Aidbox User Docs
Run Aidbox locallyRun Aidbox in SandboxTalk to us Ask community
  • Aidbox FHIR platform documentation
    • Features
    • Architecture
  • Getting Started
    • Run Aidbox in Sandbox
    • Run Aidbox locally
    • Run Aidbox on AWS
    • Upload Sample Data
  • Tutorials
    • CRUD, Search Tutorials
      • Delete data
      • Set up uniqueness in Resource
      • Search Tutorials
        • Custom SearchParameter tutorial
        • Create custom Aidbox Search resource
        • Multilingual search tutorial
        • Migrate from Aidbox SearchParameter to FHIR SearchParameter
        • Change sort order by locale collation
    • Bulk API Tutorials
      • 🎓Synthea by Bulk API
      • 🎓$dump-sql tutorial
    • Security & Access Control Tutorials
      • Allow patients to see their own data
      • Restrict operations on resource type
      • Relationship-based access control
      • Creating user & set up full user access
      • Restricting Access to Patient Data
      • Create and test access control
      • RBAC
        • Flexible RBAC built-in to Aidbox
        • RBAC with JWT containing role
        • RBAC with ACL
      • Set-up token introspection
      • Prohibit user to login
      • Debug access control
      • Managing Admin Access to the Aidbox UI Using Okta Groups
      • Run Multibox locally
      • How to enable labels-based access control
      • How to enable patient data access API
      • How to enable SMART on FHIR on Patient Access API
      • How to enable hierarchical access control
      • How to configure Audit Log
    • Terminology Tutorials
      • Load ICD-10 terminology into Aidbox
      • Uploading IG terminology content to external FHIR terminology server
    • Validation Tutorials
      • Upload FHIR Implementation Guide
        • Environment Variable
        • Aidbox UI
          • IG Package from Aidbox Registry
          • Public URL to IG Package
          • Local IG Package
        • Aidbox FHIR API
        • UploadFIG Tool
      • ISiK
      • Carin BB
      • US Core
      • Davinci Pdex
      • mCode
    • Integration Toolkit Tutorials
      • Postmark integration tutorial
      • Mailgun integration tutorial
    • Subscriptions Tutorials
      • AidboxTopicSubscription NATS tutorial
    • Other tutorials
      • Run Aidbox with FHIR R6
      • Migrate from Multibox to Aidbox
      • SDC with Custom Resources
      • How to create FHIR NPM package
      • Migrate from legacy licence portal to Aidbox portal
      • How to run Aidbox in GCP Cloud Run
  • Overview
    • Licensing and Support
    • Aidbox user portal
      • Projects
      • Licenses
      • Members
    • Aidbox UI
      • Aidbox Notebooks
      • REST Console
      • Database Console
      • Attrs stats
      • DB Tables
      • DB Queries
    • Versioning
    • Release Notes
    • Contact us
  • Configuration
    • Settings
    • Configure Aidbox and Multibox
    • Init Bundle
  • API
    • REST API
      • CRUD
        • Create
        • Read
        • Update
        • Patch
        • Delete
      • FHIR Search
        • SearchParameter
        • Include and Revinclude
        • Chaining
      • Aidbox Search
      • Bundle
      • History
      • $everything on Patient
      • Other
        • Aidbox & FHIR formats
        • Capability Statement
        • $document
        • Observation/$lastn
        • $validate
        • SQL endpoints
        • $matcho
        • $to-format
        • Aidbox version
        • Health check
    • Bulk API
      • Configure Access Policies for Bulk API
      • $dump
      • $dump-sql
      • $dump-csv
      • $export
      • $load & /fhir/$load
      • $import & /fhir/$import
      • aidbox.bulk data import
      • Bulk import from an S3 bucket
    • Batch/Transaction
    • GraphQL API
    • Other APIs
      • Plan API
        • Provider Directory API
          • Practitioner
          • PractitionerRole
          • Organization
          • OrganizationAffiliation
        • Plan API Overview
      • Archive/Restore API
        • create-archive
        • restore-archive
        • prune-archived-data
        • delete-archive
      • ETAG support
      • Cache
      • Changes API
      • RPC API
      • Sequence API
      • Encryption API
      • Batch Upsert
  • Modules
    • Profiling and validation
      • FHIR Schema Validator
        • Aidbox FHIR IGs Registry
        • Setup Aidbox with FHIR Schema validation engine
      • Skip validation of references in resource using request header
      • Asynchronous resource validation
    • Security & Access Control
      • Authentication Flows
        • Basic Auth
        • Client Credentials Grant
        • Resource Owner Grant
        • Authorization Code Grant
        • Implicit Grant
        • Two Factor Authentication
        • External OAuth 2.0 Providers
        • Token Exchange
      • External identity providers
        • Aidbox
        • Okta
        • Azure AD
        • Azure AD with certificate authentication
        • Keycloak
        • GitHub
        • Microsoft AD FS
        • Apple
      • Access Control
        • AccessPolicy
        • Evaluation engines
        • Role-Based Access Control (/RBAC)
        • Attribute-based Access Control (/ABAC)
        • Multitenancy
        • Access control lists (/ACL)
        • Access policy dev tool
        • AccessPolicy best practices
      • Audit
        • Audit Log
    • Observability
      • Getting started
        • Run Aidbox with OpenTelemetry locally
        • How to export telemetry to the OTEL collector
      • Logs
        • How-to guides
          • OpenTelemetry logs
          • Elastic Logs and Monitoring Integration
          • Datadog Log management integration
          • Loki Log management integration
        • Tutorials
          • Log analysis and visualization tutorial
          • Export logs to Datadog tutorial
        • Extending Aidbox Logs
        • Technical reference
          • Log appenders
          • Log transformations
          • Log Schema
          • OTEL logs exporter parameters
      • Metrics
        • How-to guides
          • How to export metrics to the OTEL collector
          • Use Aidbox Metrics Server
          • Set-up Grafana integration
        • Technical reference
          • OpenTelemetry Metrics
          • OTEL metrics exporter parameters
      • Traces
        • How to use tracing
        • OTEL traces exporter parameters
    • Subscriptions
      • Aidbox topic-based subscriptions
        • Kafka AidboxTopicDestination
        • Webhook AidboxTopicDestination
        • GCP Pub/Sub AidboxTopicDestination
        • Tutorial: produce QuestionnaireResponse to Kafka topic
      • Aidbox SubSubscriptions
    • Aidbox Forms
      • Getting started
      • Aidbox Forms Interface
      • Aidbox UI Builder
        • UI Builder Interface
        • Form creation
          • Form Settings
          • Widgets
          • Components
          • Versioning
          • Form customisation in Theme Editor
          • Form signature
          • How-to guides
            • How to: populate forms with data
            • How to extract data from forms
            • How to calculate form filling percentage
          • Multilingual forms
          • FHIRPath Editor
        • Import Questionnaire
        • Form sharing
        • Printing forms
          • Template-based PDF generation
        • FHIR versions
        • Offline forms
        • Embedding
          • Request Interception
        • Configuration
        • Forms multitenancy
        • Building reports using SQL on FHIR
        • Integration with external terminology servers
        • External FHIR servers as a data backend
        • Store attachments in S3-like storages
      • Access Control in Forms
      • Audit Logging in Forms
      • Aidbox Form Gallery
    • Define extensions
      • Extensions using StructureDefinition
      • Extensions using FHIRSchema
    • Custom Resources
      • Custom resources using FHIR Schema
      • Custom resources using StructureDefinition
      • Migrate to FHIR Schema
        • Migrate custom resources defined with Entity & Attributes to FHIR Schema
        • Migrate custom resources defined with Zen to FHIR Schema
    • Aidbox terminology module
      • Concept
        • $translate-concepts
        • Handling hierarchies using ancestors
      • ValueSet
        • ValueSet Expansion
        • ValueSet Code Validation
        • Create a ValueSet
      • CodeSystem
        • CodeSystem Concept Lookup
        • CodeSystem Subsumption testing
        • CodeSystem Code Composition
      • Import external terminologies
        • Import flat file (/CSV)
        • $import operation
        • Ready-to-use terminologies
      • $translate on ConceptMap
    • SQL on FHIR
      • Defining flat views with View Definitions
      • Query data from flat views
      • Reference
    • Integration toolkit
      • C-CDA / FHIR Converter
        • List of supported templates
          • Admission Diagnosis Section (/V3)
          • Advance Directives Section (/entries optional) (/V3)
          • Advance Directives Section (/entries required) (/V3)
          • Allergies and Intolerances Section (/entries optional) (/V3)
          • Allergies and Intolerances Section (/entries required) (/V3)
          • Assessment Section
          • Chief Complaint Section
          • Chief Complaint and Reason for Visit Section
          • Complications Section (/V3)
          • Course of Care Section
          • DICOM Object Catalog Section - DCM 121181
          • Default Section Rules
          • Discharge Diagnosis Section (/V3)
          • Document Header
          • Encounters Section (/entries optional) (/V3)
          • Encounters Section (/entries required) (/V3)
          • Family History Section (/V3)
          • Functional Status Section (/V2)
          • General Status Section
          • Goals Section
          • Health Concerns Section (/V2)
          • History of Present Illness Section
          • Hospital Consultations Section
          • Hospital Course Section
          • Hospital Discharge Instructions Section
          • Hospital Discharge Physical Section
          • Hospital Discharge Studies Summary Section
          • Immunizations Section (/entries optional) (/V3)
          • Immunizations Section (/entries required) (/V3)
          • Medical (/General) History Section
          • Medical Equipment Section (/V2)
          • Medications Administered Section (/V2)
          • Medications Section (/entries optional) (/V2)
          • Medications Section (/entries required) (/V2)
          • Mental Status Section (/V2)
          • Notes
          • Nutrition Section
          • Objective Section
          • Operative Note Fluids Section
          • Operative Note Surgical Procedure Section
          • Past Medical History (/V3)
          • Payers Section (/V3)
          • Plan of Treatment Section (/V2)
          • Postprocedure Diagnosis Section (/V3)
          • Preoperative Diagnosis Section (/V3)
          • Problem Section (/entries optional) (/V3)
          • Problem Section (/entries required) (/V3)
          • Procedure Description Section
          • Procedure Disposition Section
          • Procedure Estimated Blood Loss Section
          • Procedure Implants Section
          • Procedure Specimens Taken Section
          • Procedures Section (/entries optional) (/V2)
          • Procedures Section (/entries required) (/V2)
          • Reason for Visit Section
          • Results Section (/entries optional) (/V3)
          • Results Section (/entries required) (/V3)
          • Review of Systems Section
          • Social History Section (/V3)
          • Vital Signs Section (/entries optional) (/V3)
          • Vital Signs Section (/entries required) (/V3)
        • How to deploy the service
        • Producing C-CDA documents
        • How to customize conversion rules
      • HL7 v2 Integration
        • HL7 v2 integration with Aidbox Project
        • Mappings with lisp/mapping
      • X12 message converter
      • Analytics
        • Power BI
      • Mappings
      • Email Providers integration
        • Setup SMTP provider
    • SMARTbox | FHIR API for EHRs
      • Get started
        • Set up Smartbox locally
        • Deploy Smartbox with Kubernetes
      • (/g)(/10) Standardized API for patient and population services
      • The B11 Decision Support Interventions
        • Source attributes
        • Feedback Sections
      • How-to guides
        • Pass Inferno tests with Smartbox
        • Perform EHR launch
        • Pass Inferno Visual Inspection and Attestation
        • Revoke granted access
        • Set up EHR-level customization
        • Check email templates
        • Setup email provider
        • Register users
        • Set up SSO with Auth0
        • Publish Terms of Use link onto the documentation page
        • Find out what resources were exported during the $export operation
        • Find documentation endpoint
      • Background information
        • Considerations for Testing with Inferno ONC
        • Adding Clients for Inferno tests
        • Multitenancy approach
        • What is Tenant
        • Email templating
    • ePrescription
      • Getting started
      • Authentication with mTLS
      • Pharmacies synchronization
      • Prescribing
        • NewRx Message
        • CancelRx Message
        • How to test Callback
      • Directory
        • DirectoryDownload Message
        • GetProviderLocation Message
        • AddProviderLocation Message
        • UpdateProviderLocation Message
        • DisableProviderLocation Message
      • Medications
        • FDB
      • References
        • Environment Variables
      • Frequently Asked Questions
    • Other modules
      • MDM
        • Train model
        • Configure MDM module
        • Find duplicates: $match
        • Mathematical details
      • MCP
  • Database
    • Overview
    • Database schema
    • PostgreSQL Extensions
    • AidboxDB
      • HA AidboxDB
    • Tutorials
      • Migrate to AidboxDB 16
      • Working with pgAgent
  • File storage
    • AWS S3
    • GCP Cloud Storage
    • Azure Blob Storage
    • Oracle Cloud Storage
  • Deployment and maintenance
    • Deploy Aidbox
      • Run Aidbox on Kubernetes
        • Deploy Production-ready Aidbox to Kubernetes
        • Deploy Aidbox with Helm Charts
        • Highly Available Aidbox
        • Self-signed SSL certificates
      • Run Aidbox on managed PostgreSQL
      • How to inject env variables into Init Bundle
    • Backup and Restore
      • Crunchy Operator (/pgBackRest)
      • pg_dump
      • pg_basebackup
      • WAL-G
    • Indexes
      • Get suggested indexes
      • Create indexes manually
  • App development
    • Use Aidbox with React
    • Aidbox SDK
      • Aidbox JavaScript SDK
      • Apps
      • NodeJs SDK
      • Python SDK
    • Examples
  • Reference
    • FHIR Schema reference
    • Settings reference
      • General
      • FHIR
      • Security & Access Control
      • Modules
      • Database
      • Web Server
      • Observability
      • Zen Project
    • Environment variables
      • Aidbox required environment variables
      • Optional environment variables
      • AidboxDB environment variables
    • System resources reference
      • IAM Module Resources
      • SDC Module Resources
      • Base Module Resources
      • Bulk Module Resources
      • AWF Module Resources
      • Cloud Module Resources
      • HL7v2 Module Resources
      • SQL on FHIR Module Resources
    • Email Providers reference
      • Notification resource reference
      • Mailgun environment variables
      • Postmark environment variables
    • Aidbox Forms reference
      • FHIR SDC API
      • Aidbox SDC API
      • Generating Questionnaire from PDF API
    • Aidbox SQL functions
  • Deprecated
    • Deprecated
      • Zen-related
        • RPC reference
          • aidbox
            • mdm
              • aidbox.mdm/update-mdm-tables
              • aidbox.mdm/match
        • FTR
        • Aidbox configuration project
          • Run Aidbox locally using Aidbox Configuraiton project
          • Aidbox configuration project structure
          • Set up and use configuration projects
          • Enable IGs
          • Repository
          • Seed Import
          • Manage Indexes in Zen Project
          • Seed v2
          • 🎓Migrate to git Aidbox Configuration Projects
          • Aidbox Configuration project reference
            • Zen Configuration
            • Aidbox project RPC reference
            • aidbox.config/config
          • Custom resources using Aidbox Project
          • First-Class Extensions using Zen
          • Zen Indexes
        • US Core IG
          • US Core IG support reference
        • Workflow Engine
          • Task
            • Aidbox Built-in Tasks
            • Task Executor API
            • Task User API
          • Workflow
            • Workflow User API
          • Services
          • Monitoring
        • FHIR conformance Deprecated guides
          • Touchstone FHIR 4.0.1 basic server
          • Touchstone FHIR USCore ClinData
          • How to enable US Core IG
            • Start Aidbox locally with US Core IG enabled
            • Add US Core IG to a running Aidbox instance
          • HL7 FHIR Da Vinci PDex Plan Net IG
        • Terminology Deprecated Tutorials
          • Inferno Test-Suite US Core 3.1.1
        • API constructor (/beta)
        • zen-lang validator
          • Write a custom zen profile
          • Load zen profiles into Aidbox
        • FHIR topic-based subscriptions
          • Set up SubscriptionTopic
          • Tutorial: Subscribe to Topic (/R4B)
          • API Reference
            • Subscription API
        • 🏗️FHIR Terminology Repository
          • FTR Specification
          • Create an FTR instance
            • FTR from CSV
            • FTR from FHIR IG
            • FTR from FTR — Direct Dependency
            • FTR from FTR — Supplement
          • FTR Manifest
          • Load SNOMED CT into Aidbox
          • Load LOINC into Aidbox
          • Load ICD-10-CM into Aidbox
          • Load RxNorm into Aidbox
          • Load US VSAC Package to Aidbox
          • Import via FTR
        • Zen Search Parameters
      • Entity / Attribute
        • Entities & Attributes
        • First-Class Extensions using Attribute
        • Custom Resources using Entity
        • Working with Extensions
        • Aidbox Search Parameters
      • Forms
      • Other
        • Custom Search
        • SearchQuery
        • Subscribe to new Patient resource
        • App Development Deprecated Tutorials
          • Receive logs from your app
            • X-Audit header
          • Working with Aidbox from .NET
          • Patient Encounter notification Application
        • Other Deprecated Tutorials
          • Resource generation with map-to-fhir-bundle-task and subscription triggers
          • APM Aidbox
          • Automatically archive AuditEvent resources in GCP storage guide
          • HL7 v2 pipeline with Patient mapping
          • How to migrate to Apline Linux
          • How to migrate transaction id to bigint
          • How to fix broken dates
          • Configure multi-tenancy
        • AidboxProfile
        • GCP Pub/Sub
