Build a Customer 360 Solution with Fivetran and Delta Live Tables

Engineering


The Databricks Lakehouse Platform is an open architecture that combines the best elements of data lakes and data warehouses. In this blog post, we’ll show you how to build a Customer 360 solution on the lakehouse, delivering data and insights that would typically take months of effort on legacy platforms. We will use Fivetran to ingest data from Salesforce and MySQL, then transform it using Delta Live Tables (DLT), a declarative ETL framework for building reliable, maintainable, and testable data processing pipelines. To implement a Customer 360 solution, you will need to track changes over time. We will show you how DLT seamlessly processes Change Data Capture (CDC) data, keeping the Customer 360 solution up to date.

All the code is available in this GitHub repository.

Our task: Build a unified view of multichannel customer interactions

Businesses frequently seek to comprehend the various ways in which their customers interact with their products. A clothing retailer, for example, wants to know when a customer browses their website, visits one of their store locations in person, or completes a transaction. This unified view of customer interactions, known as Customer 360, powers a slew of use cases ranging from personalized recommendations to customer segmentation. Let’s look at how the Databricks Lakehouse Platform provides tools and patterns that make this task much easier.

The medallion architecture logically organizes data in a lakehouse, aiming to incrementally and progressively improve the structure and quality of data as it flows through each layer of the architecture (from Bronze ⇒ Silver ⇒ Gold layer tables). To support a Customer 360 initiative, the data typically resides in a variety of source systems, from databases to marketing applications such as Adobe Analytics. The first step is to ingest these data types into the bronze layer using Fivetran. Once the data has landed in the lakehouse, Delta Live Tables will be used to transform and cleanse the data in the silver and gold layers. The simplicity of this solution allows you to get value fast without writing complicated code to build the ETL pipeline using familiar SQL or Python. The Databricks Lakehouse Platform handles all the operations, infrastructure and scale.

The following diagram shows how fresh data and insights will be ready for downstream consumers such as analysts and data scientists.

Reference architecture for Customer 360 Solution with Fivetran, Databricks and Delta Live Tables

Fivetran: Automatic Data Ingestion into the Lakehouse

Extracting data from various applications, databases, and other legacy systems is challenging: you must deal with APIs and protocols, changing schemas, retries, and more. Fivetran’s managed connectors enable users to fully automate data ingestion into the Databricks Lakehouse Platform from more than 200 sources:

  • A user-friendly UI for configuring and testing connectors.
  • Automatic schema management, including handling schema drift.
  • Dealing with API outages, rate limits, etc.
  • Full and incremental loads.

Securely connect to Fivetran with Databricks Partner Connect

Databricks Partner Connect lets administrators set up a connection to partners with a few clicks. Click Partner Connect in the left navigation bar and click on the Fivetran logo. Databricks will configure a trial account in Fivetran, set up authentication with Fivetran and create a SQL warehouse which Fivetran will use to ingest data into the lakehouse.

In Databricks Partner Connect, select Fivetran and enter the credentials.
In Databricks Partner Connect, select Fivetran and enter the credentials.

Incrementally ingest data from Azure MySQL

Databases commonly hold transactional information such as customer orders and billing information, which we need for our task. We will use Fivetran’s MySQL connector to retrieve this data and ingest it into Delta Lake tables. Fivetran’s connector handles the initial sync and can be used to incrementally sync only updated rows, a must-have for large-scale database deployments.

Sign in to Fivetran through Databricks Partner Connect and Destinations in the left navigation bar. Select the Databricks SQL Warehouse Partner Connect created for us and click Add Connector.

Select Azure MySQL from the data sources, and click Add Connector.
Select Azure MySQL from the data sources, and click Add Connector.

Connect to the database by providing connection details, which you can find in the Azure Portal. We will use Fivetran to sync incremental changes to Databricks by reading the MySQL binary log:

Enter credentials to connect Azure MySQL to Fivetran.
Enter credentials to connect Azure MySQL to Fivetran.

