This page describes how to set up SubscriptionTopic for each Topic Queue Storage type.
SubscriptionTopic is configured by Aidbox configuration project by zen-lang.
In general, to set up the topic in Aidbox configuration project, you need the following steps:
Import fhir.topic-based-subscription namespace (and hl7-fhir-uv-subscriptions-backport-r4b if you're using FHIR 4.3.0).
Create Topic Definition which configures triggers and filters.
Specify Storage Type (e.g. PostgreSQL or Google Pub/Sub) and configuration values for it.
Link Topic Definition from step 2 and Storage Type from step 3 into one service.
Link this service from step 4 in the entry point (e.g. box).
Database Configuration
SubscriptionTopic-based services rely on PostgreSQL Logical Replication. Aidbox plays the role of the Subscriber to receive and process relevant events from the database. A replication slot is created automatically for each service when it starts. Therefore, in general, the database should be configured to support logical replication, and the POSTGRES_USER should have replication privileges.
Self Hosted Database
If you use AidboxDB then it's already configured to work properly with SubscriptionTopic.
Otherwise, check that wal_level is set to logical in postgresql.conf file:
To enable a database instance hosted with AWS RDS to work with the SubscriptionTopic services parameter rds.logical_replication should be set to 1. One possible way to accomplish this is as follows:
Navigate to RDS, this should take you to the RDS Dashboard.
Click Parameter Groups in the Resources panel on the dashboard.
Create parameter group
Click the Edit parameters
Search for rds.logical_replication and set its value to 1.
Navigate to the database instance, click Modify, and in the DB parameter group menu select the parameter group created in Step 3.
To check that the setting is applied run query SHOW wal_level; he result should be logical.
Azure Database for PostgreSQL - Flexible Server
Setup your database instance according to the official guide (Prerequisites for logical replication and logical decoding):
Go to server parameters page on the portal.
Set the server parameter wal_level to logical.
Update max_worker_processes parameter value to at least 16. Otherwise, you may run into issues like WARNING: out of background worker slots.
Save the changes and restart the server to apply the changes.
Grant the user with which Aidbox connects to the database replication permissions: SQLCopy.
ALTER ROLE <username> WITH REPLICATION;
Topic Definition configuration
Topic Definition is a zen schema which is tagged with fhir.topic-based-subscription/topic-definitionand corresponds to FHIR SubscriptionTopic resource.
Topic Definition contains:
url - should be absolute URI (globally unique)
resourceTrigger - an array of triggers, each specifies what resource types and FhirPath (https://hl7.org/fhir/fhirpath.html) criteria should trigger Aidbox to send changes to subscribers of the topic.
canFilterBy - list of properties by which Subscriptions on the SubscriptionTopic can be filtered, contains FHIR SubscriptionTopic.canFilterBy and also FhirPath:
heartbeat-rate: Heartbeat rate requested for current storage in sec. Default is 10sec.
timeout: Timeout value for batching in seconds
maxCount: Max number of resources to hold on in Aidbox before timeout (batch sending)
maxContent: empty, id-only or full-resource. Defines the permission of how much the event messages of Subscription can have information about the triggered resource. It isn't allowed to create Subscriptions with Subscription.content value that requires more content thamaxContent value of its own topic. Default is empty.
include-transaction: For tests purposes. Turns on includeTransaction replication slot option. Default is false
fhirpath-ignore-on-error: Ignore FhirPath execution errors on persisting queues.
status-interval: For test purpose mainly. Specifies the number of time between status packets sent back to the server. This allows for easier monitoring of the progress from server. A value of zero disables the periodic status updates completely, although an update will still be sent when requested by the server, to avoid timeout disconnect. The default value is 10 seconds.
In addition to the above, there are several properties for each specific storage-type.
For PostgreSQL:
table-name: Name for table to store events
senders-number: Number of services that deliver subsctiptions.
For Google Pub/Sub:
topic-name: Topic name in GCP.
bytes-threshold: Max bytes to hold until publishing. Default is 10000.
project-name: Project name in GCP.
enable-message-ordering: If true, enables message ordering. Uses focusResourceType and focusId as messageOrderingKey. Applied only when maxContent is 'id-only' or 'full-resource'.
For Aidbox Workflow Engine Connector:
rules: Trigger rule for Task/Workflow. Either task or workflow below should be used to indicate which activity should be triggered.
filterBy: The list of filtering criteria for events on Topic (has the same properties as FHIR Subscription.filterBy: resourceType, filterParameter, comparator, modifier, value)
task: Indicate which Task should be triggered by definition field.
workflow: Indicate which Workflow should be triggered by definition field.
{ns aidbox-with-subscriptionsimport #{fhir.topic-based-subscription;; the profile required for FHIR 4.3.0 version: hl7-fhir-uv-subscriptions-backport-r4b} observation-topic {:zen/tags #{fhir.topic-based-subscription/topic-definition};; SubscriptionTopic url should be an absolute URI (globally unique):url"http://aidbox.app/SubscriptionTopic/observations":resourceTrigger [;; an SubscriptionTopic may consist ;; of any number of resources {:resource"Observation":fhirPathCriteria"%current.value.ofType(Quantity).value > 10"}]:canFilterBy [{:resource"Observation":filterParameter"value";; _fhirPath specifies how to calculate value for the filter:_fhirPath"%current.value.ofType(Quantity).value":modifier ["eq""gt""lt""ge""le"]} {:resource"Observation":filterParameter"value-increase";; both %current and %previous state of the ;; resource are available:_fhirPath"%current.value.ofType(Quantity).value > %previous.value.ofType(Quantity).value":modifier ["eq"]}]} postgres-observation-topic-storage {:zen/tags #{fhir.topic-based-subscription/topic-storage};; At the moment only PostgreSQL is available as a storage:storage-type fhir.topic-based-subscription/postgres;; The name of the table which will be created to store topic events::table-name"observation_topic";; Possible value for maxContent are: "empty" | "id-only" | "full-resource";; The actual Resource is only stored in queue when "full-resource" value ;; are specified. When deciding which payload type to request, ;; systems SHOULD consider both ease of processing and security of PHI. ;; To mitigate the risk of information leakage, systems SHOULD use the ;; minimum level of detail consistent with the use case. ;; In practice, id-only provides a good balance between security ;; and performance for many real-world scenarios.:maxContent"full-resource";; The period, in seconds, during which events from the replication slot ;; will be buffered before being written to storage::timeout10;; The maximum number of events the replication slot will buffer before;; writing to storage::maxCount100;; Interval in seconds periodic heartbeat record generation ;; in cdc_topic_heartbeat_table, to reclaim the WAL space::heartbeat-rate120;; The number of workers responsible for notification delivery. ;; min number 4 is advised. One worker can handle up to 1024 subsrciptions::senders-number4};; Service which binds storage with topic definition observation-topic-srv {:zen/tags #{aidbox/service}:engine fhir.topic-based-subscription/change-data-capture-service-engine:topic-definition observation-topic:topic-storage postgres-observation-topic-storage};; Entrypoint for the instance with corresponding service box {:zen/tags #{aidbox/system}:zen/desc"box for topic test":services {:observation-topic-srv observation-topic-srv}}}
Google Cloud Pub/Sub
{ns pub-sub-topicimport #{fhir.topic-based-subscription} observation-topic {:zen/tags #{fhir.topic-based-subscription/topic-definition};; SubscriptionTopic url should be an absolute URI (globally unique):url"http://aidbox.app/SubscriptionTopic/observations":resourceTrigger [;; an SubscriptionTopic may consist ;; of any number of resources {:resource"Observation":fhirPathCriteria"%current.value.ofType(Quantity).value > 10"}]:canFilterBy [{:resource"Observation":filterParameter"value";; _fhirPath specifies how to calculate value for the filter:_fhirPath"%current.value.ofType(Quantity).value":modifier ["eq""gt""lt""ge""le"]} {:resource"Observation":filterParameter"value-increase";; both %current and %previous state of the ;; resource are available:_fhirPath"%current.value.ofType(Quantity).value > %previous.value.ofType(Quantity).value":modifier ["eq"]}]} gcp-pubsub-observation-topic-storage {:zen/tags #{fhir.topic-based-subscription/topic-storage}:storage-type fhir.topic-based-subscription/gcp-pubsub:status-interval10:heartbeat-rate120:maxContent"full-resource":project-name"aidbox-cloud-demo":topic-name"aidbox-tbs-test":timeout1:maxCount100:bytes-threshold10000:enable-message-orderingfalse} observation-topic-srv {:zen/tags #{aidbox/service}:engine fhir.topic-based-subscription/change-data-capture-service-engine:topic-definition observation-topic:topic-storage gcp-pubsub-observation-topic-storage} box {:zen/tags #{aidbox/system}:zen/desc"box for topic test":services {:patient-topic-srv patient-topic-srv}}}