Simple Kafka Producer and Consumer using Java

πŸ“¨ Producing and Consuming Messages in Apache Kafka using Java

Apache Kafka is a high-performance, distributed event streaming platform that enables applications to publish and subscribe to data streams in real time.
In this post, we’ll walk through how to set up Kafka locally, and create a simple Java producer and consumer to exchange messages.


⚙️ Step 1 — Download Apache Kafka

Download the latest stable version of Apache Kafka from the official website:

πŸ‘‰ https://kafka.apache.org/downloads

After downloading, extract the .tgz file:

tar -xzf kafka_2.13-3.7.0.tgz cd kafka_2.13-3.7.0

πŸš€ Step 2 — Start Zookeeper and Kafka Server

Kafka requires Zookeeper to manage its brokers (in older versions).
Start both Zookeeper and Kafka services using the default configuration files.

# Start Zookeeper ./bin/zookeeper-server-start.sh config/zookeeper.properties # Start Kafka broker ./bin/kafka-server-start.sh config/server.properties

✅ Once both are running, Kafka is ready to accept producer and consumer connections.


πŸ“¦ Step 3 — Create a Maven Project

Create a new Java Maven project and add the following dependency to your pom.xml file:

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.10.1.0</version> </dependency>

πŸ’‘ Note: This example uses an older Kafka client (0.10.x).
For newer versions, use org.apache.kafka:kafka-clients (post-0.11), but the logic remains similar.


🧠 Step 4 — Write Java Code for Producer and Consumer

Below is a simple example that sends a message to a Kafka topic (order) and then consumes it.

🧩 MyKafkaConsumer.java

package com.vinod.test; import java.util.*; import kafka.consumer.*; import kafka.producer.*; import kafka.javaapi.producer.Producer; import kafka.javaapi.consumer.ConsumerConnector; /** * Simple example to produce and consume messages using Apache Kafka. * * <p>This example demonstrates: * <ul> * <li>Creating a Kafka producer to send messages to a topic.</li> * <li>Creating a Kafka consumer to read messages from that topic.</li> * </ul> * * @author vinod */ public class MyKafkaConsumer { private ConsumerConnector consumer; /** * Method to create a Kafka producer and send a message. */ public void testProducer() { Properties properties = new Properties(); properties.put("metadata.broker.list", "localhost:9092"); properties.put("serializer.class", "kafka.serializer.StringEncoder"); ProducerConfig producerConfig = new ProducerConfig(properties); Producer<String, String> producer = new Producer<>(producerConfig); KeyedMessage<String, String> message = new KeyedMessage<>("order", "Sending Customer Order, please process"); producer.send(message); System.out.println("✅ Message sent successfully to 'order' topic!"); } /** * Method to create a Kafka consumer and read messages from the topic. */ public void testConsumer() { String topic = "order"; Properties props = new Properties(); props.put("zookeeper.connect", "localhost:2181"); props.put("group.id", "vinod"); props.put("zookeeper.session.timeout.ms", "5000"); props.put("zookeeper.sync.time.ms", "250"); props.put("auto.commit.interval.ms", "1000"); consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); Map<String, Integer> topicCount = new HashMap<>(); topicCount.put(topic, 1); Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount); List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic); for (KafkaStream<byte[], byte[]> stream : streams) { ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { String message = new String(it.next().message()); System.out.println("πŸ“© Message from topic '" + topic + "': " + message); } } if (consumer != null) { consumer.shutdown(); } } public static void main(String[] args) { MyKafkaConsumer app = new MyKafkaConsumer(); app.testProducer(); // Send message app.testConsumer(); // Read message } }

🧾 Step 5 — Example Output

When you run the program, the following output will appear on your console:

✅ Message sent successfully to 'order' topic! πŸ“© Message from topic 'order': Sending Customer Order, please process

πŸ“˜ Step 6 — Download Full Example

You can download or clone the working example from GitHub:

πŸ”— https://github.com/kkvinodkumaran/kafka


🧠 How It Works — High-Level Flow

Here’s the conceptual flow of how this example operates:

[Producer] → (Message) → [Kafka Broker] → (Stored in topic: order) → [Consumer]
  • Producer connects to the broker and publishes the message.

  • Broker stores the message in the specified topic partition.

  • Consumer subscribes to that topic and retrieves messages sequentially.


🧩 Key Takeaways

ConceptDescription
ProducerSends data to Kafka topics
ConsumerReads data from Kafka topics
BrokerKafka server that stores and delivers messages
TopicNamed channel for message streams
ZookeeperManages broker metadata (for older Kafka versions)


Reference: Kafka

Apache Camel Kafka Component Example

Apache camel API has the inbuilt kafka component and it is very simple to create producer, consumer and process messages. Here is one simple Kafka producer and consumer example using Apache camel and Kafka.

Steps

1) Download Apache kafka from https://kafka.apache.org/downloads


2) Extract the tar and start the zookeeper and kafka 


 ./zookeeper-server-start.sh ../config/zookeeper.properties

 ./kafka-server-start.sh ../config/server.properties


3. Create a Maven project and add below dependencies


<dependencies>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>2.16.3</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-kafka</artifactId>
<version>2.16.3</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-jetty</artifactId>
<version>2.16.3</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test</artifactId>
<version>2.17.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>


4. Create a camel main class to add camel routes and its processors.


package com.vinod.test;

import org.apache.camel.Exchange;
import org.apache.camel.Main;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.KafkaConstants;

public class CamelKafkaConsumerTest {

public static void main(String... args) throws Exception {
Main main = new Main();
main.enableHangupSupport();
main.addRouteBuilder(new MyCamelJettyBuilder());
main.run(args);
}
}

class MyCamelJettyBuilder extends RouteBuilder {
String topicName = "topic=test";
String kafkaServer = "kafka:localhost:9092";
String zooKeeperHost = "zookeeperHost=localhost&zookeeperPort=2181";
String serializerClass = "serializerClass=kafka.serializer.StringEncoder";
String autoOffsetOption = "autoOffsetReset=smallest";
String groupId = "groupId=testingvinod";

String toKafka = new StringBuilder().append(kafkaServer).append("?").append(
topicName).append("&").append(zooKeeperHost).append("&").append(
serializerClass).toString();

String fromKafka = new StringBuilder().append(toKafka).append("&").append(
autoOffsetOption).append("&").append(groupId).toString();

public void configure() {
from("jetty:http://localhost:8182/mytestservice").process(
new Processor() {
public void process(Exchange exchange) throws Exception {
String message = exchange.getIn().getBody(String.class);
exchange.getIn().setBody(message, String.class);
exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY,
0);
exchange.getIn().setHeader(KafkaConstants.KEY, "1");
}
}).to(toKafka);
from(fromKafka).process(new Processor() {
public void process(Exchange exchange) throws Exception {
if (exchange.getIn() != null) {
Message message = exchange.getIn();
String data = message.getBody(String.class);
System.out.println("Data =" + data.toString());
}
}
});

}
}

In this example exposed a jetty end point to give the input message instead of static message, once started the above program use any rest client to test the program.


5) Send message to Kafka topic using our service .. http://localhost:8182/mytestservice


Screen Shot 2016 11 01 at 9 03 01 PM

now we can see the output which we sent and consumed from Kafka in the console

Data  =Test my message by Vinod


6) Download Example

https://github.com/kkvinodkumaran/camel


Reference: Apache Camel, Kafka

Model Context Protocol (MCP) — Complete Guide for Backend Engineers

  Model Context Protocol (MCP) — Complete Guide for Backend Engineers Build Tools, Resources, and AI-Driven Services Using LangChain Moder...

Featured Posts