Set up SubscriptionTopic
This page describes how to set up SubscriptionTopic for each Topic Queue Storage type.
SubscriptionTopic is configured by Aidbox configuration project by zen-lang.
In general, to set up the topic in Aidbox configuration project, you need the following steps:
- 1.Import
fhir.topic-based-subscription
namespace (andhl7-fhir-uv-subscriptions-backport-r4b
if you're using FHIR 4.3.0). - 2.Create Topic Definition which configures triggers and filters.
- 3.Specify Storage Type (e.g. PostgreSQL or Google Pub/Sub) and configuration values for it.
- 4.Link Topic Definition from step 2 and Storage Type from step 3 into one service.
- 5.Link this service from step 4 in the entry point (e.g. box).
SubscriptionTopic-based services rely on PostgreSQL Logical Replication. Aidbox plays the role of the
Subscriber
to receive and process relevant events from the database. A replication slot is created automatically for each service when it starts. Therefore, in general, the database should be configured to support logical replication, and the POSTGRES_USER
should have replication privileges.Otherwise, check that
wal_level
is set to logical
in postgresql.conf
file:wal_level = logical
To enable a database instance hosted with AWS RDS to work with the SubscriptionTopic services parameter
rds.logical_replication
should be set to 1
. One possible way to accomplish this is as follows:- 1.Navigate to RDS, this should take you to the RDS Dashboard.
- 2.Click Parameter Groups in the Resources panel on the dashboard.
- 3.Create parameter group
- 4.Click the
Edit parameters
- 5.Search for
rds.logical_replication
and set its value to1
. - 6.Navigate to the database instance, click Modify, and in the
DB parameter group
menu select the parameter group created in Step 3.


To check that the setting is applied run query
SHOW wal_level;
he result should be logical
.Setup your database instance according to the official guide (Prerequisites for logical replication and logical decoding):
- 1.Go to server parameters page on the portal.
- 2.Set the server parameter
wal_level
tological
. - 3.Update
max_worker_processes
parameter value to at least 16. Otherwise, you may run into issues likeWARNING: out of background worker slots
. - 4.Save the changes and restart the server to apply the changes.
- 5.Grant the user with which Aidbox connects to the database replication permissions: SQLCopy.ALTER ROLE <username> WITH REPLICATION;

Server Parameters
Topic Definition is a zen schema which is tagged with
fhir.topic-based-subscription/topic-definition
and corresponds to FHIR SubscriptionTopic resource.Topic Definition contains:
- url - should be absolute URI (globally unique)
- resourceTrigger - an array of triggers, each specifies what resource types and FhirPath (https://hl7.org/fhir/fhirpath.html) criteria should trigger Aidbox to send changes to subscribers of the topic.
- canFilterBy - list of properties by which Subscriptions on the SubscriptionTopic can be filtered, contains FHIR SubscriptionTopic.canFilterBy and also FhirPath:
description
resource
filterParameter
modifier
_fhirPath
Example:
...
patient-topic
{:zen/tags #{fhir.topic-based-subscription/topic-definition}
:url "<YOUR URL>"
:resourceTrigger [{:resource "Patient"
:fhirPathCriteria "%current.birthDate > '2023'}
{:resource "Organization"}
< ... other resources ... >]
:canFilterBy [{:resource "Patient"
:filterParameter "first-family-name"
:_fhirPath "%current.name[0].family"
:modifier ["eq"]}
{:resource "Patient"
:filterParameter "first-family-name-alternative"
:_fhirPath "name[0].family"
:modifier ["eq"]}
<... other filters ... >]}
...
Topic storage specifies where to store and how to process events queue.
It is zen schema tagged with
fhir.topic-based-subscription/topic-storage
, containing: - storage-type: Choose one of the following:
- PostgreSQL:
fhir.topic-based-subscription/postgres
- Google Pub/Sub:
fhir.topic-based-subscription/gcp-pubsub
- Aidbox Workflow Engine Connector:
fhir.topic-based-subscription/awf-connector
- heartbeat-rate: Heartbeat rate requested for current storage in sec. Default is 10sec.
- timeout: Timeout value for batching in seconds
- maxCount: Max number of resources to hold on in Aidbox before timeout (batch sending)
- maxContent:
empty
,id-only
orfull-resource
. Defines the permission of how much the event messages of Subscription can have information about the triggered resource. It isn't allowed to create Subscriptions withSubscription.content
value that requires more content thamaxContent
value of its own topic. Default isempty
. - include-transaction: For tests purposes. Turns on includeTransaction replication slot option. Default is false
- fhirpath-ignore-on-error: Ignore FhirPath execution errors on persisting queues.
- status-interval: For test purpose mainly. Specifies the number of time between status packets sent back to the server. This allows for easier monitoring of the progress from server. A value of zero disables the periodic status updates completely, although an update will still be sent when requested by the server, to avoid timeout disconnect. The default value is 10 seconds.
In addition to the above, there are several properties for each specific storage-type.
For PostgreSQL:
- table-name: Name for table to store events
- senders-number: Number of services that deliver subsctiptions.
For Google Pub/Sub:
- topic-name: Topic name in GCP.
- bytes-threshold: Max bytes to hold until publishing. Default is 10000.
- project-name: Project name in GCP.
- enable-message-ordering: If true, enables message ordering. Uses focusResourceType and focusId as messageOrderingKey. Applied only when maxContent is 'id-only' or 'full-resource'.
For Aidbox Workflow Engine Connector:
- rules: Trigger rule for Task/Workflow. Either
task
orworkflow
below should be used to indicate which activity should be triggered.- filterBy: The list of filtering criteria for events on Topic (has the same properties as FHIR Subscription.filterBy:
resourceType
,filterParameter
,comparator
,modifier
,value
) - task: Indicate which Task should be triggered by
definition
field. - workflow: Indicate which Workflow should be triggered by
definition
field.
Example:
...
pubsub-patient-storage
{:zen/tags #{fhir.topic-based-subscription/topic-storage}
:storage-type fhir.topic-based-subscription/gcp-pubsub
:status-interval 0.5
:maxContent "full-resource"
:heartbeat-rate 0.5
:project-name "local-project"
:topic-name "test-topic"
:enable-message-ordering true
:timeout 1
:bytes-threshold 1000000
:maxCount 10}
...
Examples of an Aidbox entry point for each storage type, configured with one topic:
1
{ns aidbox-with-subscriptions
2
import #{fhir.topic-based-subscription
3
;; the profile required for FHIR 4.3.0 version:
4
hl7-fhir-uv-subscriptions-backport-r4b}
5
6
observation-topic
7
{:zen/tags #{fhir.topic-based-subscription/topic-definition}
8
;; SubscriptionTopic url should be an absolute URI (globally unique)
9
:url "http://aidbox.app/SubscriptionTopic/observations"
10
:resourceTrigger [
11
;; an SubscriptionTopic may consist
12
;; of any number of resources
13
{:resource "Observation"
14
:fhirPathCriteria "%current.value.ofType(Quantity).value > 10"}]
15
:canFilterBy [{:resource "Observation"
16
:filterParameter "value"
17
;; _fhirPath specifies how to calculate value for the filter
18
:_fhirPath "%current.value.ofType(Quantity).value"
19
:modifier ["eq" "gt" "lt" "ge" "le"]}
20
21
{:resource "Observation"
22
:filterParameter "value-increase"
23
;; both %current and %previous state of the
24
;; resource are available
25
:_fhirPath "%current.value.ofType(Quantity).value > %previous.value.ofType(Quantity).value"
26
:modifier ["eq"]}]}
27
28
29
postgres-observation-topic-storage
30
{:zen/tags #{fhir.topic-based-subscription/topic-storage}
31
;; At the moment only PostgreSQL is available as a storage
32
:storage-type fhir.topic-based-subscription/postgres
33
;; The name of the table which will be created to store topic events:
34
:table-name "observation_topic"
35
;; Possible value for maxContent are: "empty" | "id-only" | "full-resource"
36
;; The actual Resource is only stored in queue when "full-resource" value
37
;; are specified. When deciding which payload type to request,
38
;; systems SHOULD consider both ease of processing and security of PHI.
39
;; To mitigate the risk of information leakage, systems SHOULD use the
40
;; minimum level of detail consistent with the use case.
41
;; In practice, id-only provides a good balance between security
42
;; and performance for many real-world scenarios.
43
:maxContent "full-resource"
44
;; The period, in seconds, during which events from the replication slot
45
;; will be buffered before being written to storage:
46
:timeout 10
47
;; The maximum number of events the replication slot will buffer before
48
;; writing to storage:
49
:maxCount 100
50
;; Interval in seconds periodic heartbeat record generation
51
;; in cdc_topic_heartbeat_table, to reclaim the WAL space:
52
:heartbeat-rate 120
53
;; The number of workers responsible for notification delivery.
54
;; min number 4 is advised. One worker can handle up to 1024 subsrciptions:
55
:senders-number 4}
56
57
58
;; Service which binds storage with topic definition
59
observation-topic-srv
60
{:zen/tags #{aidbox/service}
61
:engine fhir.topic-based-subscription/change-data-capture-service-engine
62
:topic-definition observation-topic
63
:topic-storage postgres-observation-topic-storage}
64
65
66
;; Entrypoint for the instance with corresponding service
67
box
68
{:zen/tags #{aidbox/system}
69
:zen/desc "box for topic test"
70
:services {:observation-topic-srv observation-topic-srv}}}
{ns pub-sub-topic
import #{fhir.topic-based-subscription}
observation-topic
{:zen/tags #{fhir.topic-based-subscription/topic-definition}
;; SubscriptionTopic url should be an absolute URI (globally unique)
:url "http://aidbox.app/SubscriptionTopic/observations"
:resourceTrigger [
;; an SubscriptionTopic may consist
;; of any number of resources
{:resource "Observation"
:fhirPathCriteria "%current.value.ofType(Quantity).value > 10"}]
:canFilterBy [{:resource "Observation"
:filterParameter "value"
;; _fhirPath specifies how to calculate value for the filter
:_fhirPath "%current.value.ofType(Quantity).value"
:modifier ["eq" "gt" "lt" "ge" "le"]}
{:resource "Observation"
:filterParameter "value-increase"
;; both %current and %previous state of the
;; resource are available
:_fhirPath "%current.value.ofType(Quantity).value > %previous.value.ofType(Quantity).value"
:modifier ["eq"]}]}
gcp-pubsub-observation-topic-storage
{:zen/tags #{fhir.topic-based-subscription/topic-storage}
:storage-type fhir.topic-based-subscription/gcp-pubsub
:status-interval 10
:heartbeat-rate 120
:maxContent "full-resource"
:project-name "aidbox-cloud-demo"
:topic-name "aidbox-tbs-test"
:timeout 1
:maxCount 100
:bytes-threshold 10000
:enable-message-ordering false}
observation-topic-srv
{:zen/tags #{aidbox/service}
:engine fhir.topic-based-subscription/change-data-capture-service-engine
:topic-definition observation-topic
:topic-storage gcp-pubsub-observation-topic-storage}
box
{:zen/tags #{aidbox/system}
:zen/desc "box for topic test"
:services {:patient-topic-srv patient-topic-srv}}}
{ns awf-connector-topic
import #{fhir.topic-based-subscription}
topic-def
{:zen/tags #{fhir.topic-based-subscription/topic-definition}
:url "http://aidbox.app/SubscriptionTopic/observations"
:resourceTrigger [{:resource "Patient"}
{:resource "Encounter"
:fhirPathCriteria "%current.status = 'completed'"}]
:canFilterBy [{:resource "Patient"
:filterParameter "Patient-deceased"
:_fhirPath "(%previous.deceased.exists().not() or %previoius.deceased = false) and (%current.deceased.exists() and %current.deceased = true)"
:modifier ["eq"]}
{:filterParameter "resource-type"
:_fhirPath "resourceType"
:modifier ["eq"]}]}
awf-connector
{:zen/tags #{fhir.topic-based-subscription/topic-storage}
:storage-type fhir.topic-based-subscription/awf-connector
:maxContent "full-resource"
:rules [{:task {:definition encounter-completed-task}
:filterBy [{:filterParameter "resource-type"
:modifier "eq"
:value "Encounter"}]}
{:task {:definition patient-change-task}
:filterBy [{:filterParameter "resource-type"
:modifier "eq"
:value "Patient"}]}
{:task {:definition patient-deceased-task}
:filterBy [{:resourceType "Patient"
:filterParameter "Patient-deceased"
:modifier "eq"
:value "true"}]}]}
patient-topic-srv
{:zen/tags #{aidbox/service}
:engine fhir.topic-based-subscription/change-data-capture-service-engine
:topic-definition topic-def
:topic-storage awf-connector}
box
{:zen/tags #{aidbox/system}
:zen/desc "box for topic test"
:services {:patient-topic-srv patient-topic-srv}}}
- Run Aidbox and check logs for the following output
15:17:41 cro-1 :fhir.topic-based-subscription.topic/starting-replication-service {:service aidbox-with-subscriptions/observation-topic-srv, :slot-name "tbs_aidbox_with_subscriptions__observation_topic_srv"}
15:17:41 cro-2 :fhir.topic-based-subscription.topic/starting-pg-persist-thread {:service aidbox-with-subscriptions/observation-topic-srv, :slot-name "tbs_aidbox_with_subscriptions__observation_topic_srv"}
15:17:41 40c7a :fhir.topic-based-subscription.topic/service-started {:service aidbox-with-subscriptions/observation-topic-srv}
15:17:41 cro-3 :fhir.topic-based-subscription.topic/starting-delivery-thread {:service aidbox-with-subscriptions/observation-topic-srv, :slot-name "tbs_aidbox_with_subscriptions__observation_topic_srv"}
15:17:41 cro-4 :fhir.topic-based-subscription.subscription/start-notification-sender {:service aidbox-with-subscriptions/observation-topic-srv, :idx 0}
15:17:41 cro-5 :fhir.topic-based-subscription.subscription/start-notification-sender {:service aidbox-with-subscriptions/observation-topic-srv, :idx 1}
- Discover SubscriptionTopic resources in Aidbox using FHIR API
Request
Response
GET /fhir/SubscriptionTopic
content-type: application/json
accept: application/json
{
"resourceType": "Bundle",
"type": "searchset",
"meta": {
"versionId": "0"
},
"total": 1,
"link": [
{
"relation": "first",
"url": "http://localhost:8765/fhir/SubscriptionTopic?page=1"
},
{
"relation": "self",
"url": "http://localhost:8765/fhir/SubscriptionTopic?page=1"
}
],
"entry": [
{
"resource": {
"id": "cf153a1fde850de90215a6cd0f0abcf5",
"url": "http://aidbox.app/SubscriptionTopic/observations",
"meta": {
"slot_name": "tbs_aidbox_with_subscriptions__observation_topic_srv",
"queue_table_name": "observation_topic",
"subscription_status_table_name": "observation_topic_subs_status",
"lastUpdated": "2023-08-31T15:17:40.355938Z",
"versionId": "0",
"extension": [
{
"url": "ex:createdAt",
"valueInstant": "2023-08-31T15:17:40.355938Z"
}
]
},
"status": "active",
"canFilterBy": [
{
"modifier": [
"eq",
"gt",
"lt",
"ge",
"le"
],
"resource": "Observation",
"filterParameter": "value"
},
{
"modifier": [
"eq"
],
"resource": "Observation",
"filterParameter": "value-increase"
}
],
"resourceType": "SubscriptionTopic",
"resourceTrigger": [
{
"resource": "Observation",
"fhirPathCriteria": "%current.value.ofType(Quantity).value > 10"
}
]
},
"search": {
"mode": "match"
},
"fullUrl": "http://localhost:8765/SubscriptionTopic/cf153a1fde850de90215a6cd0f0abcf5",
"link": [
{
"relation": "self",
"url": "http://localhost:8765/SubscriptionTopic/cf153a1fde850de90215a6cd0f0abcf5"
}
]
}
]
}
- Open Aidbox UI -> Subscription Topics to check the topic status

Last modified 26d ago