Properties prop = new Properties(); prop.put("bootstrap.servers", "localhost:9092"); prop.put("group.id", "group1"); prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop); consumer.subscribe(Collections.singleton("simple")); // 토픽 구동 while(조건){ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record: records) { Systme.out.println(record.value() + ":" + record.topic() + ":" + record.partition() + ":" + record.offset()); } } consumer.close();
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> record: records) { ... 처리 } try { consumer.commitSync(); } catch(Exception ex) { // 커밋 실패시 에러 발생 }
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> record: records) { ... 처리 } consumer.commitAsync(); // commitAsync(OffsetCommitCallback callback)
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop); consumer.subscribe(Collections.singleton("simple")); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); // wakeup() 호출시 익셉션 발생 ... records 처리 try { consumer.commitAsync(); } catch (Exception e) { e.printStackTrace(); } } } catch (Exception ex) { ... } finally { consumer.close(); }