Next, let’s select the tables we want to sync to Databricks – in this case, we will sync transactions:

Select the tables to sync to Databricks.
Select the tables to sync to Databricks.

Click Sync Now to start the sync:

View of Sync History chart on the status page of Fivetran dashboard.
View of Sync History chart on the status page of Fivetran dashboard.

Ingest customer data from Salesforce

Salesforce is a very popular Customer Relationship Management (CRM) platform. CRMs typically contain non-transactional customer data such as marketing touchpoints, sales opportunities, etc. This data will be very valuable to us as we build out our Customer 360 solution. Fivetran’s Salesforce connector makes it easy to load this data.

In Fivetran, select the SQL warehouse we created earlier as the destination and click Add Connector. Choose the Salesforce connector:

Select Salesforce from the list of data sources.
Select Salesforce from the list of data sources.

Fivetran lets us authenticate to Salesforce with a few clicks:

Enter credentials to connect Salesforce to Fivetran.
Enter credentials to connect Salesforce to Fivetran.

Next, choose the Salesforce objects you want to sync to Databricks. In this example, the Contact object holds information about the customer contacts associated with an account, so let’s sync that to Databricks:

Select the tables to sync to Databricks.
Select the tables to sync to Databricks.

Click Sync Now to initiate the first sync of the data. Fivetran can also automatically schedule the sync. This fully managed connector automatically handles the initial load as well as incremental changes:

View of Sync History chart on the status page of Fivetran dashboard.
View of Sync History chart on the status page of Fivetran dashboard.

Review tables and columns in Databricks

We are almost ready to start transforming the incoming data. However, let’s review the schema first:

transactions: These are all the transactions a customer made and shall be processed incrementally. Records received from Fivetran will finally be persisted into the bronze layer. The “transactions” table has 10 columns:

customer_id transaction_date id amount item_count category
0033l00002iewZBAAY 08-01-2022 04:12:55 6294 813 10 utilities
0031N00001MZua7QAD 08-01-2022 01:32:10 0 738 4 entertainment

We can also see two change data capture fields that Fivetran generates and maintains:

_fivetran_id _fivetran_index _fivetran_deleted _fivetran_synced
d0twKAz5aoLiRjoV5kvlk2rHCeo 51 false 2022-08-08T06:02:09.896+0000
6J3sh+TBrLnLgxRPotIzf8dfqPo= 45 true 2022-08-08T06:02:09.846+0000

contact_info: This is the dimensional information of a customer, with 90+ fields (e.g., name, phone, email, title, etc.), which will also be ingested into the bronze layer:

Transform data using Delta Live Tables

Now that you have the data, Delta Live Tables is used to transform and clean the data for the Customer 360 solution. We will use DLT’s declarative APIs to express data transformations. DLT will automatically track the flow of data and lineage between tables and views in our ETL pipeline. DLT tracks data quality using Delta expectations, taking remedial action such as quarantining or dropping bad records, preventing bad data from flowing downstream. We will use DLT to create a Slowly Changing Dimension (SCD) Type 2 table. Lastly, we will let DLT take care of intelligently scaling our ETL infrastructure up or down – no need to tune clusters manually.

Define a Delta Live Table in a notebook

DLT pipelines can be defined in one or more notebooks. Login to Databricks and create a notebook by clicking New in the left navigation bar and choose Notebook. Set the notebook language to SQL (we could define the same pipeline in Python as well if we wanted to).

Create DLT SQL logic in Databricks notebook.
Create DLT SQL logic in Databricks notebook.

Let’s break the DLT SQL logic below. When defining a DLT table use the special LIVE keyword – which manages the dependencies, and automates the operations. Next is ensuring the correctness of the data with expectations e.g. mailing_country must be the United States. Rows that fail this quality check are dropped. We use a table property to set metadata. Finally, we simply select all the rows that pass data quality checks into the table.


