Processing data simultaneously from multiple streaming platforms using Delta Live Tables

Engineering


One of the major imperatives of organizations today is to enable decision making at the speed of business. Business teams and autonomous decisioning systems often require all the information they need to make decisions and respond quickly as soon as their source events happen – in real time or near real time. Such information, known as events in stream processing parlance, is relayed asynchronously from a source to a destination and is generally done through a message broker or message bus.

As organizations grow and teams branch out into other teams, the usage patterns, number and variety of message brokers increases. In merger and acquisition scenarios, companies often inherit new message brokers, which then need to be integrated with their existing data engineering processes. Managing a multitude of message brokers, their producers, consumers and data engineering pipelines coherently can be challenging owing to the number of technologies, cloud platforms and programming languages they need specialization in.

For example, let’s look at a Multi-Stream Use case. Imagine the following situation in a global conglomerate, where

  • They have two major divisions which use two separate message brokers – Amazon Kinesis Data Streams and Amazon Managed Service for Kafka (MSK).
  • They acquired another smaller product company which uses Azure Event Hubs internally.
  • They also have a major source of data coming in the form of a continual stream of csv or json files.

The diagram below depicts the multi stream use case described above:

Fig 1 – Multiple stream Use Case

Here are some key challenges in processing all this data in such a scenario:

  • How do they integrate all these varied data sources and their technologies together?
  • How do they build data engineering systems in such a way that they are easy to scale and maintain? Do they hire horses for courses e.g. specialized talent for each separate technology?
  • How do they ensure that in the process of solving this technology conundrum, they don’t forget what the end goal is – making sense of the data and enabling decisions faster?
  • How do they ensure that both batch and streaming needs can be served by the same data processing system?

Through this blog, we will demonstrate how these problems can be solved in an easy and clean way through Delta Live Tables (DLT). Delta Live Tables on the Databricks Lakehouse Platform makes it simple to create and manage high-quality batch and streaming data pipelines.

Multi-stream use case

To demonstrate a multi-stream processing scenario through DLT, let’s imagine a healthcare domain use case. At the center of the use case is a patient. The patient completes the following interactions with another entity hospital:

  • Gets vaccinated against medical conditions at a hospital
  • Gets tested for medical conditions at a hospital
  • Gets admitted into a hospital for certain medical conditions
  • Gets invoiced for treatment, through a claims process

These interactions can be represented in the following business use case diagram below.

Fig 2 - Business Use Case Diagram for a Patient’s interactions
Fig 2 – Business Use Case Diagram for a Patient’s interactions

The entities involved in these interactions can be represented in the form of the below entity relationship (ER) diagram. Going forward we will make ample use of the terms Facts and Dimensions from the Data warehousing Dimensional modeling lexicon. In the ER diagram, you can see that Patient and Hospital are dimension tables, while the data for Admissions, Vaccinations, Testing records and Claims is represented in the form of fact tables.

Fig 3 - Entity Relationship Diagram for a Patient’s interactions
Fig 3 – Entity Relationship Diagram for a Patient’s interactions

Let’s say that the fact data for Admissions, Vaccinations, Testing records and Claims arrive through different message brokers. For this blog, we have selected the popular ones in the industry i.e. Apache Kafka (Amazon MSK), AWS Kinesis Data Streams and Azure Event Hubs. Our fourth way to exhibit a stream processing scenario is by ingesting a sequence of files incrementally into tables (Delta lake in this case). We have selected Databricks Autoloader for this purpose. With this information, the patient’s interactions diagram can now be updated with the relevant data sources:

Fig 4 - Business Use Case Diagram with data sources
Fig 4 – Business Use Case Diagram with data sources

Below are the data sources summarized in tabular format:

Fact

Data source

Admissions

Amazon MSK (Apache Kafka)

Testing Records

Azure Event Hubs

Vaccinations

AWS Kinesis Data Streams

Claims

