Frosty Data Adventures: How Flink Thrived on the Iceberg of Information

Mehul Batra
9 min readAug 19, 2023

--

How Storage(Table formats) evolved in time for managing and governing a Realtime lakehouse at scale.

Introduction

In the age of data-driven decisions, the fusion of data lakes and warehouses has sparked the “lakehouse” trend. These seek to merge the best of both worlds: data lakes’ scalability and warehouses’ query efficiency. Table formats play a vital role in this evolution. Enter Apache Iceberg — a revolutionary table format reshaping data storage, management, and querying. In this article, we’ll uncover how Iceberg’s uniqueness can craft a real-time lakehouse using Apache Flink for processing and AWS Glue Catalog for metadata control.

RealTime Lakehouse Archietecture:

Apache Kafka is a distributed event streaming platform that has rapidly gained popularity as a fundamental component of modern data architectures. It excels in efficiently handling real-time data streams, enabling organizations to process, store, and distribute data seamlessly across various applications and systems. At its core, Kafka offers a publish-subscribe messaging model that caters to the demands of data-intensive, mission-critical applications.

What makes it so special?

Distributed and Scalable:
- Designed to handle massive amounts of data by distributing it across multiple brokers (servers) in a cluster. This architecture allows for easy scalability and fault tolerance, ensuring high availability and reliability.

Publish-Subscribe Model:
-
Follows a publish-subscribe pattern where data producers publish messages to topics, and consumers subscribe to topics to receive and process the messages. This decoupled architecture facilitates the building of flexible and modular systems.

High Throughput and Low Latency:
- Kafka is built for speed. It achieves high throughput and low latency, allowing organizations to process and move large volumes of data swiftly without sacrificing performance.

Durability and Persistence:
- Data published to Kafka topics is durably stored and can be retained for a configurable period. This persistence ensures that data is not lost, making Kafka suitable for scenarios where data integrity is crucial(to make it idempotent it requires external components).

Fault Tolerance and Replication:
- Replicates data across multiple brokers in the cluster, ensuring data availability even if some brokers fail. This fault tolerance mechanism contributes to Kafka’s reliability.

Apache Flink emerges as a versatile stream processing framework that complements Iceberg’s capabilities

What makes it so special?

Event-Time Processing:
- The event-time processing model aligns well with the time-travel feature of Iceberg, enabling efficient handling of out-of-order events.

Exactly-Once Semantics:
- Ensures exactly-once processing semantics, critical for maintaining data integrity in real-time scenarios.

Stateful Operations:
- Support for stateful operations simplifies tasks such as sessionization and complex event processing in real-time pipelines.

Upsert Mode:
- It ensures it updates the records which come again with changed values in the iceberg table and keeps the latest one.

Apache Iceberg stands as an open table format tailored for handling extensive analytical datasets. This ingenious framework seamlessly integrates tables into prominent computing engines such as Spark, Trino, and Flink It does so through a high-performance table format that mimics the functionality of SQL tables, offering a cohesive and efficient data manipulation experience.

What makes it so special?

Consistency Guarantees:
- It employs ACID (Atomicity, Consistency, Isolation, Durability) transactions, ensuring data consistency even during concurrent read and write operations.

Metadata filtering:
Iceberg uses two levels of metadata to track the files in a snapshot.
- Manifest files store a list of data files, along with each data file’s partition data and column-level stats
- A manifest list stores the snapshot’s list of manifests, along with the range of values for each partition field

MOR:
-
The Merge-On-Read paradigm for data updates is native, making the small files problem of frequent updates easier to tackle. With MoR, updates are written to a change log(Delete File) and merged only when data is read from the lake via a query. As there’s less overhead writing to a log file than frequently creating new data files, the MoR approach outperforms CoW for streaming data use cases.

Scan planning:
- Planning in an Iceberg table fits on a single node because Iceberg’s metadata can be used to prune _metadata_ files that aren’t needed, in addition to filtering _data_ files that don’t contain matching data.

Schema evolution:
- supports add, drop, update, or rename, and has [no side-effects]

Hidden partitioning:
- prevents user mistakes that cause silently incorrect results or extremely slow queries

Partition layout evolution:
- can update the layout of a table as data volume or query patterns change

Time travel:
- enables reproducible queries that use exactly the same table snapshot, or lets users easily examine changes

Version rollback:
- allows users to quickly correct problems by resetting tables to a good state

AWS Glue Catalog enhances architectural elegance by serving as a centralized metadata repository and data catalogue:

Schema Management:
- Glue Catalog provides schema definitions that align with Iceberg tables, ensuring consistency across the data pipeline.

Metadata Management:
- Metadata stored in Glue Catalog facilitates data discovery, lineage tracking, and metadata-driven operations.
Integration with Ecosystem: Glue Catalog seamlessly integrates with various AWS services, enhancing the versatility of the lakehouse architecture.