CREATE LIVE TABLE contact_data (
 CONSTRAINT `id should not be null` EXPECT (id IS NOT NULL) ON VIOLATION DROP ROW,
 CONSTRAINT `mailing_country should be US` EXPECT (mailing_country = 'United States') ON VIOLATION DROP ROW,
 CONSTRAINT `mailing_geocode_accuracy should be Address` EXPECT (mailing_geocode_accuracy = 'Address') ON VIOLATION DROP ROW
) COMMENT "bronze table properly takes contact data Ingested from salesforce through Fivetran on each sync" TBLPROPERTIES ("quality" = "bronze") AS
SELECT
 *
FROM
 retail_demo_salesforce.contact;

Similarly, follow the same format to create the transactions_data table, and adding a data quality expectation for item_count to only keep the rows that have positive item_count, and drop the rows that don’t meet this criteria.


CREATE LIVE TABLE transactions_data (
 CONSTRAINT `item_count should be positive value` EXPECT (item_count > 0) ON VIOLATION DROP ROW
 ) COMMENT "bronze table properly takes transaction data Ingested from mysql through Fivetran on each sync" TBLPROPERTIES ("quality" = "bronze") AS
SELECT
 *
FROM
 mysql_azure_banking_db.transactions;

Historical change data tracking with APPLY CHANGES INTO

Now, let’s do something more interesting. Customer contact information can change – for example, a customer mailing address would change every time the customer moves. Let’s track the changes in an easy-to-query SCD type 2 table using the APPLY CHANGES INTO keyword. If you are unfamiliar with this concept, you can read more about it in an earlier blog.

To track data changes, we will create a STREAMING LIVE TABLE. A streaming live table only processes data that has been added only since the last pipeline update. The APPLY CHANGES INTO is where the CDC data processing magic happens. Since we are using a streaming live table, we select from the stream of changes to the contact_data table – note how we use LIVE as the special namespace for the contact_data since DLT is maintaining tables and the relationships between them. Lastly, we instruct DLT to apply deletion logic instead of an upsert when Fivetran indicates a record has been deleted. With SEQUENCE BY we can seamlessly handle change events that arrive out of order. SEQUENCE BY uses the column that specifies the logical order of CDC events in the source data. Finally, we tell DLT to store the data as an SCD Type 2 table.


CREATE STREAMING LIVE TABLE silver_contacts;
APPLY CHANGES INTO LIVE.silver_contacts
FROM
 stream(LIVE.contact_data) KEYS (id) APPLY AS DELETE
 WHEN is_deleted = "true" SEQUENCE BY _fivetran_synced COLUMNS *
EXCEPT
 (is_deleted, _fivetran_synced) STORED AS SCD TYPE 2;

Analytics-ready gold tables

Creating the gold tables with DTL is pretty straightforward – simply select the columns needed with a few aggregations as seen below:


CREATE LIVE TABLE customer_360
COMMENT "Join contact data with transaction data and materialize a live table"
TBLPROPERTIES ("quality" = "gold")
AS SELECT contact.*,
 transactions._fivetran_id,
 transactions.operation,
 transactions.customer_id,
 transactions.transaction_date,
 transactions.id as transaction_id,
 transactions.operation_date,
 transactions.amount,
 transactions.category,
 transactions.item_count,
 transactions._fivetran_index,
 transactions._fivetran_deleted
FROM LIVE.transactions_data as transactions
LEFT JOIN live.silver_contacts as contact ON contact.id = transactions.customer_id;

CREATE LIVE TABLE categorized_transactions
COMMENT "Join contact data with transaction data and materialize a gold live table with aggregations"
TBLPROPERTIES ("quality" = "gold")
AS SELECT
 account_id,
 first_name,
 last_name,
 sum(amount) as total_expense,
 transaction_date,
 category
FROM LIVE.customer_360
GROUP BY
 account_id,
 first_name,
 last_name,
 transaction_date,
 category

Run DLT for the first time

