Advanced Streaming on Databricks — Multiplexing with Databricks Workflows
Author: Cody Austin Davis
Date: 04/09/2023
Git Repo for Article
In a previously released article, we reviewed how to use Delta Live Tables to dynamically “fan out” many streaming pipelines from a single data source source without needing to hard code each pipeline. This use case, often called Multiplexing, is extremely common in many industries yet difficult do well without such a tool like the Delta Live Tables meta-programming framework that works seamlessly with structured streaming to dynamically create and run many pipelines without coding each from scratch. This is part of what makes DLT so powerful for implementing simple and advanced data engineering use cases in one technology.
However, many data engineers may want even more control. Especially for larger or more complex use cases, data engineers may require more flexibility over aspects of the pipeline such as:
- Load Balancing — not all streams will be the same, so engineers need to handle the possibility of skew across data volumes and resource allocation (cluster number and size). They may also need to group certain streams together for some other business reason like logging or access control.
- Infinite Scale — these types of use cases can get out of hand quickly, generating hundred or thousands of output streams for the events flowing through them. Imagine streaming global retailer data with 10,000 stores for a use case that requires each store to have their own isolated data pipeline. That is a lot of independent pipelines that all need their own resources, logging, and access control. Currently, Delta Live tables can only run one pipeline on one cluster (for now ;) ) so anything over 10–20 streams gets tough and can create challenging resource management problems.
- Data and Pipeline Isolation — engineers may want to use a single process to create thousands of steaming pipelines, but they also want to be able to isolate the data across tables, databases, and even control row/column level ACLs differently for each pipelines.
- Clean Production Logging — logging and tracking the health of this many dynamically generated pipelines can be a monster task, so engineers need a way of scaling this out while keeping the above flexibility.
In this article, we will review a design pattern for accomplishing this use case. We will walk through the architecture, the considerations for adapting it to your use case, and the steps involved in building out the end to end pipeline. The repo for code is posted at the top of the article.
First let’s review the business problem together. Imagine we own a large retail grocery store chain. We have thousands of stores throughout the country, and each store sells thousands of items. Now as the parent company, we want each store to be able to create and self-serve their own analytics and reports. To do that, we need to somehow create independent tables for each store that only each store’s managers have access to. We also need to make sure these pipelines are delivering data consistently and reliably across all stores from our core system. Making things more challenging, business is booming and we are adding stores every day. To really scale out our inventory analytics platform, we can’t rely on manually ensuring that each new store is manually added as a consumer to our data service. This would be a full time job alone, so instead we need to simply know that whenever a new store comes online, it will automatically create its own reporting environment and data flow. Now we start to observe how powerful this use case can be. It can apply to IoT devices, banking data, market intelligence data, retail, tooling, healthcare, and just about any industry.
So how do we manage this much complexity at scale? Databricks Jobs and Structured Streaming together makes this a breeze.
Now, let’s review the high level steps for accomplishing this use case:
1: Define the logic of a single event: this could be a store, sensor measurement, log type, anything. In our example above, we need to define the pipeline logic as if we are building it for one store. Using, structured streaming, we can ensure this task can automatically track the state for this given event using a dynamically created checkpoint. An example will be provided below. This job should also dictate via a configuration parameter which user or group gets access to the specific event table.
2: Parameterize the nuances for each event: if different events have different logic, try to parameterize them as input to the pipeline via dbutils widgets, configuration objects loaded on runtime, or environment variables. Don’t forget to parameterize the event identifier itself so the notebook knows what it is dealing with. The most simple example would be parameterizing the name and location of the resulting output table given the event name.
3: Define the “monitoring function”: in a separate notebook or code file, define the logic that will identify all the distinct event types. This will be used to incrementally keep track of the jobs we need to create. For example, if each event is a sub directory in a S3 bucket, write a pattern matching function to quickly list all distinct folder that represent events. You can also make this an output of a live app, and manual configuration, or a queue. An example will be shown below.
4: Define the “controller function”: this process will use the distinct events to create a fleet of jobs to process the tasks for all available events. This process will be responsible for load balancing (allocating events to jobs, tasks, and cluster combinations), creating the jobs (or updating them if they already exist, triggering them (or setting the schedule), and recording the mapping of events to job ids so it can ensure it does not re-create existing jobs. Load balancing includes deciding how many events each job will handle, how many tasks per cluster in a job, dictating the size of each cluster, and of course how many jobs will exist. This load balancing can be done by either simple round-robin or using other custom techniques that are relevant for the events themselves.
At the end of this exercise, you should have a controller process that orchestrates a fleet of jobs that each maintain their own state, can all be tracked and monitored in the same manner as any other pipeline, and can manage access to the created child pipelines easily from a single primary notebook.
There is a lot going on here, so we will break it down with an example. If you want to re-create the example, simply import the attached Git repo, fill out your databricks token and workspace url in the variables and run the notebook!
Now let’s dig in.
First, let’s accomplish Step 1 by creating a streaming pipeline for the logic of a single event type as such:
# DBTITLE 1,Read Stream
input_df = (spark
.readStream
.format("text")
.option("multiLine", "true")
.option("pathGlobFilter", filter_regex_string)
.load(root_input_path)
.withColumn("inputFileName", input_file_name()) ## you can filter using .option("globPathFilter") as well here
)
# DBTITLE 1,Transformation Logic on any events (can be conditional on event)
transformed_df = (input_df
.withColumn("EventName", lit(child_task_name))
.selectExpr("value:id::integer AS Id",
"EventName",
"value:user_id::integer AS UserId",
"value:device_id::integer AS DeviceId",
"value:num_steps::decimal AS NumberOfSteps",
"value:miles_walked::decimal AS MilesWalked",
"value:calories_burnt::decimal AS Calories",
"value:timestamp::timestamp AS EventTimestamp",
"current_timestamp() AS IngestionTimestamp",
"InputFileName")
)
This code creates a simple streaming pipeline to read raw json data. It does some data modeling and data type conversions. Now notice the .option(“pathGlobFilter”) line. This is the first part to begin our second step of parameterizing our single job to fit any event type.
Next, we implement Step 2 by creating parameters for our child task that define the input root path, parent job name, and the child task name representing the specific event identifier. These parameters are then used to create a dynamic checkpoint folder for a passed in event, along with the custom filtering logic so that stream focuses on only the event type it cares about. This section will also create the target database and table that will storage the events and their associated jobs that they are tied to so the pipeline knows which events it does not need to re-create jobs for.
from pyspark.sql.functions import *
from pyspark.sql.types import *
# DBTITLE 1,Step 1: Logic to get unique list of events/sub directories that separate the different streams
# Design considerations
# Ideally the writer of the raw data will separate out event types by folder so you can use globPathFilters to create separate streams
# If ALL events are in one data source, all streams will stream from 1 table and then will be filtered for that event in the stream. To avoid many file listings of the same file, enable useNotifications = true in autoloader
# DBTITLE 1,Define Params
dbutils.widgets.text("Input Root Path", "")
dbutils.widgets.text("Parent Job Name", "")
dbutils.widgets.text("Child Task Name", "")
# DBTITLE 1, Get Params
root_input_path = dbutils.widgets.get("Input Root Path")
parent_job_name = dbutils.widgets.get("Parent Job Name")
child_task_name = dbutils.widgets.get("Child Task Name")
print(f"Root input path: {root_input_path}")
print(f"Parent Job Name: {parent_job_name}")
print(f"Event Task Name: {child_task_name}")
# DBTITLE 1, Define Dynamic Checkpoint Path
## Eeach stream needs its own checkpoint, we can dynamically define that for each event/table we want to create / teast out
checkpoint_path = f"dbfs:/checkpoints/cody.davis@databricks.com/{parent_job_name}/{child_task_name}/"
# DBTITLE 1, Target Location Definitions
spark.sql("""CREATE DATABASE IF NOT EXISTS iot_multiplexing_demo""")
# COMMAND ----------
## Create Job Storage Table
spark.sql("""CREATE TABLE IF NOT EXISTS iot_multiplexing_demo.job_orchestration_configs
(
JobCreationSetId BIGINT GENERATED BY DEFAULT AS IDENTITY,
ParentJobName STRING,
JobName STRING,
event_names ARRAY<STRING>,
InputRootPath STRING,
JobCreationResult STRING
)
""")
# DBTITLE 1, Use Whatever custom event filtering logic is needed
filter_regex_string = "part-" + child_task_name + "*.json*"
print(filter_regex_string)
Now we have a child job template that can be used to create a streaming pipeline for event child event type. Now we just need to create the parent job to orchestrate it all.
In our parent job, we first perform Step 3: defining the monitoring function. For example:
# Databricks notebook source
# MAGIC %md
# MAGIC
# MAGIC ## Controller notebook
# MAGIC
# MAGIC Identifies and Orcestrates the sub jobs
root_file_source_location = "dbfs:/databricks-datasets/iot-stream/data-device/"
import re
from pyspark.sql.functions import *
from pyspark.sql import Window, WindowSpec
# DBTITLE 1,Logic to get event Name
## This can be manual and loaded from a config, or parsed out from data/metadata
@udf("string")
def get_event_name(input_string):
return re.sub("part-", "", input_string.split(".")[0])
# DBTITLE 1,Define Load Balancing Parameters
tasks_per_cluster = 5
tasks_per_job = 10
# DBTITLE 1,Step 1: Logic to get unique list of events/sub directories that separate the different streams
# Design considerations
# Ideally the writer of the raw data will separate out event types by folder so you can use globPathFilters to create separate streams
# If ALL events are in one data source, all streams will stream from 1 table and then will be filtered for that event in the stream. To avoid many file listings of the same file, enable useNotifications = true in autoloader
df_active_event_jobs = spark.sql("""SELECT JobName, ParentJobName, explode(event_names) AS event_name
FROM iot_multiplexing_demo.job_orchestration_configs""")
events_df = (spark.createDataFrame(dbutils.fs.ls(root_file_source_location))
.withColumn("event_name", get_event_name(col("name")))
.select("event_name")
.distinct().alias("new_events")
.join(df_active_event_jobs.alias("active_events"), on="event_name", how="left_anti") ## WHERE NOT IN but faster
)
display(events_df)
Now this is a simple example, but as long as you have a data frame that represents all the distinct events, you’ve done it right. Note that we actually read from the logging table that we created in the previous step so that this run does not re-create jobs for events that are already assigned to a job. This process can now even be wrapped inside a structured streaming foreachBatch pipeline that continuously monitors for new events. Now that we have the events, we move on to the hardest part, creating a load balancing and orchestration parent job. For load balancing, the next code block does the allocation of events to child jobs, tasks, and connects them all to this parent job as follows:
# DBTITLE 1,Parent Job Params
input_root_path = "dbfs:/databricks-datasets/iot-stream/data-device/"
parent_job_name = "parent_iot_stream"
events_balanced = (events_df
.withColumn("ParentJobName", lit(parent_job_name))
.withColumn("NumJobs", row_number().over(Window().orderBy(lit("1"))))
.withColumn("JobGroup", ceil(col("NumJobs") / lit(tasks_per_job))) ## Grouping tasks into Job Groups
.withColumn("JobName", concat(col("ParentJobName"), col("JobGroup")))
.groupBy(col("ParentJobName"), col("JobName"))
.agg(collect_list(col("event_name")).alias("event_names"))
.withColumn("InputRootPath", lit(input_root_path))
)
# COMMAND ----------
# DBTITLE 1,Show Balanced Workload of Kobs
display(events_balanced)
Now that we have allocated our events to their associated child jobs, all we have to do now is Step 4 — define the controller function. To do this, we write a user defined function to create/update and run each job! The code works as follows:
- Note: replace <your_user_email>, <dbx_token>, and <workspace_url> with your own values to run this job in your own environment.
# DBTITLE 1,Udf To Kick of A Job In Parallel
import re
import requests
import json
@udf("string")
def build_streaming_job(job_name, input_root_path, parent_job_name, event_names, tasks_per_cluster):
## tasks_per_cluster not used in this example, but can be used to further refine shared cluster model
full_job_name = parent_job_name + "_"+ job_name
## First Create the clusters based on tasks_per_cluster
num_cluster_to_make = round_up(len(event_names)/tasks_per_cluster)
clusters_to_create = [f"Job_Cluster_{str(i + 1)}" for i in range(0,num_cluster_to_make)]
## Optional, decide how many streams go onto each cluster or just group by job (in this example there will be 10 streams per job on 1 cluster)
tasks = [{
"task_key": f"event_{event}",
"notebook_task": {
"notebook_path": "/Repos/<user_name>/edw-best-practices/Advanced Notebooks/Multi-plexing with Autoloader/Option 1: Actually Multi-plexing tables on write/Child Job Template",
"base_parameters": {
"Input Root Path": "dbfs:/databricks-datasets/iot-stream/data-device/",
"Parent Job Name": parent_job_name,
"Child Task Name": f"event_{event}"
},
"source": "WORKSPACE"
},
"job_cluster_key": str(clusters_to_create[round_up((i+1)/tasks_per_cluster) - 1]),
"timeout_seconds": 0,
"email_notifications": {}
} for i, event in enumerate(event_names)
]
## Use jobs API to create a job for each grouping
job_req = {
"name": full_job_name,
"email_notifications": {
"no_alert_for_skipped_runs": "false"
},
"webhook_notifications": {},
"timeout_seconds": 0,
"schedule": {
"quartz_cron_expression": "0 0 0 * * ?",
"timezone_id": "UTC",
"pause_status": "UNPAUSED"
},
"max_concurrent_runs": 1,
"tasks": tasks,
"job_clusters": [
{
"job_cluster_key": cluster,
"new_cluster": {
"cluster_name": "",
"spark_version": "12.2.x-scala2.12",
"aws_attributes": {
"first_on_demand": 1,
"availability": "SPOT_WITH_FALLBACK",
"zone_id": "us-west-2a",
"spot_bid_price_percent": 100,
"ebs_volume_count": 0
},
"node_type_id": "i3.xlarge",
"enable_elastic_disk": "false",
"data_security_mode": "SINGLE_USER",
"runtime_engine": "STANDARD",
"num_workers": 2
}
} for cluster in clusters_to_create
],
"tags": {
parent_job_name: ""
},
"format": "MULTI_TASK"
}
job_json = json.dumps(job_req)
## Get this from a secret or param
dbx_token = "<dbx_token>"
headers_auth = {"Authorization":f"Bearer {dbx_token}"}
uri = "https://<workspace_host>/api/2.1/jobs/create"
endp_resp = requests.post(uri, data=job_json, headers=headers_auth).json()
## For demo purposes, this just creates a job, but in PROD, you will want to add code to update an existing job for the job group if it already exists
return endp_resp
# COMMAND ----------
# DBTITLE 1,Define parallel execution
build_jobs_df = (events_balanced
.withColumn("JobCreationResult", build_streaming_job(col("JobName"), col("InputRootPath"), col("ParentJobName"), col("event_names"), lit(tasks_per_cluster)))
)
# COMMAND ----------
# DBTITLE 1,Call action with collect or display or save
build_jobs_df.write.format("delta").saveAsTable("iot_multiplexing_demo.job_orchestration_configs")
# COMMAND ----------
# MAGIC %sql
# MAGIC
# MAGIC SELECT * FROM iot_multiplexing_demo.job_orchestration_configs;
Output Saved To Delta:
Inside the UDF, we implement all the logic we want to create the job from job owners, notification destinations, triggers, etc. There are a few things happening in side this function that creates each job. The first is the events are assigned to the job. Next, events then get allocated to a number of clusters inside the job to ensure that a given cluster does not get overloaded. This is using the above parameter defined above in the tasks_per_cluster parameter. Then we simply call that udf using our load balanced data frame for each job. The final result is a collection of jobs all with a parent job name, a tag that unifies them (pro tip, tags are great for organization and observability), and a child job name that all run uninterrupted. You should see the jobs output in your Jobs Tab in your Databricks workspace!
Created Jobs:
Job Configuration:
Job Run Example:
That is how you get started with streaming many pipelines from a single data source using Databricks Jobs and Structured Streaming. The possibilities from here are endless, so try making it your own by adding TableACLs, group/user access, notifications, and even dashboard refreshes!