Kafka AidboxTopicDestination

To proceed with Kafka integration please configure AidboxSubscriptionTopic module.

Topic-based subscriptions using AidboxSubscriptionTopic

Behaviour Overview

  1. By default, Aidbox buffers notifications for a short period before sending them to Kafka, in accordance with the configuration of the Kafka AidboxTopicDestination resource.

  2. If Kafka is unavailable, Aidbox will buffer notifications according to the delivery.timeout.ms parameter.

    1. If the connection is restored, the buffered notifications will be sent.

    2. If a notification cannot be sent within the specified timeout, it will be lost.

  3. These buffers are in-memory, so if Aidbox is restarted, the buffered data will be lost.

  4. The status of successful or failed notifications can be monitored via Status Introspection.

Kafka in Private Environment

Full example see on Github

POST /fhir/AidboxTopicDestination
content-type: application/json
accept: application/json

{
  "meta": {
    "profile": [
      "http://fhir.aidbox.app/StructureDefinition/AidboxTopicDestinationKafka"
    ]
  },
  "kind": "kafka",
  "id": "kafka-destination",
  "topic": "http://example.org/FHIR/R5/SubscriptionTopic/QuestionnaireResponse-topic",
  "parameter": [
    {
      "name": "kafkaTopic",
      "valueString": "aidbox-forms"
    },
    {
      "name": "bootstrap.servers",
      "valueString": "kafka:29092"
    }
  ]
}

MSK Kafka in AWS with IAM

POST /fhir/AidboxTopicDestination
content-type: application/json
accept: application/json

{
  "meta": {
    "profile": [
      "http://fhir.aidbox.app/StructureDefinition/AidboxTopicDestinationKafka"
    ]
  },
  "kind": "kafka",
  "id": "kafka-destination",
  "topic": "http://example.org/FHIR/R5/SubscriptionTopic/QuestionnaireResponse-topic",
  "parameter": [
    {
      "name": "kafkaTopic",
      "valueString": "aidbox-forms"
    },
    {
      "name": "bootstrap.servers",
      "valueString": "b-1.checkmsktbd.9szqwn.c16.kafka.us-east-1.amazonaws.com:9098,b-3.checkmsktbd.9szqwn.c16.kafka.us-east-1.amazonaws.com:9098,b-2.checkmsktbd.9szqwn.c16.kafka.us-east-1.amazonaws.com:9098"
    },
    {
      "name": "security.protocol",
      "valueString": "SASL_SSL"
    },
    {
      "name": "sasl.mechanism",
      "valueString": "AWS_MSK_IAM"
    },
    {
      "name": "sasl.jaas.config",
      "valueString": "software.amazon.msk.auth.iam.IAMLoginModule required;"
    },
    {
      "name": "sasl.client.callback.handler.class",
      "valueString": "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
    }
  ]
}

All Available Parameters

Parameter nameValue typeDescription

kafkaTopic *

valueString

The Kafka topic where the data should be sent.

bootstrap.servers *

valueString

Comma-separated string. Specifies the Kafka broker to connect to. Only one broker can be listed.

compression.type

valueString

Specify the final compression type for a given topic. This configuration accepts the standard compression codecs ('gzip', 'snappy', 'lz4', 'zstd').

batch.size

valueInteger

This configuration controls the default batch size in bytes.

delivery.timeout.ms

valueInteger

A maximum time limit for reporting the success or failure of a record sent by a producer, covering delays before sending, waiting for broker acknowledgment, and handling retriable errors.

max.block.ms

valueInteger

The configuration controls how long the KafkaProducer's send()method will block.

max.request.size

valueInteger

The maximum size of a request in bytes.

request.timeout.ms

valueInteger

The maximum amount of time the client will wait for the response of a request.

ssl.keystore.key

valueString

Private key in the format specified by 'ssl.keystore.type'.

security.protocol

valueString

Protocol used to communicate with brokers.

sasl.mechanism

valueString

SASL mechanism used for client connections.

sasl.jaas.config

valueString

JAAS login context parameters for SASL connections in the format used by JAAS configuration files.

sasl.client.callback.handler.class

valueString

The fully qualified name of a SASL client callback handler class that implements the AuthenticateCallbackHandler interface.

* required parameter.

For additional details see Kafka Producer Configs Documentation

Status Introspection

GET /fhir/AidboxTopicDestination/<topic-destination-id>/$status
content-type: application/json
accept: application/json

Responce format:

PropertyTypeDescription

start-time

string

AidboxTopicDestination start time in UTC.

status

string

AidboxTopicDestination status is always active, which means that AidboxTopicDestination will try to send all received notifications.

in-process

number

Current number of events in the Kafka buffer being processed for delivery.

enqueued-events-count

number

Number of events pending in the queue for dispatch to the Kafka driver. This count remains 0 when the atLeastOneGuarantee is set to false.

successfully-delivered

number

Total number of events that have been successfully delivered.

failed-delivery

string

Total number of events that failed to be delivered. This count is always 0 when the atLeastOneGuarantee is true.

failed-delivery-attempt

Object

Number of delivery attempts that failed. When atLeastOneGuarantee is false, this matches the :failed-delivery count. When atLeastOneGuarantee is true, it represents the overall failed delivery attempts.

Last updated