EzyKafka Introduction

1. Introduce EzyKafka

EzyKafka (Easy going to Kafka Interaction) is a framework support to interact to Apache Kafka, It supports:
  1. Publish and consume message
  2. Serialize and Deserialize to map message's data in byte array to POJO
  3. Annotation driven

2. Structure of EzyKafka

  1. EzyKafkaProxy: Manages producers, consumers and data serializers.
  2. EzyKafkaProducer: Supports to send messages.
  3. EzyKafkaProducer: Supports to receive and consume the messages.

3. Install EzyKafka

1. To create EzyKafka we need add dependency
<dependency>
    <groupId>com.tvd12</groupId>
    <artifactId>ezymq-kafka</artifactId>
    <version>1.2.3</version>
</dependency>
The latest version can be found in the Maven Central repository.
You can configure EzyKafka like this:
EzyKafkaProxy kafkaProxy = EzyKafkaProxy.builder()
    .scan("com.tvd12.ezymq.example.kafka")
    .build();
2. You can add to your configuration file like this:
# for application.yaml
kafka:
  producers:
    hello-world:
      topic: hello-world
  consumers:
    hello-world:
      topic: hello-world
# for application.properties
kafka.producers.hello-world.topic=hello-world
kafka.consumers.hello-world.topic=hello-world

4. Example

Let’s say we need consume a message to sum 2 integer numbers, we can do it with EzyKafka follow by bellow steps.
4.1 Create message class
We need create a class has 2 fields: a and b:
package com.tvd12.ezymq.kafka.test.request;

import com.tvd12.ezyfox.message.annotation.EzyMessage;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@EzyMessage
@AllArgsConstructor
@NoArgsConstructor
public class SumRequest {
    private int a;
    private int b;
}
4.2 Create message handler class
Now we need create a handler class to handle the message:
package com.tvd12.ezymq.kafka.test.handler;

import com.tvd12.ezymq.kafka.annotation.EzyKafkaHandler;
import com.tvd12.ezymq.kafka.handler.EzyKafkaMessageHandler;
import com.tvd12.ezymq.kafka.test.request.SumRequest;

@EzyKafkaHandler(
    topic = "test",
    command = "sum"
)
public class SumRequestHandler implements EzyKafkaMessageHandler<SumRequest> {

    @Override
    public void process(SumRequest message) throws Exception {
        int result = message.getA() + message.getB();
        System.out.println("sum result: " + result);
    }
}
4.3 Create a program for producer
We need create a class has main method to start a producer:
package com.tvd12.ezymq.kafka.test;

import com.tvd12.ezyfox.util.EzyThreads;
import com.tvd12.ezymq.kafka.EzyKafkaProducer;
import com.tvd12.ezymq.kafka.EzyKafkaProxy;
import com.tvd12.ezymq.kafka.test.request.SumRequest;

public final class SumProducerProgram {

    public static void main(String[] args) {
        final EzyKafkaProxy kafkaContext = EzyKafkaProxy
            .builder()
            .scan("com.tvd12.ezymq.kafka.test")
            .build();
        final EzyKafkaProducer producer = kafkaContext.getProducer("test");
        producer.send("sum", new SumRequest(1, 2));
        while (true) {
            EzyThreads.sleep(3);
        }
    }
}
With configration like this:
# for application.yaml
kafka:
  producers:
    test:
      topic: test
      bootstrap.servers: localhost:9092
      client.id: KafkaProducerExample
4.4 Create a program for consumer
We need create a class has main method to start consumers:
package com.tvd12.ezymq.kafka.test;

import com.tvd12.ezymq.kafka.EzyKafkaProxy;

public final class SumConsumerProgram {

    public static void main(String[] args) {
        EzyKafkaProxy
            .builder()
            .scan("com.tvd12.ezymq.kafka.test")
            .build();
    }
}
With configration like this:
# for application.yaml
kafka:
  consumers:
    test:
      topic: test
      group.id: KafkaConsumerExample
4.5 Run the program
Now run the both SumConsumerProgram and SumProducerProgram programs separately, we will see the log in the SumConsumerProgram:
sum result: 3

Next

You can see how to configure