Skip to main content
Version: 8.0.1

FHIR server synchronisation

This guide describes how to create a synchronisation pipeline that periodically retrieves data from a FHIR bulk export endpoint and maintains a set of Delta tables for analysis.

Overview

The FHIR (Fast Healthcare Interoperability Resources) Bulk Data Access specification defines a standard method for extracting large volumes of healthcare data from FHIR servers. When combined with Pathling's Delta Lake support, this approach enables data pipelines with the following characteristics:

  • Initial full data exports establish baseline datasets
  • Incremental exports use the since parameter to retrieve only changed records
  • Data merging occurs automatically through Pathling's merge functionality
  • Error handling and retry mechanisms are built into the bulk client
  • Delta Lake provides transactional consistency for data operations

Authentication

The synchronisation process uses SMART Backend Services with the client-confidential-asymmetric profile for authentication. This authentication method was designed for automated backend services that require unattended access to FHIR resources. Common applications include:

  • Analytics platforms importing bulk healthcare data
  • Data integration services synchronising patient records
  • Public health surveillance systems
  • Healthcare utilisation tracking systems

This authentication approach uses asymmetric cryptography (public and private key pairs) instead of shared secrets, which reduces security risks in automated environments that handle sensitive healthcare information.

Synchronisation script

The following Python script demonstrates the synchronisation process using Pathling's bulk export and Delta merge functionality:

#!/usr/bin/env python3
"""
FHIR Bulk Export to Delta Lake Synchronisation Script

This script performs periodic synchronisation of FHIR data from a bulk export
endpoint to Delta tables, supporting both initial loads and incremental updates.
"""

import json
import logging
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, List, Dict, Any

from pathling import PathlingContext
from pathling.datasink import SaveMode

# Configure logging.
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


class SyncState:
"""Tracks the state of synchronisation for incremental updates."""

def __init__(self, state_file: Path):
self.state_file = state_file
self.last_sync_time: Optional[datetime] = None
self.load()

def load(self):
"""Load sync state from JSON file."""
if self.state_file.exists():
with open(self.state_file, 'r') as f:
data = json.load(f)
if data.get('last_sync_time'):
self.last_sync_time = datetime.fromisoformat(
data['last_sync_time']
)

def save(self):
"""Save sync state to JSON file."""
self.state_file.parent.mkdir(parents=True, exist_ok=True)
with open(self.state_file, 'w') as f:
json.dump({
'last_sync_time': self.last_sync_time.isoformat()
if self.last_sync_time else None
}, f, indent=2)


def sync_fhir_to_delta(
fhir_endpoint: str,
delta_path: str,
state_file: Path,
resource_types: Optional[List[str]] = None,
group_id: Optional[str] = None,
auth_config: Optional[Dict[str, Any]] = None,
full_refresh: bool = False
):
"""
Synchronise FHIR data from bulk export endpoint to Delta tables.

Args:
fhir_endpoint: FHIR server bulk export endpoint URL
delta_path: Base path for Delta tables
state_file: Path to store sync state
resource_types: List of resource types to sync
group_id: Optional group ID for group-level export
auth_config: Authentication configuration for FHIR server
full_refresh: Force a full export instead of incremental
"""
start_time = datetime.now(timezone.utc)

# Load sync state.
state = SyncState(state_file)

# Initialise Pathling context with Delta support.
pc = PathlingContext.create(enable_delta=True)

logger.info(f"Starting sync from {fhir_endpoint}")
if state.last_sync_time and not full_refresh:
logger.info(f"Incremental sync since {state.last_sync_time}")

# Determine export parameters.
since = None if full_refresh else state.last_sync_time

# Execute bulk export to retrieve data from FHIR server.
# Retry logic is handled internally by the bulk client.
data_source = pc.read.bulk(
fhir_endpoint_url=fhir_endpoint,
group_id=group_id,
types=resource_types,
since=since,
auth_config=auth_config,
timeout=3600 # 1 hour timeout.
)