Powered by GitBook
On this page
  • Database Configuration
  • Self Hosted Database
  • Cloud Databases
  • Topic Definition configuration
  • Storage Type configuration
  • Configuration Examples
  • PostgreSQL Queue Storage
  • Google Cloud Pub/Sub
  • Aidbox Workflow Engine Connector
  • Verify the SubscriptionTopic is available

Was this helpful?

Edit on GitHub
  1. Deprecated
  2. Deprecated
  3. Zen-related
  4. FHIR topic-based subscriptions

Set up SubscriptionTopic

PreviousFHIR topic-based subscriptionsNextTutorial: Subscribe to Topic (/R4B)

Last updated 5 days ago

Was this helpful?

While FHIR topic-based subscriptions are functional, they will no longer receive active development or new features. For enhanced capabilities and ongoing support, please use . This newer implementation offers improved performance, flexibility, and will continue to be developed to meet future needs.

This page describes how to set up SubscriptionTopic for each Topic Queue Storage type.

SubscriptionTopic is configured by Aidbox configuration project by zen-lang.

Since the 2405 release, using Aidbox in FHIR schema validation engine is recommended, which is incompatible with zen or Entity/Attribute options. Setup Aidbox with FHIR Schema validation engine

In general, to set up the topic in Aidbox configuration project, you need the following steps:

  1. Import fhir.topic-based-subscription namespace (and hl7-fhir-uv-subscriptions-backport-r4b if you're using FHIR 4.3.0).

  2. Create Topic Definition which configures triggers and filters.

  3. Specify Storage Type (e.g. PostgreSQL or Google Pub/Sub) and configuration values for it.

  4. Link Topic Definition from step 2 and Storage Type from step 3 into one service.

  5. Link this service from step 4 in the entry point (e.g. box).

Database Configuration

SubscriptionTopic-based services rely on . Aidbox plays the role of the Subscriber to receive and process relevant events from the database. A replication slot is created automatically for each service when it starts. Therefore, in general, the database should be configured to support logical replication, and the POSTGRES_USER should have replication privileges.

Self Hosted Database

If you use then it's already configured to work properly with SubscriptionTopic.

Otherwise, check that wal_level is set to logical in postgresql.conf file:

wal_level = logical

Cloud Databases

AWS RDS PostgreSQL

  1. Navigate to RDS, this should take you to the RDS Dashboard.

  2. Click Parameter Groups in the Resources panel on the dashboard.

  3. Create parameter group

  4. Click the Edit parameters

  5. Search for rds.logical_replication and set its value to 1.

  6. Navigate to the database instance, click Modify, and in the DB parameter group menu select the parameter group created in Step 3.

To check that the setting is applied run query SHOW wal_level; he result should be logical.

Azure Database for PostgreSQL - Flexible Server

  1. Go to server parameters page on the portal.

  2. Set the server parameter wal_level to logical.

  3. Update max_worker_processes parameter value to at least 16. Otherwise, you may run into issues like WARNING: out of background worker slots.

  4. Save the changes and restart the server to apply the changes.

  5. Grant the user with which Aidbox connects to the database replication permissions: SQLCopy.

    ALTER ROLE <username> WITH REPLICATION;

\

Topic Definition configuration

Topic Definition contains:

  • url - should be absolute URI (globally unique)

    • description

    • resource

    • filterParameter

    • modifier

    • _fhirPath

    Example:

    ...
    patient-topic
    {:zen/tags #{fhir.topic-based-subscription/topic-definition}
     :url "<YOUR URL>"
     :resourceTrigger [{:resource "Patient"
                        :fhirPathCriteria "%current.birthDate > '2023'"}
                       {:resource "Organization"}
                       < ... other resources ... >]
     :canFilterBy [{:resource        "Patient"
                    :filterParameter "first-family-name"
                    :_fhirPath       "%current.name[0].family"
                    :modifier        ["eq"]}
                   {:resource        "Patient"
                    :filterParameter "first-family-name-alternative"
                    :_fhirPath       "name[0].family"
                    :modifier        ["eq"]}
                   <... other filters ... >]}
    ...

Storage Type configuration

Topic storage specifies where to store and how to process events queue.

It is zen schema tagged with fhir.topic-based-subscription/topic-storage, containing:

  • storage-type: Choose one of the following:

    • PostgreSQL: fhir.topic-based-subscription/postgres

    • Google Pub/Sub: fhir.topic-based-subscription/gcp-pubsub

    • Aidbox Workflow Engine Connector: fhir.topic-based-subscription/awf-connector

  • heartbeat-rate: Heartbeat rate requested for current storage in sec. Default is 10sec.

  • timeout: Timeout value for batching in seconds

  • maxCount: Max number of resources to hold on in Aidbox before timeout (batch sending)

  • maxContent: empty, id-only or full-resource. Defines the permission of how much the event messages of Subscription can have information about the triggered resource. It isn't allowed to create Subscriptions with Subscription.content value that requires more content thamaxContent value of its own topic. Default is empty.

  • include-transaction: For tests purposes. Turns on includeTransaction replication slot option. Default is false

  • fhirpath-ignore-on-error: Ignore FhirPath execution errors on persisting queues.

  • status-interval: For test purpose mainly. Specifies the number of time between status packets sent back to the server. This allows for easier monitoring of the progress from server. A value of zero disables the periodic status updates completely, although an update will still be sent when requested by the server, to avoid timeout disconnect. The default value is 10 seconds.

In addition to the above, there are several properties for each specific storage-type.

For PostgreSQL:

  • table-name: Name for table to store events

  • senders-number: Number of services that deliver subsctiptions.

For Google Pub/Sub:

  • topic-name: Topic name in GCP.

  • bytes-threshold: Max bytes to hold until publishing. Default is 10000.

  • project-name: Project name in GCP.

  • enable-message-ordering: If true, enables message ordering. Uses focusResourceType and focusId as messageOrderingKey. Applied only when maxContent is 'id-only' or 'full-resource'.

For Aidbox Workflow Engine Connector:

  • rules: Trigger rule for Task/Workflow. Either task or workflow below should be used to indicate which activity should be triggered.

    • task: Indicate which Task should be triggered by definition field.

    • workflow: Indicate which Workflow should be triggered by definition field.

Example:

...
pubsub-patient-storage
    {:zen/tags #{fhir.topic-based-subscription/topic-storage}
     :storage-type fhir.topic-based-subscription/gcp-pubsub
     :status-interval 0.5
     :maxContent "full-resource"
     :heartbeat-rate 0.5
     :project-name "local-project"
     :topic-name "test-topic"
     :enable-message-ordering true
     :timeout 1
     :bytes-threshold 1000000
     :maxCount 10}
...

Configuration Examples

Examples of an Aidbox entry point for each storage type, configured with one topic:

PostgreSQL Queue Storage

{ns     aidbox-with-subscriptions
 import #{fhir.topic-based-subscription
          ;; the profile required for FHIR 4.3.0 version:
          hl7-fhir-uv-subscriptions-backport-r4b}

 observation-topic
 {:zen/tags #{fhir.topic-based-subscription/topic-definition}
  ;; SubscriptionTopic url should be an absolute URI (globally unique)
  :url "http://aidbox.app/SubscriptionTopic/observations"
  :resourceTrigger [
                    ;; an SubscriptionTopic may consist 
                    ;; of any number of resources
                    {:resource "Observation"
                     :fhirPathCriteria "%current.value.ofType(Quantity).value > 10"}]
  :canFilterBy [{:resource        "Observation"
                 :filterParameter "value"
                 ;; _fhirPath specifies how to calculate value for the filter
                 :_fhirPath       "%current.value.ofType(Quantity).value"
                 :modifier        ["eq" "gt" "lt" "ge" "le"]}

                {:resource        "Observation"
                 :filterParameter "value-increase"
                 ;; both %current and %previous state of the 
                 ;; resource are available
                 :_fhirPath       "%current.value.ofType(Quantity).value > %previous.value.ofType(Quantity).value"
                 :modifier        ["eq"]}]}


 postgres-observation-topic-storage
 {:zen/tags #{fhir.topic-based-subscription/topic-storage}
  ;; At the moment only PostgreSQL is available as a storage
  :storage-type fhir.topic-based-subscription/postgres
  ;; The name of the table which will be created to store topic events:
  :table-name "observation_topic"
  ;; Possible value for maxContent are: "empty" | "id-only" | "full-resource"
  ;; The actual Resource is only stored in queue when "full-resource" value 
  ;; are specified. When deciding which payload type to request, 
  ;; systems SHOULD consider both ease of processing and security of PHI. 
  ;; To mitigate the risk of information leakage, systems SHOULD use the 
  ;; minimum level of detail consistent with the use case. 
  ;; In practice, id-only provides a good balance between security 
  ;; and performance for many real-world scenarios.
  :maxContent "full-resource"
  ;; The period, in seconds, during which events from the replication slot 
  ;; will be buffered before being written to storage:
  :timeout  10
  ;; The maximum number of events the replication slot will buffer before
  ;; writing to storage:
  :maxCount 100
  ;; Interval in seconds periodic heartbeat record generation 
  ;; in cdc_topic_heartbeat_table, to reclaim the WAL space:
  :heartbeat-rate 120
  ;; The number of workers responsible for notification delivery. 
  ;; min number 4 is advised. One worker can handle up to 1024 subsrciptions:
  :senders-number 4}


 ;; Service which binds storage with topic definition
 observation-topic-srv
 {:zen/tags #{aidbox/service}
  :engine fhir.topic-based-subscription/change-data-capture-service-engine
  :topic-definition observation-topic
  :topic-storage postgres-observation-topic-storage}


 ;; Entrypoint for the instance with corresponding service
 box
 {:zen/tags #{aidbox/system}
  :zen/desc "box for topic test"
  :services {:observation-topic-srv observation-topic-srv}}}

