Azure Cosmos DB
The Azure Cosmos DB is a multi-model database, which can also be used either as source or destination for an ETLBox data flow. This article will give you an overview how to use the connector.
ETLBox.Azure.CosmosDb package
In order to use the Azure Cosmos Db connector, you need to add the nuget package ETLBox.Azure.CosmosDb to your project.
Shared example code
In the following examples, we will use the the Pocos Movie
and Person
.
public class Person
{
public DateTime? Born { get; set; }
public int? Age => Born.HasValue ? DateTime.Now.Year - Born.Value.Year : null;
public string Name { get; set; }
}
public class Movie
{
public string Id { get; set; }
public Person[] MainActors { get; set; }
public string Name { get; set; }
public Person Important { get; set; }
public decimal? OpeningRevenue { get; set; }
public string Block { get; set; } = "1";
}
Also we will need some methods to create demo data:
public static IEnumerable<Movie> GenerateDefaultMovies() {
Movie[] movies = new Movie[] {
new Movie() {
Id = "1",
Name = "Harry Potter and the Sorcerer's Stone",
MainActors = new [] {
new Person() { Name = "Daniel Radcliffe", Born = new DateTime(1989,7,23) },
new Person() { Name = "Rupert Grint" }
},
OpeningRevenue = 90294621.23m,
Important = new Person() { Name="Chris Columbus" }
},
new Movie() {
Id = "2",
Name = "Harry Potter and the Order of the Phoenix",
MainActors = new [] {
new Person() { Name = "Daniel Radcliffe", Born = new DateTime(1989, 7, 23)},
new Person() { Name = "Emma Watson", Born = new DateTime(1990, 4, 15) }
},
Important = new Person() { Name="Slawomir Idziak"}
},
new Movie() {
Id = "3",
Name = "Harry Potter and the Goblet of fire",
OpeningRevenue = 102685961
},
};
return movies;
}
public static IEnumerable<Movie> GenerateMoreMovies() {
Movie[] movies = new Movie[] {
new Movie() {
Id = "4",
Name = "Harry Potter and the Chamber of Secrets",
MainActors = new [] {
new Person() { Name = "Daniel Radcliffe", Born = new DateTime(1989,7,23) },
new Person() { Name = "Rupert Grint" }
},
Important = new Person() { Name="Chris Columbus" },
OpeningRevenue = 88357488,
Block = "2"
},
new Movie() {
Id = "5",
Name = "Harry Potter and the Half-Blood Prince",
Important = new Person() { Name="David Yates"},
OpeningRevenue = 77835727,
Block = "2"
},
new Movie() {
Id = "6",
Name = "Happry Potter and the Prsioner of Azkaban",
OpeningRevenue = 93687367,
Block = "3"
},
};
return movies;
}
Lastly, we need a helper method to (re)create a container and set up a container with existing data (though the latter is possible with an ETLBox dataflow, using this method improves the readability of the example codings):
public static void FillContainerWithMovies(string connectionString, string containerName, IEnumerable<Movie> data) {
var source = new MemorySource<Movie>();
source.Data = data;
var dest = new CosmosDestination<Movie>() {
ConnectionString = connectionString,
DatabaseName = "etlbox",
ContainerName = containerName
};
source.LinkTo(dest);
Network.Execute(source);
}
public static void RecreateContainer(Database database, string containerName, string partitionKey = null) {
try {
var cont = database.GetContainer(containerName);
cont.DeleteContainerAsync().GetAwaiter().GetResult();
} catch { }
database.CreateContainerIfNotExistsAsync(
id: containerName,
partitionKeyPath: "/" + partitionKey
).GetAwaiter().GetResult();
}
Cosmos Db source examples
Now let’s start with the examples for retrieving data from Cosmos Db.
Read all data
Let’s read all data from a table:
string containerName = "Movies";
RecreateContainer(database, containerName, "block");
FillContainerWithMovies(connectionString, containerName, GenerateDefaultMovies());
var source = new CosmosSource<Movie>() {
Sql = "SELECT * FROM Movies",
ContainerName = containerName,
ConnectionString = connectionString,
DatabaseName = "etlbox"
};
var dest = new MemoryDestination<Movie>();
source.LinkTo(dest);
Network.Execute(source);
Read parameterized data
Let’s read all data using a parameterized query:
string containerName = "Movies";
RecreateContainer(database, containerName, "block");
FillContainerWithMovies(connectionString, containerName, GenerateDefaultMovies().Union(GenerateMoreMovies()));
//Act
var source = new CosmosSource<Movie>() {
QueryDefinition = new QueryDefinition("SELECT * FROM Movies m WHERE m.id IN (@p1,@p2,@p3)")
.WithParameter("@p1", "1")
.WithParameter("@p2", "2")
.WithParameter("@p3", "3"),
ContainerName = containerName,
ConnectionString = connectionString,
DatabaseName = "etlbox"
};
var dest = new MemoryDestination<Movie>();
source.LinkTo(dest);
Network.Execute(source);
Cosmos destination examples
Next step is to insert data into a CosmosDb container.
Bulk insert
By default, data is only inserted in batches - a classic bulk insert.
string containerName = "Movies";
RecreateContainer(database, containerName, "block");
var source = new MemorySource<Movie>();
source.Data = MovieGenerator.GenerateDefaultMovies();
var dest = new CosmosDestination<Movie>() {
ConnectionString = connectionString,
DatabaseName = "etlbox",
ContainerName = containerName
};
Other bulk operations
Azure Cosmos Db also offers different update modes, which can be adjusted for every record.
Supported bulk operations are:
- Create
- Upsert
- Replace
- Delete
string containerName = "Movies";
RecreateContainer(database, containerName, "block");
var source_prefill = new MemorySource<Movie>();
source_prefill.Data = MovieGenerator.GenerateDefaultMovies();
var dest_prefill = new CosmosDestination<Movie>() {
ConnectionString = connectionString,
DatabaseName = "etlbox",
ContainerName = containerName
};
source_prefill.LinkTo(dest_prefill);
Network.Execute(source_prefill);
//Act
var source = new MemorySource<Movie>();
source.Data = GenerateBasicMovies().Union(GenerateMoreMovies());
var dest = new CosmosDestination<Movie>() {
ConnectionString = connectionString,
DatabaseName = "etlbox",
ContainerName = containerName
};
dest.SetItemActionOptions = (options, item) => {
if (item.Id == "1")
options.ItemAction = ItemAction.Replace;
else if (item.Id == "2")
options.ItemAction = ItemAction.Delete;
else if (item.Id == "4")
options.ItemAction = ItemAction.Upsert;
else
options.ItemAction = ItemAction.Create;
options.Id = item.Id;
options.PartitionKey = new PartitionKey(item.Block);
};
source.LinkTo(dest);
Network.Execute(dest);
Using transactional batches
By default, data is inserted as bulk without an enclosing transaction. If you prefer to insert data as part of a transactional batch, set the property UseTransactionBatch
to true. The maximum allowed batch size is then 100 (due to the limitation of Cosmos Db).
This works best if all data has the same partition key. If the incoming data has different partition keys, the data is inserted in multiple batches, grouped by each partition key.
Make sure to add a partition key for each record using the
Setting this property to true will decrease performance.
Alternatives to connection string
Instead of passing a connection string, the CosmosDbSource
and CosmosDbDestination
also offer different properties to pass credentials:
AzureKeyCredential
: Optional: An Azure Key Credential used when opening the connection (works only in combination withConnectionString
AzureTokenCredential
: Optional: An Azure Token Credential used when opening the connection (works only in combination withConnectionString
)AccountEndpoint
: Optional: An Azure Cosmos Db account endpoint used when opening the connection (works only in combination withAuthKeyOrResourceToken
).ConnectionString
,AzureKeyCredential
andAzureTokenCredential
have no effect then.AuthKeyOrResourceToken
: Optional: An Azure Authorization key or resource token which is used when opening the connection. (works only in combination withAccountEndpoint
).ConnectionString
,AzureKeyCredential
andAzureTokenCredential
have no effect then.
Dynamic object support
Like all other data flow components, Cosmos DB source and destination support the dynamic ExpandoObject
. Simple use either CosmosDbSource
or CosmosDbSource<ExpandoObject>
instead of CosmosDbSource<T>
.
Example for the source:
string containerName = "Movies";
RecreateContainer(database, containerName, "block");
FillContainerWithMovies(connectionString, containerName, GenerateDefaultMovies());
//Act
var source = new CosmosSource() {
Sql = "SELECT * FROM Movies",
ContainerName = containerName,
ConnectionString = connectionString,
DatabaseName = "etlbox"
};
var dest = new MemoryDestination();
source.LinkTo(dest);
Network.Execute(source);
Example for the destination:
string containerName = "MoviesDynamic";
RecreateContainer(database, containerName, "block");
var source = new MemorySource();
source.Data = MovieGenerator.GenerateDefaultMoviesDynamic();
var dest = new CosmosDestination() {
ConnectionString = connectionString,
DatabaseName = "etlbox",
ContainerName = containerName
};
source.LinkTo(dest);
Network.Execute(source);