# Save data to Delta tables using merge operation.
# The merge process handles both initial table creation and updates to existing data.
data_source.write.delta(delta_path, save_mode=SaveMode.MERGE)

# Update sync state.
state.last_sync_time = start_time
state.save()

duration = (datetime.now(timezone.utc) - start_time).total_seconds()
logger.info(f"Sync completed successfully in {duration:.2f} seconds")


if __name__ == "__main__":
import argparse

parser = argparse.ArgumentParser(
description="Sync FHIR data from bulk export to Delta tables"
)
parser.add_argument(
"--fhir-endpoint",
required=True,
help="FHIR server bulk export endpoint URL"
)
parser.add_argument(
"--delta-path",
required=True,
help="Base path for Delta tables"
)
parser.add_argument(
"--state-file",
default="/var/lib/fhir-sync/state.json",
help="Path to sync state file"
)
parser.add_argument(
"--resource-types",
nargs="+",
default=["Patient", "Condition", "Observation", "Encounter"],
help="Resource types to sync"
)
parser.add_argument(
"--group-id",
help="Group ID for group-level export"
)
parser.add_argument(
"--full-refresh",
action="store_true",
help="Force a full export instead of incremental"
)
parser.add_argument(
"--auth-client-id",
help="OAuth2 client ID for authentication"
)
parser.add_argument(
"--auth-private-key",
help="Path to private key file for authentication"
)

args = parser.parse_args()

# Build authentication configuration if provided.
auth_config = None
if args.auth_client_id:
auth_config = {
"enabled": True,
"client_id": args.auth_client_id,
"use_smart": True,
"scope": "system/*.read"
}

if args.auth_private_key:
with open(args.auth_private_key, 'r') as f:
auth_config["private_key_jwk"] = f.read()

# Run synchronisation.
sync_fhir_to_delta(
fhir_endpoint=args.fhir_endpoint,
delta_path=args.delta_path,
state_file=Path(args.state_file),
resource_types=args.resource_types,
group_id=args.group_id,
auth_config=auth_config,
full_refresh=args.full_refresh
)

Scheduling periodic synchronisation

Using cron

Periodic synchronisation can be scheduled using cron. The following shell script provides a basic wrapper:

#!/bin/bash
# /usr/local/bin/fhir-sync.sh

# Set up environment.
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk
export SPARK_HOME=/opt/spark

# Run the sync script.
/usr/bin/python3 /opt/fhir-sync/sync.py \
--fhir-endpoint "https://your-fhir-server.com/fhir" \
--delta-path "/data/delta/fhir" \
--state-file "/var/lib/fhir-sync/state.json" \
--resource-types Patient Condition Observation Encounter \
--auth-client-id "your-client-id" \
--auth-private-key "/etc/fhir-sync/private-key.jwk" \
>> /var/log/fhir-sync/sync.log 2>&1

Then add to crontab to run every hour:

# Run FHIR sync every hour at 15 minutes past.
15 * * * * /usr/local/bin/fhir-sync.sh

Using Python scheduler

More complex scheduling requirements can be addressed using Python's schedule library:

import schedule
import time
from datetime import datetime
from pathlib import Path


def run_sync(full_refresh=False):
"""Execute the sync process."""
logger.info(f"Starting scheduled sync at {datetime.now()}")

sync_fhir_to_delta(
fhir_endpoint="https://your-fhir-server.com/fhir",
delta_path="/data/delta/fhir",
state_file=Path("/var/lib/fhir-sync/state.json"),
resource_types=["Patient", "Condition", "Observation"],
full_refresh=full_refresh
)


# Schedule different sync patterns.
schedule.every().hour.at(":15").do(run_sync) # Hourly incremental.
schedule.every().sunday.at("03:00").do(
lambda: run_sync(full_refresh=True)
) # Weekly full refresh.

while True:
schedule.run_pending()
time.sleep(60)

Using Apache Airflow

