Hurtownia Danych

Dynamiczne generowanie pakietów SSIS

Obecnie gromadzimy, przechowujemy i przetwarzamy terabajty danych w różnych systemach źródłowych. W końcu przychodzi moment, gdy trzeba zmierzyć się z problemem ich integracji. Aleksandra Strusiewicz pokazuje, jak sobie z nim poradzić wykorzystując proces ETL i pakiet SSIS. 

W dzisiejszym świecie, gdzie na każdym kroku dane są zbierane, przechowywane i przetwarzane, prędzej czy później trzeba zmierzyć się z zagadnieniem ich integracji – posiadania w jednym miejscu danych z różnych systemów źródłowych, aby zapewnić jednolity widok danych. Tak powstała koncepcja procesu ETL. Jednym z narzędzi umożliwiających implementację tego procesu jest Microsoft SQL Server Integration Services (SSIS), gdzie użytkownik tworzy pakiety wraz z odpowiednimi komponentami i przepływami między nimi, korzystając z intuicyjnego interfejsu. W niniejszym artykule pokażę w jaki sposób ręcznie utworzyć taki prosty pakiet oraz jak można zautomatyzować ten proces, gdy chcemy przeładować dane z wielu tabel źródłowych.

Na początek spróbujmy ręcznie utworzyć i skonfigurować prosty pakiet. Zanim jednak przystąpimy do pracy, niezbędne jest przygotowanie środowiska deweloperskiego:

  1. Instalacja i uruchomienie programu SSDT-BI (SQL Server Data Tools for BI) w wybranej wersji (poniższy przykład przygotowano dla wersji 2015).
  2. Utworzenie projektu o typie „Integration Services Project” wraz z jednym, pustym pakietem o domyślnej nazwie „Package.dtsx”.

Należy ustawić odpowiednią wartość parametru „TargetServerVersion” we właściwościach projektu (w tym wypadku wybrano opcję „SQL Server 2016) oraz wyłączyć 64-bitowy tryb uruchamiania.

pakiet SSIS

konfiguracja pakietu SSIS

Po wykonaniu wymienionych czynności możemy przejść dalej i wykonać następujące kroki:

  1. Krok 1: Utworzyć połączenia do bazy źródłowej i docelowej
  2. Krok 2: Utworzyć zadanie ładowania danych
  3. Krok 3: Uruchomić przygotowany pakiet

A więc do dzieła.

KROK 1: Utworzyć połączenia do bazy źródłowej i docelowej

Najpierw musimy utworzyć i skonfigurować połączenie do źródłowej bazy danych, żeby wiedzieć skąd ładujemy dane – można to zrobić na poziomie pakietu lub całego projektu. W tym wypadku skorzystajmy z tej pierwszej możliwości i dodajmy nowe połączenie typu „OLE DB”, ponieważ naszym źródłem będzie baza danych Microsoft SQL Server:

database connection

konfiguracja bazy danych

connection manager

Po utworzeniu połączenia do bazy źródłowej warto jest zmienić domyślnie wygenerowaną nazwę – na przykład na „SOURCE”.

Skoro wiemy już skąd ładujemy dane, ustalmy dokąd mają zostać załadowane. W tym celu wystarczy wykonać te same czynności co dla połączenia do źródła (z racji tego, że naszą bazą docelową jest również baza Microsoft SQL Server), w definicji połączenia wskazując bazę docelową i zmieniając domyślnie wygenerowaną nazwę połączenia np. na „DESTINATION”.

KROK 2: Utworzenie zadania przeładowania danych

Po zdefiniowaniu połączeń możemy już określić, która tabela ma zostać przeładowana z bazy źródłowej do docelowej. Posłużymy się do tego komponentem „Data Flow Task”, które w prosty sposób umożliwia przeładowanie danych z jednej tabeli do drugiej:

zadanie przeładowania danych

Wystarczy tylko dodać i skonfigurować komponenty „OLE DB Source” oraz „OLE DB Destination”:

OLE DB source

OLE DB destination

OLE DB destination mappings

KROK 3: Uruchomić przygotowany pakiet

