Kafka Offset
Offset 이란?
Partition 의 특정 Consumer 가 메세지를 읽어들인 위치를 나타냄. Consumer 가 메세지 수신 처리를 재개할 때 어떤 메세지 부터 가져 가야 하는 지 알 수 있음.
Offset Topic
Kafka 에는 Consumer 의 offset 을 저장 하는 topic 이 존재함. (v0.9 release)
- __consumer_offsets
- 일반 토픽처럼 partitioning, replication 되어 있음
offsets.topic.replication.factor=1 (default=1)
Consumer 가 Kafka에 현재까지 읽은 메세지의 offset 정보를 알려주는 것을 commit 이라 한다.
- 일반 토픽처럼 partitioning, replication 되어 있음
Offset Option
- enable.auto.commit (default=true)
- 특정 주기 마다 자동으로 commit 하는 설정
- auto.commit.interval.ms (default=5000ms)
- 커밋 주기
- kafka 로 부터 메세지를 읽어 올때 이 주기와 맞으면 offset 정보를 commit 한다.
- enable.auto.commit = false 일 경우 이 옵션은 무시된다.
- auto commit 일 경우 장애 발생 타이밍에 따라 commit 된 메세지 처리가 완료 되지 않거나
- 메세지 처리 누락 발생
- 메세지 처리가 완료 되었지만 offset commit 이 이루어 지지 않은 경우가 발생 할 수 있음
- 동일 메세지 중복 처리
- auto.offset.reset
- Consumer 의 offset commit 정보가 존재 않거나 해당 offset 이 유효하지 않을 경우
- latest : 가장 새로운(마지막) offset부터 (Default)
- earliest : 가장 오래된(처음) offset부터
- none : 해당 consumer group이 가져가고자 하는 topic의 consumer offset정보가 없으면 exception을 발생시킴(seek 명령어로 명시적으로 offset 재지정)
- Consumer 의 offset commit 정보가 존재 않거나 해당 offset 이 유효하지 않을 경우
- Spring Kafka Configuration
spring: kafka: consumer: auto-offset-reset: earliest enable-auto-commit: true auto-commit-interval: 5000ms
Offset Commit
- Commit 방법
- commitSync()
while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.println(record.value()); try { consumer.commitSync(); } catch (CommitFailedException e) { System.err.println("commit failed"); } }
- 신뢰도가 가장 높음
- poll 메서드에서 반환된 마지막 오프셋을 커밋
- 커밋이 성공하거나 재시도가 불가능한 에러가 생길 때까지 commitSync() 를 재시도
- commitSync() 호출전에 리밸런싱이 시작된다면 중복 처리됨
- 브로커가 커밋 요청에 응답할 때 까지 기다려야 하기 때문에 Consumer 처리량에 제한이 생김
- commitAsync()
while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.println(record.value()); consumer.commitAsync(); }
- 커밋 요청을 전송하고 처리를 계속 진행
- callback 전달 가능
while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf(record.value()); consumer.commitAsync(new OffsetCommitCallback() { public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) { if (e != null) System.err.println("Commit failed"); else System.out.println("Commit succeeded"); if (e != null) log.error("Commit failed for offsets {}", offsets, e); } }); }
- commitSync()
- Spring Kafka AckMode
- enable-auto-commit=true 일 경우 이옵션은 무시된다.
- 모두 commitAsync() 을 실행 시킨다.
spring: kafka: listener: ack-mode: RECORD ack-time: 5000ms ack-count: 100
- RECORD
- 레코드가 처리 될 때 마다 실행
- BATCH (Default)
- 다음 poll 한 레코드가 모두 수행되고 다음 poll 이 수행되기 전에 배치로 실행
- TIME
- akc-time 설정 시간 단위
- COUNT
- 설정된 처리 횟수 후
- COUNT_TIME
- ack-time 또는 ack-count 둘중 하나가 만족될때
- MANUAL
- ack 가 수동으로 발생했을 경우. 하지만 뒤에서 배치로 실행
- MANUAL_IMMEDIATE
- Acknowledgment.acknowledge() 가 실행될 때 즉시 처리 ```java @Component public class KafkaConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
@Override @KafkaListener(topics = “product.event”, groupId = “finding-consumer-group”) public void messageListener(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) { LOGGER.info(“Received Message Consumer : “ + consumerRecord); acknowledgment.acknowledge(); } } ```
리밸런싱 & 특정 offset 부터 읽기
- ConsumerRebalanceListener interface 구현
- onPartitionsRevoked(Collection
partitions) - 리밸런싱이 시작되기 전, 그리고 컨슈머가 메세지 소비를 중단한 후 호출
- offset 를 커밋 해야 하는 곳 (저장)
- onPartitionsAssigned(Collection
partitions) - 파티션이 브로커에게 재할당된 후, 그리고 컨슈머가 파티션을 새로 할당 받아 메세지 소비를 시작하기 전에 호출
- consumer.seek() 를 이용하여 장애 발생 offset 부터 읽음
- onPartitionsRevoked(Collection
- consumer.subscriber(topics, Listener구현체)
public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener { private Consumer<?,?> consumer; public SaveOffsetsOnRebalance(Consumer<?,?> consumer) { this.consumer = consumer; } public void onPartitionsRevoked(Collection<TopicPartition> partitions) { // save the offsets in an external store using some custom code not described here for(TopicPartition partition: partitions) saveOffsetInExternalStore(consumer.position(partition)); } public void onPartitionsAssigned(Collection<TopicPartition> partitions) { // read the offsets from an external store using some custom code not described here for(TopicPartition partition: partitions) consumer.seek(partition, readOffsetFromExternalStore(partition)); } } // Consumer consumer.subscribe(topics, SaveOffsetsOnRebalance);
- spring kafka
- Rebalancing Listener : https://docs.spring.io/spring-kafka/reference/html/#rebalance-listeners
- Seek : https://docs.spring.io/spring-kafka/reference/html/#seek
참고
- Spark Offset Management
댓글남기기