Incremental stream of files

In the next section, we will demonstrate how a single DLT pipeline can ingest data from the above streaming sources simultaneously and also perform ETL on it. We have used AWS infrastructure to set up the resources and pipeline for this blog, but the scenarios should be similarly configurable in other public clouds like Azure and GCP.

Multi-stream processing through DLT

The diagram below depicts how a common DLT pipeline would look like. While ingestion and transformation of streaming data coming from diverse sources sums up the scope for this blog, there are additional features of DLT that also deserve attention. For example, data quality management is a key feature which can help teams process only the data which passes certain ‘expectations’. Teams can then take corrective and preventive actions on the erroneous data. Other benefits of DLT are managed checkpointing and enhanced autoscaling. You can read about these and more features in this article: Delta Live Tables concepts.

Fig 5 - Data Processing Pipeline with DLT
Fig 5 – Data Processing Pipeline with DLT

Our multi-stream processing DLT pipeline (let’s call it dlt_multistream_consumer for simplicity) involves the following operations:

  1. Loading streaming data from the message brokers into a raw layer of delta lake tables.
  2. Parsing and applying schema over the raw data to generate meaningful facts (fact tables) from message payloads.
  3. Generating aggregated datasets for consumption by BI and Analytics teams. This is done by joining various fact and dimension tables, and slicing / dicing the generated records.

A DLT Pipeline can be configured to operate in the following modes:

  • Continuous: A live cluster processes the data as soon as it arrives in a continuous manner. This is recommended for streaming scenarios where latency is paramount.
  • Triggered: A cluster is spun up at a desired schedule and it processes the data available at that point in time. Once there is no more data pending for processing (for a certain time duration threshold), the cluster is shut down. This mode is recommended when cost savings and throughput take precedence over latency.

For this blog, we are using the continuous mode for two reasons. The first is to demonstrate a ‘true’ stream processing scenario where the data is captured and processed in real time. Secondly, the Amazon Kinesis Data Streams connector for Apache Spark, at the time of publishing this blog, doesn’t support a batch mode of ingestion (i.e. trigger once or available now) which is the case with a triggered mode. It’s coming soon, though.

Another point to highlight is that generally in a production setup, once the raw data is ingested, it goes through many stages of processing and enrichment. These stages cleanse, transform, aggregate the data for business use cases involving data analytics, historical and predictive analysis of data. We refer to the progressive improvement in the structure and quality of data as it flows through each layer of the architecture as the Medallion Lakehouse Architecture. While the Medallion architecture is a best practice for mainstream use cases, to simplify this blog, we will skip the data transformation piece and move to perform data analysis to support business use cases.

Now let’s look at how we can perform streaming data ingestion – all at once from a multitude of streaming systems using DLT into Delta Lake and analyze that data to generate meaningful insights.

Ingesting data from streams

In this section we show you how we are going to ingest data from multiple streaming platforms.

Ingesting data from Amazon MSK

Amazon Managed Streaming for Apache Kafka (Amazon MSK) is a fully managed service that enables you to build and run applications that use Apache Kafka to process streaming data. Amazon MSK provides the control-plane operations, such as those for creating, updating, and deleting clusters.

We have used an MSK cluster with the following configurations for this blog purpose:

  • Cluster type – Provisioned
  • Total number of brokers – 2
  • Apache Kafka version – 2.5.1
  • Broker type – kafka.m5.large
  • Number of availability zones – 2
  • EBS storage volume per broker – 300 GiB
  • Encryption between clients and brokers:
    – TLS encryption – Enabled
    – Plaintext – Enabled

For authentication in the AWS environment, we have attached an instance profile to the DLT cluster. It references an IAM role having appropriate read/write permissions as configured in the role definition.

The pipeline dlt_multistream_consumer ingests the Hospital Admission dataset from the MSK topic admission into the delta lake table named msk_admissions_tbl_raw. The kafka payload (binary represented as a string) for an Admissions fact looks like this:


