spot_img
HomeEducationConstructing a Reactive Occasion Streaming Structure - DZone | The Global Today

Constructing a Reactive Occasion Streaming Structure – DZone | The Global Today

This text outlines an answer for streaming occasions from Kafka, forwarding them to Redis utilizing its Stream API, and studying particular person streams from Redis by way of its streaming API. The added complexity on this situation is the necessity to stream occasions from an HTTP endpoint utilizing Server-Despatched Occasions (SSE) whereas making certain that solely occasions related to a selected shopper ID are processed and despatched.

Downside Assertion

Many firms have an present Kafka infrastructure the place occasions are being produced. Our purpose is to arrange a system that subscribes to Kafka messages however solely processes occasions related to a selected shopper ID. These filtered occasions needs to be forwarded to Redis utilizing its Stream API. Moreover, we have to set up an HTTP endpoint for Server-Despatched Occasions (SSE) that enables the required shopper to obtain real-time occasion updates.

Resolution Structure Overview

The structure consists of the next elements:

  • Kafka: A distributed occasion streaming platform that lets you publish and subscribe to streams of data (occasions).
  • Spring Boot: A framework for constructing Java functions. We’ll use it to create Kafka customers and Redis Stream producers. Subscribes to Kafka messages, filters occasions based mostly on the shopper ID, and forwards related occasions to Redis Streams.
  • Redis: A high-performance, in-memory knowledge retailer. We’ll use its Streams characteristic to deal with occasion streams. Shops the streamed occasions utilizing its Streams API.
  • Docker: A containerization platform. We’ll use Docker and Docker-Compose to create containers for Kafka, Redis, and our Spring Boot utility. We’ll make the most of this for a neighborhood POT, POC.
  • HTTP Server-Despatched Occasions (SSE) Endpoint: Offers real-time occasion updates to the shopper, filtering occasions based mostly on the shopper ID.

Redis Streams 

Redis Streams is a characteristic in Redis that gives a approach to deal with real-time knowledge streams with varied use instances. Listed here are some eventualities the place you would possibly need to use Redis Streams:

  • Actual-Time Occasion Processing: Redis Streams are glorious for processing and storing real-time occasions. You need to use it for issues like logging, monitoring, monitoring person actions, or any use case that entails dealing with a steady stream of occasions.
  • Process Queues: For those who want a dependable and distributed activity queue, Redis Streams generally is a nice selection. It lets you push duties right into a stream and have a number of customers course of these duties concurrently.
  • Exercise Feeds: For those who’re constructing a social community or any utility that requires exercise feeds, Redis Streams can effectively deal with the feed knowledge, making certain quick entry and scalability.
  • Message Brokering: Redis Streams can function a light-weight message dealer for microservices or different distributed programs. It could actually deal with message routing and be sure that messages are delivered to customers.
  • Actual-Time Analytics: When it’s essential analyze knowledge in real-time, Redis Streams might be helpful for storing the incoming knowledge after which processing and aggregating it utilizing Redis capabilities.
  • IoT Knowledge Ingestion: For those who’re coping with knowledge from Web of Issues (IoT) gadgets, Redis Streams can deal with the high-throughput and real-time nature of the information generated by these gadgets.
  • Logging and Audit Trails: Redis Streams can be utilized to retailer logs or audit trails in real-time, making it straightforward to investigate and troubleshoot points.
  • Stream Processing: If it’s essential course of a steady stream of information in a selected order (for instance, monetary transactions or sensor readings), Redis Streams might help you handle the information within the order it was obtained.

Stipulations

  • Docker and Docker Compose are put in.
  • Fundamental understanding of Spring Boot, Redis Streams, and Kafka.
  • Java 17 or larger
  • An HTTP shopper of your selection. I used [httpie]

After all, you will get the code here.

Steps

1. Set Up Docker Compose for the Backend Infrastructure and the Spring Boot App

Create a `docker-compose.yml` file to outline the companies:

model: '3.8'
companies:
   zookeeper:
      picture: confluentinc/cp-zookeeper:newest
      surroundings:
         ZOOKEEPER_CLIENT_PORT: 2181
         ZOOKEEPER_TICK_TIME: 2000
      ports:
         - 22181:2181

      networks:
         node_net:
            ipv4_address: 172.28.1.81

   kafka:
      picture: confluentinc/cp-kafka:newest
      depends_on:
         - zookeeper
      ports:
         - 29092:29092
         - 9092:9092
         - 9093:9093
      surroundings:
         KAFKA_BROKER_ID: 1
         KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
         KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092,,EXTERNAL://172.28.1.93:9093
         KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,EXTERNAL:PLAINTEXT
         KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
         KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

      networks:
         node_net:
            ipv4_address: 172.28.1.93

   cache:
      picture: redis:6.2-alpine
      #picture: redis:5.0.3-alpine
      restart: all the time
      ports:
         - '6379:6379'
      #command: redis-server --save 20 1 --loglevel warning --requirepass eYVX7EwVmmxKPCDmwMtyKVge8oLd2t81
      command: redis-server /usr/native/and so forth/redis/redis.conf --loglevel verbose --save 20 1
      volumes:
         - cache:/knowledge
         - ./redis.conf:/usr/native/and so forth/redis/redis.conf
         - $PWD/redis-data:/var/lib/redis

         #surroundings:
         # - REDIS_REPLICATION_MODE=grasp

      networks:
         node_net:
            ipv4_address: 172.28.1.79

