Copied RSS Feed

Development

Incremental Fetch Using Data Integration Platform

In almost all data warehousing scenarios, a common use case is to extract, move, or duplicate data from source RDBMS and to store them in some other target database or storage system for future analysis. When it comes to loading this data from source RDBMS to target, you have two methods.

  • Full Load – Will move all the rows from source RDBMS to target table even though you have already processed them before.
  • Incremental Fetch – Will fetch only the records which are not processed already and move them into the target system. You are going to use the QueryDatabaseTable processor to bring the data incrementally from the source table.

In this blog, you will see how to perform an incremental fetch operation using the “QueryDataBaseTable” processor in the Syncfusion Data Integration Platform. That is, you will learn how to fetch a newly added or modified record from a source table or data store and move it to a target table or data store.

Set up an environment

Install Syncfusion Data Integration Platform in your system.

Template

Download and extract the pre-defined template from this link and follow the steps below to upload the template (XML file) into the Data Integration Platform.

1. From the templates tab, click on the Upload button.

Figure 1: Upload template.

2. Choose the downloaded template file (IncrementalUpdateusingQueryDataBase.XML) and upload it. Once the upload is successful, you can find the template in the templates list.

Figure 2: View template in toolbar.

3. Now drag and drop the template into the data integration work area. You can see the data flow in the application as follows.

Figure 3: Overview.

You must have Microsoft SQL Server and the following table.

Source Table: InvestmentDetails – sample data.

Name Id InvestedAmount LastModified
Carla Adams 15 $55 3/8/2018 10:00
James Aguilar 44 $70 2/25/2018 13:35
Lili Alameda 16 $18 2/23/2018 15:00
Milton Albury 6 $700 2/19/2018 12:28
Paul Alcorn 5 $250 2/2/2018 9:05
Amy Alberts 74 $780 1/17/2018 11:50
Kim Akers 93 $620 1/08/2018 14:42

 


Figure 4: Source table

Now, you are going to incrementally fetch and move the data to the target table named UpdatedInvestments.

List of processors used in this sample

Processor Comments
QueryDatabaseTable Checks for updated or newly added values and returns the data from the source table in AVRO format.
ConvertAvroToJSON Converts the incoming AVRO data into JSON data.
ConvertJSONToSQL Converts the incoming JSON data into SQL statements (insert/update query).
PutSQL Executes the SQL statements.

Configure and enable SQL controller service

1. Select the processor group and click on the configuration icon from the Operate panel.

Figure 5: Open configuration.

2. Go to the controller services tab.

Figure 6: Choose controller service

3. Enter the details for the following highlighted properties and enable the controller service.

Figure 7: Edit SQL Server configuration.

Figure 8: Enable the controller service.

To learn more about controller services, refer here.

QueryDatabaseTable Processor

You must use this processor to fetch the updated values from the given table based on the last modified date. Select the processor and double-click to open a configuration dialog. Specify the below properties in the configuration.

Table Name: InvestmentDetails

Columns to Return: Leave empty. If no columns are specified, all the columns will be returned from the source table.

Maximum-value-columns:  LastModified

Figure 9: Configure source table details.

This processor will store the latest date value as the state value and filter the records based on the date stored. To view the state value, right-click on the processor and select view > state.

The component state dialog will appear and show the last modified date with column name and table name as keys. To clear the state value, select the “Clear state” option.

Figure 10: View component state.

On running, this processor returns the output in AVRO format.

Figure 11: View AVRO data.

ConvertAvroToJSON processor

Next, you have to use ConvertAvroToJSON processor to convert the incoming AVRO data into JSON format.

Under properties, set the JSON container options to “array.” AVRO data will be converted to an array of JSON objects.


Figure 12: Convert AVRO data into JSON data.

You can see the JSON output in the following screenshot.

Figure 13: View JSON data

ConvertJSONToSQL processor

You must convert the JSON input into relevant SQL Insert statements. For this operation, you need to use the ConvertJSONToSQL processor.

Under properties, set the statement type as either “INSERT” or “UPDATE,” and enter the target table name as “UpdatedInvestments” in the Table Name property.


Figure 14: Configure target table details.

Under the output queue, you can see the SQL statements as in the following screenshot.


Figure 15: View generated SQL query.

PutSQL processor

This processor is used to execute the incoming SQL statement. Just point out the relevant connection pool.


Figure 16: Choose SQL Server.

Now your data flow is ready.

Processor scheduling

You must schedule the overall data flow to run periodically and move the data from source to destination.

Open the Configuration dialog for the “UpdateDatabaseTable” processor, move to the scheduling tab, choose the “Timer driven” option, and set it for one minute. This will ensure that the data flow runs every minute, checking for new and updated records and moving them from the source to the destination.

Figure 17: Schedule data flow.

Please refer to the link below to learn more about scheduling:

https://help.syncfusion.com/data-integration/user-guide#scheduling-tab

Now, start all processors by clicking the “Start” button under the operate menu.

Figure 18: Execute the data flow.

If any values are inserted or updated (along with the LastModified date field) in the source table, they will be automatically moved to the target table as per the dataflow schedule.

You can try directly editing the source table along with the last modified column in the SQL server to see if the records are updated correctly.

Target table: UpdatedInvestments


Figure 19: Target table.

Conclusion

Hopefully this blog has shown you the steps involved in fetching data incrementally from source SQL tables and moving it to a target SQL table using the QueryDataBaseTable processor in the Syncfusion Data Integration Platform. This data flow will be very helpful when you want to perform data migration, data synchronization, and data transformation between SQL tables.

If you are new to the Data Integration Platform, it is highly recommended that you follow our Getting Started guide.

If you’re already a Syncfusion user, you can download the product setup here. If you’re not yet a Syncfusion user, you can download a free, 30-day trial here.

If you have any questions or require clarification about this feature, please let us know in the comments below. You can also contact us through our support forum or Direct-Trac. We are happy to assist you!

If you like this blog post, we think you’ll also like the following free e-books:

 

Meet the Author

Suresh Ram Chandrasekaran