Incrementally Load Data From SAP ECC Using Azure ADF

Incrementally Load Data From SAP ECC Using Azure ADF

In the previous blog post, we discussed the initial load into the SQL Db from SAP ECC using Azure ADF.
Following that, we will have to load data from the source system Incrementally, or whenever there is a change we have to capture it, in real terms, we have to do Change Data Capture which at this time we cant able to implement using Azure ADF on SAP ECC at least.
When developing ETL pipelines for transactional datasets especially, I tend to think simple is best without much complexities and integrations.

Logic

he pipelines that we have now is fine for an initial download. But for future downloads, we would only like to synchronize the modified products. For this, we can use the lastModified field for the Products entity set. We’ll use this field as a filter in the OData query. The OData URL would then look like this:

http://sapprd:8000/sap/opu/odata/sap/EPM_REF_APPS_PROD_MAN_SRV/Products?$filter=LastModified%20gt%20datetime%272020-01-01T00:00:00%27. 

To test the URL you can change a product via the Fiori App.

Fiori App Output

Result of the OData call :

OData Response

To keep track of the different delta, we need to keep track of the last date when synchronization was done. We’ll save this date on a separate table. The process flow is then as follows:

  1. Retrieve the date of the last delta extraction
  2. Use this date in the filter to extract the latest changes
  3. Update the date of the last delta extraction

For more info on Delta, patterns see Incrementally load data from a source data store to a destination data store.

Setup

Watermark Table

Create a new table to contain the last synchronization dates.

CREATE TABLE [dbo].[watermarktable](
	[TableName] [varchar](255) NULL,
	[WatermarkValue] [datetime] NULL
) ON [PRIMARY]

Note that I included a field TableName, this allows me to keep track of the delta sync per table (or per object if you want).

Initialize the watermark table

INSERT INTO watermarktable values('NPLProducts', '2017-01-01T00:00:00.000');

Update Watermark procedure

We’ll also need a procedure to update the watermark table

CREATE PROCEDURE [dbo].[sp_write_watermark] @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

    UPDATE watermarktable
    SET [WatermarkValue] = @LastModifiedtime 
WHERE [TableName] = @TableName

END

Product Type Definition

CREATE TYPE NPLProductsType As TABLE(
	id nvarchar(25) not null,
	currencycode nvarchar(3),
	stockquantity int,
	name nvarchar(50),
	description text,
	subcategoryid nvarchar(50),
	subcategoryname nvarchar(50),
	maincategoryid nvarchar(50),
	maincategoryname nvarchar(50),
	supplierid nvarchar(50),
	suppliername nvarchar(50),
	lastmodified date,
	price decimal,
	quantityunit nvarchar(10),
	measureunit nvarchar(10)
);

Insert or Update Product procedure

And a procedure to insert new products or update existing products.

DROP PROCEDURE spOverwriteProducts

CREATE PROCEDURE spOverwriteProducts @Products [dbo].[NPLProductsType] READONLY
AS
BEGIN
  MERGE [dbo].[NPLProducts] AS target
  USING @Products AS source
  ON (target.id = source.id)
  WHEN MATCHED THEN
    UPDATE SET id = source.id,
				 currencycode  = source.currencycode,
				 stockquantity = source.stockquantity,
				 name = source.name,
				 description = source.description,
				 subcategoryid = source.subcategoryid,
				 subcategoryname = source.subcategoryname,
				 maincategoryid = source.maincategoryid,
				 maincategoryname = source.maincategoryname,
				 supplierid = source.supplierid,
				 suppliername = source.suppliername,
				 lastmodified = source.lastmodified,
				 price = source.price,
				 quantityunit = source.quantityunit,
				 measureunit = source.measureunit
  WHEN NOT MATCHED THEN
    INSERT (	
				id,
				currencycode,
				stockquantity,
				name,
				description,
				subcategoryid,
				subcategoryname,
				maincategoryid,
				maincategoryname,
				supplierid,
				suppliername,
				lastmodified,
				price,
				quantityunit,
				measureunit
			)
		VALUES (
				source.id,
				source.currencycode,
				source.stockquantity,
				source.name,
				source.description,
				source.subcategoryid,
				source.subcategoryname,
				source.maincategoryid,
				source.maincategoryname,
				source.supplierid,
				source.suppliername,
				source.lastmodified,
				source.price,
				source.quantityunit,
				source.measureunit
			);
END

A script to test the stored procedure:

/*Test the stored procedure*/
Declare @ProductsList NPLProductsType
Insert @ProductsList ( id, currencycode, stockquantity, name, description, subcategoryid, subcategoryname, maincategoryid, maincategoryname, supplierid, suppliername, lastmodified, price, quantityunit, measureunit)
Values ('1- HT-Test', 'USD', 149, 'Test1 - Notebook Basic 15', 'Test Description 1', 'NoteBooks', 'Notebooks', 'ComputerSystems', 'ComputerSystems', '100000000', 'SAP', '2018-10-15T19:16:37.1892050', 956, 'EA', 'each' ),
       ('2- HT-Test', 'USD', 155, 'Test2 - Notebook Basic 15', 'test Description 2', 'NoteBooks', 'Notebooks', 'ComputerSystems', 'ComputerSystems', '100000000', 'SAP', '2018-10-15T19:16:37.1892050', 956, 'EA', 'each' );
	   
exec spOverWriteProducts @ProductsList;
select * from NPLProducts;

Update Pipeline

Now we can incorporate these elements into the pipeline.

We’ll need some additional actions to retrieve and update the watermark.

Updated Pipeline

The first step is a lookup to retrieve the last delta data. we also need to create a DataSet for the watermark table. Since I’m using the same Azure SQL Database I can reuse my connection to this database.

Watermark_SQL Dataset

In the copy step, we need to update the source to include the date filter. In the sink, we need to execute a stored procedure to update the products. In the Query parameter of the source add:

$filter=LastModified%20gt%20datetime%[email protected]{activity('LookupWaterMark').output.firstRow.WaterMarkValue}%27
Copy Activity Source
Copy Activity Sink

In the last action we need to update the watermark table.

Update Watermark Table

Testing

You can now test the pipeline. On the first run, you will do an initial download once more. (Depending on how you initialized the watermark table). Future runs should only download deltas.

Pipeline Run output

We can test in SQL Db as well

SELECT [ID], [description] FROM [dbo].[NPLProducts] WHERE ID='HT-1022';
SQL Db output showing Delta loaded correctly

Finally, we achieved the required result, though we can use this logic in production systems, it is highly recommended to optimize the logic to better suit the business use case.
Thanks for reading.

0

1 Comment

Leave a Comment

Your email address will not be published. Required fields are marked *