Merge destination This article contains example code that shows the usage of the DbMerge component.
On this page Setup# Using a different database# The following examples are using SqlServer as destination database. In order to use SQLServer, we use the SqlConnectionManager
in combination with a connection string for SQLServer. If you prefer to use a different database, simple replace the connection manager (and the connection string) with your database of choice. E.g. if you want to use Postgres, you can simply use the PostgresConnectionManager
instead.
Shared method# Throughout the following, all code examples will use the methods PrintTable(..)
which prints the content of a table onto the console output, as well as the PrintDeltaTable(..)
which shows the content of a list produced by the DbMerge which contains information about the executed operations on the database.
private void PrintTable ( IConnectionManager connMan , string tableName ) {
Console . WriteLine ( "Database table:" );
var dest = new DataFrameDestination ();
var source = new DbSource ( connMan , tableName );
source . LinkTo ( dest );
Network . Execute ( source );
Console . WriteLine ( dest . DataFrame . ToString ());
}
private void PrintDeltaTable ( DataFrame dataFrame ) {
Console . WriteLine ( "Calculated delta:" );
dataFrame . Columns . Remove ( "ChangeDate" );
dataFrame . Columns . Remove ( "ChangeAction" );
Console . WriteLine ( dataFrame . ToString ());
}
Basic Merge# The following example shows the basic merge oeprations using strongly typed objects. The matching between column names and properties in the object is based on name comparison. The DbColumnMap
attribute can be used to map the property to a different column.
Full Merge Mode# The following examples show the basic merge operations (Insert, Update, Delete and Exists) using MergeMode.Full
. We are using attributes to define which column are used as id, for comparison and for updates.
private void CreateTableTwoPrimaryKeys ( IConnectionManager connMan , string tableName ) {
DropTableTask . DropIfExists ( connMan , tableName );
TableDefinition td = new TableDefinition ( tableName
, new List < TableColumn >() {
new TableColumn ( "Id1" , "INT" , allowNulls : false , isPrimaryKey : true ),
new TableColumn ( "Id2" , "INT" , allowNulls : false , isPrimaryKey : true ),
new TableColumn ( "Value1" , "VARCHAR(100)" , allowNulls : true ),
new TableColumn ( "Value2" , "INT" , allowNulls : false ),
new TableColumn ( "Value3" , "VARCHAR(100)" , allowNulls : true ),
});
CreateTableTask . Create ( connMan , td );
}
public class MyRow : MergeableRow {
[IdColumn]
public int Id1 { get ; set ; }
[IdColumn]
[DbColumnMap("Id2")]
public int OtherId { get ; set ; }
[CompareColumn]
[UpdateColumn]
public string Value1 { get ; set ; }
[CompareColumn]
public int Value2 { get ; set ; }
[UpdateColumn]
public string Value3 { get ; set ; }
public string ChangeText => ChangeAction ?. ToString ();
}
var connMan = new SqlConnectionManager ( ConnectionString );
string tableName = "BasicMerge" ;
CreateTableTwoPrimaryKeys ( connMan , tableName );
SqlTask . ExecuteNonQuery ( connMan , $@"
INSERT INTO {tableName} (Id1, Id2, Value1, Value2, Value3)
VALUES (1,1, 'Exists', 1, 'NoChange'),
(2,2, 'Update', 0, null),
(3,3, 'Delete', 0, 'XX')
" );
var source = new MemorySource < MyRow >();
source . DataAsList . Add ( new MyRow () { Id1 = 1 , OtherId = 1 , Value1 = "Exists" , Value2 = 1 , Value3 = "X" });
source . DataAsList . Add ( new MyRow () { Id1 = 2 , OtherId = 2 , Value1 = "Update" , Value2 = 2 , Value3 = "Y" });
source . DataAsList . Add ( new MyRow () { Id1 = 3 , OtherId = 1 , Value1 = "Insert" , Value2 = 3 , Value3 = "Z" });
var dest = new DbMerge < MyRow >( connMan , tableName );
dest . MergeMode = MergeMode . Full ;
var delta = new DataFrameDestination < MyRow >();
source . LinkTo ( dest );
dest . LinkTo ( delta );
Network . Execute ( source );
PrintTable ( connMan , tableName );
PrintDeltaTable ( delta . DataFrame );
//Output
/*
Database table:
Id1 Id2 Value1 Value2 Value3
1 1 Exists 1 NoChange
2 2 Update 0 Y
3 1 Insert 3 Z
Calculated delta:
Id1 OtherId Value1 Value2 Value3 ChangeText
1 1 Exists 1 X Exists
3 1 Insert 3 Z Insert
2 2 Update 2 Y Update
3 3 Delete 0 XX Delete
*/
Only the IdColumns are mandatory. Defining the Compare/Update (and Delete) columns is optional. If no compare or udpate columns are set, all other properties that match with a database column will automatically used as an compare and/or update column.
Delta Merge Mode# When using the delta mode, Delete are not detected automatically. A delete operation is triggered by a record that has a DeleteColumn
defined - if the property value is equal to the defined target value in the DeleteColumn, the record is then delete from the target table.
public class MyDeltaRow : MergeableRow {
[IdColumn]
public int Id1 { get ; set ; }
[IdColumn]
[DbColumnMap("Id2")]
public int OtherId { get ; set ; }
[CompareColumn]
[UpdateColumn]
public string Value1 { get ; set ; }
[CompareColumn]
public int Value2 { get ; set ; }
[UpdateColumn]
public string Value3 { get ; set ; }
[DeleteColumn(true)]
public bool Delete { get ; set ; }
public string ChangeText => ChangeAction ?. ToString ();
}
var connMan = new SqlConnectionManager ( ConnectionString );
string tableName = "BasicMerge" ;
CreateTableTwoPrimaryKeys ( connMan , tableName );
SqlTask . ExecuteNonQuery ( connMan , $@"
INSERT INTO {tableName} (Id1, Id2, Value1, Value2, Value3)
VALUES (1,1, 'Exists', 1, 'NoChange'),
(2,2, 'Update', 0, null),
(3,3, 'Delete', 0, 'XX')
" );
var source = new MemorySource < MyDeltaRow >();
source . DataAsList . Add ( new MyDeltaRow () { Id1 = 1 , OtherId = 1 , Value1 = "Exists" , Value2 = 1 , Value3 = "X" });
source . DataAsList . Add ( new MyDeltaRow () { Id1 = 2 , OtherId = 2 , Value1 = "Update" , Value2 = 2 , Value3 = "Y" });
source . DataAsList . Add ( new MyDeltaRow () { Id1 = 3 , OtherId = 1 , Value1 = "Insert" , Value2 = 3 , Value3 = "Z" });
source . DataAsList . Add ( new MyDeltaRow () { Id1 = 3 , OtherId = 3 , Delete = true });
var dest = new DbMerge < MyDeltaRow >( connMan , tableName );
dest . MergeMode = MergeMode . Delta ;
var delta = new DataFrameDestination < MyDeltaRow >();
source . LinkTo ( dest );
dest . LinkTo ( delta );
Network . Execute ( source );
PrintTable ( connMan , tableName );
PrintDeltaTable ( delta . DataFrame );
//Output
/*
Database table:
Id1 Id2 Value1 Value2 Value3
1 1 Exists 1 NoChange
2 2 Update 0 Y
3 1 Insert 3 Z
Calculated delta:
Id1 OtherId Value1 Value2 Value3 Delete ChangeText
1 1 Exists 1 X False Exists
3 3 null 0 null True Delete
2 2 Update 2 Y False Update
3 1 Insert 3 Z False Insert
*/
Performance considerations
The Delta mode is the default merge mode for the DbMerge
due to performance reason. When using MergeMode.Full
, the DbMerge needs to compare all id values of the target with the id values of all processed records to determine which records can be deleted. This is a time-consuming operation, that will also need enough memory for big amounts of data.
Inserting with Identity values# The DbMerge
will also work if the identity column is auto generated.
private void CreateTableWithIdentity ( IConnectionManager connMan , string tableName ) {
DropTableTask . DropIfExists ( connMan , tableName );
TableDefinition td = new TableDefinition ( tableName
, new List < TableColumn >() {
new TableColumn ( "AutoId" , "INT" , allowNulls : false , isPrimaryKey : true , isIdentity : true ),
new TableColumn ( "Value1" , "VARCHAR(100)" , allowNulls : true ),
new TableColumn ( "Value2" , "INT" , allowNulls : false ),
new TableColumn ( "Value3" , "VARCHAR(100)" , allowNulls : true )
});
CreateTableTask . Create ( connMan , td );
}
public class MyAutoIdRow : MergeableRow {
private string combined ;
[DbColumnMap("Value3")]
public string Combined {
get => combined ?? Value1 + "_" + Value2 ;
set => combined = value ;
}
public string Value1 { get ; set ; }
public int Value2 { get ; set ; }
[IdColumn]
public int AutoId { get ; set ; }
public string ChangeText => ChangeAction ?. ToString ();
}
var connMan = new SqlConnectionManager ( ConnectionString );
string tableName = "AutoIdColumn" ;
CreateTableWithIdentity ( connMan , tableName );
SqlTask . ExecuteNonQuery ( connMan , $@"
INSERT INTO {tableName} (Value1, Value2, Value3)
VALUES ('Exists', 1, 'Exists_1'),
('WillUpdate', 2, null),
('Delete', 3, 'XX')
" );
var source = new MemorySource < MyAutoIdRow >();
source . DataAsList . Add ( new MyAutoIdRow () { AutoId = 1 , Value1 = "Exists" , Value2 = 1 });
source . DataAsList . Add ( new MyAutoIdRow () { AutoId = 2 , Value1 = "Update" , Value2 = 2 });
source . DataAsList . Add ( new MyAutoIdRow () { AutoId = 4 , Value1 = "New" , Value2 = 4 });
source . DataAsList . Add ( new MyAutoIdRow () { AutoId = 100 , Value1 = "AlsoNew" , Value2 = 100 });
var dest = new DbMerge < MyAutoIdRow >( connMan , tableName );
dest . MergeMode = MergeMode . Full ;
var delta = new DataFrameDestination < MyAutoIdRow >();
source . LinkTo ( dest );
dest . LinkTo ( delta );
Network . Execute ( source );
PrintTable ( connMan , tableName );
PrintDeltaTable ( delta . DataFrame );
//Output
/*
Database table:
AutoId Value1 Value2 Value3
1 Exists 1 Exists_1
2 Update 2 Update_2
4 New 4 New_4
5 AlsoNew 100 AlsoNew_100
Calculated delta:
Combined Value1 Value2 AutoId ChangeText
Exists_1 Exists 1 1 Exists
New_4 New 4 4 Insert
AlsoNew_100AlsoNew 100 100 Insert
Update_2 Update 2 2 Update
XX Delete 3 3 Delete
*/
Partial Cache and Eviction Policies# When using the DbMerge, internally a LookupTransformation
is used to retrieve the data from the destination table. By default, the CacheMode.Full
in the Lookup - this means that all data from the target table is loaded into memory before any recors are processed.
So when the first record arrives at the DbMerge
, the component will load all rows from the database table into memory, in order to determine the corresponding merge operations. This step can be very time and memory-consuming for large destination tables.
Using the CacheMode.Partial
will only load records into memory when they are needed. This may reduce memory consumption a lot, but will also slow down the DbMerge, as there will me many subsequent (batch) reads on the target table. It is recommended to use id columns that are part of an index.
In order to reduce the amount of rows to be read from the source, you can use different eviction policies. A full overview of all policies are found in the documentation for the LookupTransformation
.
The configuration of the partial cache mode is very simple:
var dest = new DbMerge < MyDeltaRow >( connMan , tableName );
dest . MergeMode = MergeMode . Delta ;
dest . CacheMode = CacheMode . Partial ;
dest . EvictionPolicy = CacheEvictionPolicy . LeastFrequentlyUsed ;
dest . MaxCacheSize = 5000 ;
If you want to measure the performance of the DbMerge
in your environment, have a look at the DbMerge Performance Recipe .
DbMerge With Dynamic Object# The DbMerge also supports the dynamic ExpandoObject. You can assign the id, update, compare and delete columns using the IdColumns
, UpdateColumns
, CompareColumns
and DeleteColumns
properties.
private void CreateTableSinglePrimaryKey ( IConnectionManager connMan , string tableName ) {
DropTableTask . DropIfExists ( connMan , tableName );
TableDefinition td = new TableDefinition ( tableName
, new List < TableColumn >() {
new TableColumn ( "Id" , "INT" , allowNulls : false , isPrimaryKey : true ),
new TableColumn ( "Value1" , "VARCHAR(100)" , allowNulls : true ),
new TableColumn ( "Value2" , "INT" , allowNulls : false ),
});
CreateTableTask . Create ( connMan , td );
}
public dynamic CreateDynamicRow ( int id , string value1 , int value2 ) {
dynamic result = new ExpandoObject ();
result . Id = id ;
result . Value1 = value1 ;
result . Value2 = value2 ;
return result ;
}
var connMan = new SqlConnectionManager ( ConnectionString );
string tableName = "WithDynamic" ;
CreateTableSinglePrimaryKey ( connMan , tableName );
SqlTask . ExecuteNonQuery ( connMan , $@"
INSERT INTO {tableName} (Id, Value1, Value2)
VALUES (1, 'Exists', 1),
(2,'WillUpdate', 0),
(3, 'WillDelete', -1)
" );
var source = new MemorySource ();
source . DataAsList . Add ( CreateDynamicRow ( 1 , "Exists" , 1 ));
source . DataAsList . Add ( CreateDynamicRow ( 2 , "Update" , 2 ));
source . DataAsList . Add ( CreateDynamicRow ( 3 , "Delete" , - 1 ));
source . DataAsList . Add ( CreateDynamicRow ( 4 , "Insert" , 4 ));
var dest = new DbMerge ( connMan , tableName );
dest . MergeMode = MergeMode . Delta ;
dest . IdColumns = new [] {
new IdColumn () { IdPropertyName = "Id" }
};
dest . CompareColumns = new [] {
new CompareColumn () { ComparePropertyName = "Value1" },
new CompareColumn () { ComparePropertyName = "Value2" },
};
dest . UpdateColumns = new [] {
new UpdateColumn () { UpdatePropertyName = "Value1" },
new UpdateColumn () { UpdatePropertyName = "Value2" },
};
dest . DeleteColumns = new [] {
new DeleteColumn () { DeleteOnMatchValue = - 1 , DeletePropertyName = "Value2" }
};
var changeAction = new RowTransformation ();
changeAction . TransformationFunc = row => {
dynamic r = row ;
r . ChangeActionText = (( ChangeAction ) r . ChangeAction ). ToString ();
return row ;
};
var delta = new DataFrameDestination ();
source . LinkTo ( dest );
dest . LinkTo ( changeAction ). LinkTo ( delta );
Network . Execute ( source );
PrintTable ( connMan , tableName );
PrintDeltaTable ( delta . DataFrame );
//Output
/*
Database table:
Id Value1 Value2
1 Exists 1
2 Update 2
4 Insert 4
Calculated delta:
Id Value1 Value2 ChangeActionText
1 Exists 1 Exists
4 Insert 4 Insert
3 Delete -1 Delete
2 Update 2 Update
*/
Automatic Duplicate Detection# If the property FindDuplicates
is set to true
, all rows which id columns where already processed previously are automatically filtered out.
public class MyDuplicateRow : MergeableRow {
public int Id1 { get ; set ; }
public int? Id2 { get ; set ; }
public string CompareUpdateValue { get ; set ; }
public string ChangeText => ChangeAction ?. ToString ();
}
var connMan = new SqlConnectionManager ( ConnectionString );
string tableName = "FindDuplicates" ;
var source = new MemorySource < MyDuplicateRow >();
source . DataAsList . Add ( new MyDuplicateRow () { Id1 = 1 , Id2 = 1 , CompareUpdateValue = "Exists" });
source . DataAsList . Add ( new MyDuplicateRow () { Id1 = 2 , Id2 = 2 , CompareUpdateValue = "Test2_Update1" });
source . DataAsList . Add ( new MyDuplicateRow () { Id1 = 1 , Id2 = 1 , CompareUpdateValue = "Exists" });
source . DataAsList . Add ( new MyDuplicateRow () { Id1 = 3 , Id2 = 3 , CompareUpdateValue = "Test3_Insert1" });
source . DataAsList . Add ( new MyDuplicateRow () { Id1 = 2 , Id2 = 2 , CompareUpdateValue = "Test2_Update2" });
source . DataAsList . Add ( new MyDuplicateRow () { Id1 = 3 , Id2 = 3 , CompareUpdateValue = "Test3_Insert2" });
DropTableTask . DropIfExists ( connMan , tableName );
CreateTableTask . CreateIfNotExists ( connMan , tableName ,
new List < TableColumn >()
{
new TableColumn ( "Id1" , "INT" , allowNulls : false , isPrimaryKey : true ),
new TableColumn ( "Id2" , "INT" , allowNulls : false , isPrimaryKey : true ),
new TableColumn ( "CompareUpdateValue" , "VARCHAR(100)" , true )
});
SqlTask . ExecuteNonQuery ( connMan ,
$"INSERT INTO {tableName} VALUES(1,1,'Exists')" );
SqlTask . ExecuteNonQuery ( connMan ,
$"INSERT INTO {tableName} VALUES(2,2,'Update')" );
SqlTask . ExecuteNonQuery ( connMan ,
$"INSERT INTO {tableName} VALUES(4,4,'Delete')" );
var dest = new DbMerge < MyDuplicateRow >( connMan , tableName );
dest . MergeMode = MergeMode . Full ;
dest . FindDuplicates = true ;
dest . IdColumns = new [] {
new IdColumn () { IdPropertyName = "Id1" },
new IdColumn () { IdPropertyName = "Id2"
}
};
var delta = new DataFrameDestination < MyDuplicateRow >();
source . LinkTo ( dest ). LinkTo ( delta );
Network . Execute ( source );
PrintTable ( connMan , tableName );
PrintDeltaTable ( delta . DataFrame );
/* Output
Database table:
Id1 Id2 CompareUpdateValue
1 1 Exists
2 2 Test2_Update1
3 3 Test3_Insert1
Calculated delta:
Id1 Id2 CompareUpdateValue ChangeText
1 1 Exists Exists
1 1 Exists Duplicate
2 2 Test2_Update2 Duplicate
3 3 Test3_Insert2 Duplicate
3 3 Test3_Insert1 Insert
2 2 Test2_Update1 Update
4 4 Delete Delete
*/
Value Generated Columns# The DbMerge
is also able to retrieve existing data from so called value generated columns, e.g. a computed column or a column with a default value if no value is provided.
public class RowWithGenerated : MergeableRow {
public string Computed { get ; set ; }
public int? Id { get ; set ; }
public string DefString { get ; set ; }
public int? DefInt { get ; set ; }
public string Value { get ; set ; }
}
private static void CreateTableWithDefaultAndComputed ( IConnectionManager conn , string tableName ) {
DropTableTask . DropIfExists ( conn , tableName );
var columns = new List < TableColumn >()
{
new TableColumn ( "Id" , "INT" , allowNulls : false , isPrimaryKey : true , isIdentity : true ),
new TableColumn ( "DefString" , "VARCHAR(100)" , allowNulls : false ) { DefaultValue = "'A'" },
new TableColumn ( "DefInt" , "INT" , allowNulls : true ) { DefaultValue = "3" },
new TableColumn ( "Computed" , "VARCHAR(100)" , allowNulls : false ) { ComputedColumn = $"{conn.QB}DefString{conn.QE}" },
new TableColumn ( "Value" , "VARCHAR(100)" , allowNulls : true )
};
TableDefinition def = new TableDefinition ( tableName , columns );
def . CreateTable ( conn );
}
private static void InsertDestinationData ( IConnectionManager conn , string tableName ) {
ObjectNameDescriptor TN = new ObjectNameDescriptor ( tableName , conn . QB , conn . QE );
SqlTask . ExecuteNonQuery ( conn ,
$"INSERT INTO {tableName} " +
$"(DefString,DefInt,Value) " +
$"VALUES('DefString1',3,'Test1')" );
SqlTask . ExecuteNonQuery ( conn ,
$"INSERT INTO {tableName} " +
$"(DefInt, Value) " +
$"VALUES(10,'ToUpdate')" );
SqlTask . ExecuteNonQuery ( conn ,
$"INSERT INTO {tableName} " +
$"(DefInt, Value) " +
$"VALUES(10,'ToUpdate2')" );
SqlTask . ExecuteNonQuery ( conn ,
$"INSERT INTO {tableName} " +
$"(Value) " +
$"VALUES('ToDelete')" );
}
var connMan = new SqlConnectionManager ( ConnectionString );
string tableName = "ValueGeneratedColumns" ;
var source = new MemorySource < RowWithGenerated >();
source . DataAsList . Add ( new RowWithGenerated () { Id = 1 , DefInt = 3 , Computed = "DefString1" , Value = "Test1" });
source . DataAsList . Add ( new RowWithGenerated () { Id = 2 , DefString = "DefString2" , Value = "Test2" });
source . DataAsList . Add ( new RowWithGenerated () { Id = 3 , DefInt = 2 , Value = null });
source . DataAsList . Add ( new RowWithGenerated () { Id = null , Value = "A" });
CreateTableWithDefaultAndComputed ( connMan , tableName );
InsertDestinationData ( connMan , tableName );
var dest = new DbMerge < RowWithGenerated >( connMan , tableName );
dest . MergeMode = MergeMode . Full ;
dest . ValueGeneratedColumns = new List < ValueGenerationColumn >() {
new ValueGenerationColumn ( ValueGenerationEvent . OnAddOrUpdate ) { ValueGenerationPropertyName = "Id" },
new ValueGenerationColumn ( ValueGenerationEvent . OnAddOrUpdate ) { ValueGenerationPropertyName = "DefString" },
new ValueGenerationColumn ( ValueGenerationEvent . OnAddOrUpdate ) { ValueGenerationPropertyName = "DefInt" },
new ValueGenerationColumn ( ValueGenerationEvent . OnAddOrUpdate ) { ValueGenerationPropertyName = "Computed" },
};
dest . ColumnMapping = new List < DbColumnMap >() {
new DbColumnMap () { PropertyName = "DefString" , IgnoreColumn = true },
};
dest . IdColumns = new List < IdColumn >() {
new IdColumn () { IdPropertyName = "Id" }
};
var delta = new DataFrameDestination < RowWithGenerated >();
source . LinkTo ( dest ). LinkTo ( delta );
Network . Execute ( source );
PrintTable ( connMan , tableName );
PrintDeltaTable ( delta . DataFrame );
/* Output:
Database table:
Id DefString DefInt Computed Value
1 DefString13 DefString1Test1
2 A null A Test2
3 A 2 A null
5 A null A A
Calculated delta:
Computed Id DefString DefInt Value
DefString11 null 3 Test1
A 2 A null Test2
A 3 A 2 null
A 5 A null A
A 4 null 3 ToDelete
*/