Enterprise environments may implement scheduling through Apache Airflow using a DAG (Directed Acyclic Graph):

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
from datetime import datetime, timedelta

default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2025, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5)
}

dag = DAG(
'fhir_bulk_sync',
default_args=default_args,
description='Sync FHIR data to Delta tables',
schedule_interval='0 * * * *', # Hourly.
catchup=False
)


def sync_task(**context):
"""Airflow task to run sync."""
from pathlib import Path

sync_fhir_to_delta(
fhir_endpoint="{{ var.value.fhir_endpoint }}",
delta_path="{{ var.value.delta_path }}",
state_file=Path("{{ var.value.state_file }}"),
resource_types=["Patient", "Condition", "Observation"]
)


sync = PythonOperator(
task_id='sync_fhir_data',
python_callable=sync_task,
dag=dag
)

notify = EmailOperator(
task_id='send_notification',
to=['data-team@example.com'],
subject='FHIR Sync Completed',
html_content='Sync completed at {{ ds }}',
trigger_rule='all_done',
dag=dag
)

sync >> notify

Monitoring and health checks

Health check endpoints can be implemented for system monitoring:

from flask import Flask, jsonify

app = Flask(__name__)


@app.route('/health')
def health_check():
"""Health check endpoint for monitoring systems."""
try:
state = SyncState(Path("/var/lib/fhir-sync/state.json"))

# Check if sync is running regularly.
if state.last_sync_time:
time_since_sync = datetime.now(timezone.utc) - state.last_sync_time
is_healthy = time_since_sync.total_seconds() < 7200 # 2 hours.
else:
is_healthy = False

return jsonify({
'status': 'healthy' if is_healthy else 'unhealthy',
'last_sync': state.last_sync_time.isoformat() if state.last_sync_time else None
}), 200 if is_healthy else 503

except Exception as e:
return jsonify({
'status': 'error',
'message': str(e)
}), 503

Production considerations

Security considerations

Credentials should be stored securely using environment variables or dedicated secret management systems:

import os
from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential


def get_auth_config():
"""Retrieve authentication config from secure storage."""
# Option 1: Environment variables.
if os.getenv("FHIR_CLIENT_ID"):
return {
"enabled": True,
"client_id": os.getenv("FHIR_CLIENT_ID"),
"private_key_jwk": os.getenv("FHIR_PRIVATE_KEY"),
"use_smart": True,
"scope": "system/*.read"
}

# Option 2: Azure Key Vault.
credential = DefaultAzureCredential()
client = SecretClient(
vault_url="https://your-vault.vault.azure.net/",
credential=credential
)

return {
"enabled": True,
"client_id": client.get_secret("fhir-client-id").value,
"private_key_jwk": client.get_secret("fhir-private-key").value,
"use_smart": True,
"scope": "system/*.read"
}

Large dataset configuration

Large datasets may require parameter adjustments to the bulk export process:

# Timeout and concurrency parameters can be increased for large datasets.
data_source = pc.read.bulk(
fhir_endpoint_url=fhir_endpoint,
types=resource_types,
since=since,
auth_config=auth_config,
timeout=7200, # 2 hour timeout for large datasets.
max_concurrent_downloads=10
)

Summary

This guide describes the implementation of a synchronisation pipeline between FHIR bulk export endpoints and Delta tables using Pathling's built-in functionality. The approach provides several key characteristics:

  • Incremental processing: The since parameter enables efficient synchronisation by retrieving only changed data
  • Data merging: Pathling's SaveMode.MERGE automatically handles conflicts and updates
  • Error handling: Built-in retry mechanisms address transient network and server issues
  • Code efficiency: The core synchronisation logic requires minimal custom implementation
  • Operational features: Scheduling, monitoring, and security components support production deployment

The combination of Pathling's bulk export capabilities and Delta Lake's merge functionality enables maintenance of current analytical datasets with limited custom development effort while providing consistent synchronisation with FHIR data sources.