Writes the data from a data source to a set of tables in the Spark catalog.

ds_write_tables(ds, schema = NULL, save_mode = SaveMode$OVERWRITE)

Arguments

ds

The DataSource object.

schema

The name of the schema to write the tables to.

save_mode

The save mode to use when writing the data - "overwrite" will overwrite any existing data, "merge" will merge the new data with the existing data based on resource ID.

Value

A list with element file_infos, containing a list of files created. Each file has fhir_resource_type and absolute_url.

Examples

# Create a temporary warehouse location, which will be used when we call ds_write_tables().
temp_dir_path <- tempfile()
dir.create(temp_dir_path)
sc <- sparklyr::spark_connect(master = "local[*]", config = list(
  "sparklyr.shell.conf" = c(
    paste0("spark.sql.warehouse.dir=", temp_dir_path),
    "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension",
    "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
  )
), version = pathling_spark_info()$spark_version)

pc <- pathling_connect(sc)
data_source <- pc %>% pathling_read_ndjson(pathling_examples("ndjson"))

# Write the data to a set of Spark tables in the 'default' database.
data_source %>% ds_write_tables("default", save_mode = SaveMode$MERGE)
#> $file_infos
#> $file_infos[[1]]
#> $file_infos[[1]]$fhir_resource_type
#> [1] "Condition"
#> 
#> $file_infos[[1]]$absolute_url
#> [1] "default.Condition"
#> 
#> 
#> $file_infos[[2]]
#> $file_infos[[2]]$fhir_resource_type
#> [1] "Patient"
#> 
#> $file_infos[[2]]$absolute_url
#> [1] "default.Patient"
#> 
#> 
#> 

pathling_disconnect(pc)
unlink(temp_dir_path, recursive = TRUE)