본문

Kafka - Consumer

반응형

Kafka Cluster 구성

 

 

1. Consumer

1.1 Consumer
레코드를 polling하는 애플리케이션

 

a) 데이터를 가져가는(polling) 주체

b) commit을 통해 읽은 consumer offsetㅇㄹ 카프카에 기록

c) Java kafka-client 제공

d) 데이터 저장 방식

- File System (.cxv, .log, .tsv)

- Object Storage (S3, Minio)

- Hadoop (Hdfs, Hive)

- RDBNS (Oracle, CouchDB)

- 기타 저장소들 (Elasticsearch, influxDB)

 

Sample Code

 

a) 1개의 토픽만 구독

consumer.subscribe(Arrays.asList("TOPIC_NAME"));

 

b) n개의 토픽 구독

정규 표현식을 통한 토픽 구독

consumer.subscribe(Pattern.compile("TOPI.*"));

 

c) 특정 토픽의 파티션 구독

key를 포함한 Record를 consume할 때, 특정 파티션을 할당하고 싶다면

consumer.assign(Collections.singleton(new TopicPartition("TOPIC_NAME", 1)));

 

1.2 Consumer options
a) 필수옵션

- bootstrap.servers: 카프카 클러스터에 연결하기 위한 브로커 목록

- groip.id: 컨슈머 그룹 id

- key.deserializer: 메시지 키 역직렬화에 사용되는 클래스

- value.deserializer: 메시지 값을 역직렬화 하는데 사용되는 클래스

 

b) 선택옵션 - default값 존재

- enable.auto.commit: 자동 오프셋 커밋 여부

- auto.commit.interval.ms: 자동 오프셋 커밋일 때 interval 시간

- auto.offset.reset: 신규 컨슈머그룹일 때, 읽을 파티션의 오프셋 위치

- client.id: 클라이언트 식별값

- max.poll.records: poll() 메서드 호출로 반환되는 레코드의 최대 개수

- session.timeout.ms: 컨슈머가 브로커와 연결이 끊기는 시간

 

1.3 Consumer commit

a) enable.auto.commit=true

- 일정 간격(auto.commit.interval.ms), poll() 메서드 호출시 자동 commit

- commit 관련 코드를 작성할 필요없음. 편리함

- 속도가 가장 빠름

- 중복 또는 유실이 발생할 수 있음

중복/유실을 허용하지 않는곳(은행, 카드사 등)에서는 사용하면 안됨!

→ 일부 데이터가 중복/유실되도 상관없는 곳(센서 정보, GPS 등)에서 사용

중복 처리되는 메시지들

a) 중복 처리가 야기할 수 있는 문제점

- 커머스의 장바구니 시스템에서 중복 이슈 발생

→ 나는 장바구니에 1개 상품을 담았는데 2개 상품이 담김

 

- 카드사의 결제 시스템에서 중복 이슈 발생

→ 편의점에서 아이스크림을 결제했는데 2번 결제됨

 

- 택배사 SMS 발송 시스템엫서 중복 이슈 발생

→ 집에 도착했다는 SMS가 2번 발송

 

b) 데이터 중복을 막을 수 있는 방법

- 오토 커밋을 사용하되, 컨슈머가 죽지않도록 Keep Monitoring 한다.

불가능. 서버/애플리케이션은 언젠가 죽을 수 있다. ex) 배포 시

 

- 오토 커밋을 사용하지 않는다.

Kafka consumer의 commitSync(), commitAsync() 사용

 

유실되는 메시지들

 

b) enable.auto.commit=false

b.1) commitSync(): 동기 커밋

동기 커밋

- ConsumerRecord 처리 순서를 보장함

- 가장 느림 (커밋이 완료될 때까지 block)

- poll() 메서드로 변환된 ConsumerRecord의 마지막 offset을 커밋

- Map<TopicPartition, OffsetAndMetadata>을 통해 오프셋 지정 커밋 가능

지정 커밋

 

b.2) commitAsync() 비동기 커밋

비동기 커밋

- 동기 커밋보다 빠름

- 중복이 발생할 수 있음

