Set up SubscriptionTopic
While FHIR topic-based subscriptions are functional, they will no longer receive active development or new features. For enhanced capabilities and ongoing support, please use Aidbox topic-based subscriptions. This newer implementation offers improved performance, flexibility, and will continue to be developed to meet future needs.
This page describes how to set up SubscriptionTopic for each Topic Queue Storage type.
SubscriptionTopic is configured by Aidbox configuration project by zen-lang.
Since the 2405 release, using Aidbox in FHIR schema validation engine is recommended, which is incompatible with zen or Entity/Attribute options. Setup Aidbox with FHIR Schema validation engine
In general, to set up the topic in Aidbox configuration project, you need the following steps:
Import
fhir.topic-based-subscription
namespace (andhl7-fhir-uv-subscriptions-backport-r4b
if you're using FHIR 4.3.0).Create Topic Definition which configures triggers and filters.
Specify Storage Type (e.g. PostgreSQL or Google Pub/Sub) and configuration values for it.
Link Topic Definition from step 2 and Storage Type from step 3 into one service.
Link this service from step 4 in the entry point (e.g. box).
Database Configuration
SubscriptionTopic-based services rely on PostgreSQL Logical Replication. Aidbox plays the role of the Subscriber
to receive and process relevant events from the database. A replication slot is created automatically for each service when it starts. Therefore, in general, the database should be configured to support logical replication, and the POSTGRES_USER
should have replication privileges.
Self Hosted Database
If you use aidboxdb-image then it's already configured to work properly with SubscriptionTopic.
Otherwise, check that wal_level
is set to logical
in postgresql.conf
file:
wal_level = logical
Check for other relevant settings in PostgreSQL documentation.
Cloud Databases
AWS RDS PostgreSQL
To enable a database instance hosted with AWS RDS to work with the SubscriptionTopic services parameter rds.logical_replication
should be set to 1
. One possible way to accomplish this is as follows:
Navigate to RDS, this should take you to the RDS Dashboard.
Click Parameter Groups in the Resources panel on the dashboard.
Create parameter group
Click the
Edit parameters
Search for
rds.logical_replication
and set its value to1
.Navigate to the database instance, click Modify, and in the
DB parameter group
menu select the parameter group created in Step 3.


To check that the setting is applied run query SHOW wal_level;
he result should be logical
.
Azure Database for PostgreSQL - Flexible Server
Setup your database instance according to the official guide (Prerequisites for logical replication and logical decoding):
Go to server parameters page on the portal.
Set the server parameter
wal_level
tological
.Update
max_worker_processes
parameter value to at least 16. Otherwise, you may run into issues likeWARNING: out of background worker slots
.Save the changes and restart the server to apply the changes.
Grant the user with which Aidbox connects to the database replication permissions: SQLCopy.
ALTER ROLE <username> WITH REPLICATION;

\
Topic Definition configuration
Topic Definition is a zen schema which is tagged with fhir.topic-based-subscription/topic-definition
and corresponds to FHIR SubscriptionTopic resource.
Topic Definition contains:
url - should be absolute URI (globally unique)
resourceTrigger - an array of triggers, each specifies what resource types and FhirPath (https://hl7.org/fhir/fhirpath.html) criteria should trigger Aidbox to send changes to subscribers of the topic.
canFilterBy - list of properties by which Subscriptions on the SubscriptionTopic can be filtered, contains FHIR SubscriptionTopic.canFilterBy and also FhirPath:
description
resource
filterParameter
modifier
_fhirPath
Example:
...
patient-topic
{:zen/tags #{fhir.topic-based-subscription/topic-definition}
:url "<YOUR URL>"
:resourceTrigger [{:resource "Patient"
:fhirPathCriteria "%current.birthDate > '2023'"}
{:resource "Organization"}
< ... other resources ... >]
:canFilterBy [{:resource "Patient"
:filterParameter "first-family-name"
:_fhirPath "%current.name[0].family"
:modifier ["eq"]}
{:resource "Patient"
:filterParameter "first-family-name-alternative"
:_fhirPath "name[0].family"
:modifier ["eq"]}
<... other filters ... >]}
...
Storage Type configuration
Topic storage specifies where to store and how to process events queue.
It is zen schema tagged with fhir.topic-based-subscription/topic-storage
, containing:
storage-type: Choose one of the following:
PostgreSQL:
fhir.topic-based-subscription/postgres
Google Pub/Sub:
fhir.topic-based-subscription/gcp-pubsub
Aidbox Workflow Engine Connector:
fhir.topic-based-subscription/awf-connector
heartbeat-rate: Heartbeat rate requested for current storage in sec. Default is 10sec.
timeout: Timeout value for batching in seconds
maxCount: Max number of resources to hold on in Aidbox before timeout (batch sending)
maxContent:
empty
,id-only
orfull-resource
. Defines the permission of how much the event messages of Subscription can have information about the triggered resource. It isn't allowed to create Subscriptions withSubscription.content
value that requires more content thamaxContent
value of its own topic. Default isempty
.include-transaction: For tests purposes. Turns on includeTransaction replication slot option. Default is false
fhirpath-ignore-on-error: Ignore FhirPath execution errors on persisting queues.
status-interval: For test purpose mainly. Specifies the number of time between status packets sent back to the server. This allows for easier monitoring of the progress from server. A value of zero disables the periodic status updates completely, although an update will still be sent when requested by the server, to avoid timeout disconnect. The default value is 10 seconds.
In addition to the above, there are several properties for each specific storage-type.
For PostgreSQL:
table-name: Name for table to store events
senders-number: Number of services that deliver subsctiptions.
For Google Pub/Sub:
topic-name: Topic name in GCP.
bytes-threshold: Max bytes to hold until publishing. Default is 10000.
project-name: Project name in GCP.
enable-message-ordering: If true, enables message ordering. Uses focusResourceType and focusId as messageOrderingKey. Applied only when maxContent is 'id-only' or 'full-resource'.
For Aidbox Workflow Engine Connector:
rules: Trigger rule for Task/Workflow. Either
task
orworkflow
below should be used to indicate which activity should be triggered.filterBy: The list of filtering criteria for events on Topic (has the same properties as FHIR Subscription.filterBy:
resourceType
,filterParameter
,comparator
,modifier
,value
)task: Indicate which Task should be triggered by
definition
field.workflow: Indicate which Workflow should be triggered by
definition
field.
Example:
...
pubsub-patient-storage
{:zen/tags #{fhir.topic-based-subscription/topic-storage}
:storage-type fhir.topic-based-subscription/gcp-pubsub
:status-interval 0.5
:maxContent "full-resource"
:heartbeat-rate 0.5
:project-name "local-project"
:topic-name "test-topic"
:enable-message-ordering true
:timeout 1
:bytes-threshold 1000000
:maxCount 10}
...
Configuration Examples
Examples of an Aidbox entry point for each storage type, configured with one topic:
#aidbox-workflow-engine-connector
PostgreSQL Queue Storage
{ns aidbox-with-subscriptions
import #{fhir.topic-based-subscription
;; the profile required for FHIR 4.3.0 version:
hl7-fhir-uv-subscriptions-backport-r4b}
observation-topic
{:zen/tags #{fhir.topic-based-subscription/topic-definition}
;; SubscriptionTopic url should be an absolute URI (globally unique)
:url "http://aidbox.app/SubscriptionTopic/observations"
:resourceTrigger [
;; an SubscriptionTopic may consist
;; of any number of resources
{:resource "Observation"
:fhirPathCriteria "%current.value.ofType(Quantity).value > 10"}]
:canFilterBy [{:resource "Observation"
:filterParameter "value"
;; _fhirPath specifies how to calculate value for the filter
:_fhirPath "%current.value.ofType(Quantity).value"
:modifier ["eq" "gt" "lt" "ge" "le"]}
{:resource "Observation"
:filterParameter "value-increase"
;; both %current and %previous state of the
;; resource are available
:_fhirPath "%current.value.ofType(Quantity).value > %previous.value.ofType(Quantity).value"
:modifier ["eq"]}]}
postgres-observation-topic-storage
{:zen/tags #{fhir.topic-based-subscription/topic-storage}
;; At the moment only PostgreSQL is available as a storage
:storage-type fhir.topic-based-subscription/postgres
;; The name of the table which will be created to store topic events:
:table-name "observation_topic"
;; Possible value for maxContent are: "empty" | "id-only" | "full-resource"
;; The actual Resource is only stored in queue when "full-resource" value
;; are specified. When deciding which payload type to request,
;; systems SHOULD consider both ease of processing and security of PHI.
;; To mitigate the risk of information leakage, systems SHOULD use the
;; minimum level of detail consistent with the use case.
;; In practice, id-only provides a good balance between security
;; and performance for many real-world scenarios.
:maxContent "full-resource"
;; The period, in seconds, during which events from the replication slot
;; will be buffered before being written to storage:
:timeout 10
;; The maximum number of events the replication slot will buffer before
;; writing to storage:
:maxCount 100
;; Interval in seconds periodic heartbeat record generation
;; in cdc_topic_heartbeat_table, to reclaim the WAL space:
:heartbeat-rate 120
;; The number of workers responsible for notification delivery.
;; min number 4 is advised. One worker can handle up to 1024 subsrciptions:
:senders-number 4}
;; Service which binds storage with topic definition
observation-topic-srv
{:zen/tags #{aidbox/service}
:engine fhir.topic-based-subscription/change-data-capture-service-engine
:topic-definition observation-topic
:topic-storage postgres-observation-topic-storage}
;; Entrypoint for the instance with corresponding service
box
{:zen/tags #{aidbox/system}
:zen/desc "box for topic test"
:services {:observation-topic-srv observation-topic-srv}}}
Google Cloud Pub/Sub
{ns pub-sub-topic
import #{fhir.topic-based-subscription}
observation-topic
{:zen/tags #{fhir.topic-based-subscription/topic-definition}
;; SubscriptionTopic url should be an absolute URI (globally unique)
:url "http://aidbox.app/SubscriptionTopic/observations"
:resourceTrigger [
;; an SubscriptionTopic may consist
;; of any number of resources
{:resource "Observation"
:fhirPathCriteria "%current.value.ofType(Quantity).value > 10"}]
:canFilterBy [{:resource "Observation"
:filterParameter "value"
;; _fhirPath specifies how to calculate value for the filter
:_fhirPath "%current.value.ofType(Quantity).value"
:modifier ["eq" "gt" "lt" "ge" "le"]}
{:resource "Observation"
:filterParameter "value-increase"
;; both %current and %previous state of the
;; resource are available
:_fhirPath "%current.value.ofType(Quantity).value > %previous.value.ofType(Quantity).value"
:modifier ["eq"]}]}
gcp-pubsub-observation-topic-storage
{:zen/tags #{fhir.topic-based-subscription/topic-storage}
:storage-type fhir.topic-based-subscription/gcp-pubsub
:status-interval 10
:heartbeat-rate 120
:maxContent "full-resource"
:project-name "aidbox-cloud-demo"
:topic-name "aidbox-tbs-test"
:timeout 1
:maxCount 100
:bytes-threshold 10000
:enable-message-ordering false}
observation-topic-srv
{:zen/tags #{aidbox/service}
:engine fhir.topic-based-subscription/change-data-capture-service-engine
:topic-definition observation-topic
:topic-storage gcp-pubsub-observation-topic-storage}
box
{:zen/tags #{aidbox/system}
:zen/desc "box for topic test"
:services {:patient-topic-srv patient-topic-srv}}}
Aidbox Workflow Engine Connector
{ns awf-connector-topic
import #{fhir.topic-based-subscription}
topic-def
{:zen/tags #{fhir.topic-based-subscription/topic-definition}
:url "http://aidbox.app/SubscriptionTopic/observations"
:resourceTrigger [{:resource "Patient"}
{:resource "Encounter"
:fhirPathCriteria "%current.status = 'completed'"}]
:canFilterBy [{:resource "Patient"
:filterParameter "Patient-deceased"
:_fhirPath "(%previous.deceased.exists().not() or %previoius.deceased = false) and (%current.deceased.exists() and %current.deceased = true)"
:modifier ["eq"]}
{:filterParameter "resource-type"
:_fhirPath "resourceType"
:modifier ["eq"]}]}
awf-connector
{:zen/tags #{fhir.topic-based-subscription/topic-storage}
:storage-type fhir.topic-based-subscription/awf-connector
:maxContent "full-resource"
:rules [{:task {:definition encounter-completed-task}
:filterBy [{:filterParameter "resource-type"
:modifier "eq"
:value "Encounter"}]}
{:task {:definition patient-change-task}
:filterBy [{:filterParameter "resource-type"
:modifier "eq"
:value "Patient"}]}
{:task {:definition patient-deceased-task}
:filterBy [{:resourceType "Patient"
:filterParameter "Patient-deceased"
:modifier "eq"
:value "true"}]}]}
patient-topic-srv
{:zen/tags #{aidbox/service}
:engine fhir.topic-based-subscription/change-data-capture-service-engine
:topic-definition topic-def
:topic-storage awf-connector}
box
{:zen/tags #{aidbox/system}
:zen/desc "box for topic test"
:services {:patient-topic-srv patient-topic-srv}}}
Verify the SubscriptionTopic is available
Run Aidbox and check logs for the following output
15:17:41 cro-1 :fhir.topic-based-subscription.topic/starting-replication-service {:service aidbox-with-subscriptions/observation-topic-srv, :slot-name "tbs_aidbox_with_subscriptions__observation_topic_srv"}
15:17:41 cro-2 :fhir.topic-based-subscription.topic/starting-pg-persist-thread {:service aidbox-with-subscriptions/observation-topic-srv, :slot-name "tbs_aidbox_with_subscriptions__observation_topic_srv"}
15:17:41 40c7a :fhir.topic-based-subscription.topic/service-started {:service aidbox-with-subscriptions/observation-topic-srv}
15:17:41 cro-3 :fhir.topic-based-subscription.topic/starting-delivery-thread {:service aidbox-with-subscriptions/observation-topic-srv, :slot-name "tbs_aidbox_with_subscriptions__observation_topic_srv"}
15:17:41 cro-4 :fhir.topic-based-subscription.subscription/start-notification-sender {:service aidbox-with-subscriptions/observation-topic-srv, :idx 0}
15:17:41 cro-5 :fhir.topic-based-subscription.subscription/start-notification-sender {:service aidbox-with-subscriptions/observation-topic-srv, :idx 1}
Discover SubscriptionTopic resources in Aidbox using FHIR API
GET /fhir/SubscriptionTopic
content-type: application/json
accept: application/json
Open Aidbox UI -> Subscription Topics to check the topic status

Last updated
Was this helpful?