apache-kafka Producer/Consumer in Java SimpleProducer (kafka >= 0.9)


Example

Configuration and initialization

First, create a maven project and add the following dependency in your pom:

 <dependencies>
     <dependency>
         <groupId>org.apache.kafka</groupId>
         <artifactId>kafka-clients</artifactId>
         <version>0.9.0.1</version>
     </dependency>
 </dependencies>

The producer is initialized using a Properties object. There are lots of properties allowing you to fine-tune the producer behavior. Below is the minimal configuration needed:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "simple-producer-XX");

The bootstrap-servers is an initial list of one or more brokers for the producer to be able discover the rest of the cluster. The serializer properties tell Kafka how the message key and value should be encoded. Here, we will send string messages. Although not required, setting a client.id since is always recommended: this allows you to easily correlate requests on the broker with the client instance which made it.

Other interesting properties are:

props.put("acks", "all"); 
props.put("retries", 0);  
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);

You can control the durability of messages written to Kafka through the acks setting. The default value of “1” requires an explicit acknowledgement from the partition leader that the write succeeded. The strongest guarantee that Kafka provides is with acks=all, which guarantees that not only did the partition leader accept the write, but it was successfully replicated to all of the in-sync replicas. You can also use a value of “0” to maximize throughput, but you will have no guarantee that the message was successfully written to the broker’s log since the broker does not even send a response in this case.

retries (default to >0) determines if the producer try to resend message after a failure. Note that with retries > 0, message reordering may occur since the retry may occur after a following write succeeded.

Kafka producers attempt to collect sent messages into batches to improve throughput. With the Java client, you can use batch.size to control the maximum size in bytes of each message batch. To give more time for batches to fill, you can use linger.ms to have the producer delay sending. Finally, compression can be enabled with the compression.type setting.

Use buffer.memory to limit the total memory that is available to the Java client for collecting unsent messages. When this limit is hit, the producer will block on additional sends for as long as max.block.ms before raising an exception. Additionally, to avoid keeping records queued indefinitely, you can set a timeout using request.timeout.ms.

The complete list of properties is available here. I suggest to read this article from Confluent for more details.

Sending messages

The send() method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.

The result of send is a RecordMetadata specifying the partition the record was sent to and the offset it was assigned. Since the send call is asynchronous it returns a Future for the RecordMetadata that will be assigned to this record. To consult the metadata, you can either call get(), which will block until the request completes or use a callback.

// synchronous call with get()
RecordMetadata recordMetadata = producer.send( message ).get();
// callback with a lambda
producer.send( message, ( recordMetadata, error ) -> System.out.println(recordMetadata) );

The code

public class SimpleProducer{
    
    public static void main( String[] args ) throws ExecutionException, InterruptedException{
        Properties props = new Properties();

        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put( "client.id", "octopus" );

        String topic = "test-topic";

        Producer<String, String> producer = new KafkaProducer<>( props );

        for( int i = 0; i < 10; i++ ){
            ProducerRecord<String, String> message = new ProducerRecord<>( topic, "this is message " + i );
            producer.send( message );
            System.out.println("message sent.");
        }

        producer.close(); // don't forget this
    }
}