Tak skonfigurowany pakiet wystarczy uruchomić i sprawdzić czy dane zostały załadowane do bazy docelowej.

data flow

SQL query

I gotowe!

Jak widać, powyższe czynności nie są skomplikowane, ale konieczne jest wykonanie ich dla każdej przeładowywanej tabeli źródłowej. Dlatego, gdy mamy wiele obiektów, z których chcemy pobrać dane, warto poświęcić czas na przygotowanie mechanizmu dynamicznego generowania pakietów SSIS, co obejmuje:

  1. Przygotowanie szablonu pakietu, na podstawie którego będą dynamicznie generowane pakiety – parametryzacja statycznego pakietu, który utworzyliśmy ręcznie,
  2. Utworzenie nowego pakietu-generatora, który:
    • Pobierze listę tabel do przeładowania z tabeli źródłowej do docelowej,
    • Dla każdej tabeli wykona skrypt, który nadpisze nazwę tabeli w konfiguracji szablonu, dynamicznie odświeży mapowanie kolumn między komponentem źródłowym i docelowym, a następnie wykona wygenerowany pakiet.

ETAP 1: Przygotowanie szablonu pakietu

Najpierw zmodyfikujmy utworzony przez nas pakiet „LoadData.dtsx”, wykonując następujące czynności:

  1. Dodajmy zmienną pakietową o nazwie „TableName”, którą zainicjalizujemy wartością z nazwą tabeli źródłowej:

dodanie zmiennej pakietowej

  1. Skonfigurujmy komponenty „OLE DB Source” i „OLE DB Destination”, by korzystały z wartości podanej w zmiennej:

OLE DB source

OLE DB destination

ETAP 2: Utworzenie pakietu-generatora

Mając tak przygotowany pakiet, który użyjemy jako szablon, możemy utworzyć pakiet do dynamicznego generowania pakietów. Należy umieścić w nim następujące elementy:

  1. Komponent typu „Execute SQL Task”, który pobierze nam listę tabel do przeładowania z bazy źródłowej do docelowej (w tym miejscu przyjmijmy założenie, że tabele istnieją w docelowej bazie danych):

Execute SQL Task

Execute SQL Task Editor

  1. Komponent typu „Foreach Loop Container” – pętlę do przetworzenia wszystkich elementów:

Foreach Loop Container

Foreach Loop Container

  1. Komponent typu „Script Task”, który wykona proces generowania i uruchamiania pakietu:

Script Task

Script Task