Now DLT is ready to run for the first time. To create a DLT pipeline, you will need to navigate to Workflows. Click Workflows in the left navigation bar and click Delta Live Tables. Then, Click Create Pipeline.

To create a DLT pipeline click Workflows in the navigation bar and select Delta Live Tables.
To create a DLT pipeline click Workflows in the navigation bar and select Delta Live Tables.

We give our pipeline a name, “Customer 360” and choose the notebook we defined earlier under Notebook libraries:

Add configurations and parameters required for creating your pipeline.
Add configurations and parameters required for creating your pipeline.

We need to specify the target database name, in order to get tables published to the Databricks Metastore. Once the pipeline is created, click Start to run it for the first time. If you set up everything correctly, you should see the DAG of data transformations we defined in the notebook.

View of the completed run from the created DLT pipeline, demonstrating the lineage of published tables.
View of the completed run from the created DLT pipeline, demonstrating the lineage of published tables.

You can view these published tables by clicking Data in the left navigation bar, and search for the database name you added in the Target field under DLT pipeline settings.

On the left navigation bar in Azure Databricks, all the published tables are accessible from “Data”, which is highlighted in the red box.
On the left navigation bar in Azure Databricks, all the published tables are accessible from “Data”, which is highlighted in the red box.

Data quality and data pipeline monitoring with Databricks SQL

DLT captures events of the pipeline run in logs. These events include data quality check, pipeline runtime statistics and overall pipeline progress. Now that we have successfully developed our data pipeline, let’s use Databricks SQL to build a data quality monitoring dashboard on top of this rich metadata. This screenshot shows the finished product:

Screenshot of the data quality monitoring dashboard built from the DLT pipeline metadata.
Screenshot of the data quality monitoring dashboard built from the DLT pipeline metadata.

DLT stores metadata in the pipeline’s storage location. We can create a table to query pipeline event logs that are stored in this location. Click SQL in the left navigation bar and paste the following query. Replace ${storage_location} with the storage location you set when you created your pipeline, or the default storage location dbfs:/pipelines.


CREATE OR REPLACE Table Customer360_Database.pipeline_logs
AS SELECT * FROM delta.`${storage_location}/system/events`;
SELECT * FROM Customer360_Database.pipeline_logs
ORDER BY timestamp;

To test if we can query the metadata, run this SQL query to find the version of Databricks Runtime (DBR) that DLT used:


SELECT details:create_update:runtime_version:dbr_version 
FROM Customer360_Database.pipeline_logs 
WHERE event_type = 'create_update' 
LIMIT 1;

As an example, we can query the quality of the data produced by our DLT with this SQL query:


SELECT
  timestamp,
  Double(details:cluster_utilization.num_executors) as current_num_executors,
  Double(details:cluster_utilization.avg_num_task_slots) as avg_num_task_slots,
  Double(
    details:cluster_utilization.avg_task_slot_utilization
  ) as avg_task_slot_utilization,
  Double(
    details :cluster_utilization.avg_num_queued_tasks
  ) as queue_size,
  Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
  Customer360_Database.pipeline_logs
WHERE
  event_type IN ('cluster_utilization', 'flow_progress')
  AND origin.update_id = '${latest_update_id}'
  ORDER BY timestamp ASC; 

Conclusion

We built a Customer 360 solution in this blog post using transactional data from a MySQL database and customer information from Salesforce. First, we described how to use Fivetran to ingest data into the Lakehouse, followed by transforming and cleansing the data using Databricks Delta Live Table. Finally, with DLT, data teams have the ability to apply data quality and monitor quality. The Databricks Lakehouse Platform enables organizations to build powerful Customer 360 applications that are simple to create, manage, and scale.

To start building your data applications on Databricks, read more about Fivetran and Delta Live Tables and check out the code and sample queries we used to produce the dashboard in this Github repo.



Source link

Leave a Reply

Your email address will not be published. Required fields are marked *