Governance:
- With the help of Lake Formation, you can develop granular access control on top of table and schema.

Identity and Access Management (IAM) roles play a pivotal role in securely managing access to various resources, including Amazon S3 buckets. IAM roles allow you to delegate permissions to entities such as AWS services, users, or applications, without the need to share long-term credentials. By crafting fine-grained IAM policies, you can enforce restrictions on S3 bucket access, ensuring data security and compliance.

Let’s get DEV started

Start the project by setting up POM with the required packages

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId>
<artifactId>FlinkJavaHacks</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-avro-confluent-registry</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.2</version>
<scope>compile</scope>
<!-- We need to exclude Zookeeper, Log4J and SLF4J to avoid conflicts with Flink's usage -->
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.3.2</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime-1.16</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bundle</artifactId>
<version>2.19.17</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>url-connection-client</artifactId>
<version>2.19.17</version>
</dependency>
<dependency>
<groupId>net.sourceforge.argparse4j</groupId>
<artifactId>argparse4j</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.4.1</version>
</dependency>

</dependencies>
</project>

Create an Iceberg Table via Athena with MOR Stratergy for Frequent Writes

CREATE TABLE my_catalog.my_schema.ecomm_data_sales_stats (
id bigint,
unit_left bigint,
amount double,
product_name string,
category string )
USING iceberg
OPTIONS (
'write.object-storage.enabled'=true,
'write.data.path'='s3://my-table-data-bucket',
'write.update.mode'='merge-on-read',
'write.merge.mode'='merge-on-read')
PARTITIONED BY (category);

Merge-on-Read: Here is some context

With merge-on-read, the file is not rewritten, instead, the changes are written to a new file. Then when the data is read, the changes are applied or merged to the original data file to form the new state of the data during processing. This makes writing the changes much quicker but also means more work must be done when the data is read.

In Apache Iceberg tables, this pattern is implemented through the use of delete files that track updates to existing data files.

If you delete a row, it gets added to a delete file and reconciled on each subsequent read till the files undergo compaction which will rewrite all the data into new files that won’t require the need for the delete file.

If you update a row, that row is tracked via a delete file so future reads ignore it from the old data file, and the updated row is added to a new data file. Again, once compaction is run, all the data will be in fewer data files and the deleted files will no longer be needed.

Helper Class to Initialize the Flink streaming and table Environment

import lombok.val;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.time.Duration;
import java.time.ZoneId;
import java.util.Map;

public class SetupEnv {

public static StreamExecutionEnvironment getFlinkEnvironment() {
val flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment();

// These are additonal properties I have set it for tuning or else you can go ahead with defautl ones

if (ns.getBoolean("enableHeapStateBackend")) {
flinkEnv.setStateBackend(new HashMapStateBackend());
}
flinkEnv.setRestartStrategy(
RestartStrategies.exponentialDelayRestart(
Time.milliseconds(10),
Time.milliseconds(2000),

1.1, // exponential multiplier
Time.milliseconds(3000), // threshold duration to reset delay to its initial value
0.1 // jitter
));
// start a checkpoint
if (ns.getBoolean("enableCheckpointing")) {
flinkEnv.enableCheckpointing(true);
}
// set mode to at least once (this is not by default)
flinkEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// allow only one checkpoint to be in progress at the same time
flinkEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
flinkEnv.getCheckpointConfig().enableUnalignedCheckpoints();
// checkpoint timeout
flinkEnv.getCheckpointConfig().setCheckpointTimeout(50000);
flinkEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(50000);
flinkEnv
.getCheckpointConfig()
.setTolerableCheckpointFailureNumber(1);

// enable externalized checkpoints which are retained
// after job cancellation
flinkEnv
.getCheckpointConfig()
.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
val executionConfig = flinkEnv.getConfig();
if (SetupEnv.OBJECT_REUSE) {
executionConfig.enableObjectReuse();
}
return flinkEnv;
}
public static StreamTableEnvironment getTableEnvironment(StreamExecutionEnvironment flinkEnv) {
val tableEnv = StreamTableEnvironment.create(flinkEnv);
// Flink configuration
Configuration configuration = tableEnv.getConfig().getConfiguration();
tableEnv.getConfig().setIdleStateRetention(Duration.ofMinutes(IDLE_STATE_TIMEOUT));
// For performance Tuning
configuration.setString(
"table.exec.mini-batch.enabled",
"true"); // local-global aggregation depends on mini-batch is enabled
configuration.setString("table.exec.mini-batch.allow-latency", "4 s");
configuration.setString("table.exec.mini-batch.size", "4000");
configuration.setString(
"table.optimizer.agg-phase-strategy",
"TWO_PHASE"); // enable two-phase, i.e. local-global aggregation
configuration.setString(
"table.optimizer.distinct-agg.split.enabled", "true"); // enable distinct agg split
configuration.setString(
"table.exec.state.ttl", Long.toString(Duration.ofMinutes(IDLE_STATE_TIMEOUT).toMillis()));
configuration.setString("table.exec.source.idle-timeout", "4000");
return tableEnv;
}
// set ideltimewait value to zero, to disable it.
public static StreamTableEnvironment getTableEnvironment(
StreamExecutionEnvironment flinkEnv, Map < String, String > extraProps, String timeZone) {
val tableEnv = getTableEnvironment(flinkEnv);
Configuration configuration = tableEnv.getConfig().getConfiguration();
tableEnv.getConfig().setLocalTimeZone(ZoneId.of(timeZone));
extraProps.forEach(configuration::setString);
return tableEnv;
}
}

