AirFlow and Prefect are probably the most popular schedulers in 2021. They are both more data-aware than the traditional orchestration softwares. This article will describe an additional service architecture to put the data dependency as the enabling pattern for an effective & efficient orchestration in the complex big data environment which may span multiple data centers, cloud vendors, and hybrid topology.
It’s common to schedule a data flow/DAG as
# “0 0 2,14 ? * *” : everyday at 2AM and 2PM
# “0 0 */6 ? * *” : every 6 hours after the 1st execution
Yet the flow owners sometimes notice that the upstream/input tables are not updated by the expected time. The staled data may not fail the flow directly, but the output data quality score and downstream metrics will eventually reveal the problem and trigger painful backfills. Then the owner may defer the flow by 1~2 hours, hoping the problem might not occur again.
Some schedulers (except Azkaban) also support flow-to-upstream-flow dependency. e.g. flow_xyz depends on the completion of flow_abc for the same scheduled DATE, so that flow_xyz can consume the output datasets generated by flow_abc.
Apparently, we are missing the vocabulary about the obviously important entities: table/view/partition/watermark. When we are in the era of big data and cloud data lake, we still have to express & translate the data objects and their corresponding dependencies into the above 2 styles. Needless to say this is cumbersome, un-natural, unscalable, and hard to visualize, manage or understand.
Therefore, data dependency should be plugged into the orchestration steps:
1. Wait for time: Cron expression only means — “the earliest time” when a flow can be launched. Such fixed setting is actually optional, because a flow can now purely react to an event
2. Wait for input data:
# The partition tables must have expected partitions populated already
# The snapshot/dimension tables must have increment/delta beyond the expected high watermark
3. Optionally wait for the result of the data quality checks for the selected input tables.
4. Wait for resources: If all the above 3 conditions are satisfied, the orchestrator can check which cluster/queue/pod has enough capacity (or is the least busy), and assign the flow to this particular computation resource. If all the resources are busy or temporarily offline, then the flow keeps waiting
5. Launch flow in execution mode
For data intensive flows, step 2~3 are really important to safeguard the timing and quality, and to prevent operational waste & burden. AirFlow has a set of poke-based (a.k.a. pull-based) sensors to wait for various types of triggering events (file, partition, event, …). If the condition is not satisfied yet, the sensor will poke again after sleeping for a few minutes or seconds. This may easily lead to scalability issue, for example, if there are 500 flows waiting for a few recent partitions of a popular Hive table — Hive Metastore and its underlying storage can be a little bit overwhelmed. Smart Sensor is an early-access feature in AirFlow 2.0 to reduce such problem.
Actually, most of the functions of sensors should be moved into a service, which is explained in the following architecture diagram:
There are 3 main components to work together:
1. Global state store: in addition to the state data points from multiple data centers (e.g. cluster/host, flow/job execution id, start/end timestamp, status, parameter, …), the data lineage info, which consists of partition & high watermark for both input and output datasets. If proper instrumentation is in place, then basic record count, min/max, snapshot id, and batch id can be logged for each output dataset.
2. Configurations (CMDB): besides the config files in git, artifact repositories, related databases, and LDAP, there’re also a lot of observability data points attached to this component. Resource usages and allocation choices are directly leveraged by the orchestrator.
3. Data catalog: instead of an informational-oriented catalog, this is an operational-oriented catalog which can reflect the real-time status of partitions, snapshots, schemas, versions, replication status, … so that computation engines and cost-based optimizer can effectively execute the data processing and data management jobs.
A data dependency service can provide much better efficiency and scalability than the implementation as a trigger job or a client-side poke/pull task. The centralized shareable dependency metadata can serve multiple use cases beyond triggering dependent flows. Let’s look at some patterns:
Comprehensive rollup for time-series and region/org hierarchies.
- Ingestion jobs keep creating small files into data lake. {Timestamp + record count} statistics are frequently compared against the source (e.g. Kafka). Once the stats of ingested files are ≥ 99.995% of its source for the same 5-minute or 10-minute period, a data completeness event/trigger can be generated (and stored in the state store) to effectively trigger those mini-batch flows.
When such 5 or 10-minute windows are accumulated to the hourly boundary, another data completeness event will be generated to trigger the hourly flows. Also the same event will trigger the hourly compaction job to merge the small files and sort the records for much better performance and storage efficiency.
Once 24 hourly partitions are compacted, another data completeness event will be generated (even though there is no actual daily compaction job at all) to trigger all the daily batch flows. Since the compaction workload is distributed to each hour, the memory utilization and SLA is way better than a true daily compaction, and the downstream flows barely notice any difference. - For global companies, using US east/west coast midnight as the daily batch cutoff time seems suboptimal. A better approach is to define several regional midnights (aligned with the “region” of geolocation of data center s or cloud providers), for example, [UTC+8, UTC+5, UTC, UTC-8] as 4 regions. Each region has its own midnight and daily batch window. Once all 4 regions’s data are all processed, the daily partition is considered as completed, then global metrics can be calculated for that date. This hierarchy can naturally further break down the processing window to country or data center level, so that the daily process workload is reasonably spread across multiple smaller units. It seems more complex than just a single batch at US/Pacific midnight, but it offers much better SLA and granular recoverability.
The same pattern can be applied to composition of multiple business units or multiple external partners too. In short, we can imagine that a daily partition will contain several subpartitions, each subpartition can be populated and backfilled with its own cadence. The rollup logic will generate the data completeness event to trigger downstream flows only if all the subpartitions are ready for that given date (which is a batch date instead of a rigid UTC date or PST date). - Data synchronization among multiple data centers is often tricky. The 2 popular yet suboptimal choices are:
# dual or triple ingestion: each data center has its independent ingestion pipelines and track its own completeness. Downstream flows are also orchestrated with little or no awareness of the other data centers, therefore the failover and load balance is barely available during one data center goes down (or has severe data issue).
# primary + backup: all ingestion happens in one data center, and there is a process to replicate data files to the backup data center. Since the replication scope is not associated with the semantic boundaries of partition or delta/snapshot, even with high-bandwidth network, the backup site remains as an eventual consistent destination with random lagging and unpredictable SLA, hence most flows can’t be effectively scheduled to execute unless a big delay buffer is expressed in the schedule.
With clear data trigger events for interval-based partition and watermark-based on delta/snapshot, the replication tasks can be precisely triggered with accurate list of files/directories to replicate (and then verify & publish). The partition and snapshot level SLA is in good control now.
Because the flows in the other data centers (primary or backup) can also clearly wait for the corresponding data dependencies, they will be effectively launched upon the replication for certain partitions/snapshots are finished.
This can enable a multi-data-center and multi-storage-cluster topology with true and timely load balance. - Once the data trigger is generated, it can then invoke the cache refresh/invalidation, or kick off the materialized view refresh, or recalculate the pre-aggregated metrics for the impacted date, region, and hierarchy.
A centralized smart collect/crawl service.
- This is a much more efficient way to keep the data completeness metadata and basic statistics up-to-date. 95% of metadata & file system inspections, and stats evaluations will hit the state store instead of being amplified to the underlying systems.
- Especially when the dependency is not yet satisfied, the dependent flows will poke the sensors every a few minutes. If any rollup hierarchy logic is involved, the overhead is even more significant.
With the rapid growth of data flows, the data dependency really deserves its own dedicated service.
Here is a sample story with more realistic details:
- After a series of PUT/MULTIPART UPLOAD actions for a specific S3/ABS location. The record count and Kafka offset for each part have been sent to the Global State Store
- State Store aggregates the part-level info into a time-series metric and compares against the upstream/Kafka completeness/watermark. Data completeness % is calculated with these 2 streams of info combined, if 99.995% has been reached for [15:58 ~ 16:08] time slot, multiple new state transition events will be posted in the State Store, such as:
* Hour [15:00~16:00) ingestion has completed for topic Foo
* 10Min [15:50~16:00) ingestion has completed for topic Foo
* 5Min [15:55~16:00) ingestion has completed for topic Foo
* 5Min [16:00~16:05) ingestion has completed for topic Foo - The state transition events could trigger a few different flows:
* An hourly ML flow to transform the hour 15’s raw data into a feature set, and append after hour 14
* A near-real-time metric flow to incrementally generate metrics between 15:50~16:00
* A pre-aggregation process to recompute & materialize PV/UV/DAU/bot for hour 00~15
* A microservice to update the metadata in data exploration & analytics so users can choose hour 15 (from selection box/dropdown in UI)
* A data profiling job to compute the stats for hour 15,
then followed by a data quality check job to evaluate a few constraints & assertions based on the stats for hour 15.
* A compaction job to optimize the small files for hour 15 and 14 (if too many late-arrival files are landed) - The completion of the above flows/jobs will be translated into a new set of data state with clear time boundary
- The new state transition events would trigger more flows.
Since the hierarchy-based (time, geo, data center, BU) aggregation is handled by the state store. So any flow can use a very clear contract to express its time window for data payload. The additional constraints about data quality expectation, data center, previous time bucket, compute resource requirement, etc, can also be attached to the trigger declaration.
Simply put, the flow or task will first wait for upstream data content > wait for data quality checks > wait for resources > execute > report critical output data stats back to the state store.
Here is another story for a critical dataset with high quality requirement:
- The preliminary data profiling job detects anomaly for hour 15, it then triggers a more comprehensive data quality job to validate a couple of business-domain-specific metrics/aggregates. The metrics go beyond the pre-defined threshold, so the data quality job generates a state transition event to mark data as invalid for hour 15.
- State Store applies the time-series propagation logic and then post multiple new state transition events, such as:
* Hour [15:00~16:00) ingestion has completed for topic Foo but failed data quality
* 10Min [15:00~15:10, 15:10~15:20, … 15:50~16:00) ingestion has completed for topic Foo with DQ issue
* 5Min [15:00~15:05, 15:05~15:10, … 15:55~16:00) ingestion has completed for topic Foo with DQ issue
* Hour [15:00~16:00) pre-aggregation result is complete with DQ issue - The state transition events could trigger a few different actions
* Any new flows to compute intra-day (multi-hour) metrics will not be triggered
* Any new flows with “ignore-data-quality-issue” flag = true will continue to run for hour 15
* A microservice to update hour 15 as gray color in the data catalog - A backfill of hour 15 is later executed manually. The backfill job resets the state of hour 15 from invalid to backfilled
* It triggers the data profiling and data quality check job again
* Once the quality metrics fall within the threshold, the DQ issue flag is removed from the hourly, 10-min, 5-min window according
* The downstream processes have their pre-defined attributes to decide if they need to reprocess the data or not
The Global State Store is mutable with predefined data quality and backfill transition rules in place. We need keep it as serverless as AWS Step Functions with rich & practical data semantics that work more efficiently for ever-expanding data lake.
Will the data dependency orchestration lead to a completely flowless (or DAG-less) landscape? IMHO, the sweat spot is to construct relatively simple (and rerunable/idempotent) flows in AirFlow/Prefect/dbt, then specify a set of data triggers in the dependent/downstream flows. The main benefit of incorporating DAG for job-level dependency is to avoid leaking the not-so-useful data dependencies between intermediate/temporary datasets to the global state store. The edges/dependencies among 5~20 jobs within a single flow are very deterministic and quick to evaluate at runtime. Different than full-fledged provenance tracking use case, only the data lineage used by (exposed to) the other flows need to be managed in the global state store for cost-effective orchestration.
In the above illustration, a complex pipeline 007 is refactored into 3 simpler flows (each is easier to test and deploy). Flow 02 depends on datasets [2, X], and Flow 03 depends on [F, X, Y]. If dataset F is produced by a different pipeline later, Flow 03 can transparently pick up the correct external dependency as long as the dataset name and partition key remain the same for F. This makes the whole topology both dynamic (evolvable) and manageable.
A few references inline with this design:
# Dagster’s Concept with Asset, Sensor, Tick, and Trigger
# Datacoral’s Event-Driven Orchestration Framework
# Temporal’s Microservices Orchestration Platform (Uber’s Cadence)
# Cask Data Application Platform (Cloud Data Fusion) triggers
(* Disclaimer: The views expressed in this article are those of the author and do not reflect any policy or position of the employers of the author.)