#region Namespaces
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Runtime;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using System.Windows.Forms;
#endregion
public void Main()
{
//Variables declaration
Microsoft.SqlServer.Dts.Runtime.Application generator;
Microsoft.SqlServer.Dts.Runtime.Package package;
Microsoft.SqlServer.Dts.Pipeline.Wrapper.MainPipe dft;
Microsoft.SqlServer.Dts.Runtime.TaskHost th;
Microsoft.SqlServer.Dts.Pipeline.Wrapper.IDTSComponentMetaData100 sourceComponent;
Microsoft.SqlServer.Dts.Pipeline.Wrapper.CManagedComponentWrapper sourceComponentInstance;
Microsoft.SqlServer.Dts.Pipeline.Wrapper.IDTSComponentMetaData100 destinationComponent;
Microsoft.SqlServer.Dts.Pipeline.Wrapper.CManagedComponentWrapper destinationComponentInstance;
//Variables initialization
generator = new Microsoft.SqlServer.Dts.Runtime.Application();
package = generator.LoadPackage("D:\\ETL\\LoadData.dtsx", null);
//Package parameter set
package.Parameters["TableName"].Value = Dts.Variables["User::TableName"].Value.ToString();
//Data Flow Task configuration
Executable e = package.Executables[0];
th = e as Microsoft.SqlServer.Dts.Runtime.TaskHost;
dft = th.InnerObject as Microsoft.SqlServer.Dts.Pipeline.Wrapper.MainPipe;
//Source Component configuration
sourceComponent = dft.ComponentMetaDataCollection["OLE DB Source"];
sourceComponentInstance = sourceComponent.Instantiate();
if (sourceComponent.RuntimeConnectionCollection.Count > 0)
{
sourceComponent.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(package.Connections["SOURCE"]);
sourceComponent.RuntimeConnectionCollection[0].ConnectionManagerID = 
package.Connections["SOURCE"].ID;
}
sourceComponentInstance.AcquireConnections(null);
sourceComponentInstance.ReinitializeMetaData();
sourceComponentInstance.ReleaseConnections();
//Destination Component configuration
destinationComponent = dft.ComponentMetaDataCollection["OLE DB Destination"];
destinationComponentInstance = destinationComponent.Instantiate();
if (destinationComponent.RuntimeConnectionCollection.Count > 0)
{
destinationComponent.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(package.Connections["DESTINATION"]);
destinationComponent.RuntimeConnectionCollection[0].ConnectionManagerID = package.Connections["DESTINATION"].ID;
}
destinationComponentInstance.AcquireConnections(null);
destinationComponentInstance.ReinitializeMetaData();
destinationComponentInstance.ReleaseConnections();
//Columns mapping
IDTSOutputColumnCollection100 SourceColumns = sourceComponent.OutputCollection[0].OutputColumnCollection;
IDTSInput100 DestinationInputCollection = destinationComponent.InputCollection[0];
IDTSVirtualInput100 DestinationVirtualInput = DestinationInputCollection.GetVirtualInput();
foreach (IDTSVirtualInputColumn100 column in DestinationVirtualInput.VirtualInputColumnCollection)
{
destinationComponentInstance.SetUsageType(
destinationComponent.InputCollection[0].ID, 
DestinationVirtualInput, 
column.LineageID, 
DTSUsageType.UT_READONLY
);
}
for (int i = 0; i < DestinationInputCollection.InputColumnCollection.Count; i++)
{
string dstColumnName = DestinationInputCollection.InputColumnCollection[i].Name;
if (
!DestinationInputCollection.ID.Equals(null) 
&& !DestinationInputCollection.InputColumnCollection[dstColumnName].ID.Equals(null) 
&& !DestinationInputCollection.ExternalMetadataColumnCollection[dstColumnName].ID.Equals(null)
)
{
destinationComponentInstance.MapInputColumn(
DestinationInputCollection.ID,
DestinationInputCollection.InputColumnCollection[dstColumnName].ID,                                                DestinationInputCollection.ExternalMetadataColumnCollection[dstColumnName].ID
);
}
}
//Package execution
package.Execute();
package.Dispose();
Dts.TaskResult = (int)ScriptResults.Success;
}

Tak przygotowany pakiet wystarczy uruchomić i sprawdzić, czy dane zostały załadowane do bazy docelowej:

uruchomienie pakietu

U mnie działa.

Oczywiście, pokazany scenariusz dynamicznego generowania pakietów jest bardzo prosty. Pobieraliśmy dane tylko z jednego źródła do jednej bazy docelowej oraz przyjęliśmy założenie, że tabele już istnieją w bazie docelowej. A teraz wyobraźmy sobie, że przy pomocy takiego generatora możemy w ramach jednego procesu:

  • obsługiwać obiekty z różnych baz źródłowych, mało tego – nawet z różnych platform bazodanowych i plików płaskich,
  • ładować dane wielowątkowo, aby zoptymalizować czas pobierania danych ze źródeł,
  • decydować o trybie ładowania poszczególnych obiektów – czy za każdym razem mają być pobrane wszystkie dane (ładowanie pełne) czy tylko ich podzbiór (ładowanie przyrostowe),
  • automatycznie generować tabele w bazie docelowej zgodne ze strukturą tabeli źródłowej oraz skompresowane.

A wszystko to w prosty sposób konfigurowalne przez użytkownika z możliwością śledzenia poszczególnych etapów wykonania. Brzmi ciekawie? Wszystko to, co opisano powyżej, zostało zaimplementowane w narzędziu APN_ETL. Skontaktuj się z nami i dowiedz się więcej.

Biogram autora zostanie zaktualizowany niebawem.
Zobacz profil
×
Biogram autora zostanie zaktualizowany niebawem.
Zobacz profil
Latest Posts
  • zapis RSS do bazy
  • Dynamiczne generowanie pakietów SSIS