Sink data to Redis in a Squirrel way

Mehul Batra
6 min readMar 26, 2023

Apache Flink and Redis are two powerful tools that can be used together to build real-time data processing pipelines that can handle large volumes of data. Flink provides a highly scalable and fault-tolerant platform for processing data streams, while Redis provides a high-performance, in-memory database that can be used to store and query data. In this article, we’ll explore how Flink can be used to call Redis using async functions and show how this can be used to push data to Redis in a non-blocking way.

Tale of Redis

IC: Redis Explained Infographic(https://architecturenotes.co/redis/)

“Redis: More Than Just a Cache

Redis is a powerful NoSQL in-memory data structure store that has become a go-to tool for developers. While it’s often thought of as just a cache, Redis is much more than that. It can function as a database, message broker, and cache all in one.

One of Redis’ strengths is its versatility. It supports various data types, including Strings, Lists, Sets, Sorted Sets, Hashes, Streams, HyperLogLogs, and Bitmaps. Redis also offers geospatial indexes and radius queries, making it a valuable tool for location-based applications.

Redis’ features extend beyond its data model. It has built-in replication, Lua scripting, and transactions, and can automatically partition data with Redis Cluster. Additionally, Redis provides high availability through Redis Sentinel.

Note: In this article, we will be focussing more on the Redis Cluster Mode

IC: Redis Cluster Mode(https://architecturenotes.co/redis/)

Redis Cluster uses algorithmic sharding with Hashslots to determine which shard holds a given key and simplifies adding new instances. Meanwhile, it uses Gossiping to determine the cluster’s health, and if a primary node is unresponsive, a secondary node can be promoted to keep the cluster healthy. It’s essential to have an odd number of primary nodes and two replicas for a robust setup to avoid the split-brain phenomenon (Where clusters are unable to decide whom to promote and end up with a split decision)

To talk to Redis Cluster we will be using lettuce a Redis Async Java client.

Tale of Flink

IC: Flink Highlevel (https://flink.apache.org/)

Apache Flink is an open-source, unified stream-processing and batch-processing framework designed to handle real-time, high-throughput, and fault-tolerant data processing. It is built on top of the Apache Gelly framework and is designed to support complex event processing and stateful computations on Bounded and Unbounded Streams, What makes it fast is its Leveraging In-Memory Performance and asynchronously checkpointing the local state.

The Hero of the Story

IC: Flink 1.16 Release Docs

Asynchronous interaction with databases is a game-changer for stream processing applications. With this approach, a single function instance can handle multiple requests at once, allowing for concurrent responses and a significant boost in throughput. By overlapping the waiting time with other requests and responses, the processing pipeline becomes much more efficient.

We will be taking an Example of E-commerce Data to Calculate the Number of sales for Each category in the Sliding Window of 24 Hours with a slide of 30 Seconds and Sinking it to Redis for a faster Lookup for a downstream service.

Sample Dataset



Category, TimeStamp
Electronics,1679832334
Furniture,1679832336
Fashion,1679832378
Food,16798323536

Flink Kafka Consumer Class

package Aysnc_kafka_redis;

import AsyncIO.RedisSink;
import akka.japi.tuple.Tuple3;
import deserializer.Ecommdeserialize;
import model.Ecomm;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.concurrent.TimeUnit;

public class FlinkAsyncRedis {

public static void main(String[] args) throws Exception {


final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Ecommdeserialize jsonde = new Ecommdeserialize();

KafkaSource<Ecomm> source = KafkaSource.<Ecomm>builder()
.setTopics("{dummytopic}")
.setBootstrapServers("{dummybootstrap}")
.setGroupId("test_flink")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(jsonde)
.build();


DataStream<Ecomm> orderData = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");


orderData.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Ecomm>(Time.seconds(10)) {
@Override
public long extractTimestamp(Ecomm element) {
return element.getEventTimestamp(); // extract watermark column from stream
}
});

SingleOutputStreamOperator<Tuple3<String, Long, Long>> aggregatedData = orderData.keyBy(Ecomm::getCategory)
.window(SlidingEventTimeWindows.of(Time.hours(24),Time.seconds(30)))
.apply((WindowFunction<Ecomm, Tuple3<String, Long, Long>, String, TimeWindow>) (key, window, input, out) -> {
long count = 0;
for (Ecomm event : input) {
count++; // increment the count for each event in the window
}
out.collect(new Tuple3<>(key, window.getEnd(), count)); // output the category, window end time, and count
});


// calling async I/0 operator to sink data to redis in UnOrdered way
SingleOutputStreamOperator<String> sinkResults = AsyncDataStream.unorderedWait(aggregatedData,new RedisSink(
"{redisClusterUrl}"),
1000, // the timeout defines how long an asynchronous operation take before it is finally considered failed
TimeUnit.MILLISECONDS,
100); //capacity This parameter defines how many asynchronous requests may be in progress at the same time.

sinkResults.print(); // print out the redis set response stored in the future for every key

env.execute("RedisAsyncSink"); // you will be able to see your job running on cluster by this name


}

}

Redis Set Key Async I/0 Operator

package AsyncIO;

import akka.japi.tuple.Tuple3;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands;
import lombok.AllArgsConstructor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import scala.collection.immutable.List;

import java.util.ArrayList;
import java.util.Collections;

@AllArgsConstructor
public class RedisSink extends RichAsyncFunction<Tuple3<String, Long, Long>, String> {

String redisUrl;

public RedisSink(String redisUrl){
this.redisUrl=redisUrl;
}

private transient RedisClusterClient client = null;
private transient StatefulRedisClusterConnection<String, String> clusterConnection = null;
private transient RedisAdvancedClusterAsyncCommands<String, String> asyncCall = null;


// method executes any operator-specific initialization
@Override
public void open(Configuration parameters) {
if (client == null ) {
client = RedisClusterClient.create(redisUrl);
}
if (clusterConnection == null) {
clusterConnection = client.connect();
}
if (asyncCall == null) {
asyncCall = clusterConnection.async();
}
}

// core logic to set key in redis using async connection and return result of the call via ResultFuture
@Override
public void asyncInvoke(Tuple3<String, Long, Long> stream, ResultFuture<String> resultFuture) {

String productKey = stream.t1();
System.out.println("RedisKey:" + productKey); //for logging
String count = stream.t3().toString();
System.out.println("Redisvalue:" + count); //for logging
RedisFuture<String> setResult = asyncCall.set(productKey,count);

setResult.whenComplete((result, throwable) -> {if(throwable!=null){
System.out.println("Callback from redis failed:" + throwable);
resultFuture.complete(new ArrayList<>());
}
else{
resultFuture.complete(new ArrayList(Collections.singleton(result)));
}});
}

// method closes what was opened during initialization to free any resources
// held by the operator (e.g. open network connections, io streams)
@Override
public void close() throws Exception {
client.close();
}


}

Use cases:

  • The Data streamed to the Redis can you used by the Data science model to look up and generate more products for the categories which are getting sold frequently during the sale season.
  • It can be used to showcase charts and numbers as Stats of the Sale on the webpage, to create a drive among users for aggressive purchases.

Key Takeaways:

  • Flink provides a highly scalable and fault-tolerant platform for processing data streams, while Redis provides a high-performance, in-memory database that can be used to store and query data.
  • Asynchronous programming can be used to improve the performance of data processing pipelines by allowing non-blocking calls to external systems like Redis.
  • A combination of both could help in bringing a real-time data-decision culture.

Reference:

https://architecturenotes.co/redis/.

https://www.baeldung.com/java-redis-lettuce

https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/operators/asyncio/

--

--