How Collective Health uses Delta Live Tables and Structured Streaming for Data Integration

Engineering


Collective Health is not an insurance company. We’re a technology company that’s fundamentally making health insurance work better for everyone— starting with the 155M+ Americans covered by their employer.

We’ve created a powerful, flexible infrastructure to simplify employer-led healthcare, and started a movement that prioritizes the human experience within health benefits. We’ve built smarter technology that’s easy to use, gives people an advocate in their health journey, and helps employers manage costs. It’s a platform that’s data-powered and human-empowered.

Our mission is to change the way people experience health benefits by making it effortless to understand, navigate, and pay for healthcare. By reducing the administrative lift of delivering health benefits, providing an intuitive member experience, and improving health outcomes, Collective Health guides employees toward healthier lives and companies toward healthier bottom lines.

One of the numerous offerings on our platform is Premier Partner Program™, built on the Databricks Lakehouse Platform. In this blog, we’ll cover how we’re making it easier to share data at scale with our partners.

Pipeline Overview

Here is our integration architecture at a high level, in this post we will focus on the Delta Live Tables portion.

Schema Definition

Before we begin with the ingest process we need to be explicit with our expectations from our partners. Since each partner might not have the capability of conforming to our internal schema we create a schema.


example_partner_schema = StructType(
  [
       StructField("Session ID", StringType(), True),
       StructField("Date of Session", DateType(), True),
       StructField("Date of Payment", DateType(), True),
       StructField("Amount", IntegerType(), True),
       StructField("Session Type", StringType(), True),
       StructField("Session Modality", StringType(), True),
       StructField("Member ID", StringType(), True),
       StructField("Member First Name", StringType(), True),
       StructField("Member Last Name", StringType(), True),
       StructField("Member Status", StringType(), True),
       StructField("Member Zip Code", StringType(), True),
       StructField("Member Date of Birth", DateType(), True),
       StructField("Member Gender", StringType(), True),
       StructField("Primary Member Employee ID", StringType(), True),
       StructField("sponsor", StringType(), True),
       StructField("file_name", StringType(), True),
       StructField("ingest_date", TimestampType(), True),
  ]
)

Ingest Files

One of the benefits of working with Apache Spark on Databricks is the ability to read multiple files from a cloud storage provider.

We can read files into a PySpark DataFrame and save it into a delta table. We also included the ingest date and file name when ingesting the files that way we can revisit records should issues arise in the future.


df = (
spark.read.csv(f"s3://{s3_bucket_name}/{root_file_path}/{partner}/{date}/*.csv", header=True, schema=example_schema)
.withColumn("file_name", F.input_file_name())
.withColumn("ingest_date", F.current_timestamp())
)

df.write.format("delta").mode("append").saveAsTable(f"{partner}_utilization_data_bronze")

This process worked well, but as business requirements changed so did the schema, new requirements arose, and columns that previously contained data now contained null values. We explored several solutions including implementing custom logic to handle invalid records. Out of the box Delta Live Tables provides us with validation tools, pipeline visualization, and a simple programmatic interface to do this. We also want to ingest files incrementally without having to go through each file we had previously ingested.

Structured Streaming

We don’t want to constantly listen for new files since we are expecting new data at a given schedule so our pipeline only needs to run at particular times. We can ingest new incoming files as they come in a similar fashion to an event driven model without having to keep our compute resources running all the time, instead we use the Structured Streaming approach, for this we will use Databricks’ Auto Loader. Auto Loader creates a checkpoint file that keeps track of previously ingested files and records so we don’t have to. We will also be exposing the _rescued_data column to capture records that did not get parsed based on the specified schema.


df = (
spark.readStream.format("cloudFiles")
		.option("cloudFiles.format", "csv")
		.option("header", "true")
		.option("sep", "|")
		.option("rescuedDataColumn", "_rescued_data")
		.schema(schema)
	).load(file_path)
	 .withColumn("file_name", F.input_file_name())
	 .withColumn("ingest_date", F.current_timestamp())
  )