{
    "admissions_id": "93c9b070-e636-4e66-b758-964b6d7ce9a1", 
    "hospital_id": 103658, 
    "patient_id": 141620, 
    "Timestamp": "2022-07-10T05:00:00.000+0000"
}

We used the following code in a python notebook for ingestion, and attached it to our DLT pipeline dlt_multistream_consumer.


import dlt
import pyspark.sql.functions as F
from pyspark.sql.types import *

@dlt.create_table(
  name='msk_admissions_tbl_raw',
  comment="BRONZE TABLE FOR ADMISSIONS DATA FROM AWS MSK",
  table_properties={
    "quality": "bronze"
  }
)


def msk_stream():
  input_schema = StructType(
    [ 
      StructField('admissions_id', StringType()),
      StructField('hospital_id', IntegerType()),
      StructField('patient_id', IntegerType()),
      StructField('Timestamp', TimestampType())
    ]
  )
  topic = "admission"
  kafka_bootstrap_servers_plaintext=dbutils.secrets.get(scope='myscope_aj',key='MSK_kafka_bootstrap_servers_plaintext')
  return (
   spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafka_bootstrap_servers_plaintext ) 
  .option("subscribe", topic )
  .option("failOnDataLoss", "false")
  .option("startingOffsets", "earliest" )
.load()
  .withColumn("admission_json", F.from_json(F.col("value").cast("string"), input_schema))
  .withColumn("msk_event_timestamp",F.col("timestamp"))
  .select("msk_event_timestamp","admission_json.*")
    )

The pipeline dlt_multistream_consumer, when initiated, creates the table msk_admissions_tbl_raw inside the desired database / schema and populates the table with the data read from the admission topic. Then it continues to ingest additional data as and when new data gets written into that topic.

In addition to the four fields ingested from the Kafka topic, we have added a column msk_event_timestamp (derived from the event timestamp generated by Kafka) to the msk_admissions_tbl_raw table. The messages are received in the field value of the Kafka payload in binary format. They need to be converted into string so that downstream processes can parse it.

Ingesting from Amazon Kinesis

Amazon Kinesis Data Streams makes it easy to collect, process, and analyze real-time streaming data. The data could be video, audio, application logs, website clickstreams, and IoT telemetry data for machine learning, analytics, and other applications. For our blog, we used an Kinesis data stream with the following configurations:

  • Data Stream Name: test_vaccination_data
  • Capacity Mode: On Demand
  • Write Capacity: 200 MiB/second or 200,000 records/second (Default Maximum)
  • Read Capacity: 400 MiB/second (Default)

For better throughput, the Amazon Kinesis enhanced fan-out (EFO) feature of the Databricks Runtime Kinesis connector can be used in production scenarios.

The stream test_vaccination_data, in our use case, carries the vaccination facts (events) for a patient. Below is a sample payload:


{
    "hospital_id": 102269,
    "patient_id": 101060,
    "vaccination_type": "hepatitis",
    "Timestamp": "2021-04-02 16:00:00"
}

The DLT code for consuming the data from Kinesis data stream to Landing zone in Lake house is as follows :


import dlt
import pyspark.sql.functions as F
from pyspark.sql.types import *


my_stream_name = 'test_vaccination_data_1' 
kinesisRegion = 'us-west-2'

@dlt.create_table(
  name='kinesis_vaccination_tbl_raw',
  comment="BRONZE TABLE FOR VACCINATION DATA FROM KINESIS",
  table_properties={
    "quality": "bronze"
  }
)
def kinesis_stream():
  input_schema = StructType(
    [ 
      StructField('hospital_id', IntegerType()),
      StructField('patient_id', IntegerType()),
      StructField('vaccination_type', StringType()),
      StructField('Timestamp', TimestampType())
    ]
  )

  return (
    spark
    .readStream
    .format("kinesis")
    .option("streamName", my_stream_name)
    .option("initialPosition", "earliest")
    .option("region", kinesisRegion)
    .load()
    .withColumn('value',F.from_json(F.col("data").cast("string"), input_schema))                
    .withColumn('key',F.col('partitionKey').cast("string"))
    .select('key','value.hospital_id','value.patient_id','value.vaccination_type','value.Timestamp')
  )

