Showing posts with label Bulk Copy. Show all posts
Showing posts with label Bulk Copy. Show all posts

Wednesday, June 22, 2011

SQL Bulk Copy

In an earlier post I talked about a generic SSIS package for staging data using SQL Bulk Copy. I have been using it extensively without fail . . . until just recently.

I got a new set of files from a new business unit that we were bringing into our data warehouse solution. Of their 20 files 14 loaded without issue and the remaining 6 failed with an error similar to the one below.


System.Reflection.TargetInvocationException: Exception has been thrown by the target of an invocation. ---> System.InvalidOperationException: The given value of type String from the data source cannot be converted to type int of the specified target column.


I kept going back to the developers that created the extract process and telling them that they were doing something wrong with their encoding or delimiters but neither us nor them could actually find the issue. Until now.

Turns out the issue is related to SQL Bulk Copy, numeric columns, and null values. Apparently SQL Bulk Copy does not, by default, keep nulls. Instead, it converts them to empty strings. Well, as I'm sure you know, SQL Server doesn't like it when you try to load empty strings, or any strings for that matter, into a numerically typed column.

There are a couple of solutions.

  1. Make sure to replace all nulls with zeros when created flat files with numeric data that will be consumed by BCP.
  2. Use the KeepNulls option when executing BCP. In my SSIS solution I changed my SqlBulkCopy object creation to look like this . . .
Dim bc As SqlBulkCopy = New SqlBulkCopy(dbConn, SqlBulkCopyOptions.KeepNulls, bulkCopyTransaction)

Monday, March 28, 2011

A One-Package, Generic SSIS Staging Process


I recently worked on a project that was flat file and staging intensive. By intensive I mean we had essentially 1.5 ETL developers and over 50 flat files to stage. We also had a very aggressive timeline. Two data marts with source data coming from 5 totally disparate and disconnected (hence the flat files) subsidiaries/source systems all over the world (think currency conversion, translation, master data management, etc.) and only three and a half months to get it done.

Was I going to create a package to stage each flat file? Ummm . . . no! So what was I to do? I was going to create a single, metadata driven package that would loop through a data set and populate my staging tables one by one. Below is a screenshot of all the components necessary to make this work. There is no Data Flow.


Before going into the details there are a few things to keep in mind.

  1. This solution is based on text files. It can easily be tailored to read from relational sources and also non-relational sources with a little more tweaking.
  2. What makes this design possible is a Script Task that alleviates the need for custom data flows for each extract file/table to staged.
  3. The solution assumes that the staging tables can hold more than one day's/load's worth of data and therefore adds an ExtractFileID column to each staging table and to the source data as it is being loaded.
  4. For the script task to work as-is it is necessary to create a staging table for each data source that will be staged. Each staging table must meet the following criteria:
    • The staging table must contain column names that match the data source column names exactly.
    • The staging table columns must have data types that are appropriate for the incoming data. If they are character data types they must be large enough that they will not truncate any incoming data.  
The metadata that drives the solution might be specific to your project so I won't go into it in this post. What you'll need to do is create the metadata tables necessary to create a data set that can by used by a Foreach Loop container that will loop through each file or table that needs to be staged.

However, you can test the Script Task by hard-coding a single file's metadata within the Script Task. That is exactly what I did to create the script. Once the script was working as expected I created the necessary metadata tables and data and passed the values into the script to make it more dynamic. Below is the method that does the bulk of the work (no pun intended). The entire script can be downloaded here.



Private Sub BulkCopyDelimitedFile(ByVal batchSize As Integer, _
            ByVal columnDelimiter As String, _
            ByVal extractFileID As Int32, _
            ByVal extractFileFullPath As String, _
            ByVal stageTableName As String)

        Dim SqlConnectionManager As Microsoft.SqlServer.Dts.Runtime.ConnectionManager

        'Set a connection manager object equal to the connection manager named "Stage"
        SqlConnectionManager = Dts.Connections("Stage")

        'Since the "Stage" connection manager is of type OLEDB we need to modify it to make it compatible with an ADO.NET connection
        Dim dbConn As SqlConnection = New SqlConnection(SqlConnectionManager.ConnectionString.Replace("Provider=SQLNCLI10.1;", ""))

        'Create a new StreamReader object and pass in the path of the file to be read in
        Dim sr As StreamReader = New StreamReader(extractFileFullPath)

        'Read in the first line of the file
        Dim line As String = sr.ReadLine()

        'Create an array of strings and fill it with each column within the extract file by using the Split function
        Dim strArray As String() = line.Split(columnDelimiter)

        'Create a DataTable object
        Dim dt As DataTable = New DataTable()

        'Create a DataRow object
        Dim row As DataRow

        'Open the database connection
        dbConn.Open()

        'Create a SQLTransaction object in which to execute the Bulk Copy process so that if it fails, all writes are rolled back
        Dim bulkCopyTransaction As SqlTransaction = dbConn.BeginTransaction()

        'Instantiate our SqlBulkCopy object and set appropriate properties
        Dim bc As SqlBulkCopy = New SqlBulkCopy(dbConn, SqlBulkCopyOptions.Default, bulkCopyTransaction)

        'Set the BulkCopy destination table name equal to the staging table name provided by the Foreach Loop Container
        bc.DestinationTableName = stageTableName

        'Set the BulkCopy batch size equal to the size contained in the metadata provided by the Foreach Loop Container
        bc.BatchSize = batchSize

        'For each column that was found in the extract file
        For Each columnName As String In strArray
            'Add a column in the data table
            dt.Columns.Add(New DataColumn(columnName))

            'Add a column mapping in the SqlBulkCopy object
            bc.ColumnMappings.Add(columnName, columnName)
        Next

        'Add the ExtractFileID column to the data table since it doesn't exist in the extract file and wasn't added in the previous For Each loop
        dt.Columns.Add(New DataColumn("ExtractFileID", System.Type.GetType("System.String")))

        'Add the ExtractFileID column mapping since it doesn't exist in the extract file and wasn't added in the previous For Each loop
        bc.ColumnMappings.Add("ExtractFileID", "ExtractFileID")

        'Move the the first row in the extract file after the header
        line = sr.ReadLine()

        Dim rowCount As Integer

        'Loop through all rows in the extract file
        While Not line = String.Empty
            rowCount += 1

            'Create a new data table row to store the extract file's data in
            row = dt.NewRow()

            'Add all column values to the row
            row.ItemArray = line.Split(columnDelimiter)

            'Add the ExtractFileId 
            row("ExtractFileID") = extractFileID

            'Add the newly created row to the data table
            dt.Rows.Add(row)

            'Move to the next row in the extract file
            line = sr.ReadLine()
        End While
        

        Try
            'Write the data table to the staging table
            bc.WriteToServer(dt)

            'If successful, commit the transaction
            bulkCopyTransaction.Commit()
        Catch ex As Exception
            'If an exception occurs, rollback the transaction
            bulkCopyTransaction.Rollback()

            'Set the Task result to Failure
            Dts.TaskResult = ScriptResults.Failure


            Throw ex
        End Try

        'Close the database connection
        dbConn.Close()

        'Close the BulkCopy object
        bc.Close()

        'Set the SSIS metadata variable equal to the number of rows loaded
        Dts.Variables("ExtractActualRowCount").Value = rowCount
    End Sub


Download script