Databricks AutoLoader with Spark Structured Streaming using Delta

We were using spark structured streaming to read and write stream data. For some reason, we were facing issues of missing source records into the target.

Let me directly jump to the scenario and explain how AutoLoader helped me overcome it.

Source data: Data from TOS arrives at Oracle DB and from there Oracle golden gate picks the data and writes on ADLS RAW location. Data is segregated by each terminal.

Catch: Once OGG(Oracle golden gate) starts receiving the data it starts writing data to ADLS in Avro format, however, there are two cases considering example for table INV_UNIT

Case1: Data is continuously arriving from TOS for more than 30 mins for table INV_UNIT

TOS sends data to oracle DB and from there, OGG keeps on sending data to ADLS and write is in-stream and keeps on writing for 30 mins to the same file and once 30 min window is closed, OGG creates a new Avro file and starts dumping data into that file.

Case2: Data is arriving from TOS for table INV_UNIT intermittently

TOS sends data to oracle and from there, OGG keeps on sending data to ADLS, the data is not arriving continuously but let’s say in the interval of 10 mins table INV_UNIT is getting updated on oracle and same is pushed through OGG to ADLS, now OGG will still keep the window open for 30 mins and OGG will write data to the same file for 30 mins in 10 mins interval batch. Once 30 mins are over, OGG will create a new file in ADLS

The current solution implemented: We have implemented Spark structured streaming, using read stream we read data and do checkpoint to process only incremental file data and write the incremental data into delta tables on cleansed layer by using merge operation to update present records and insert new records.

Code:

from pyspark.sql.window import Window
from delta.tables import *
from pyspark.sql import functions as F
from pyspark.sql.functions import *
ppath="/data/latest/INV_UNIT/"
schema=spark.read.format("avro").option("header", "true").load(ppath).schema
df1=spark.readStream.schema(schema).format("avro").option("header", "true").load(ppath)df1.writeStream.trigger(once=True) \
.option("checkpointLocation","/staging/checkpoint/INV_UNIT/") \
.format("delta") \
.outputMode("append") \
.partitionBy("date_part") \
.start("/staging/data/INV_UNIT /") \
.awaitTermination()

The issue with the current solution implemented:

As the catch mentioned above while writing data in ADLS through OGG, We observed, Jobs were scheduled to execute every hour starting 12:00 AM on a daily basis. Let's take the following scenario into consideration

1. Job started at 1:00 AM and new Avro file was created at 12:25 AM: This is the best case as OGG will complete writing the same file by 12:55 AM and when the job gets triggered at 1:00 AM it will consider the file entirely and all the records will flow into target cleansed layer in Delta table.

2. Job starts at 1:00 AM and new Avro file was created at 12:45 AM OGG will complete writing to this file by 01:15 AM, As per the schedule our read stream job triggers at 01:00 AM and let’s say OGG wrote 15 records by 01:00 AM and another 25 records from 01:00 AM to 01:15 AM. Here Spark will read 15 records and will process the same till cleansed layer and spark will do a checkpoint of this file mentioning in its metadata that the file is processed.

This causes 25 records drop which will eventually cause data mismatch and many other impacts in daily dashboard and snapshot dashboard which are maintained on day to day basis.

The solution implemented to mitigate this issue:

The autoLoader is an optimized file source and provides a seamless way for data teams to load the raw data at low cost and latency with minimal DevOps effort. You just need to provide a source directory path and start a streaming job.

AutoLoader incrementally and efficiently processes new data files as they arrive in Azure Blob storage and Azure Data Lake Storage Gen1 and Gen2.

AutoLoader provides a Structured Streaming source called cloudFiles will automatically set up file notification services that subscribe to file events from the input directory and process new files as they arrive. Given an input directory path on the cloud file storage, the cloud files source automatically processes new files as they arrive, with the option of also processing existing files in that directory.

The AutoLoader works with DBFS paths as well as direct paths to the data source.

Improvised code:

raw_path="/data/latest/INV_UNIT/"
schema=spark.read.format("avro").load(raw_path).schema
raw_df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format","avro") \
.option("cloudFiles.connectionString",conn_string ) \
.option("cloudFiles.resourceGroup", resource_group) \
.option("cloudFiles.subscriptionId", subscription_id) \
.option("cloudFiles.tenantId", tenant_id) \
.option("cloudFiles.clientId", client_id) \
.option("cloudFiles.clientSecret", client_secret) \
.option("cloudFiles.includeExistingFiles", "true") \
.option("cloudFiles.allowOverwrites", "true") \
.schema(schema) \
.load(raw_path)
raw_df.writeStream.trigger(once=True) \
.option("checkpointLocation","/AutoLoader/staging/checkpoint/INV_UNIT/") \
.format("delta") \
.outputMode("append") \
.partitionBy("date_part") \
.start("/AutoLoader/staging/data/INV_UNIT/") \
.awaitTermination()

.option(“cloudFiles.connectionString”,conn_string ) : This is the connection string for your storage account

.option(“cloudFiles.resourceGroup”, resource_group): This is the name of the resource group in which the above storage account resides.

.option(“cloudFiles.subscriptionId”, subscription_id): Subsription ID under which above resource group resides.

.option(“cloudFiles.tenantId”, tenant_id): Tenent ID of the Azure account under which the development is carried out.

.option(“cloudFiles.clientId”, client_id): SPN ID

.option(“cloudFiles.clientSecret”, client_secret): Password for SPN ID provided above.

By enabling .option(“cloudFiles.allowOverwrites”, “true”) while reading stream, everything was taken care of internally by an autoloader, no matter when the file is overwritten if the file was updated after the job was processed it will be tracked by autoloader and will be again considered in a later batch. In the end not missing any data.

If you liked the article, Requesting you to kindly share it with your colleagues who will be benefited from this solution.

Reference link:

#Azure #Databricks #spark #AutoLoader #Delta

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store