Cached Row Transformation
CachedRowTransformation extends RowTransformation by adding an in-memory cache of previously processed rows. This enables row-level logic that depends on historical context, such as deduplication, rolling calculations, or key-based stateful transformations.
Overview
- Transformation Type: Non-blocking
- Execution Mode: Row-by-row processing with access to a row cache
- Buffers: One input buffer
- Cache Support: Built-in in-memory cache with customizable key management
Each input row is processed using a user-defined transformation function that also receives a reference to a CachedData<T>
object. This cache holds the most recent records and allows inspection of previously processed rows.
Usage
Basic Example
public class MyRow {
public int Col1 { get; set; }
public string Col2 { get; set; }
}
var source = new MemorySource<MyRow>();
for (int i = 0; i < 10; i++)
source.DataAsList.Add(new MyRow() { Col1 = i, Col2 = $"Test{i}" });
var rowTrans = new CachedRowTransformation<MyRow>();
rowTrans.MaxCacheSize = 3;
rowTrans.TransformationFunc = (row, cache) => {
// Use cached rows for logic (e.g., cumulative sum)
return new MyRow() {
Col1 = cache.List.Sum(r => r.Col1) // Sum values in the cache
};
};
var dest = new MemoryDestination<MyRow>();
source.LinkTo(rowTrans).LinkTo(dest);
Network.Execute(source);
In this example:
- The last 3 processed rows are stored in the cache.
- For each new row, the transformation calculates the sum of all
Col1
values from the cached rows.
Configuration Options
TransformationFunc
The transformation function is mandatory. It processes each input row and receives a second argument—an instance of CachedData<T>
—which provides access to all currently cached rows.
This allows you to base transformation logic on previously seen rows:
rowTrans.TransformationFunc = (row, cache) => {
// Access the list of previously cached rows
var total = cache.List.Sum(r => r.Col1);
// Return a new row or modified input
return new MyRow {
Col1 = total, // Use cache-based calculation
Col2 = row.Col2
};
};
The cache.List
gives access to all cached rows. If a key-based cache is configured, cache.Items
provides indexed access via dictionary lookup.
MaxCacheSize
Defines the maximum number of rows stored in the cache. Default is 10,000
.
var rowTrans = new CachedRowTransformation<MyRow>();
rowTrans.MaxCacheSize = 100;
CacheManager
You can provide a custom cache manager by implementing ICacheManager<TInput, TCache>
. For example, this enables key-based caching or deduplication.
var rowTrans = new CachedRowTransformation<MyRow>();
rowTrans.CacheManager = new MemoryCache<MyRow, MyRow> {
CacheKeySelector = row => row.Col1,
InputKeySelector = row => row.Col1
};
FillCacheAfterTransformation
By default, each input row is added to the cache before the transformation function is executed. If you want to add it only after processing, set:
rowTrans.FillCacheAfterTransformation = true;
Thank you for the clarification. Here’s the complete and corrected version of the deduplication example, including the key behavior enabled by CacheKeySelector
and InputKeySelector
:
Example: Deduplication Using Key-Based Cache
This example shows how to configure the CachedRowTransformation
to deduplicate rows based on a specific column using a custom cache manager.
public class MyRow {
public int Col1 { get; set; }
public string Col2 { get; set; }
}
var source = new MemorySource<MyRow>();
var row1 = new MyRow() { Col1 = 1, Col2 = "A" };
var row2 = new MyRow() { Col1 = 2, Col2 = "B" };
var row3 = new MyRow() { Col1 = 3, Col2 = "C" };
source.DataAsList.Add(row1);
source.DataAsList.Add(row2);
source.DataAsList.Add(row3);
source.DataAsList.Add(row2); // duplicate
source.DataAsList.Add(row1); // duplicate
source.DataAsList.Add(row3); // duplicate
var rowTrans = new CachedRowTransformation<MyRow>();
rowTrans.MaxCacheSize = 10;
rowTrans.CacheManager = new MemoryCache<MyRow, MyRow> {
CacheKeySelector = row => row.Col1, // Key used to store in cache
InputKeySelector = row => row.Col1 // Key used to check for duplicates
};
rowTrans.TransformationFunc = (row, cache) => {
// The cache ensures only one row per unique Col1 is kept
if (cache.Items.ContainsKey(row.Col1)) {
// Duplicate found, optionally filter or transform
Console.WriteLine($"Duplicate detected: Col1 = {row.Col1}");
}
return row;
};
var dest = new MemoryDestination<MyRow>();
source.LinkTo(rowTrans).LinkTo(dest);
Network.Execute(source);
CacheKeySelector
defines how each row is stored in the cache.InputKeySelector
defines how incoming rows are compared.- This ensures only one row per unique
Col1
value is retained in the cache at any time. - You can implement conditional filtering or logging within the transformation function for duplicate handling.
Integration with ExpandoObject
Like other non-generic transformations, CachedRowTransformation
defaults to ExpandoObject
if no type is specified:
var trans = new CachedRowTransformation();
trans.TransformationFunc = (row, cache) => {
dynamic dyn = row;
dyn.Sum = cache.List.Count;
return row;
};