Google Cloud Pub/Sub

{ns pub-sub-topic
    import #{fhir.topic-based-subscription}

    observation-topic
    {:zen/tags #{fhir.topic-based-subscription/topic-definition}
     ;; SubscriptionTopic url should be an absolute URI (globally unique)
     :url "http://aidbox.app/SubscriptionTopic/observations"
     :resourceTrigger [
                       ;; an SubscriptionTopic may consist 
                       ;; of any number of resources
                       {:resource "Observation"
                        :fhirPathCriteria "%current.value.ofType(Quantity).value > 10"}]
     :canFilterBy [{:resource        "Observation"
                    :filterParameter "value"
                    ;; _fhirPath specifies how to calculate value for the filter
                    :_fhirPath       "%current.value.ofType(Quantity).value"
                    :modifier        ["eq" "gt" "lt" "ge" "le"]}

                   {:resource        "Observation"
                    :filterParameter "value-increase"
                    ;; both %current and %previous state of the 
                    ;; resource are available
                    :_fhirPath       "%current.value.ofType(Quantity).value > %previous.value.ofType(Quantity).value"
                    :modifier        ["eq"]}]}


    gcp-pubsub-observation-topic-storage
    {:zen/tags #{fhir.topic-based-subscription/topic-storage}
     :storage-type fhir.topic-based-subscription/gcp-pubsub
     :status-interval 10
     :heartbeat-rate 120
     :maxContent "full-resource"
     :project-name "aidbox-cloud-demo"
     :topic-name "aidbox-tbs-test"
     :timeout  1
     :maxCount 100
     :bytes-threshold 10000
     :enable-message-ordering false}

    observation-topic-srv
    {:zen/tags #{aidbox/service}
     :engine fhir.topic-based-subscription/change-data-capture-service-engine
     :topic-definition observation-topic
     :topic-storage gcp-pubsub-observation-topic-storage}

    box
    {:zen/tags #{aidbox/system}
     :zen/desc "box for topic test"
     :services {:patient-topic-srv patient-topic-srv}}}

