Properties prop = new Properties(); prop.put("bootstrap.servers", "kafka01:9092,kafka01:9092,kafka01:9092"); prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<Integer, String> producer = new KafkaProducer<>(prop); producer.send(new ProducerRecord<>("topicname", "key", "value")); producer.send(new ProducerRecord<>("topicname", "value")); producer.close();
producer.send(new ProducerRecord<>("simple", "value"));
Future<RecordMetadata> f = producer.send(new ProducerRecord<>("topic", "value")); try { RecordMetadata meta = f.get(); // 블로킹 } catch (ExecutionException ex) { }
producer.send(new ProducerRecord<>("simple", "value"), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception ex) { } });
1
로 지정