Set up SubscriptionTopic

This page describes how to set up SubscriptionTopic for each Topic Queue Storage type.

SubscriptionTopic is configured by Aidbox configuration project by zen-lang.

Aidbox configuration project

In general, to set up the topic in Aidbox configuration project, you need the following steps:

  1. Import fhir.topic-based-subscription namespace (and hl7-fhir-uv-subscriptions-backport-r4b if you're using FHIR 4.3.0).

  2. Create Topic Definition which configures triggers and filters.

  3. Specify Storage Type (e.g. PostgreSQL or Google Pub/Sub) and configuration values for it.

  4. Link Topic Definition from step 2 and Storage Type from step 3 into one service.

  5. Link this service from step 4 in the entry point (e.g. box).

Database Configuration

SubscriptionTopic-based services rely on PostgreSQL Logical Replication. Aidbox plays the role of the Subscriber to receive and process relevant events from the database. A replication slot is created automatically for each service when it starts. Therefore, in general, the database should be configured to support logical replication, and the POSTGRES_USER should have replication privileges.

Self Hosted Database

If you use AidboxDB then it's already configured to work properly with SubscriptionTopic.

Otherwise, check that wal_level is set to logical in postgresql.conf file:

wal_level = logical

Check for other relevant settings in PostgreSQL documentation.

Cloud Databases

AWS RDS PostgreSQL

To enable a database instance hosted with AWS RDS to work with the SubscriptionTopic services parameter rds.logical_replication should be set to 1. One possible way to accomplish this is as follows:

  1. Navigate to RDS, this should take you to the RDS Dashboard.

  2. Click Parameter Groups in the Resources panel on the dashboard.

  3. Create parameter group

  4. Click the Edit parameters

  5. Search for rds.logical_replication and set its value to 1.

  6. Navigate to the database instance, click Modify, and in the DB parameter group menu select the parameter group created in Step 3.

To check that the setting is applied run query SHOW wal_level; he result should be logical.

Azure Database for PostgreSQL - Flexible Server

Setup your database instance according to the official guide (Prerequisites for logical replication and logical decoding):

  1. Go to server parameters page on the portal.

  2. Set the server parameter wal_level to logical.

  3. Update max_worker_processes parameter value to at least 16. Otherwise, you may run into issues like WARNING: out of background worker slots.

  4. Save the changes and restart the server to apply the changes.

  5. Grant the user with which Aidbox connects to the database replication permissions: SQLCopy.

    ALTER ROLE <username> WITH REPLICATION;
Server Parameters

Topic Definition configuration

Topic Definition is a zen schema which is tagged with fhir.topic-based-subscription/topic-definition and corresponds to FHIR SubscriptionTopic resource.

Topic Definition contains:

  • url - should be absolute URI (globally unique)

  • resourceTrigger - an array of triggers, each specifies what resource types and FhirPath (https://hl7.org/fhir/fhirpath.html) criteria should trigger Aidbox to send changes to subscribers of the topic.

  • canFilterBy - list of properties by which Subscriptions on the SubscriptionTopic can be filtered, contains FHIR SubscriptionTopic.canFilterBy and also FhirPath:

    • description

    • resource

    • filterParameter

    • modifier

    • _fhirPath

    Example:

    ...
    patient-topic
    {:zen/tags #{fhir.topic-based-subscription/topic-definition}
     :url "<YOUR URL>"
     :resourceTrigger [{:resource "Patient"
                        :fhirPathCriteria "%current.birthDate > '2023'"}
                       {:resource "Organization"}
                       < ... other resources ... >]
     :canFilterBy [{:resource        "Patient"
                    :filterParameter "first-family-name"
                    :_fhirPath       "%current.name[0].family"
                    :modifier        ["eq"]}
                   {:resource        "Patient"
                    :filterParameter "first-family-name-alternative"
                    :_fhirPath       "name[0].family"
                    :modifier        ["eq"]}
                   <... other filters ... >]}
    ...

Storage Type configuration

Topic storage specifies where to store and how to process events queue.

It is zen schema tagged with fhir.topic-based-subscription/topic-storage, containing:

  • storage-type: Choose one of the following:

    • PostgreSQL: fhir.topic-based-subscription/postgres

    • Google Pub/Sub: fhir.topic-based-subscription/gcp-pubsub

    • Aidbox Workflow Engine Connector: fhir.topic-based-subscription/awf-connector

  • heartbeat-rate: Heartbeat rate requested for current storage in sec. Default is 10sec.

  • timeout: Timeout value for batching in seconds

  • maxCount: Max number of resources to hold on in Aidbox before timeout (batch sending)

  • maxContent: empty, id-only or full-resource. Defines the permission of how much the event messages of Subscription can have information about the triggered resource. It isn't allowed to create Subscriptions with Subscription.content value that requires more content thamaxContent value of its own topic. Default is empty.

  • include-transaction: For tests purposes. Turns on includeTransaction replication slot option. Default is false

  • fhirpath-ignore-on-error: Ignore FhirPath execution errors on persisting queues.

  • status-interval: For test purpose mainly. Specifies the number of time between status packets sent back to the server. This allows for easier monitoring of the progress from server. A value of zero disables the periodic status updates completely, although an update will still be sent when requested by the server, to avoid timeout disconnect. The default value is 10 seconds.

In addition to the above, there are several properties for each specific storage-type.

For PostgreSQL:

  • table-name: Name for table to store events

  • senders-number: Number of services that deliver subsctiptions.

For Google Pub/Sub:

  • topic-name: Topic name in GCP.

  • bytes-threshold: Max bytes to hold until publishing. Default is 10000.

  • project-name: Project name in GCP.

  • enable-message-ordering: If true, enables message ordering. Uses focusResourceType and focusId as messageOrderingKey. Applied only when maxContent is 'id-only' or 'full-resource'.

For Aidbox Workflow Engine Connector:

  • rules: Trigger rule for Task/Workflow. Either task or workflow below should be used to indicate which activity should be triggered.

    • filterBy: The list of filtering criteria for events on Topic (has the same properties as FHIR Subscription.filterBy: resourceType, filterParameter, comparator, modifier, value)

    • task: Indicate which Task should be triggered by definition field.

    • workflow: Indicate which Workflow should be triggered by definition field.

Example:

...
pubsub-patient-storage
    {:zen/tags #{fhir.topic-based-subscription/topic-storage}
     :storage-type fhir.topic-based-subscription/gcp-pubsub
     :status-interval 0.5
     :maxContent "full-resource"
     :heartbeat-rate 0.5
     :project-name "local-project"
     :topic-name "test-topic"
     :enable-message-ordering true
     :timeout 1
     :bytes-threshold 1000000
     :maxCount 10}
...

Configuration Examples

Examples of an Aidbox entry point for each storage type, configured with one topic:

PostgreSQL Queue Storage

Google Cloud Pub/Sub

Aidbox Workflow Engine Connector

PostgreSQL Queue Storage

{ns     aidbox-with-subscriptions
 import #{fhir.topic-based-subscription
          ;; the profile required for FHIR 4.3.0 version:
          hl7-fhir-uv-subscriptions-backport-r4b}

 observation-topic
 {:zen/tags #{fhir.topic-based-subscription/topic-definition}
  ;; SubscriptionTopic url should be an absolute URI (globally unique)
  :url "http://aidbox.app/SubscriptionTopic/observations"
  :resourceTrigger [
                    ;; an SubscriptionTopic may consist 
                    ;; of any number of resources
                    {:resource "Observation"
                     :fhirPathCriteria "%current.value.ofType(Quantity).value > 10"}]
  :canFilterBy [{:resource        "Observation"
                 :filterParameter "value"
                 ;; _fhirPath specifies how to calculate value for the filter
                 :_fhirPath       "%current.value.ofType(Quantity).value"
                 :modifier        ["eq" "gt" "lt" "ge" "le"]}

                {:resource        "Observation"
                 :filterParameter "value-increase"
                 ;; both %current and %previous state of the 
                 ;; resource are available
                 :_fhirPath       "%current.value.ofType(Quantity).value > %previous.value.ofType(Quantity).value"
                 :modifier        ["eq"]}]}


 postgres-observation-topic-storage
 {:zen/tags #{fhir.topic-based-subscription/topic-storage}
  ;; At the moment only PostgreSQL is available as a storage
  :storage-type fhir.topic-based-subscription/postgres
  ;; The name of the table which will be created to store topic events:
  :table-name "observation_topic"
  ;; Possible value for maxContent are: "empty" | "id-only" | "full-resource"
  ;; The actual Resource is only stored in queue when "full-resource" value 
  ;; are specified. When deciding which payload type to request, 
  ;; systems SHOULD consider both ease of processing and security of PHI. 
  ;; To mitigate the risk of information leakage, systems SHOULD use the 
  ;; minimum level of detail consistent with the use case. 
  ;; In practice, id-only provides a good balance between security 
  ;; and performance for many real-world scenarios.
  :maxContent "full-resource"
  ;; The period, in seconds, during which events from the replication slot 
  ;; will be buffered before being written to storage:
  :timeout  10
  ;; The maximum number of events the replication slot will buffer before
  ;; writing to storage:
  :maxCount 100
  ;; Interval in seconds periodic heartbeat record generation 
  ;; in cdc_topic_heartbeat_table, to reclaim the WAL space:
  :heartbeat-rate 120
  ;; The number of workers responsible for notification delivery. 
  ;; min number 4 is advised. One worker can handle up to 1024 subsrciptions:
  :senders-number 4}


 ;; Service which binds storage with topic definition
 observation-topic-srv
 {:zen/tags #{aidbox/service}
  :engine fhir.topic-based-subscription/change-data-capture-service-engine
  :topic-definition observation-topic
  :topic-storage postgres-observation-topic-storage}


 ;; Entrypoint for the instance with corresponding service
 box
 {:zen/tags #{aidbox/system}
  :zen/desc "box for topic test"
  :services {:observation-topic-srv observation-topic-srv}}}

Google Cloud Pub/Sub

{ns pub-sub-topic
    import #{fhir.topic-based-subscription}

    observation-topic
    {:zen/tags #{fhir.topic-based-subscription/topic-definition}
     ;; SubscriptionTopic url should be an absolute URI (globally unique)
     :url "http://aidbox.app/SubscriptionTopic/observations"
     :resourceTrigger [
                       ;; an SubscriptionTopic may consist 
                       ;; of any number of resources
                       {:resource "Observation"
                        :fhirPathCriteria "%current.value.ofType(Quantity).value > 10"}]
     :canFilterBy [{:resource        "Observation"
                    :filterParameter "value"
                    ;; _fhirPath specifies how to calculate value for the filter
                    :_fhirPath       "%current.value.ofType(Quantity).value"
                    :modifier        ["eq" "gt" "lt" "ge" "le"]}

                   {:resource        "Observation"
                    :filterParameter "value-increase"
                    ;; both %current and %previous state of the 
                    ;; resource are available
                    :_fhirPath       "%current.value.ofType(Quantity).value > %previous.value.ofType(Quantity).value"
                    :modifier        ["eq"]}]}


    gcp-pubsub-observation-topic-storage
    {:zen/tags #{fhir.topic-based-subscription/topic-storage}
     :storage-type fhir.topic-based-subscription/gcp-pubsub
     :status-interval 10
     :heartbeat-rate 120
     :maxContent "full-resource"
     :project-name "aidbox-cloud-demo"
     :topic-name "aidbox-tbs-test"
     :timeout  1
     :maxCount 100
     :bytes-threshold 10000
     :enable-message-ordering false}

    observation-topic-srv
    {:zen/tags #{aidbox/service}
     :engine fhir.topic-based-subscription/change-data-capture-service-engine
     :topic-definition observation-topic
     :topic-storage gcp-pubsub-observation-topic-storage}

    box
    {:zen/tags #{aidbox/system}
     :zen/desc "box for topic test"
     :services {:patient-topic-srv patient-topic-srv}}}

Aidbox Workflow Engine Connector

{ns awf-connector-topic
    import #{fhir.topic-based-subscription}

    topic-def
    {:zen/tags #{fhir.topic-based-subscription/topic-definition}
     :url "http://aidbox.app/SubscriptionTopic/observations"
     :resourceTrigger [{:resource "Patient"}
                       {:resource "Encounter"
                        :fhirPathCriteria "%current.status = 'completed'"}]

     :canFilterBy [{:resource        "Patient"
                    :filterParameter "Patient-deceased"
                    :_fhirPath       "(%previous.deceased.exists().not() or %previoius.deceased = false) and (%current.deceased.exists() and %current.deceased = true)"
                    :modifier        ["eq"]}
                   {:filterParameter "resource-type"
                    :_fhirPath       "resourceType"
                    :modifier        ["eq"]}]}

    awf-connector
    {:zen/tags #{fhir.topic-based-subscription/topic-storage}
     :storage-type fhir.topic-based-subscription/awf-connector
     :maxContent "full-resource"
     :rules [{:task     {:definition      encounter-completed-task}
              :filterBy [{:filterParameter "resource-type"
                          :modifier        "eq"
                          :value           "Encounter"}]}
             {:task     {:definition      patient-change-task}
              :filterBy [{:filterParameter "resource-type"
                          :modifier        "eq"
                          :value           "Patient"}]}
             {:task     {:definition       patient-deceased-task}
              :filterBy [{:resourceType    "Patient"
                          :filterParameter "Patient-deceased"
                          :modifier        "eq"
                          :value           "true"}]}]}

    patient-topic-srv
    {:zen/tags #{aidbox/service}
     :engine fhir.topic-based-subscription/change-data-capture-service-engine
     :topic-definition topic-def
     :topic-storage awf-connector}

    box
    {:zen/tags #{aidbox/system}
     :zen/desc "box for topic test"
     :services {:patient-topic-srv patient-topic-srv}}}
     

Verify the SubscriptionTopic is available

  • Run Aidbox and check logs for the following output

15:17:41 cro-1 :fhir.topic-based-subscription.topic/starting-replication-service {:service aidbox-with-subscriptions/observation-topic-srv, :slot-name "tbs_aidbox_with_subscriptions__observation_topic_srv"}
15:17:41 cro-2 :fhir.topic-based-subscription.topic/starting-pg-persist-thread {:service aidbox-with-subscriptions/observation-topic-srv, :slot-name "tbs_aidbox_with_subscriptions__observation_topic_srv"}
15:17:41 40c7a :fhir.topic-based-subscription.topic/service-started {:service aidbox-with-subscriptions/observation-topic-srv}
15:17:41 cro-3 :fhir.topic-based-subscription.topic/starting-delivery-thread {:service aidbox-with-subscriptions/observation-topic-srv, :slot-name "tbs_aidbox_with_subscriptions__observation_topic_srv"}
15:17:41 cro-4 :fhir.topic-based-subscription.subscription/start-notification-sender {:service aidbox-with-subscriptions/observation-topic-srv, :idx 0}
15:17:41 cro-5 :fhir.topic-based-subscription.subscription/start-notification-sender {:service aidbox-with-subscriptions/observation-topic-srv, :idx 1}
  • Discover SubscriptionTopic resources in Aidbox using FHIR API

GET /fhir/SubscriptionTopic
content-type: application/json
accept: application/json
  • Open Aidbox UI -> Subscription Topics to check the topic status

Last updated