The pipeline dlt_multistream_consumer populates the target table kinesis_vaccination_tbl_raw with the data available in the Kinesis data stream and performs a checkpoint to avoid re-reading the data. The messages are received in the field data of the Kinesis payload in binary format. They need to be converted into string so that downstream processes can parse it.

Ingesting from Azure Event Hubs

Azure Event Hubs is a popular streaming platform offered by Microsoft. Event Hubs supports multiple protocols for reading and writing, such as AMQP, HTTPS, and Apache Kafka. For this blog, our DLT pipeline dlt_multistream_consumer uses the Kafka surface of Event Hubs to consume the messages, since the Kafka connector for Spark comes out of the box in Databricks Runtime without the need for any extra libraries. We are using the standard pricing tier for our Event Hubs namespace with 2 Throughput Units.

Azure documentation describes namespaces as management containers for event hubs (or topics, in Kafka parlance). The Event Hub (aka topic) of interest for us is testing_records_stream, which carries events pertaining to the laboratory test results of a patient. Each message looks like the below:


{
  "testing_record_id":"e0a2e0a8-f0a6-4f1f-a080-974026c3606a",
  "hospital_id":104164,
  "patient_id":152430,
  "is_positive":"N",
  "Timestamp":"2021-07-12 17:00:00.000"
}

The DLT code for consuming messages from testing_records_stream event hub looks much similar to the ingestion pattern with Amazon MSK (they both use Kafka protocol after all).


import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pyspark.sql.functions as F

# Event Hub details
EH_CONN_STR = "my-event-hubs-connection-string"
EH_NAMESPACE = "my-event-hubs-namespace"
EH_KAFKA_TOPIC = "testing_records_stream"
EH_BOOTSTRAP_SERVERS = f"{EH_NAMESPACE}.servicebus.windows.net:9093"
EH_SASL_WRITE = f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{EH_CONN_STR}";"
# standard configuration options (similar to Structured Streaming)
topic_name = EH_KAFKA_TOPIC
eh_namespace_name = EH_NAMESPACE
eh_sasl = EH_SASL_WRITE
bootstrap_servers = EH_BOOTSTRAP_SERVERS
kafka_options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.request.timeout.ms": "60000",
"kafka.session.timeout.ms": "30000",
"startingOffsets": "earliest",
"kafka.sasl.jaas.config": eh_sasl,
"subscribe": topic_name,
}
# creates a DLT table
@dlt.table(name = "eventhubs_testing_records_tbl_raw",
         comment = "Reads raw messages from event hub stream for testing records of patients")
def read_eventhub():
 return spark.readStream.format("kafka").options(**kafka_options).load().withColumn("value_text", F.col("value").cast("string"))

The DLT pipeline incrementally reads the testing records messages from Event Hubs and appends them as new rows into the eventhubs_testing_records_tbl_raw delta lake table. The messages are received in the field value of the Kafka-surface payload in binary format. They need to be converted into string so that downstream processes can parse it.

Ingesting files through Autoloader

Auto Loader is one of the most powerful and robust data file ingestion tools in the industry. It incrementally and efficiently processes new data files as they arrive in cloud storage without the need for any additional setup. As of today, it can ingest JSON, CSV, PARQUET, AVRO, ORC, TEXT, and BINARYFILE file formats.