volumes:
   cache:
      driver: native

networks:
   node_net:
      ipam:
         driver: default
         config:
            - subnet: 172.28.0.0/16

Create the yaml for the appliance ‘sse-demo.yml’.

model: "3.8"
companies:
  sse-demo:
    picture: "sse/spring-sse-demo:newest"
    ports:
      - "8080:8080"
      #- "51000-52000:51000-52000"

    env_file:
      - native.env

    surroundings:
      - REDIS_REPLICATION_MODE=grasp
      - SPRING_PROFILES_ACTIVE=default
      - REDIS_HOST=172.28.1.79
      - REDIS_PORT=6379
      - KAFKA_BOOTSTRAP_SERVERS=172.28.1.93:9093

    networks:
      node_net:
        ipv4_address: 172.28.1.12

networks:
  node_net:
    exterior:
      identify: docker_node_net

2. Create Spring Boot Software

Create a Spring Boot utility with the required dependencies:

git checkout 

In `pom.xml,` add the required dependencies for Kafka and Redis integration.

3. Implement Kafka Shopper and Redis Stream Producer

Create a Kafka client that listens to Kafka occasions and sends them to Redis Streams. This part additionally consumes the Redis streams for the HTTP shoppers:

package deal com.taptech.sse.occasion;


import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.taptech.sse.utils.DurationSupplier;
import com.taptech.sse.utils.ObjectMapperFactory;
import com.taptech.sse.config.SSEProperties;
import jakarta.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.occasion.ApplicationStartedEvent;
import org.springframework.context.occasion.EventListener;
import org.springframework.knowledge.redis.connection.stream.*;
import org.springframework.knowledge.redis.core.ReactiveStringRedisTemplate;
import reactor.core.Disposable;
import reactor.core.writer.Flux;
import reactor.core.writer.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.util.operate.Tuple2;
import reactor.util.operate.Tuples;
import reactor.util.retry.Retry;

import java.nio.ByteBuffer;
import java.time.Period;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.operate.Perform;

