Blog Post

Analytics on Azure Blog
8 MIN READ

Replicating Azure Cosmos DB into Azure Databricks using CDC

katiecummiskey's avatar
Nov 21, 2024

This blog was written in conjunction with David Poulet, Senior Solutions Architect at Databricks. 

 

Microsoft’s NoSQL database-as-a-service, Azure Cosmos DB, is a key platform in Azure for storing non-relational, transactional data and vectors for applications with high throughput and availability requirements. This data often holds valuable business insights, and the ability to analyze this data at scale with Azure Databricks is a key requirement for many customers. Azure Cosmos DB is optimized for fast reads and writes of individual items. However, in common with other data stores of this type, it is not optimized for analytical workloads and this can create a challenge to analyzing stored data in a performant and cost-effective way.

 

Microsoft’s solution to this problem is the Analytical Store, which stores a copy of the Azure Cosmos DB data in a columnar format and keeps it up-to-date. However, until recently this feature stored the data in a proprietary format and a hidden location that could not be accessed except via Azure Synapse and was subject to a number of restrictions around the types of data structures and query types that it could handle. 

 

But there is now a flexible and open solution to this problem! Microsoft has a feature in Azure Data Factory that enables users to replicate the Azure Cosmos DB Analytical Store into their lakehouse in Delta format, automatically inserting/updating/deleting records as the source transactional database changes. The incremental nature of this offers significant cost savings vs pulling data directly from the transactional store and dealing with complex incremental ingestion logic in code. In this article, I’ll show how we can leverage this feature to create a simple process to continuously ingest operational data in Azure Cosmos DB into Azure Databricks’ powerful analytics and AI platform. 

Architecture Overview

The architecture we’ll discuss in this article will use the CDC capability for Azure Cosmos DB within Azure Data Factory to process changes in a Azure Cosmos DB container and then merge them into a Delta Lake table in the lakehouse. See the diagram below:

 

Azure Data Factory (ADF) will read a container from Azure Cosmos DB (via the analytical store) and periodically replicate any changes from that container into a Delta Lake table in Azure Databricks. This incremental replication process will operate on a schedule that is defined within ADF. 

 

There are a couple of possibilities for how we ingest these changes into Azure Databricks: we could move the data to a staging area, and ingest into Bronze from there using a workflow or Delta Live Tables, but for simplicity we’ll write directly to a table in the Bronze layer of our medallion architecture from ADF. 

 

Once the data is in our Bronze layer standard Azure Databricks patterns can be used to cleanse and transform the data into Silver/Gold layers.

 

The bulk of the activities happen in Azure Data Factory, but there are some prerequisites. Before we can create the CDC pipeline it’s assumed the following already exist: 

 

  • A Azure Cosmos DB for NoSQL container, with Analytical Store enabled. 
  • Azure Data Factory instance in which to create a CDC pipeline. 
  • An ADLS storage container to act as our staging area. 

 

With these in place, we can create the CDC pipeline from ADF.  

Setting Up The Pipeline

The feature in ADF that consumes the Azure Cosmos DB changes is in the Data Flows area, so we start by launching the ADF studio and creating a new data flow:

 

 

The new data flow needs a Source and a Sink. The source will be our Azure Cosmos DB container and the Sink will be our Delta Table in Bronze. 

 

 

First we’ll create and configure the Source to consume from our Azure Cosmos DB container. Click to Add Source in the new Dataflow. In the source settings we have to set the Source Type to Inline and the Inline Dataset Type to Azure Cosmos DB for NoSQL. The Store Type should be set to Analytical. 

 

The Linked Service should be set to a linked service for Azure Cosmos DB that has been set up to connect to our source container. For details on how to create an ADF Linked Service see the getting started documentation for Azure Cosmos DB

 

In the Source Options for the Data Flow, there are some settings that are important to control the behavior of the reads from the source feed.

 

The Container name field is where we select the Azure Cosmos DB container we are interested in. In this example we have a container with some simple customer related data in. 

 

The Start from field allows us to synchronize ALL the data in the container from the start of its life, or you can select to only sync changes from now on (or from a given timestamp). 

 

You have the option to capture intermediate updates, if you want to maintain a history of all the changes, but we are just going to capture the latest state so this is unselected. Capture Deletes ensures that deleted items from the source are also deleted in our Bronze table. Capture Transactional store TTLs means that if items are expired from the Azure Cosmos DB transactional store by the Time-To-Live function, they will also be deleted from our copy of the data. This is enabled by default but many people may not want this functionality as TTL is often used to reduce the data size of the transactional store at the cost of losing historical data, but in the analytics world this historical data is often important. We’ll leave it at the default though for now.

 