Auto Loader allows the consumers to configure a Structured Streaming source called cloudFiles. The configuration options include cloud storage file paths, file filtering patterns, file arrival event options for queues like Azure Queue Storage, AWS SQS, AWS SNS etc. The DLT pipeline uses these options to automatically ingest new files as they arrive, with the option of also processing any pre-existing files in that directory. Auto Loader has support for both Python and SQL in DLT pipelines. Clients have used Auto Loader to process billions of files to migrate data or backfill a table. It scales to support near real-time ingestion of millions of files per hour.

In this blog we have ingested patient Claims data using Auto Loader with following sample:


{
   "claim_id":"595be279-1ac3-4831-8a32-62d338c280ab",
   "admissions_id":"f6f17bf8-d735-4f75-91e4-e2c04e30ecec",
   "claim_timestamp":"2022-01-31T13:00:00.000Z",
   "total_invoiced_amount":32146.0,
   "patient_payable_amount":18174.0,
   "payer_covered_amount":13972.0
}

In dlt_multistream_consumer pipeline, the json files for patient claims are ingested through Auto Loader as they arrive at the source location, as depicted in the code below:


import os
import pyspark.sql.functions as F
import dlt 

files_path = "dbfs:/mnt/multistream-dlt-blog/raw_data/claims"
checkpoint_root = 'dbfs:/mnt/multistream-dlt-blog/ss-checkpoints/autoloader'
checkpoint_schema = os.path.join(checkpoint_root, "schemas")
checkpoint_writestream = os.path.join(checkpoint_root, "writestream")

cloudfiles_options = {
  "header": "true",
  "cloudFiles.format": "json",
  "cloudFiles.useNotifications": "false",
  "cloudFiles.inferColumnTypes": "true",
  "cloudFiles.rescuedDataColumn": "_rescued_data",
  "cloudFiles.schemaHints": "timestamp timestamp",
  "cloudFiles.schemaLocation": checkpoint_root,
  "cloudFiles.schemaEvolutionMode": "addNewColumns"
}

@dlt.table
def autoloader_filestream_claims_tbl_raw ():
  return (spark.readStream
                  .format('cloudFiles')
                  .options(**cloudfiles_options)
                  .load(files_path).withColumn("source_file", F.input_file_name())
                  .withColumn("ingestion_timestamp", F.current_timestamp()))

Analyzing Ingested Data to generate business insights

As mentioned earlier, for this blog, we have already ingested the patient datasets from four different sources – Amazon MSK, Amazon Kinesis Data Streams, Azure Event Hubs and Databricks Auto Loader. For brevity, we have limited the data transformation part to schema enforcement only and are moving forward to perform data analysis on these datasets to support business use cases.

We have created two business analytics use cases for this blog. The idea is to demonstrate how users can use the delta lake tables created by DLT to perform data analytics to answer business questions.

Number of Patients by state and age groups who became Covid positive after being vaccinated against Covid

The table gold_patients_by_state_age_group is created in DLT by joining the tables which were populated via the data ingestion through DLT. As new data arrives into the ingestion layer the continuous DLT pipeline ingests that data into the ingestion layer and updates this gold_patients_by_state_age_group table for data analysis. —


-- Creating a temporary PATIENT View using LIVE keyword for using it with DLT
-- Views in DLT lives within DLT execution and are not accessible from outside e.g. using DBSQL


-- Here we are reading the patients table from health_analytics database
CREATE TEMPORARY LIVE VIEW PATIENT AS
SELECT
  *
FROM
  health_analytics.patient

-- Creating patients_by_state_age view using LIVE keyword for using it with DLT
-- This is an intermediate step to calculate patient's age 
CREATE TEMPORARY LIVE VIEW patients_by_state_age AS
  select
    count(*) as patient_count,
    state,
    ceil(
      date_diff(current_date(), cast(date_of_birth as date)) / 365
    ) as age
  from
    LIVE.eventhubs_testing_records_tbl_raw as testing_records_sample
    join LIVE.kinesis_vaccination_tbl_raw as vaccinations_sample on testing_records_sample.patient_id = vaccinations_sample.patient_id
    join LIVE.PATIENT on testing_records_sample.patient_id = patient.patient_id
  where
    testing_records_sample.`Timestamp` > vaccinations_sample.`Timestamp`
    AND vaccinations_sample.vaccination_type = 'covid-19'
    AND testing_records_sample.is_positive = 'Y'
  group by
