CosmosDB — .NET SDKv3 — a change feed

Michal Molka
3 min readOct 13, 2023

--

When you want to export data from a CosmosDB container you can import it in various ways. Azure Data Factory, Databricks, e.g. Or… You can use a baked in functionality called a change feed which can integrate data from the container in no time. In order to do this, we need to write some C# code. As you will notice, you can handle all the changes, whatever you want, through the code. In this case we save all incoming changes inside another container, but as I said, sky is the limit, you can do whatever you want with the feed.

Like usual, a GitHub link to the source code is placed at the end of the article.

The first step is to add a Microsoft.Azure.Cosmos package. Then initialize all necessary variables.

string endpointUrl = "your-cosmos-db-endpoint-url";
string authorizationKey = "your-cosmos-db-primary-key";

CosmosClient cosmosClient = new CosmosClient(endpointUrl, authorizationKey);
Database database = cosmosClient.GetDatabase("cosmos_db_iowa");
Container sourceContainer = database.GetContainer("cosmos_c_iowa");
Container leaseContainer = database.GetContainer("cosmos_c_lease");
Container backUpContainer = database.GetContainer("cosmos_c_iowa_backup00");

We need three containers here:

  • sourceContainer — where we take data from,
  • backUpContainer00 — where we place data in,
  • leaseContainer — keeps a change feed state, plays the checkpoint role.

The source and the destination container have to have the same settings like partitions, ids, e.g. The lease container should have a default partition key: /partitionKey. <- yes, this is a partition name.

We need two functions:

async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync()
{
ChangeFeedProcessor changeFeedProcessor = sourceContainer
.GetChangeFeedProcessorBuilder<IowaSales>("cosmosBackup", HandleChangesAsync)
.WithInstanceName("cosmosBackup-01")
.WithLeaseContainer(leaseContainer)
.Build();

Console.WriteLine("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
return changeFeedProcessor;
}

StartChangeFeedProcessorAsync is a change feed processor which monitors the source container and catches all the changes. When changes are detected then StartChangeFeedProcessorAsync handovers a task to a HandleChangesAsync task.

async Task HandleChangesAsync(
ChangeFeedProcessorContext context,
IReadOnlyCollection<IowaSales> changes,
CancellationToken cancellationToken)
{
foreach (IowaSales item in changes)
{
await backUpContainer.CreateItemAsync(
item,
partitionKey: new PartitionKey(item.CountyName)
);
Console.WriteLine($"Item added to backup container: {item.id}");
await Task.Delay(10);
}
}

Here you define how changes are handled and where data should be placed. In this example, every document is saved into the cosmos_c_iowa_backup00 container. You need to be aware that the change feed catches inserts and updates. When it comes to inserting there is nothing to mention, the code works like a charm. The code inserts a new document into the backup container. When the document is updated in the source, then the change feed catches it. And from this perspective, the update looks like a new document. So, what does the code do? Will try to save this document. What will happen? It fails, because there is a document with identical id at the destination. What to do? I suggest creating a new field inside a document which points out if a document was created or edited inside a source container. Then the program is able to handle this field/condition and react in a proper way.

The change feed is started by:

  ChangeFeedProcessor processor = await StartChangeFeedProcessorAsync();
Console.ReadKey();
await processor.StopAsync();

And gives us the following output always when there is a new document at the source.

Here is the app: GitHub

--

--

Michal Molka
Michal Molka

Written by Michal Molka

Architect | Azure | Power BI | Fabric | Power Platform | Infrastructure | Security | M365

No responses yet