Iceberg Sink Class

import configs.ArgParser;
import lombok.val;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import utils.SetupEnv;
import java.util.HashMap;
import java.util.Map;

public class IcerbergSink {

public static void main(String[] args) throws Exception {

val ns = ArgParser.parseArgs(args);
val flinkEnv = SetupEnv.getFlinkEnvironment();
val timeZone = "Asia/Calcutta";
flinkEnv.setRestartStrategy(RestartStrategies.noRestart());
flinkEnv.getCheckpointConfig().setCheckpointInterval(600000 L);
flinkEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(5 L);
Map < String, String > extraProps = new HashMap < > ();
extraProps.put("table.exec.source.idle-timeout", "5000");
val tableEnv = SetupEnv.getTableEnvironment(flinkEnv, extraProps, timeZone);
val statementSet = tableEnv.createStatementSet();

tableEnv.executeSql(
"CREATE CATALOG my_catalog WITH( \n" +
"'type'='iceberg', " +
"'warehouse'='s3://my-table-data-bucket', " +
"'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog', " +
"'glue.skip-archive' = 'true' ," -- to not maintain history of glue table versions+
"'io-impl'='org.apache.iceberg.aws.s3.S3FileIO')"
);

tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS lakehouse_poc;");

tableEnv.executeSql(
"CREATE TABLE IF NOT EXISTS ecomm_data_topic( \n" +
" id BIGINT, \n" +
" unit_left BIGINT, \n" +
" amount DOUBLE, \n" +
" product_name STRING, \n" +
" category STRING \n" +
") WITH (\n" +
" 'connector' = 'kafka', -- using kafka connector\n" +
" 'topic' = '', -- kafka topic\n" +
" 'properties.bootstrap.servers' = 'server:9092', -- kafka broker address\n" +
" 'properties.group.id' = 'test-group',\n" +
" 'scan.startup.mode' = 'earliest-offset', \n" +
" 'format' = 'json', \n" +
" 'json.ignore-parse-errors' = 'true', --ignore invalid json messages\n" +
" 'scan.topic-partition-discovery.interval' = '60000'"
")");
// Declared so that flink can interact with glue
tableEnv.useCatalog("my_catalog");
tableEnv.useDatabase("my_schema");

tableEnv.executeSql("CREATE TABLE IF NOT EXISTS `my_catalog`.`my_schema`.`ecomm_data_sales_stats`( \n" +
" id BIGINT UNIQUE,\n" +
" unit_left BIGINT, \n" +
" amount DOUBLE, \n" +
" product_name STRING, \n" +
" category STRING, \n" +
" PRIMARY KEY (id) NOT ENFORCED)" +
" with (" +
"'format-version'='2', 'write.upsert.enabled'='true')");
// Declaring back to default for flink
tableEnv.useCatalog("default_catalog");
tableEnv.useDatabase("default_database");

val insertIceberg = tableEnv.sqlQuery("SELECT id,unit_left,amount," +
"product_name," +
"category" +
" FROM " +
"ecomm_data_topic" /*+ OPTIONS('upsert-enabled'='true') */ );
// hints to run flink in upsert mode and update in case same product id appears
insertIceberg.executeInsert("`my_catalog`.`my_schema`.`ecomm_data_sales_stats`");

}

}

Conclusion

As data-driven applications demand real-time insights, the trio of Apache Iceberg, Apache Flink, and AWS Glue Catalog presents a compelling solution for building a robust real-time lakehouse architecture. By leveraging the strengths of each component, organizations can achieve efficient data processing, seamless schema evolution, and comprehensive metadata management. As you embark on your journey to build a real-time lakehouse, remember that these technologies not only provide technical prowess but also empower data engineers to architect solutions that drive meaningful business outcomes.

References

https://docs.immerok.cloud/docs/how-to-guides/development/read-from-apache-kafka-write-to-parquet-files-with-apache-flink/

--

--

Mehul Batra
Mehul Batra

Written by Mehul Batra

Data will talk to you if you are willing to listen.

Responses (1)