Cached Batch Transformation

The CachedBatchTransformation is a specialized transformation that allows batch-wise processing of rows with access to previously seen records via a memory cache. It extends the BatchTransformation and adds the ability to reference earlier input rows, which is especially useful for identifying duplicates, detecting patterns, or comparing the current batch with past entries.

Overview

Use the CachedBatchTransformation when your batch-processing logic requires access to previously seen records. It extends the capabilities of BatchTransformation by introducing an in-memory cache, allowing you to perform operations like deduplication, temporal checks, or incremental comparisons efficiently—without loading the entire dataset. The component processes data in batches and supports cache size limits and custom matching strategies.

Buffering

CachedBatchTransformation is a blocking transformation. It has:

  • An input buffer, storing incoming rows until a batch is full.
  • An output buffer, storing processed output rows before sending them downstream.
  • An internal cache, holding previously seen rows up to a configurable size.

It starts processing only after either:

  • The number of input rows reaches the specified BatchSize, or
  • The upstream source completes execution.

The MaxCacheSize property controls how many previously processed rows are retained in memory. The cache is updated and evicted automatically as new rows are processed.

Example: Avoiding Duplicate Rows Using a Cache

Here’s an example that filters out previously processed records based on the Id property:

public class MyRow {
    public int Id { get; set; }
    public string Value { get; set; }
}

var source = new MemorySource<MyRow>();
(source.DataAsList as List<MyRow>).AddRange(new[] {
    new MyRow() { Id = 1, Value = "A" },
    new MyRow() { Id = 2, Value = "B" },
    new MyRow() { Id = 1, Value = "A" }, // Duplicate
    new MyRow() { Id = 3, Value = "C" }
});

var batch = new CachedBatchTransformation<MyRow>();
batch.BatchSize = 2;
batch.MaxCacheSize = 10;
batch.BatchTransformationFunc = (rows, cache) => {
    return rows
        .Where(row => !cache.List.Any(cached => cached.Id == row.Id))
        .ToArray();
};

var dest = new MemoryDestination<MyRow>();

source.LinkTo(batch).LinkTo(dest);
Network.Execute(source);

foreach (var row in dest.Data)
    Console.WriteLine($"{row.Id} - {row.Value}");

// Output:
// 1 - A
// 2 - B
// 3 - C

Cache Behavior

You can explicitly configure how rows are identified in the cache by setting a CacheManager. The default is a MemoryCache with a key selector that uses the object reference.

To compare rows by property (e.g., Id), configure the CacheManager:

batch.CacheManager = new MemoryCache<MyRow, MyRow>();
batch.CacheManager.CacheKeySelector = row => row.Id;
batch.CacheManager.InputKeySelector = row => row.Id;

This ensures cache lookups are based on the Id value rather than object identity.

Dynamic Object Support

CachedBatchTransformation also supports dynamic rows such as ExpandoObject. This is especially useful when processing data from loosely structured sources like JSON, CSV, or APIs.

Example:

var source = new MemorySource();
dynamic row1 = new ExpandoObject(); row1.Id = 1; row1.Value = "X";
dynamic row2 = new ExpandoObject(); row2.Id = 2; row2.Value = "Y";
dynamic row3 = new ExpandoObject(); row3.Id = 1; row3.Value = "X"; // Duplicate
source.DataAsList.Add(row1);
source.DataAsList.Add(row2);
source.DataAsList.Add(row3);

var batch = new CachedBatchTransformation();
batch.BatchSize = 2;
batch.MaxCacheSize = 5;
batch.BatchTransformationFunc = (rows, cache) =>
{
    return rows
        .Where(r => !cache.List.Any(c => ((dynamic)c).Id == ((dynamic)r).Id))
        .ToArray();
};

var dest = new MemoryDestination();
source.LinkTo(batch).LinkTo(dest);
Network.Execute(source);

foreach (dynamic row in dest.Data)
    Console.WriteLine($"{row.Id} - {row.Value}");

// Output:
// 1 - X
// 2 - Y

The cache handles ExpandoObject instances like any other class, provided you use appropriate dynamic access in your logic.