Skip to main content

Kafka integration

Pathling supports Kafka as a streaming data source, and all the operations available within the library are able to execute continuously across a stream of data.

Here is an example of streaming a source of FHIR data, encoding it and then performing a terminology operation upon it:

from pathling import PathlingContext, Coding, subsumes

pc = PathlingContext.create()

# Subscribe to a stream of FHIR Bundles from a Kafka topic.
df = (
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "some-topic")
.selectExpr("CAST(value AS STRING) as json_bundle")

# Pull out the MedicationAdministration resources and put them into a dataset.
med_administrations = (
pc.encode_bundle(df, "MedicationAdministration")
"id", "status",
"EXPLODE_OUTER(medicationCodeableConcept.coding) as coding"

# Perform a subsumes operation on the medication coding to determine whether it is a type of
# anti-coagulant.
result =,
# 372862008 |Anticoagulant|
left_coding=Coding("", "372862008"),

For more information about Spark's Kafka integration, see the Structured Streaming + Kafka Integration Guide.