Aidbox Workflow Engine Connector

{ns awf-connector-topic
    import #{fhir.topic-based-subscription}

    topic-def
    {:zen/tags #{fhir.topic-based-subscription/topic-definition}
     :url "http://aidbox.app/SubscriptionTopic/observations"
     :resourceTrigger [{:resource "Patient"}
                       {:resource "Encounter"
                        :fhirPathCriteria "%current.status = 'completed'"}]

     :canFilterBy [{:resource        "Patient"
                    :filterParameter "Patient-deceased"
                    :_fhirPath       "(%previous.deceased.exists().not() or %previoius.deceased = false) and (%current.deceased.exists() and %current.deceased = true)"
                    :modifier        ["eq"]}
                   {:filterParameter "resource-type"
                    :_fhirPath       "resourceType"
                    :modifier        ["eq"]}]}

    awf-connector
    {:zen/tags #{fhir.topic-based-subscription/topic-storage}
     :storage-type fhir.topic-based-subscription/awf-connector
     :maxContent "full-resource"
     :rules [{:task     {:definition      encounter-completed-task}
              :filterBy [{:filterParameter "resource-type"
                          :modifier        "eq"
                          :value           "Encounter"}]}
             {:task     {:definition      patient-change-task}
              :filterBy [{:filterParameter "resource-type"
                          :modifier        "eq"
                          :value           "Patient"}]}
             {:task     {:definition       patient-deceased-task}
              :filterBy [{:resourceType    "Patient"
                          :filterParameter "Patient-deceased"
                          :modifier        "eq"
                          :value           "true"}]}]}

    patient-topic-srv
    {:zen/tags #{aidbox/service}
     :engine fhir.topic-based-subscription/change-data-capture-service-engine
     :topic-definition topic-def
     :topic-storage awf-connector}

    box
    {:zen/tags #{aidbox/system}
     :zen/desc "box for topic test"
     :services {:patient-topic-srv patient-topic-srv}}}
     

