I recently worked on a project that was flat file and staging intensive. By intensive I mean we had essentially 1.5 ETL developers and over 50 flat files to stage. We also had a very aggressive timeline. Two data marts with source data coming from 5 totally disparate and disconnected (hence the flat files) subsidiaries/source systems all over the world (think currency conversion, translation, master data management, etc.) and only three and a half months to get it done.
Was I going to create a package to stage each flat file? Ummm . . . no! So what was I to do? I was going to create a single, metadata driven package that would loop through a data set and populate my staging tables one by one. Below is a screenshot of all the components necessary to make this work. There is no Data Flow.
Before going into the details there are a few things to keep in mind.
- This solution is based on text files. It can easily be tailored to read from relational sources and also non-relational sources with a little more tweaking.
- What makes this design possible is a Script Task that alleviates the need for custom data flows for each extract file/table to staged.
- The solution assumes that the staging tables can hold more than one day's/load's worth of data and therefore adds an ExtractFileID column to each staging table and to the source data as it is being loaded.
- For the script task to work as-is it is necessary to create a staging table for each data source that will be staged. Each staging table must meet the following criteria:
- The staging table must contain column names that match the data source column names exactly.
- The staging table columns must have data types that are appropriate for the incoming data. If they are character data types they must be large enough that they will not truncate any incoming data.
The metadata that drives the solution might be specific to your project so I won't go into it in this post. What you'll need to do is create the metadata tables necessary to create a data set that can by used by a Foreach Loop container that will loop through each file or table that needs to be staged.
However, you can test the Script Task by hard-coding a single file's metadata within the Script Task. That is exactly what I did to create the script. Once the script was working as expected I created the necessary metadata tables and data and passed the values into the script to make it more dynamic. Below is the method that does the bulk of the work (no pun intended). The entire script can be downloaded here.
Private Sub BulkCopyDelimitedFile(ByVal batchSize As Integer, _
ByVal columnDelimiter As String, _
ByVal extractFileID As Int32, _
ByVal extractFileFullPath As String, _
ByVal stageTableName As String)
Dim SqlConnectionManager As Microsoft.SqlServer.Dts.Runtime.ConnectionManager
'Set a connection manager object equal to the connection manager named "Stage"
SqlConnectionManager = Dts.Connections("Stage")
'Since the "Stage" connection manager is of type OLEDB we need to modify it to make it compatible with an ADO.NET connection
Dim dbConn As SqlConnection = New SqlConnection(SqlConnectionManager.ConnectionString.Replace("Provider=SQLNCLI10.1;", ""))
'Create a new StreamReader object and pass in the path of the file to be read in
Dim sr As StreamReader = New StreamReader(extractFileFullPath)
'Read in the first line of the file
Dim line As String = sr.ReadLine()
'Create an array of strings and fill it with each column within the extract file by using the Split function
Dim strArray As String() = line.Split(columnDelimiter)
'Create a DataTable object
Dim dt As DataTable = New DataTable()
'Create a DataRow object
Dim row As DataRow
'Open the database connection
dbConn.Open()
'Create a SQLTransaction object in which to execute the Bulk Copy process so that if it fails, all writes are rolled back
Dim bulkCopyTransaction As SqlTransaction = dbConn.BeginTransaction()
'Instantiate our SqlBulkCopy object and set appropriate properties
Dim bc As SqlBulkCopy = New SqlBulkCopy(dbConn, SqlBulkCopyOptions.Default, bulkCopyTransaction)
'Set the BulkCopy destination table name equal to the staging table name provided by the Foreach Loop Container
bc.DestinationTableName = stageTableName
'Set the BulkCopy batch size equal to the size contained in the metadata provided by the Foreach Loop Container
bc.BatchSize = batchSize
'For each column that was found in the extract file
For Each columnName As String In strArray
'Add a column in the data table
dt.Columns.Add(New DataColumn(columnName))
'Add a column mapping in the SqlBulkCopy object
bc.ColumnMappings.Add(columnName, columnName)
Next
'Add the ExtractFileID column to the data table since it doesn't exist in the extract file and wasn't added in the previous For Each loop
dt.Columns.Add(New DataColumn("ExtractFileID", System.Type.GetType("System.String")))
'Add the ExtractFileID column mapping since it doesn't exist in the extract file and wasn't added in the previous For Each loop
bc.ColumnMappings.Add("ExtractFileID", "ExtractFileID")
'Move the the first row in the extract file after the header
line = sr.ReadLine()
Dim rowCount As Integer
'Loop through all rows in the extract file
While Not line = String.Empty
rowCount += 1
rowCount += 1
'Create a new data table row to store the extract file's data in
row = dt.NewRow()
'Add all column values to the row
row.ItemArray = line.Split(columnDelimiter)
'Add the ExtractFileId
row("ExtractFileID") = extractFileID
'Add the newly created row to the data table
dt.Rows.Add(row)
'Move to the next row in the extract file
line = sr.ReadLine()
End While
Try
'Write the data table to the staging table
bc.WriteToServer(dt)
'If successful, commit the transaction
bulkCopyTransaction.Commit()
Catch ex As Exception
'If an exception occurs, rollback the transaction
bulkCopyTransaction.Rollback()
'Set the Task result to Failure
Dts.TaskResult = ScriptResults.Failure
Throw ex
Throw ex
End Try
'Close the database connection
dbConn.Close()
'Close the BulkCopy object
bc.Close()
'Set the SSIS metadata variable equal to the number of rows loaded
Dts.Variables("ExtractActualRowCount").Value = rowCount
I added a few updates.
ReplyDelete1) The script would fail if the text file didn't have any data rows. This is now resolved by a change in the loop structure.
2) The Catch block did not have a Throw in it resulting in unreported errors. A Throw has been added to resolve this.
The changes have been made in the blog text as well as in the downloadable source code file.
Really nice solution, I have literally this exact problem to solve, but on an even larger scale. Thank you.
ReplyDeleteNice, But have you seen: http://www.cozyroc.com/ssis/dynamic-data-flow
ReplyDeleteYes, but it appears to be ~$2400 per server...
DeleteOr $399 per year for two licences. Not a bad price considering the amount of extra functions you can also include, such as parallel processing, lookups, connections to everything... :)
Delete