There are many ways to ingest the data in standard file formats from cloud storage to Delta Lake, but is there a way to ingest data from 100s of files within few seconds as soon as it lands in a storage account folder? I have recently come across a Customer with requirements to ingest ~1500 files every few minutes and enable real time analytics on the data within 2 minutes of file creation. Auto Loader is a streaming pattern that efficiently processes and ensures data recency without having to setup job triggers or scheduling manually. One could argue that event triggers in Azure Data Factory can do the same, of course it can but can ADF ingest 100’s of files every few seconds, perhaps not as fast. Auto Loader is fast, easy to setup and it can manage the state information on what files arrived.
Let’s look at the use case where we have a legacy system pumping sensor data in the form of csv files every few minutes, this data needs to be ingested as soon as it arrives and run the aggregations/threshold breach queries to identify any malfunctions in those machine sensors so that the analysts can immediately take corrective actions. Ideally, the application should have been a streaming source sending events to event hubs, but then if it’s a streaming application we would be leveraging Azure Stream Analytics or Structured Streaming for analytics.
So, In a nutshell
- We have 100s of csv files generated every few minutes with each 50MB / ~500K records. In short, we are dealing with billions of records and ~2 TB data every day
- Ingestion, Aggregations queries on the incoming files to be completed within 2 minutes
- Move the Data to Azure Synapse Analytics to enable datawarehousing and complex analytical processing and to enable PBI dashboards.
Auto Loader supports two modes for detecting new files in a directory
- Directory Listing mode – Identifies new files by parallel listing of the input directory. Not a scalable option over time when the number of files increase in the input directory. I have observed some files missed out in the input stream, so not a recommended option.
- File Notification mode – The new structured streaming source, called “cloudFiles”, will automatically set up file notification services (Azure Event Grid and Queue Storage services) that subscribe file events from the input directory and process new files as they arrive, with the option of also processing existing files in that directory. File notification mode is more performant and scalable for large input directories.
Get started with configuring environment to use Auto Loader
- Get the connection string of the cloud storage (Azure Data Lake) from Azure portal or follow instruction here
- Set up a Key Vault in Azure to store secrets and connection strings. Follow instructions here
- Create an AAD app and service principal and assign this app Contributor access to the Azure Storage account. Follow instructions here
In this example, we will use file notification mode and python notebooks
Csv file structure in Azure Data Lake
We have a typical file structure having Sensor Number, Parameter Names and its values.
Identify new files and Ingest to Delta Tables
In the below code snippet,
- We are setting permissions to access the input directory in ADLS
- Using Auto Loader “CloudFiles” source to start the stream and write data to Delta tables
from pyspark.sql.functions import * from pyspark.sql import * from pyspark.sql.types import StringType, IntegerType, StructType, StructField, TimestampType, DoubleType; #Establish Permissions to access Azure Data Lake spark.conf.set("fs.azure.account.key.<YOURADLSACCOUNTNAME>.dfs.core.windows.net", "<YOUR_ADLS_STORAGE_ACCOUNT_KEY>") #InputDirectory and Checkpoint Location SourceFilePath = "abfss://<ADLS_CONTAINER_NAME>@<YOURADLSACCOUNTNAME>.dfs.core.windows.net/Raw" CheckpointPath = "abfss://<ADLS_CONTAINER_NAME>@<YOURADLSACCOUNTNAME>.dfs.core.windows.net/StreamCheckpoint" #Define Schema for the Incoming files schema = StructType([StructField('SensorNumber', StringType(), True), StructField('DateTime', TimestampType(), True), StructField('Unitvalue', DoubleType(), True), StructField('ParameterName', StringType(), True), StructField('ParameterValue', DoubleType(), True), StructField('ParameterUnit', StringType(), True), StructField('SourceName', StringType(), True), StructField('Status', StringType(), True)]) #Read the new files in the Input Directory #Get the connection string of ADLS, RG Name, Subscripton ID, Service Principal details readquery = (spark.readStream.format("cloudFiles") .option("cloudFiles.format", "csv") .option("header", "true") .option("cloudFiles.useNotifications" , "true") .option("cloudFiles.includeExistingFiles", "true") .option("cloudFiles.connectionString", "YOUR_ADLS_ACCOUNT_CONNECTIONSTRING") .option("cloudFiles.resourceGroup", "<YOUR_RESOURCEGROUP_NAME>") .option("cloudFiles.subscriptionId", "<YOUR_SUBSCRIPTION_ID>") .option("cloudFiles.tenantId", "<APP_TENANT_ID>") .option("cloudFiles.clientId","<APP_CLIENT_ID>") .option("cloudFiles.clientSecret","<APP_CLIENT_SECRET>") #.option("cloudFiles.queueName","<QUEUE_NAME>") .schema(schema) .load(SourceFilePath) ) #Write the stream to a Delta Table writequery = (readquery .writeStream .format("delta") .outputMode("append") .option("checkpointLocation", CheckpointPath) .table("<YOUR_DELTA_TABLENAME>"))
Write the stream data to Azure Synapse Analytics
Here we are using the dataframes created in above code and writing the batches to Azure Synapse SQL Pool.
blobStorage = "<YOUR_BLOBSTORE_ACCOUNTNAME>.blob.core.windows.net" ; blobContainer = "temp" ; blobAccessKey = "<YOUR_STORAGE_ACCOUNTKEY>" tempDir = "wasbs://" + blobContainer + "@" + blobStorage +"/tempDirs" acntInfo = "fs.azure.account.key."+ blobStorage sc._jsc.hadoopConfiguration().set(acntInfo, blobAccessKey) dwDatabase = "<YOUR_SYNAPSE_DWNAME>" dwServer = "<YOUR_SYNAPSE_SERVERNAME>" dwUser = "<YOUR_SQLLOGIN_USERNAME>" dwPass = "<YOUR_SQLLOGIN_PASSWORD>" dwJdbcPort = "1433" sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass def writeToSQLWarehouse(df, epochId): df.write.format("com.databricks.spark.sqldw").option("url", sqlDwUrlSmall).option("forward_spark_azure_storage_credentials", "true").option("dbtable", "<YOUR_TABLENAME>").mode('overwrite').option("tempdir", tempDir).save() writetoSynapseDW = (readquery .writeStream .foreachBatch(writeToSQLWarehouse) .outputMode("update") .start()
Run Aggregate queries on micro batches
Simple aggregations to show parameter values at hourly basis and writing the results to a Delta table
def BasicAggregations(microBatchOutputDF, batchId): # Set the dataframe to view name microBatchOutputDF.createOrReplaceTempView("microbatch") microBatchOutputDF._jdf.sparkSession().sql(""" INSERT INTO BasicAggregations SELECT SensorNumber AS AssetNumber,ParameterName,SUM(ParameterValue) AS ParameterValue, hour(current_timestamp) AS BatchHour FROM microbatch GROUP BY SensorNumber,ParameterName,hour(current_timestamp) """) Aggregatesquery = (readquery .writeStream .foreachBatch(BasicAggregations) .outputMode("update") .start()
To Summarize, we have successfully :
- Set up Auto Loader to identify new files in a given input directory
- Ingest data within few seconds to Delta Lake
- Move the data to Azure Synapse Analytics to enable complex analytical distributed processing
- Run aggregate queries on the stream data as soon as the data arrives to enable analysts to look for malfunctions in the sensor data.
In short, without having to set up any additional infrastructure, Auto Loader automatically creates Azure Event Grid and Storage Queue Services and allows you to rapidly ingest files into Delta Lake, DW without the complexity required using traditional/legacy pattern