public class DefaultEventReceiverService implements EventReceiverService 
	non-public static ultimate Logger logger = LoggerFactory.getLogger(DefaultEventReceiverService.class);

	public ultimate static String TEST_STREAM = "check.stream";
	public ultimate static String TEST_STREAM_KEY = "check.stream.key";
	public ultimate static String TEST_STREAM_VALUE = "check.stream.worth";
	non-public static ultimate String EMPTY_STR = "";
	public static ultimate String CLIENT_STREAM_STARTED = "shopper.stream.began";
	public static ultimate String HASH_PREFIX = "hash.";

	non-public static ObjectMapper objectMapper = ObjectMapperFactory.createObjectMapper(ObjectMapperFactory.Scope.SINGLETON);

	ReactiveStringRedisTemplate redisTemplate;
	KafkaReceiver<String, String> kafkaReceiver;
	SSEProperties sseProperties;
	StreamReadOptions streamReadOptions;

	public DefaultEventReceiverService(ReactiveStringRedisTemplate redisTemplate, KafkaReceiver<String, String> kafkaReceiver,
									   SSEProperties sseProperties) 
		this.redisTemplate = redisTemplate;
		this.kafkaReceiver = kafkaReceiver;
		this.sseProperties = sseProperties;
		this.streamReadOptions = StreamReadOptions.empty().autoAcknowledge()
				.block(Period.of(sseProperties.getClientHoldSeconds(), ChronoUnit.SECONDS));
	

	static ultimate Perform<String,String> calculateHashKey = str -> new StringBuilder(HASH_PREFIX).append(str).toString();

	@PostConstruct
	public void init() 

		this.redisTemplate.opsForValue().append(TEST_STREAM_KEY, TEST_STREAM_VALUE).subscribe();

	

	@EventListener(ApplicationStartedEvent.class)
	public Disposable startKafkaConsumer() 
		logger.data("############# Beginning Kafka listener.....");
		return kafkaReceiver.obtain()
				.doOnError(error -> logger.error("Error receiving occasion, will retry", error))
				.retryWhen(Retry.fixedDelay(Lengthy.MAX_VALUE, Period.ofSeconds(sseProperties.getTopicRetryDelaySeconds())))
				.doOnNext(file -> logger.data("Acquired occasion: key ", file.key()))
				.filterWhen(file -> checkIfStreamBeingAccessed(file))
				.concatMap(this::handleEvent)
				.subscribe(file -> file.receiverOffset().acknowledge());
	

	Mono<Boolean> checkIfStreamBeingAccessed(ReceiverRecord<String,String> file)
		return this.redisTemplate.opsForHash().hasKey(calculateHashKey.apply(file.key()), CLIENT_STREAM_STARTED)
				.doOnNext(val -> logger.data("key => 's stream is being accessed ",file.key(),val));
	

	public Mono<ReceiverRecord<String, String>> handleEvent(ReceiverRecord<String, String> file) 
		return Mono.simply(file)
				.flatMap(this::produce)
				.doOnError(ex -> logger.warn("Error processing occasion: key ", file.key(), ex))
				.onErrorResume(ex -> Mono.empty())
				.doOnNext(rec -> logger.debug("Efficiently processed occasion: key ", file.key()))
				.then(Mono.simply(file));
	

	public Mono<Tuple2<RecordId, ReceiverRecord<String, String>>> produce(ReceiverRecord<String, String> recRecord) 

		ObjectRecord<String, String> file = StreamRecords.newRecord()
				.ofObject(recRecord.worth())
				.withStreamKey(recRecord.key());
		return this.redisTemplate.opsForStream().add(file)
				.map(recId -> Tuples.of(recId, recRecord));
	

	Perform<ObjectRecord<String, String>, NotificationEvent> convertToNotificationEvent() 
		return (file) -> 
			NotificationEvent occasion = null;
			attempt 
				occasion = objectMapper.readValue(file.getValue(), NotificationEvent.class);
			 catch (JsonProcessingException e) 
				e.printStackTrace();
				occasion = new NotificationEvent();
			
			return occasion;
		;
	

	non-public Mono<String> createGroup(String workspaceId)
		return redisTemplate.getConnectionFactory().getReactiveConnection().streamCommands()
				.xGroupCreate(ByteBuffer.wrap(workspaceId.getBytes()), workspaceId, ReadOffset.from("0-0"), true)
				.doOnError((error) -> 
					if (logger.isDebugEnabled())
						logger.debug("Couldn't create group.",error);
					
				)
				.map(okStr -> workspaceId)
				.onErrorResume((error) -> Mono.simply(workspaceId));
	

	non-public Flux<NotificationEvent> findClientNotificationEvents(Shopper client, StreamOffset<String> streamOffset, DurationSupplier booleanSupplier)
		return this.redisTemplate.opsForStream().learn(String.class, client, streamReadOptions, streamOffset)
				.map(convertToNotificationEvent())
				.repeat(booleanSupplier);
	

	public Flux<NotificationEvent> devour(ultimate String clientId)
		return Flux.from(createGroup(clientId))
				.flatMap(id -> addIdToStream(clientId))
				.map(id -> Tuples.of(StreamOffset.create(clientId, ReadOffset.lastConsumed()),
						Shopper.from(clientId, clientId),
						new DurationSupplier(Period.of(sseProperties.getClientHoldSeconds(), ChronoUnit.SECONDS), LocalDateTime.now())))
				.flatMap(tuple3 -> findClientNotificationEvents(tuple3.getT2(), tuple3.getT1(), tuple3.getT3()));

	

	non-public Mono<String> addIdToStream(String id) 
		return this.redisTemplate.opsForHash().put(calculateHashKey.apply(id), CLIENT_STREAM_STARTED, Boolean.TRUE.toString()).map(val -> id);
	

	public Flux<Boolean> deleteWorkspaceStream(String workspaceId)
		StreamOffset<String> streamOffset = StreamOffset.create(workspaceId, ReadOffset.lastConsumed());
		StreamReadOptions streamReadOptions = StreamReadOptions.empty().noack();
		Shopper client = Shopper.from(workspaceId, workspaceId);

		return this.redisTemplate.opsForStream().learn(String.class, client, streamReadOptions, streamOffset)
				.flatMap(objRecord -> this.redisTemplate.opsForStream().delete(workspaceId,objRecord.getId()).map(val -> objRecord))
				.flatMap(objRecord -> this.redisTemplate.opsForHash().delete(workspaceId));
	

	@Override
	public Flux<String> consumeString(String clientId) 
		return this.redisTemplate.opsForStream().learn(String.class, StreamOffset.newest(clientId)).map(ObjectRecord::getValue);
	


4. Configure Companies in Spring Boot

