Improve Ingest Latency and Query Efficiency of Data Lake — Partition and Index
Data Lake continues to offer better and better Cost Performance with the new features and integration patterns. With the summits of both Snowflake and Databricks in 2024–06, there has been a lot of hype about Iceberg, Polaris, and Unity Catalog lately. Yet, there are some quite important technical aspects and design patterns beyond the buzzwords.
For the standalone platforms, in addition to Kafka Connect, Dataflow, Datastream, and custom pipelines in Flink/Spark, there are also platforms like SeaTunnel and Decodable which can generate Flink and/or Spark codes for both batch & stream ingestion purposes. Confluent offers Streaming Data Pipelines and Databricks converted Arcion into LakeFlow.
On the other hand, the embedded jobs inside databases can provide easier integration but with limited flexibility.
- StarRocks has Routine Load for Kafka, and Flink CDC from MySQL
- RisingWave has connectors for Kafka, Kinesis, and direct Debezium sources (without any message queue/bus)
Change Data Capture ships binlog/WAL/change-streams/logical-replication from OLTP to lakehouse. The deduplicate (merge) operation based on PK is actually a big concern about performance and efficiency. Merge-On-Read (MOR) strategy trades the overhead at query time for simple & quick write. The common tactic is to incorporate with “Deletion Vectors/Bitmaps” to hide the older versions of the same PK. But some records especially the poorly-modeled NoSQL documents can be modified 30~300 times per second, such “hot” records can put quite high pressure on MOR.
- Paimon uses LSM + Bucket (* see blog1 and blog2 from Giannis Polyzos) to support quick DML operations on a table with Primary Key. This is one the main reasons that Alibaba/Ververica and Flink struggled with Iceberg and then decided to create Paimon for streaming compute to replace Iceberg.
- StarRocks implements the real Primary Key table to guarantee the uniqueness at write time. It also supports “Partial Updates” on individual columns (for the wide/flat table).
This feature is so useful for the critical transaction/dimension tables, but I still don’t know when the other OLAP engines will modernize their storage design to catch up. - Hudi also has the primary key concept to speed up DML via _hoodie_record_key within each partition but it does not enforce the uniqueness like StarRocks [* sincere thanks for Bhavani pointing out, if we use ComplexKeyGenerator — “recordkey.field”, “precombine.field” and “payload.class” with “hoodie.datasource.write.” prefix together, Hudi can enforce the uniqueness].
- Snowpipe Streaming API in Snowflake can leverage Channel to frequently (every 5~300 seconds) append records with very little overhead to a staging table first. Then it use Streams to capture/mark the newly-ingested records. Typically, a frequent Stored Procedure is scheduled to pick up these records, translate them a bit (schematize with proper data types), and then append to a “delta” table WITH DUPLICATES. Since frequent COW/MOW is very expensive in Snowflake, the Snowflake way is to create another “deferred merge view” to MOR the records between the “delta” and “target” table with 2 deduplicate subqueries — first to compact within the “delta”, then deduplicate against the “target”. Finally, schedule another infrequent Stored Procedure (every a couple of hours) to upsert from “delta” into “target” table.
Secondary Indices and Materialized Views are the async by-products of the tables in lakehouse. ClickHouse, Materialize, RisingWave, Lance, and StarRocks have serious support for various types of indices to accelerate queries already.
Hudi has record_index, inmemory, simple, bloom and bucket indices as well.
Iceberg Puffin might support index in the future.
HyperSpace was an inactive project for Delta. I haven’t heard about any plan to support index in Delta even after Rahul joined Databricks.
Merge-On-Write (Copy-On-Write) is quite difficult and expensive with Object Storage. For the databases which have long-running stateful daemons, it is also possible to introduce advanced optimizations such as in-memory buffer/channel, and/or local RocksDB
- To shard, deduplicate, and compact the CDC stream based on PK;
- Merge-On-Read with the committed Iceberg/Delta files;
- Incrementally update metrics/aggregates in-memory and then update the materialized views (based on the window or interval);
The section of optimization (marked as red in the above diagram) is not stateless, so it is more complicated to implement. Right now, I am not aware of any database is actively considering it yet. If you know any company or project in this kind, please kindly leave a comment here.
Hudi and Paimon format specs continue to highlight/excel their focuses on ingest speed (including direct sourcing from WAL/binlog without Kafka) and (complex) storage layout optimization for MERGE because Uber & Alibaba are much closer to the e-commerce and financial transaction data than Netflix and Databricks. On the other side, Iceberg and Delta have much wider adoption and deeper vendor/platform support from the big guys, even though IMHO they still need to learn and embrace some of the key technical designs from Hudi or Paimon. Of course Hudi and Paimon require deeper learning curve (more fine-grain parameters for engineers), while RisingWave and StarRocks hide the implementation details and offer us a more straight-forward database + SQL programming interface.
More capabilities and features (including the typical database ones, such as index, partition, materialized view) are added to Data Lake frameworks; HTAP (hybrid transactional and analytical processing), such as SingleStore, TiDB, OceanBase, continue to scale into hundred TB ~ single PB level. The demands for a low-latency yet cost-effective Operational Data Store will push the technology innovation and standard forward in the next 3~4 years. The pure in-memory OLAP solutions are facing obvious resistance due to the hight costs and size limitation. It will be quite exciting to see how the traditional database technology (especially index, partition, LSM) are translated to (or tailored for) the cloud-native infrastructure such as object storage, state store, and k8s (or pre-warmed VM fleet).
(* Disclaimer: The views expressed in this article are those of the author and do not reflect any policy or position of the employers of the author.)
References
- Open Data Foundations across Hudi, Iceberg and Delta (by Kyle Weller)
- Feature Comparison — Hudi vs Delta Lake vs Iceberg (from OneHouse or XTable)
- Enrich (join + aggregate) CDC in RisingWave and publish to StarRocks in realtime
- Window/Interval/Temporal Join in RisingWave
Data Type wishlist for the future open standard across formats & engines:
- Decimal256: the 38-digit Decimal128 is not sufficient for financial and especially for crypto
- Int96/Int128: the same reason as above
- Unsigned Int32/Int64
- Variant
- n-dimensional matrix (Tensor)
- IVF vector index
- UUID (there are different ways to present UUID)
- Timestamp with fractional seconds (int64 + unit, fixed(20,9), int64 + int32, and string; too many serialization formats)
- IPv4/IPv6/CIDR