In almost all data warehousing scenarios, a common use case is to extract, move, or duplicate data from source RDBMS and to store them in some other target database or storage system for future analysis. When it comes to loading this data from source RDBMS to target, you have two methods.
In this blog, you will see how to perform an incremental fetch operation using the “QueryDataBaseTable” processor in the Syncfusion Data Integration Platform. That is, you will learn how to fetch a newly added or modified record from a source table or data store and move it to a target table or data store.
Install Syncfusion Data Integration Platform in your system.
Download and extract the pre-defined template from this link and follow the steps below to upload the template (XML file) into the Data Integration Platform.
1. From the templates tab, click on the Upload button.
Figure 1: Upload template.
Figure 2: View template in toolbar.
Figure 3: Overview.
You must have Microsoft SQL Server and the following table.
Source Table: InvestmentDetails – sample data.
Name | Id | InvestedAmount | LastModified |
Carla Adams | 15 | $55 | 3/8/2018 10:00 |
James Aguilar | 44 | $70 | 2/25/2018 13:35 |
Lili Alameda | 16 | $18 | 2/23/2018 15:00 |
Milton Albury | 6 | $700 | 2/19/2018 12:28 |
Paul Alcorn | 5 | $250 | 2/2/2018 9:05 |
Amy Alberts | 74 | $780 | 1/17/2018 11:50 |
Kim Akers | 93 | $620 | 1/08/2018 14:42 |
Figure 4: Source table
Now, you are going to incrementally fetch and move the data to the target table named UpdatedInvestments.
Processor | Comments |
QueryDatabaseTable | Checks for updated or newly added values and returns the data from the source table in AVRO format. |
ConvertAvroToJSON | Converts the incoming AVRO data into JSON data. |
ConvertJSONToSQL | Converts the incoming JSON data into SQL statements (insert/update query). |
PutSQL | Executes the SQL statements. |
Figure 5: Open configuration.
2. Go to the controller services tab.
Figure 6: Choose controller service
3. Enter the details for the following highlighted properties and enable the controller service.
To learn more about controller services, refer here.
You must use this processor to fetch the updated values from the given table based on the last modified date. Select the processor and double-click to open a configuration dialog. Specify the below properties in the configuration.
Table Name: InvestmentDetails
Columns to Return: Leave empty. If no columns are specified, all the columns will be returned from the source table.
Maximum-value-columns: LastModified
Figure 9: Configure source table details.
This processor will store the latest date value as the state value and filter the records based on the date stored. To view the state value, right-click on the processor and select view > state.
The component state dialog will appear and show the last modified date with column name and table name as keys. To clear the state value, select the “Clear state” option.
Figure 10: View component state.
On running, this processor returns the output in AVRO format.
Figure 11: View AVRO data.
Next, you have to use ConvertAvroToJSON processor to convert the incoming AVRO data into JSON format.
Under properties, set the JSON container options to “array.” AVRO data will be converted to an array of JSON objects.
Figure 12: Convert AVRO data into JSON data.
You can see the JSON output in the following screenshot.
Figure 13: View JSON data
You must convert the JSON input into relevant SQL Insert statements. For this operation, you need to use the ConvertJSONToSQL processor.
Under properties, set the statement type as either “INSERT” or “UPDATE,” and enter the target table name as “UpdatedInvestments” in the Table Name property.
Figure 14: Configure target table details.
Under the output queue, you can see the SQL statements as in the following screenshot.
Figure 15: View generated SQL query.
This processor is used to execute the incoming SQL statement. Just point out the relevant connection pool.
Figure 16: Choose SQL Server.
Now your data flow is ready.
You must schedule the overall data flow to run periodically and move the data from source to destination.
Open the Configuration dialog for the “UpdateDatabaseTable” processor, move to the scheduling tab, choose the “Timer driven” option, and set it for one minute. This will ensure that the data flow runs every minute, checking for new and updated records and moving them from the source to the destination.
Figure 17: Schedule data flow.
Please refer to the link below to learn more about scheduling:
https://help.syncfusion.com/data-integration/user-guide#scheduling-tab
Now, start all processors by clicking the “Start” button under the operate menu.
Figure 18: Execute the data flow.
If any values are inserted or updated (along with the LastModified date field) in the source table, they will be automatically moved to the target table as per the dataflow schedule.
You can try directly editing the source table along with the last modified column in the SQL server to see if the records are updated correctly.
Target table: UpdatedInvestments
Figure 19: Target table.
Hopefully this blog has shown you the steps involved in fetching data incrementally from source SQL tables and moving it to a target SQL table using the QueryDataBaseTable processor in the Syncfusion Data Integration Platform. This data flow will be very helpful when you want to perform data migration, data synchronization, and data transformation between SQL tables.
If you are new to the Data Integration Platform, it is highly recommended that you follow our Getting Started guide.
If you’re already a Syncfusion user, you can download the product setup here. If you’re not yet a Syncfusion user, you can download a free, 30-day trial here.
If you have any questions or require clarification about this feature, please let us know in the comments below. You can also contact us through our support forum or Direct-Trac. We are happy to assist you!
If you like this blog post, we think you’ll also like the following free e-books: