본문

Kafka 건드려보기

반응형

1. config/server.proerties

- broker.id: 정수로 된 브로커 번호. 클러스터 내 고유번호 지정

- listeners: kafka 통신에 사용되는 host:port

- advertised.listeners: kafka client가 접속할 host:port

- log.dirs: 메시지를 저장할 디스크 디렉토리. 세그먼트가 저장됨

- log.segment.bytes: 메시지가 저장되는 파일의 크기 단위

- log.retention.ms: 메시지를 얼마나 보존할지 지정. 닫힌 세그먼트를 처리

- zookeeper.connect: 브로커의 메타데이터를 저장하는 주키퍼의 위치

- auto.create.topics.enable: 자동으로 토픽 생성이 가능하도록 설정의 여부

- num.partitoins: 자동생성된 토픽의 default partition 개수

- message.max.bytes: kafka broker에 쓰려는 메시지 최대 크기

 

2. zookeeper, kafka 실행

$ bin/zookeeper-server-start.sh -daemon ../config/zookeeper.properties
$ bin/kafka-server-start.sh -daemon ../config/server.properties

- zookeeper는 카프카의 메타데이터(브로커id, 컨트롤러id 등) 저장

- 카프카(~2.5.0까지)는반드시 주키퍼가 필요함: 주키퍼 의존성을 제거하는 작업이 KIP-500을 통해 진행중

- JPS 명령어를 통해 카프카, 주키퍼가 정상적으로 Running중인지 확인 가능

netstat -anp | grep 2181 | grep LIST
netstat -anp | grep 9092 | grep LIST

 

3. kafka shell scripts

a) kafka-topics.sh

토픽 생성, 조회, 수정 등의 역할

## show topic list
$ bin/kafka-topics.sh --list --zookeeper 주키퍼IP:2181
$ bin/kafka-topics.sh --list --zookeeper 201.7.1.101:2181

## show topic list (cluster)
$ bin/kafka-topics.sh --list --zookeeper 주키퍼1_IP:2181,주키퍼2_IP:2181,주키퍼3_IP:2181
$ bin/kafka-topics.sh --list --zookeeper 201.7.1.101:2181,201.7.1.106:2181,201.7.1.108:2181

 

b) kafka-console-consumer.sh

토픽의 레코드 즉시 조회

## show the topic
$ bin/kafka-console-consumer.sh --bootstrap-server 카프카IP:9092 --from-beginning --topic 토픽명
$ bin/kafka-console-consumer.sh --bootstrap-server 201.7.1.101:9092 --from-beginning --topic ServiceData-MDData

 

c) kafka-console-producer.sh

토픽의 레코드를 전달(String)

 

d) kafka-consumer-groups.sh

컨슈머그룹 조회, 컨슈머 오프셋 확인, 수정

 

* 위의 4개를 포함하여 33개의 kafka shell scripts가 제공됨

 

4. 카프카 토픽 생성

## show the topic
$ bin/kafka-topics.sh --create --bootstrap-server 카프카IP:9092 --replication-factor 복제옵션 --partitions 파티션개수 --topic 토픽명
$ bin/kafka-topics.sh --create --bootstrap-server 201.7.1.101:9092 --replication-factor 1 --partitions 3 --topic ServiceData-MDData

--bootstrap-server: 토픽관련 명령어를 수행할 대상 카프카 클러스터

--replication-factor: replica 개수 지정(브로커 개수 이하로 설정 가능)

--partitions: 파티션 개수 설정

--config: 각종 토픽 설정 가능 (retention.ms, segment.byte 등)

--create: 토픽 생성

--delete: 토픽 제거

--describe: 토픽 상세 확인

--list: 카프카 클러스터의 토픽 리스트 확인

--version: 대상 카프카 클러스터 버전 확인

 

* 카프카 세그먼트 확인

 

5. 카프카 콘솔 프로듀서, 콘솔 컨슈머

5.1 kafka-console-producer

$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_name

- 토픽에 String 데이터를 넣는 용도
- Key 지정은 불가능: 테스트용

 

5.2 kafka-console-consumer

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_name --from-beginning

- 토픽의 데이터를 String으로 읽는 용도
- 카프카 운영시 가장 많이 사용

 

6. 카프카 컨슈머 그룹

- TOPIC: 토픽 이름
- PARTITION: consumer group 내의 각 consumer가 할당된 파티션 번호
- CURRENT-OFFSET: 현재 consumer group의 consumer가 각 파티션에서 마지막으로 offset을 commit한 값
- LOG-END-OFFSET: producer쪽에서 마지막으로 생성한 레코드의 offset
- LAG: LOG-END-OFFSET에서 CURRENT-OFFSET를 뺀 값

 

~/kafka/bin/kafka-consumer-groups.sh --bootstrap-server server1_IP, server2_IP, server3_IP --group groupName --describe
~/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 201.7.1.101:9092, 201.7.1.106:9092, 201.7.1.108:9092 --group ServiceData-MDData --describe

 

7. 카프카 오프셋 리셋

~/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupName --topic topicName --reset-offsets --to-earliest --execute

 

카프카 Consumer를 사용하다 보면 offset을 reset해야하는 경우가 종종 있다.
- 개발 테스트를 진행하다가 필요에의해 offset을 리셋
- 예상치 못한 에러 등으로 데이터 누락이 발생하여 일정기간 전으로 다시 offset을 rewind해서 사용하고자하는 경우

P.S. 카프카(Kafka) Consumer offset reset 방법: https://www.letmecompile.com/kafka-consumer-offset-reset

 

* 상세 옵션

--shift-by <Long>: 컨슈머 오프셋에서 + 또는 - 이동
--to-offset <Long>: 컨슈머 오프셋을 지정
--to-latest: 가장 최신(높은숫자) 오프셋으로 지정
--to-earliest: 가장 이른(낮은 숫자) 오프셋으로 지정

 

* 특정 파티션만 오프셋 이동

 

 


파티션과 컨슈머 개수 설정
1. 컨슈머 lag는 계속 늘어나고 있지만 처리는 정상으로 병렬처리되고있다면 무조건 괜찮은 걸까요?
- 시스템/서비스 확인
사전에 TEST용으로 파티션과 컨슈머를 생성후 lag 추이를 파악한 후, 파티션과 컨슈머 충분하게 늘려놓는다.

- 이미 데이터처리를 하고있는데, 컨슈머 lag이 너무많이 발생하는 경우
컨슈머를 안전하게 종료후(컨슈머를 종료해도 데이터 유실이 되거나하지않음) 파티션 개수를 늘려서 병렬처리 속도를 향상시킨다.

2. 무조건 파티션 개수를 늘리는것이 좋은것인가?
No! 파티션이 컨슈머랑 할당되는 리밸런스 과정에 있어서 파티션이 늘어나면 늘어날수록 리밸런스에 시간이 더 소요된다.
즉 서비스에 적절한 파티션과 컨슈머를 설정하하여 운영하는 것이 중요하다.

 


💡 Apache kafka 기본개념 및 생태계

https://www.youtube.com/watch?v=catN_YhV6To

반응형

공유

댓글