Shared functionalities
Stream connectors allow to read data that is stored in a common format either from a file or via http.
Supported types
ETLBox currently supports the following data storage types:
- Csv
- Json
- Xml
- Excel
- Text
- Parquet
This article covers functionalities that are shared by all of these data types.
There is a connector package for each data type that must be included together with the core package to read the data. E.g. for Json you need to add the ETLBox.Json package.
Note
CustomSource
and CustomDestination
- these custom connectors allow you to define your own parsing logic.All these different storage types can be read either from a flat file (e.g. a .csv file on your local machine or a network share) or via a http web service endpoint. So you could read a csv file from a network storage located at //Volumes/user/data.csv/
, but you could also read csv files from a REST endpoint on https://test.com/getcsv/
.
Also, all these types (except excel) can also be written into a file or send to a http endpoint. E.g. you can write a text file into you local windows folder at c:/data/text.csv
or send it as a POST into https://test.com/postcsv
.
Once the data is read from one of these types into a data flow, all transformation that ETLBox offers can be used to transform the data. So these connectors make it easy to transfer an xml into a database table or to send an excel as json to a web service.
Streaming data
All data that is read or written with these connectors supports streaming. Streaming for a source means that data will be send into a data flow while it is still read from the source. Imagine you receive a json from a webservice that contains a lot of data - ETLBox will start reading the data from the input stream of the web response. Every element of the data that was already received will be send to the connected components immediately. So while the source server is still transferring data, the ETLBox data will already start processing the data received.
If you source is sending you data faster than your data flow process, each connector will use an internal buffer to store the already received data. The buffer size can be adjusted with the MaxBufferSize
property available on each connector.
Resource Type and Web Requests
All streaming sources and destinations in this article can be set to work either with a file or to use data from a web service. If you want to access a file on your drive or a network share, set the ResourceType
property to ResourceType.File
. This is default value for all connectors (Csv, Excel, Text, Xml and Json).
Another option is ResourceType.Http
- and allows you to read/write data from a http endpoint.Instead of a filename just provide a url. Furthermore, the components also have
a HttpClient
and for sources a HttpRequestMessage
property that can be used to configure the http request, e.g. to add authentication, change the request type from GET to POST or to use https instead.
The third option is ResourceType.AzureBlob
if you want to access data from Azure Blob Storage. Use the AzureBlobStorage
property to define additional connection settings. The simplest way is to set a value for AzureBlobStorage.ConnectionString
and AzureBlobStorage.ContainerName
. But you can also use the different authentication methods, e.g. by providing an existing BlobClient.
Note
ResourceType File
The simplest approach is to write your data into a file, either on your local storage or on a network share.
ResourceType Http
If you want to write data into a web service instead of a file, your code for a JsonDestination
could look like this:
The other way round, if you want to read data for instance from a rest endpoint, you would define your source like this: This example will also override the default implementation of the http client to also accept encoded data streams:
MIME multipart responses
If you receive data from as a MIME multipart response, you can use the UseMulitpartContent
function to specify the content type that you would to process in your source.
E.g. if you receive a response like this:
You could specify to only use the part with the content type text/csv
:
ResourceType AzureBlob
If your csv data is stored in an Azure Blob storage, you can retrieve it like this:
The other way you can write data into Azure Blob storage:
Shared properties
Paging
A common scenario is that you need to create multiple calls to a http endpoint in order to receive all data. E.g. your first request to an endpoint will be via a GET request to http://test.com/data?page=1
. To receive all data, you will need to increase the page number and repeat the request until you don’t get any data in the response.
This can be accomplished in ETLBox with using the GetNextUri
and HasNextUri
properties.
The GetNextUri
/ HasNextUri
pattern is also available on destinations. HasNextUri
will determine if there will be different destination name used for the current data row - if yes, the GetNextUri
is called to determine the new uri. Both functions give you access to the currently processed row.
Skipping rows at beginning
Each streaming source can skip some rows at the beginning of the stream. These rows will be simple ignored and also not counted as Progress. Set the property SkipRows
to a value greater than 0.
Encoding
For sources and destinations that deal with text (so every streaming source except excel) you can define the used Encoding. The default Encoding is UTF-8 - if you need a different encoding, please overwrite the Encoding
property.
Pass existing Stream
By default, all streaming source and destination will create a StreamReader
or StreamWriter
based on the existing configuration of the component. E.g. if you pass a filename to a CsvSource
, the component will create a StreamReader that reads from a newly created FileStream on the file.
If you already have an existing stream and you want to pass this to the Source or Destination, you can use the CreateStreamReader
(for sources) or CreateStreamWriter
(for destinations) function. This function allows you to inject the creation of the StreamReader/StreamWriter in the component.
Modify or enrich rows directly after reading
Each streaming source offers the RowModificationAction
. This Action (Action<TOutput, StreamMetaData>
) allows to modify a record directly after it is read from the source and before it is send to the next component. While processing of data in other components can happen asynchronously, the execution of this action is synchronous for the source and always occurs after each record was read and before a new record is read. (Execution of reading a row and invoking this action will take place in the same thread).
One usage example would be to get the URI used for the current record (when combined with GetNextUri
/HasNextUri
).