cache.supplier.identify=redis
cache.host=$REDIS_HOST:localhost
cache.port=$REDIS_PORT:6379
cache.password=password


# Producer properties
spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
spring.kafka.producer.key-serializer=org.apache.kafka.widespread.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.widespread.serialization.StringSerializer
spring.kafka.producer.group-id=group_id
spring.kafka.boostrap.servers=$KAFKA_BOOTSTRAP_SERVERS:loclahost:9093


# Frequent Kafka Properties
auto.create.subjects.allow=true
sse.client-hold-seconds=$SSE_CLIENT_HOLD_SECONDS:120

logging.stage.root=INFO
logging.stage.com.taptech.sse=DEBUG

5. Construct Docker Picture for Spring Boot App

Utilizing the `kubernetes-maven-plugin` from jkube, create the picture on your Spring Boot utility:

./mvnw clear package deal -Dmaven.check.skip=true k8s:construct

6. Begin the Companies and Run the Software

From the src/check/sources/docker listing

Begin the companies:

Begin the app:

7. Hook up with a Stream Utilizing One of many IDS within the client-ids.json File 

http --stream GET  clientId==dd07bd51-1ab0-4e69-a0ff-f625fa9e7fc0

8. Generate Some Occasions

You are able to do an HTTP POST to /generateNE

http POST /generateNE

After this, watch as your HTTP shopper receives an occasion for the clientId that it’s subscribed to.

Dialogue

Why would you employ Kafka and Redis? Doesn’t Kafka supply this alone? Many firms have invested in Kafka as a backend message supplier between their programs. Kafka in itself doesn’t deal with message choice very simply. 

Message choice isn’t a typical characteristic offered natively by Kafka for a few causes:

  • Knowledge Measurement and Latency: Kafka is designed for high-throughput, low-latency message processing. Its structure focuses on distributing messages to numerous customers rapidly. Introducing message choice based mostly on arbitrary situations can decelerate the general processing and introduce latency, which works in opposition to Kafka’s major design targets.
  • Idempotency: Kafka depends on the idea of idempotent producers and customers. Which means that if a client or producer retries a message on account of a failure, it shouldn’t lead to duplicate processing. Introducing selective message retrieval would complicate this idempotency assure, probably resulting in unintended duplicate processing.
  • Shopper Offset Monitoring: Kafka maintains client offsets, permitting customers to maintain monitor of the final processed message. If message choice is launched, offsets turn into much less simple, as some messages could be skipped based mostly on choice standards.
  • Decoupled Structure: Kafka is designed to decouple producers from customers. Producers are unaware of client conduct, and customers can independently resolve what messages they need to devour. Message choice would break this decoupling, as producers would wish to know which messages to supply based mostly on particular client wants.
  • Shopper Flexibility: Kafka customers might be extremely versatile by way of message processing. They are often designed to filter, rework, and mixture messages based mostly on their very own standards. Introducing message choice on the Kafka stage would restrict this flexibility and make the system much less adaptable to altering client necessities.
  • Scaling and Parallelism: Kafka’s scalability and parallelism advantages come from the power to distribute messages throughout a number of partitions and permit a number of customers to course of messages in parallel. Selective message retrieval would complicate this parallelism, making it more durable to distribute work effectively.

Whereas Kafka itself would not present native message choice options, it is important to design the customers to deal with message filtering and choice if wanted. Customers might be designed to filter and course of messages based mostly on particular standards, making certain that solely related messages are processed throughout the client utility. This strategy permits Kafka to take care of its core design ideas whereas nonetheless offering the pliability wanted for varied message-processing eventualities.

Kafka couldn’t basically resolve the issue in a straightforward manner, which result in pushing the messages to a different persistent house that would simply choose based mostly on identified standards. This requirement results in the choice to make use of Redis and permit pushing messages on to Redis.

A choice was made to restrict the occasions being pushed into Redis based mostly on whether or not there was a shopper truly anticipating a message. If there have been no shoppers, then Kafka messages have been being filtered out.

.filterWhen(file -> checkIfStreamBeingAccessed(file))

The shopper registers the id in order that the Kafka listener will push the to the Redis stream. 

.flatMap(id -> addIdToStream(clientId))

Conclusion

By following the steps outlined on this doc, we now have efficiently applied an occasion streaming structure that takes occasions from Kafka, filters them based mostly on a selected shopper ID, and forwards the related occasions to Redis utilizing its Stream API. The SSE endpoint permits shoppers to obtain real-time occasion updates tailor-made to their respective shopper IDs. This answer offers an environment friendly and scalable approach to deal with occasion streaming for focused shoppers.

#Constructing #Reactive #Occasion #Streaming #Structure #DZone

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Skip to toolbar