Delta Live Tables

Setting up a Delta Live Table (DLT) pipeline is pretty straight forward. You would go ahead and setup your existing dataframe as a table


import dlt

@dlt.table
def partner_utilization_data():
 df = (
      (
           spark.readStream.format("cloudFiles")
          .option("cloudFiles.format", "csv")
          .option("header", "true")
          .option("sep", "|")
          .option("rescuedDataColumn", "_rescued_data")
          .schema(schema)
      )
      .load(file_path)
      .withColumn("file_name", F.input_file_name())
      .withColumn("ingest_date", F.current_timestamp())
  )
return df

But before we go ahead and create the table we can validate that our records do not contain null values, for this we will refer back to the schema and identify required columns. We will use the @dlt.expect_all decorator to keep track of records that fail this validation, this will not drop the records or fail the pipeline but we can use this to keep track of occurrences of null value in the non-nullable columns.


import dlt

default_schema_rules = {}

for struct in schema:
if not struct.nullable:
   rules[f"{struct.column}_not_null"] = f"{struct.column} is NOT NUll"

@dlt.view(name=f"{partner}_utilization_data_view")
@dlt.expect_all(default_schema_rules)
def partner_utilization_data_view():
 df = ( spark.read.csv(f"s3://{s3_bucket_name}/{root_file_path}/{partner}/{date}/*.csv", header=True, schema=example_schema)
.withColumn("file_name", F.input_file_name())
      .withColumn("ingest_date", F.current_timestamp())
)
return df

We do however want to drop records that have insufficient data or we are unable to validate. We will do this using the @dlt.expect_or_drop decorator on a view that will read from the bronze table. We will also need to load a Delta Table external to our pipeline to validate records against it.


@dlt.view
def person_view():
   return spark.sql(
       f"select lower(first_name) as first_name , lower(last_name) as last_name, date_of_birth, subscriber_id,person_id,sponsor_person_id,sponsor_name from ds_{env}.persons"
  )
@dlt.table(name=f"{partner}_utilization_bronze")
@dlt.expect_all_or_drop(dict(valid_members="(person_id IS NOT NULL)"))
def partner_utilization_bronze(partner):
  partner = dlt.readStream(f"{partner}_utilization_bronze")
  person = dlt.read("person_view")
  matched_by_name_dob = (
       partner.alias("partner")
      .join(
           person.alias("person"),
          (partner["member_first_name"] == person["first_name"])
           & (partner["member_last_name"] == person["last_name"])
           & (partner["member_date_of_birth"] == person["date_of_birth"])
           & (partner["sponsor"] == person["sponsor_name"])
      )
      .dropDuplicates(["session_id"])
  )
   return matched_by_name_dob

Quarantine records

To capture all the records that failed the validation checks in the previous step, we will simply reverse the validation logic (e.g. person_id is expected to be null below) and load invalid records in a separate table.


@dlt.table(name=f"{partner}_utilization_quarantine")
@dlt.expect_all_or_drop(dict(valid_members="(person_id IS NULL)"))
def partner_utilization_quarantine(partner):
  	partner = dlt.readStream(f"{partner}_utilization_bronze")
  	person = dlt.read("person_view")
  	matched_by_name_dob = (
       lyra.alias("lyra")
       .join(
           person.alias("person"),
           (partner["member_first_name"] == person["first_name"])
           & (partner["member_last_name"] == person["last_name"])
           & (partner["member_date_of_birth"] == person["date_of_birth"])
           & (partner["sponsor"] == person["sponsor_name"])
       )
       .dropDuplicates(["session_id"])
   )
   return matched_by_name_dob

Conclusion

In this post we covered a use case at Collective Health where our partners send us files at a given cadence. By leveraging the Databricks Lakehouse Platform and Delta Live Tables, we are able to ingest files incrementally and also visualize and validate the quality of incoming records.

Learn more about the Databricks Lakehouse Platform: https://www.databricks.com/solutions/audience/digital-native



Source link

Leave a Reply

Your email address will not be published. Required fields are marked *