→ 일시적인 통신 문제로 이전 offset보다 이후 offset이 먼저 커밋 될 때

 

- ConsumerRecord 처리 순서를 보장하지 못함

→ 처리 순서가 중요한 서비스(주문, 재고관리 등)에서는 사용 제한

비동기 + 동기 커밋

1.4 Consumer rebalance

a) 리벨런스: 컨슈머 그룹의 파티션 소유권이 변경될 때 일어나는 현상

- 리벨런스를 하는 동안 일시적으로 메시지를 가져올 수 없음

- 리벨런스 발생시 데이터 유실/중복 발생 가능성 있음

→ commitSync() 또는 추가적인 방법(unique key)으로 데이터 유실/중복 방지

 

- 리벨런스 발생 원인

→ consumer.close() 호출시 또는 consumer의 세션이 끊어졌을 때

 

b) Consumer rebalance listener

- 리벨런스 발생에 따른 offset commit

- 리벨런스 시간 측정을 통한 컨슈머 모니터링

 

1.5 Consumer wakeup

컨슈머를 정상적으로 종료시키기위해 wakeup 메소드를 사용한다.

레코드를 poll하는도중 장애가 발생하더라도, 데이터 중복/유실을 막기위해 wakeup 메소드를 이용하여 예외처리를 한다.

exception이 발생하면 마지막에 처리하던것을 commitSync()하고 consumer.close()를 이용하여 안전하게 종료하게 된다.

이를통해 리밸런스가 발생하는것을 브로커에 명시적으로 알려줄 수 있다.

 

wakeup()을 통한 graceful shytdown 필수!

- SIGTERM을 통힌 shutdown signal로 kill하여 처리한 데이터 커밋 필요

- SIGKILL(9)는 프로세스 강제 종료로 커밋 불가 중복/유실 발생

 

1.5.1 정상동작 예시

a) poll() 호출: record 100개 반환: offset 101번~200번

b) records loop 구문 수행

c) record.value() system print

d) offset 200 커밋

e) poll() 호출

f) 반복

 

1.5.2 SIGKILL로 인한 중복처리 발생 예시

 SIGKILL: 강제로 애플리케이션을 종료시키는것을 의미

 

a) poll() 호출

- 마지막 커밋된 오프셋이 100

- records 100개 반환: 오프셋 101~200

b) records loop 구문 수행

 

c) record.value() system 150번 오프셋 print 중, SIGKILL 호출

- 101번~150번 오프셋 처리 완료, 151번 오프셋~200번 오프셋 미처리

 

d) offset 200 커밋 불가

- 브로커에는 100번 오프셋이 마지막 커밋

→ 컨슈머 재시작, 다시 오프셋 101부터 처리 시작, 101번~150버 중복처리

 

1.6  Consumer lag

a) 컨슈머 랙은 컨슈머의 상태를 나타내는 지표

b) 컨슈머 랙의 최대값은 컨슈머 인스턴스를 통해 직접 확인할 수 있음

→ consumer.metrics()를 통해 확인할 수 있는 지표

   - records-lag-max: 토픽의 파티션 중 최대 랙

   - fetch-zise-avg: 1번 polling하여 가져올 때 레코드 평균 byte

   - fetch-rate: 1초 동안 레코드를 가져오는 횟수

 

컨슈머 인스턴스를 이용할 경우의 한계점

- 컨슈머 인스턴스 장애가 발생하면 지표 수집 불가

- 구현하는 컨슈머마다 지표를 수집하는 로직개발 필요

- 컨슈머 랙 최대값(records-lag-max)만 알 수 있음

→ 토픽에 파티션은 n개가 있을 수 있음

→ 최대값을 제외한 나머지 파티션의 컨슈머 랙은 알 수 없음

 

c) 컨슈머 랙 모니터링

- 외부 모니터링 Application 사용: Confluent Platform, Datadog, Kafka Burrow(Open Source)

 

 

 

 


💡 Apache kafka 컨슈머 애플리케이션 개발, 실습

https://www.youtube.com/watch?v=5FEE5wVi8uY

 

반응형

공유

댓글