목차

Consumer

Properties prop = new Properties();
prop.put("bootstrap.servers", "localhost:9092");
prop.put("group.id", "group1");
prop.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
consumer.subscribe(Collections.singleton("simple"));  // 토픽 구동
 
while(조건){
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  for (ConsumerRecord<String, String> record: records) {
    Systme.out.println(record.value() + ":" + record.topic() + ":" + record.partition() + ":" + record.offset());
  }
}
 
consumer.close();

커밋과 오프셋

ranpd7q.jpg

커밋된 오프셋이 없는 경우

컨슈머 설정

자동 커밋/수동 커밋

수동 커밋 : 동기/비동기 커밋

ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record: records) {
  ... 처리
}
try {
  consumer.commitSync();
} catch(Exception ex) {
  // 커밋 실패시 에러 발생
}

ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record: records) {
  ... 처리
}
consumer.commitAsync(); // commitAsync(OffsetCommitCallback callback)

재처리와 순서

세션 타임아웃, 하트비트, 최대 poll 간격

종료 처리

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
consumer.subscribe(Collections.singleton("simple"));

try {
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); // wakeup() 호출시 익셉션 발생
    ... records 처리
    try {
      consumer.commitAsync();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
} catch (Exception ex) {
  ...
} finally {
  consumer.close();
}

주의: 쓰레드 안전하지 않음

Refs


관련 문서