Next we’ll add a Sink to publish the change data to. Click the + button next to the source icon and search for the Sink option.

 

 

We then need to configure the Sink to point to our Bronze table in the lakehouse. 

 

In the Sink settings we select our incoming source stream (there is only one in this case, the one we just created). We again select Inline for Sink type. And the Inline dataset type is Delta. Once again the Linked service is an ADF linked service which points to a blob container/folder that will store our Bronze table. You can read the documentation for creating an ADF blob linked service (or ADLS, either will work) on this page.

 

Next, the Settings page for our Sink has some important options to control the behavior of the table we are creating.

 

 

First we need to select the correct Folder path for the folder in the blob container that will store our Bronze table data. Here we have a simple folder called customer where ADF will put the Delta Lake files.

 

We also need to think about the Update method field. In this case we will allow Insert (to put new rows in the table as they are added in the source), Delete (to remove rows in the table as they are deleted in the source) and Update (updating existing rows to match changes in the source). To do this ADF needs a unique field in the source that it can match in the target table - so we select List of columns and put {_rid} in the column field. _rid is a system field in Azure Cosmos DB that uniquely identifies a data item.

 

At this point we are actually ready to run this Data Flow to start syncing Azure Cosmos DB changes to our Bronze table. To do this we need to create a Pipeline in ADF to run the Data Flow defined above. 

 

In the ADF studio resources section, under Pipelines create a new pipeline, and in that pipeline drag a single action onto the pipeline edit canvas - a Data Flow action.

 

 

Once we’ve created a pipeline with a Data Flow action, we will edit the Data Flow action settings to trigger the CDC Data Flow we created above. Here all we need to do is select our data flow in the Data Flow drop-down.

 

 

Then, like all ADF pipelines we need a trigger to start the pipeline and we’re ready to start ingesting data. From the pipeline editor menu select  Add Trigger and then New/Edit - this will bring up the trigger menu below.

 

 

We’ll set our trigger to run on creation and then run every 5 minutes after that. This means that every 5 minutes the pipeline will get the latest changes from Azure Cosmos DB and push them into our Bronze table.

Using The Target Table

With the pipeline running, we should start to see data flowing into our target Delta Lake table. I have created a simple customer data set for this example, with three items in the container. After the pipeline has run these items are pushed into a Delta Lake table in our target ADLS container.

 

In a notebook in Azure Databricks, we can load that Delta Lake table and see its contents:

 

 

We can already access the data in the target Delta table from Azure Databricks. Each time the pipeline in ADF runs, it will update this table with whatever inserts/updates/deletes have happened in the source container. 

 

To really make the best use of this as a Bronze table in Azure Databricks, we’re going to create an external table in Unity Catalog to integrate this data with the rest of our UC resources and in this way make it securely accessible to all our Azure Databricks users.

 

First in the Catalog view in Azure Databricks we create a new external location:

 

 

Then we configure the external location to point to our target ADLS folder.

 

In the new external location dialog, we give the location a name, we select the storage credential that we’ll use to access the external container (in this case the managed identity that is assigned to my Azure Databricks workspace), and the URL to the actual storage container itself. Note that if you have not already done so you will have to ensure that the managed identity for your Azure Databricks workspace has been assigned the relevant permissions to access the storage container. For more information on configuring external locations in Azure Databricks see this documentation.

 

Finally we can create an external table over our target storage container location so that we can access the table in UC. Inside an Azure Databricks notebook we can do this very simply:

 

 

In the above example this creates the bronze table in the cdc_demo schema of my cdc_catalog  catalog. Once this is done we can query this table like any other table in Unity Catalog, and view the data that’s being replicated from Azure Cosmos DB by our ADF pipeline. We can then continue to enrich, clean and merge this data downstream using standard Azure Databricks processes for example as shown in the documentation here.

 

So we can see that with a simple pipeline in ADF, we have created a robust way of opening up our Azure Cosmos DB transactional data to whatever complex analytical processes we want to use in Azure Databricks without reading the transactional data store itself, thus reducing cost and “noisy neighbor” risks. 

Updated Nov 18, 2024
Version 1.0
No CommentsBe the first to comment