Verify the SubscriptionTopic is available

  • Run Aidbox and check logs for the following output

15:17:41 cro-1 :fhir.topic-based-subscription.topic/starting-replication-service {:service aidbox-with-subscriptions/observation-topic-srv, :slot-name "tbs_aidbox_with_subscriptions__observation_topic_srv"}
15:17:41 cro-2 :fhir.topic-based-subscription.topic/starting-pg-persist-thread {:service aidbox-with-subscriptions/observation-topic-srv, :slot-name "tbs_aidbox_with_subscriptions__observation_topic_srv"}
15:17:41 40c7a :fhir.topic-based-subscription.topic/service-started {:service aidbox-with-subscriptions/observation-topic-srv}
15:17:41 cro-3 :fhir.topic-based-subscription.topic/starting-delivery-thread {:service aidbox-with-subscriptions/observation-topic-srv, :slot-name "tbs_aidbox_with_subscriptions__observation_topic_srv"}
15:17:41 cro-4 :fhir.topic-based-subscription.subscription/start-notification-sender {:service aidbox-with-subscriptions/observation-topic-srv, :idx 0}
15:17:41 cro-5 :fhir.topic-based-subscription.subscription/start-notification-sender {:service aidbox-with-subscriptions/observation-topic-srv, :idx 1}
  • Discover SubscriptionTopic resources in Aidbox using FHIR API