patient.state,
    ceil(
      date_diff(current_date(), cast(date_of_birth as date)) / 365
    )

-- Create age groups for patients
CREATE TEMPORARY LIVE VIEW patients_by_state_age_group AS
select
  patient_count,
  state,
  case
    when age between 0
    and 10 then '1-child'
    when age between 11
    and 25 then '2-youth'
    when age between 26
    and 60 then '3-adult'
    else '4-senior'
  end as age_group
from
  LIVE.patients_by_state_age

-- Creating GOLD table gold_patients_by_state_age_group as part of Medallion Architecture to create aggregated data for analysis

CREATE
  OR REFRESH LIVE TABLE gold_patients_by_state_age_group
  select
    state,
    age_group,
    sum(patient_count) as patient_count
  from
    LIVE.patients_by_state_age_group
  group by
    state,
    age_group
  order by
    state,
    age_group

Below is how the data looks like when queried through the Databricks SQL query editor:

Simultaneous stream processing using DLT

Number of Admissions month-wise per city and state

Number of Admissions per city and state per month is another use case which is supported by querying the gold_monthly_admissions_by_state_city table.


-- Creating a temporary Hospital View using LIVE keyword for using it with DLT 
-- Here we are reading a static hospital table from health_analytics database 

CREATE TEMPORARY LIVE VIEW hospital AS
SELECT
  *
FROM
  health_analytics.hospital

-- Creating GOLD table gold_monthly_admissions_by_state_city as part of Medallion Architecture to create aggregated data for analysis

-- Converting Year-Month into a YYYYMM format
CREATE
OR REFRESH LIVE TABLE gold_monthly_admissions_by_state_city
select
count(*) as patient_count,
  city,
  state,
  extract(
    Year
    FROM
      Timestamp
  ) || (
    case
      when extract(
        month
        FROM
          Timestamp
      ) < 10 then '0' || extract(
        month
        FROM
          Timestamp
      )
      else extract(
        month
        FROM
          Timestamp
      )
    end
  ) AS YYYYMM
from
  LIVE.msk_admissions_tbl_raw admissions_source
  join LIVE.hospital on admissions_source.hospital_id = hospital.hospital_id
group by
  city,
  state,
  extract(
    Year
    FROM
      Timestamp
  ) || (
    case
      when extract(
        month
        FROM
          Timestamp
      ) < 10 then '0' || extract(
        month
        FROM
 Timestamp
      )
      else extract(
        month
        FROM
          Timestamp
      )
    end
  )

Below is how the data looks like when queried through the Databricks SQL query editor:

Simultaneous stream processing using DLT

End to end view of DLT Pipeline

One of the key features of DLT is its ability to crawl through all the table level dependencies among the code artifacts included as part of the pipeline and then display them in the form of a lineage graph. This graph is ‘live’. As the ETL operations proceed, it displays successful operations in green color and erroneous ones in red. A screenshot of our continuously running DLT data pipeline is shown below :

Simultaneous stream processing using DLT

Conclusion

In this blog, we have shown how you can ingest and consume data from diverse streaming platforms across multiple clouds using Databricks Delta Live Table using a single data pipeline. The ETL process happens continuously, as soon as the data arrives. The data generated through transformations is continuously available to perform data analysis to get answers for your business questions. Such a solution simplifies the overall technical architecture a lot and makes it easy to maintain and scale. With Databricks introducing new features into DLT regularly, it’s finding wide adoption among clients for ETL workloads. Try Delta Live Tables today.



Source link

Leave a Reply

Your email address will not be published. Required fields are marked *