Database Source

The DbSource component reads data from a relational database and streams it into an ETL process. It supports reading from tables, executing SQL queries, and calling stored procedures while efficiently handling large datasets.

Creating a DbSource

A DbSource requires a connection manager and either a table name or an SQL query.

Reading from a Table

var conn = new SqlConnectionManager("Data Source=.;Initial Catalog=ETL;Integrated Security=SSPI;");
var source = new DbSource<MyRow>() {
  ConnectionManager = conn,
  TableName = "SourceTable"
};

For this example, assume the database table has the following definition:

CREATE TABLE SourceTable (
    Id INT NOT NULL,
    Value VARCHAR(50) NULL
);

And a matching POCO:

public class MyRow {
    public int Id { get; set; }
    public string Value { get; set; }
}

Using SQL Queries

Instead of specifying a table, DbSource can execute a SQL query to retrieve data.

var conn = new SqlConnectionManager("Data Source=.;Initial Catalog=ETL;Integrated Security=SSPI;");
var source = new DbSource<MyRow>() {
    ConnectionManager = conn,
    Sql = @"SELECT s.Id, j.JoinedValue as Value
            FROM SourceTable s
            INNER JOIN JoinTable j
            ON s.Id = j.Id"
};

Query Parameters

Parameterized queries help avoid SQL injection and improve query performance.

var conn = new SqlConnectionManager("Data Source=.;Initial Catalog=ETL;Integrated Security=SSPI;");
var source = new DbSource<MyRow>(conn) {
    ConnectionManager = conn,
    Sql = @"SELECT s.Id, j.JoinedValue as Value
            FROM SourceTable s
            INNER JOIN JoinTable j
            ON s.Id = j.Id
            WHERE s.Id > @minId",
    SqlParameter = new[] {
        new QueryParameter("minId", "INT", 10)
    }
};

Using SELECT * in an SQL query may lead to issues, as column names cannot always be determined automatically. While it might work in some cases, it is recommended to explicitly specify column names (e.g. SELECT s.Id, j.JoinedValue as Value ) to ensure proper mapping and metadata detection.

Using Stored Procedures

DbSource supports executing stored procedures.

var conn = new SqlConnectionManager("Data Source=.;Initial Catalog=ETL;Integrated Security=SSPI;");
var source = new DbSource<MyRow>() {
    ConnectionManager = conn,
    Sql = "EXEC MyStoredProcedure @param1, @param2",
    TableDefinition = TableDefinition.FromProcedureName(conn, "MyStoredProcedure"),
    SqlParameter = new[] {
        new QueryParameter("param1", "INT", 5),
        new QueryParameter("param2", "VARCHAR(50)", "Test")
    }
};

Depending on the database, a TableDefinition must be provided when using a stored procedure. This ensures that metadata such as column names and data types are correctly identified.

Working with Dynamic Objects

When no strongly-typed object is specified, DbSource returns dynamic ExpandoObject instances.

var source = new DbSource(conn, "SourceTable");

which is a short version for

var source = new DbSource<ExpandoObject>(conn, "SourceTable");

Streaming and Flow Control

DbSource reads data as a stream, processing rows one by one. As long as the connected components can handle incoming data, it continues reading from the database without loading all records into memory. If downstream components slow down, DbSource will also reduce its reading speed, ensuring controlled data flow. It never reads more rows than the MaxBufferSize of its output buffer, optimizing memory usage and preventing excessive resource consumption.

diagram

Automatic Metadata Retrieval

When using a table name, DbSource automatically retrieves metadata, including column names, data types, primary keys, and identity columns. If using an SQL query, column names are extracted when possible. If metadata cannot be determined, a TableDefinition can be provided manually.

var source = new DbSource<MyRow>(conn, "SourceTable") {
    TableDefinition = new TableDefinition("SourceTable", new List<TableColumn> {
        new TableColumn("Id", "INT", allowNulls: false, isPrimaryKey: true),
        new TableColumn("Value", "VARCHAR(50)", allowNulls: true)
    })
};

In most cases, only column names are required in the TableDefinition. However, providing full metadata, including data types and constraints, ensures better compatibility and functionality.

How Metadata is Retrieved

  • When a table name is provided, DbSource queries the database system tables to fetch metadata, ensuring column names, types, identity properties, and constraints are accurately detected.
  • When an SQL query is used, DbSource attempts to parse column names from the query. Explicitly defined column names (SELECT Id, Value FROM SourceTable) can be extracted, whereas wildcard selections (SELECT * FROM SourceTable) and stored procedures do not provide sufficient metadata.

Fallback Mechanisms

If metadata cannot be fully determined, DbSource follows these fallback steps:

  1. If a TableDefinition is explicitly set, it takes precedence over any automatic detection.
  2. If a TableName is provided, but column names cannot be inferred from the query, DbSource retrieves metadata directly from the table.
  3. If neither a TableDefinition nor a table name is available, DbSource checks whether the underlying ADO.NET connector can provide column names.

Column Mapping

When the database column names differ from the object property names, column mapping is required.

Column Mapping for POCOs

Use the DbColumnMap attribute to define column mappings.

public class MyRow {
    [DbColumnMap("DB_Id")]
    public int Id { get; set; }

    [DbColumnMap("DB_Value")]
    public string Value { get; set; }
}
var source = new DbSource<MyRow>(conn, "SourceTable");

This ensures that values from the DB_Id column in the database are mapped to the Id property in the object.

You could also ignore columns using attributes in POCO classes by applying DbColumnMap with IgnoreColumn = true.

[DbColumnMap(IgnoreColumn = true)]
public string Value { get; set; }

In this case, Value would be ignored when reading from the database.

Manual Column Mapping