GET /fhir/SubscriptionTopic
content-type: application/json
accept: application/json
{
 "resourceType": "Bundle",
 "type": "searchset",
 "meta": {
  "versionId": "0"
 },
 "total": 1,
 "link": [
  {
   "relation": "first",
   "url": "http://localhost:8765/fhir/SubscriptionTopic?page=1"
  },
  {
   "relation": "self",
   "url": "http://localhost:8765/fhir/SubscriptionTopic?page=1"
  }
 ],
 "entry": [
  {
   "resource": {
    "id": "cf153a1fde850de90215a6cd0f0abcf5",
    "url": "http://aidbox.app/SubscriptionTopic/observations",
    "meta": {
     "slot_name": "tbs_aidbox_with_subscriptions__observation_topic_srv",
     "queue_table_name": "observation_topic",
     "subscription_status_table_name": "observation_topic_subs_status",
     "lastUpdated": "2023-08-31T15:17:40.355938Z",
     "versionId": "0",
     "extension": [
      {
       "url": "ex:createdAt",
       "valueInstant": "2023-08-31T15:17:40.355938Z"
      }
     ]
    },
    "status": "active",
    "canFilterBy": [
     {
      "modifier": [
       "eq",
       "gt",
       "lt",
       "ge",
       "le"
      ],
      "resource": "Observation",
      "filterParameter": "value"
     },
     {
      "modifier": [
       "eq"
      ],
      "resource": "Observation",
      "filterParameter": "value-increase"
     }
    ],
    "resourceType": "SubscriptionTopic",
    "resourceTrigger": [
     {
      "resource": "Observation",
      "fhirPathCriteria": "%current.value.ofType(Quantity).value > 10"
     }
    ]
   },
   "search": {
    "mode": "match"
   },
   "fullUrl": "http://localhost:8765/SubscriptionTopic/cf153a1fde850de90215a6cd0f0abcf5",
   "link": [
    {
     "relation": "self",
     "url": "http://localhost:8765/SubscriptionTopic/cf153a1fde850de90215a6cd0f0abcf5"
    }
   ]
  }
 ]
}
  • Open Aidbox UI -> Subscription Topics to check the topic status

Check for other relevant settings in .

To enable a database instance hosted with to work with the SubscriptionTopic services parameter rds.logical_replication should be set to 1. One possible way to accomplish this is as follows:

Setup your database instance according to (Prerequisites for logical replication and logical decoding):

Server Parameters

Topic Definition is a zen schema which is tagged with fhir.topic-based-subscription/topic-definition and corresponds to resource.

resourceTrigger - an array of triggers, each specifies what resource types and FhirPath () criteria should trigger Aidbox to send changes to subscribers of the topic.

canFilterBy - list of properties by which Subscriptions on the SubscriptionTopic can be filtered, contains FHIR and also FhirPath:

filterBy: The list of filtering criteria for events on Topic (has the same properties as: resourceType, filterParameter, comparator, modifier, value)

Aidbox topic-based subscriptions
Aidbox configuration project
PostgreSQL Logical Replication
aidboxdb-image
PostgreSQL documentation
AWS RDS
the official guide
FHIR SubscriptionTopic
https://hl7.org/fhir/fhirpath.html
SubscriptionTopic.canFilterBy
FHIR Subscription.filterBy
#postgresql-queue-storage
#google-cloud-pub-sub
#aidbox-workflow-engine-connector