First, create a maven project and add the following dependency in your pom:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
</dependencies>
The producer is initialized using a Properties
object.
There are lots of properties allowing you to fine-tune the producer behavior. Below is the minimal configuration needed:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "simple-producer-XX");
The bootstrap-servers
is an initial list of one or more brokers for the producer to be able discover the rest of the cluster. The serializer
properties tell Kafka how the message key and value should be encoded. Here, we will send string messages.
Although not required, setting a client.id
since is always recommended: this allows you to easily correlate requests on the broker with the client instance which made it.
Other interesting properties are:
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
You can control the durability of messages written to Kafka through the acks
setting. The default value of “1” requires an explicit acknowledgement from the partition leader that the write succeeded. The strongest guarantee that Kafka provides is with acks=all
, which guarantees that not only did the partition leader accept the write, but it was successfully replicated to all of the in-sync replicas. You can also use a value of “0” to maximize throughput, but you will have no guarantee that the message was successfully written to the broker’s log since the broker does not even send a response in this case.
retries
(default to >0) determines if the producer try to resend message after a failure. Note that with retries > 0, message reordering may occur since the retry may occur after a following write succeeded.
Kafka producers attempt to collect sent messages into batches to improve throughput. With the Java client, you can use batch.size
to control the maximum size in bytes of each message batch. To give more time for batches to fill, you can use linger.ms
to have the producer delay sending. Finally, compression can be enabled with the compression.type
setting.
Use buffer.memory
to limit the total memory that is available to the Java client for collecting unsent messages. When this limit is hit, the producer will block on additional sends for as long as max.block.ms
before raising an exception. Additionally, to avoid keeping records queued indefinitely, you can set a timeout using request.timeout.ms
.
The complete list of properties is available here. I suggest to read this article from Confluent for more details.
The send()
method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.
The result of send is a RecordMetadata
specifying the partition the record was sent to and the offset it was assigned. Since the send call is asynchronous it returns a Future
for the RecordMetadata that will be assigned to this record. To consult the metadata, you can either call get()
, which will block until the request completes or use a callback.
// synchronous call with get()
RecordMetadata recordMetadata = producer.send( message ).get();
// callback with a lambda
producer.send( message, ( recordMetadata, error ) -> System.out.println(recordMetadata) );
public class SimpleProducer{
public static void main( String[] args ) throws ExecutionException, InterruptedException{
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put( "client.id", "octopus" );
String topic = "test-topic";
Producer<String, String> producer = new KafkaProducer<>( props );
for( int i = 0; i < 10; i++ ){
ProducerRecord<String, String> message = new ProducerRecord<>( topic, "this is message " + i );
producer.send( message );
System.out.println("message sent.");
}
producer.close(); // don't forget this
}
}