By default, ETLBox automatically matches properties to database columns based on their names. However, if column names differ or explicit control is needed, manual column mapping can be used. This option is available for both POCOs and ExpandoObject, allowing you to override or define mappings as needed.

Example: Mapping for POCOs

public class MyRow {
    public int Id { get; set; }
    public string Value { get; set; }
}

var source = new DbSource<MyRow>(conn, "SourceTable");
source.ColumnMapping = new[] {
    new DbColumnMap { DbColumnName = "DB_Id", PropertyName = "Id" },
    new DbColumnMap { DbColumnName = "DB_Value", PropertyName = "Value" }
};

If ColumnMapping is set, it overrides any attribute-based mappings (DbColumnMap attributes on the class will be ignored).

Specific columns can be ignored by setting IgnoreColumn = true.

new DbColumnMap { PropertyName="Value", IgnoreColumn = true }

Example: Mapping for ExpandoObject

For dynamic objects, mappings must be explicitly provided.

var source = new DbSource(conn, "SourceTable");
source.ColumnMapping = new[] {
    new DbColumnMap { DbColumnName = "DB_Id", PropertyName = "Id" },
    new DbColumnMap { DbColumnName = "DB_Value", PropertyName = "Value" }
};

Column Converters

Column converters allow modifying data immediately after reading from the database, ensuring transformations occur within the same task before passing data to downstream components.

Example: Removing Characters from a String

var source = new DbSource<MyRow>(conn, "SourceTable");
source.ColumnConverters = new[] {
    new ColumnConverter { ColumnName = "Value", ConversionFunc = val => ((string)val).Replace(",", "") }
};

In this example, any commas in the Value column are removed before the data leaves DbSource. This ensures that all modifications are applied immediately, preventing inconsistencies when multiple components process the data concurrently.

Column Name Resolver

The column resolver modifies column names dynamically before mapping them to object properties. This is useful when dealing with databases that have different naming conventions, such as inconsistent casing or special characters.

Assume the following database table definition:

CREATE TABLE SourceTable (
    "col_Id" INT NOT NULL,
    "col_Value" VARCHAR(100) NOT NULL
);

And a matching POCO:

public class MyRow {
    public int Id { get; set; }
    public string Value { get; set; }
}

By default, DbSource will try to match column names exactly. However, since the database uses different naming conventions, mapping will fail unless a ColumnToPropertyNamesResolver is used.

var source = new DbSource<MyRow>(conn, "SourceTable");
source.ColumnToPropertyNamesResolver = columnName => columnName.Replace("col_", "");

With this resolver, col_Id will map to Id and col_Value will map to Value. This approach avoids the need for manual column mappings when dealing with databases that use prefixes, different casing, or other variations in column naming.

Bulk Selection

Bulk selection is useful when you need to retrieve specific records from a table based on a set of key values. Unlike using a WHERE clause in an SQL query, which is static and predefined, bulk selection allows for dynamic filtering at runtime. This is particularly helpful when working with lists of keys that are not known beforehand or need to be processed in batches.

Assume the following table definition:

CREATE TABLE SourceTable (
    Id INT NOT NULL PRIMARY KEY,
    Value VARCHAR(100) NOT NULL
);

And a corresponding POCO:

public class MyRow {
    [IdColumn]
    public int Id { get; set; }
    public string Value { get; set; }
}

Instead of retrieving all rows, only records matching the provided key values are fetched.

var source = new DbSource<MyRow>(conn, "SourceTable") {
    SelectMode = SelectOperation.BulkSelect,
    FilterRows = new[] {
        new MyRow { Id = 1 },
        new MyRow { Id = 3 }
    }
};

This retrieves only the rows where Id = 1 or Id = 3, reducing unnecessary data transfer and memory usage.

Why Not Just Use a WHERE Clause?

A WHERE clause can be used for filtering, but it requires hardcoding the condition in the SQL query. If the filtering criteria are dynamic, constructing a query string manually with multiple values (WHERE Id IN (1, 3, 5, ...)) can be inefficient, especially for large lists. Some databases also impose a limit on the number of values that can be passed in an IN clause.

Handling Large Key Lists

Bulk selection is optimized for handling large datasets by processing keys in batches. The BulkSelectBatchSize property determines how many keys are sent per query.

source.BulkSelectBatchSize = 500;

This groups queries into batches of 500 keys per request, ensuring efficient execution and avoiding query length limits imposed by some databases.

Error Handling

If an error occurs while reading a record, DbSource automatically redirects the faulty record to an error flow if LinkErrorTo is used. Errors do not halt execution unless explicitly configured to do so.

When an error occurs, the record is wrapped inside an ETLBoxError. The original data is stored in the RecordAsJson property, ensuring that the problematic record is preserved and can be analyzed later.

Example: Capturing Errors

var source = new DbSource<MyRow>(conn, "SourceTable");
var errorDest = new MemoryDestination<ETLBoxError>();

source.LinkErrorTo(errorDest);
Network.Execute(source);

foreach (var error in errorDest.Data) {
    Console.WriteLine($"Error: {error.ErrorText}");
    Console.WriteLine($"Faulty Record: {error.RecordAsJson}");
}

Example Data Flow

The following example reads data from a database table and loads it into a memory destination. This is useful for debugging, as it allows you to inspect the extracted data before processing it further.

var conn = new SqlConnectionManager("Data Source=.;Initial Catalog=ETL;Integrated Security=SSPI;");
var source = new DbSource<MyRow>(conn, "SourceTable") {
    Limit = 3 // Read only the first 3 rows
};
var dest = new MemoryDestination<MyRow>();

source.LinkTo(dest);
Network.Execute(source);

Console.WriteLine($"Loaded {dest.Data.Count} rows!");