티스토리 뷰

Kafka

카프카 컨슈머 옵션 테스트

rangrangerang 2021. 8. 15. 19:42

오늘은 카프카 컨슈머의 주요옵션들에 대해서 알아보고, 테스트해보자

먼저 카프카 컨슈머, 컨슈머 그룹, 리밸런싱, 오프셋, 커밋에 대해 알아야한다. 관련 내용은 아래 블로그에서 설명을 잘 하고 있으니 참고하자

리밸런싱
https://soft.plusblog.co.kr/29

오프셋, 커밋
https://freedeveloper.tistory.com/398

 

준비

1. aws에 주키퍼 1대, 카프카 1대를 설치한다.

2. ec2를 t2-micro사용했더니 메모리가 1Gb로 부족해서 카프카 메모리 줄여줬다

export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"

3. 외부 사용 포트 열어주기

## 외부 사용 포트 지정

vi kafka/config/server.properties

advertised.listeners=PLAINTEXT://{aws-ip}:9092

4. 타임존 변경 : 로그를 로컬 시간과 맞추기 위해

sudo cat /etc/localtime
sudo rm /etc/localtime
sudo ln -s /usr/share/zoneinfo/Asia/Seoul /etc/localtime
date

 

리밸런싱 관련 설정

session.timeout.ms (defalut=10s)

  • 카프카 브로커가 컨슈머에게 장애가 생겼다고 판단하는데 걸리는 시간
  • 컨슈머는 주기적으로 그룹 코디네이터에게 하트비트를 전송해 자신이 살아있음을 알림
  • 컨슈머가 그룹 코디네이터에게 하트비트를 보내지 않고 session.timeout.ms가 지나면 리밸런싱
  • 값이 작은 경우 이상상황 빠르게 감지. 불필요한 리밸런싱이 자주 발생

heartbeat.interval.ms (default=3s)

  • 그룹 코디네이터에게 얼마나 자주 KafkaConsumer poll() 메소드로 하트비트를 보낼것인지 조정
  • session.timeout.ms보다 낮아야 하고 일반적으론 1/3 정도로 설정

max.poll.interval.ms (defalut=5m)

  • 컨슈머가 poll을 한 후 다음 poll 까지 얼마나 기다리는지에 대한 설정
  • 한번 poll 한 컨슈머가 max.poll.interval.ms 까지 다시 poll 하지 않으면 리밸런싱
  • 파티션 무한 점유를 막기위한 설정
  • 로직이 오래 걸려서 리밸런싱이 되는 경우 중복소비되는 문제 발생할 수 있음
하나의 레코드 로직이 5분이상이 걸린다고 가정하면, 하나의 레코드만 소비해도 max.poll.interval.ms를 넘기 때문에 poll을 max.poll.interval.ms이내에 하지 못하고 리밸런싱이 진행된다. 따라서 하나의 레코드를 처리하는데 걸리는 시간 x max.poll.records보다 max.poll.interval.ms를 설정해야 불필요한 리밸런싱을 막을 수 있다.

heartbeat 쓰레드와 poll

heartbeat와 poll은 서로 다른쓰레드에서 동작한다.  KIP-62 이전에는 poll이 호출되는경우 heartbeat가 호출됐다. 하지만 KIP-62이후부턴 위 그림과 같이 heartbeat 쓰레드가 분리되어 poll에 더이상 영향을 받지 않게 되었다. 따라서 session.timeout.ms와 heartbeat.interval.ms는 긴밀한 관련이 있어서 보통 함께 고려하고, max.poll.interval.ms 옵션의 경우 poll에 관해서만 설정해준다.

 

오프셋, 커밋 관련 설정

enable.auto.commit (default=true)

  • true일 경우 백그라운드로 주기적으로 오프셋을 커밋
  • true일 경우 편리하지만 데이터 누락이나 중복 처리가 발생할 수 있음
  • poll(), close() 메서드 호출시 자동 커밋 실행
  • auto.commit.interval.ms: 자동 커밋 주기
  • false 일 경우 : commitSync() - 동기, commitAsync() - 비동기로 커밋해줘야한다

테스트 : 5초에 한번 메세지 전송, 자동 커밋 주기(default=5s)

# 토픽 상세 보기
cd kafka/bin
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group 토픽명

# 1초에 한번씩 토픽 상세 보기
while true; do ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group 토픽명; sleep 1 ; done ;
토픽 상세 보기를 했을 경우 나오는 칼럼에 대한 설명
- TOPIC: 토픽 이름
- PARTITION: consumer group 내의 각 consumer가 할당된 파티션 번호
- CURRENT-OFFSET: 현재 consumer group의 consumer가 각 파티션에서 마지막으로 offset을 commit한 값
- LOG-END-OFFSET: producer쪽에서 마지막으로 생성한 레코드의 offset
- LAG: LOG-END-OFFSET에서 CURRENT-OFFSET를 뺀 값.

- true일 경우 : 바로바로 커밋되기 때문에 LAG 값이 0이다.

- false일 경우 : 커밋이 되지 않기 때문에 LAG값이 계속 쌓이는걸 확인할 수 있다.

 

auto.offset.reset (defalut=latest)

  • 컨슈머가 읽어야 할 오프셋이 없을 때의 동작
    • earliest : 최초 데이터부터
    • latest : 최신 데이터부터
    • none : 이전 오프셋이 없으면 오류 ( 잘 사용하지 않음)

테스트 : 토픽을 새로 생성(offset이 없음)한 후 테스트

CURRENT-OFFSET 이 없는 초기 상태

- earliest일 경우 : 컨슈머 로그를 확인해보면 offset이 0인값부터 가져오는 것을 확인할 수 있다

- latest일 경우 : 최근 오프셋인 30부터 가져오는것을 확인할 수 있다

- none일 경우 : 오류가 난다

 

기타 설정

max.poll.records (default=500)

  • 한번의 poll() 요청으로 가져올 records의 최대 개수

테스트 : 데이터를 쌓아놓고 가져오기 위해 자동커밋 false, auto.commit.reset : earliest로 설정

2로 설정

5로 설정

 

request.timeout.ms (defalut=30s)

  • 요청에 대해 응답을 기다리는 최대 시간

 

테스트를 안한것도 있는데 오늘은 귀찮아서 여기까지만 쓰고 언젠가 업데이트하겠다..ㅎ

 

References
https://chrzaszcz.dev/2019/06/kafka-heartbeat-thread/
https://d2.naver.com/helloworld/0974525
https://soft.plusblog.co.kr/29
https://saramin.github.io/2019-09-17-kafka/
https://dev-punxism.tistory.com/entry/Kafka-Consumer-maxpollintervalms-%EC%84%A4%EC%A0%95%ED%95%98%EA%B8%B0

카프카 명령어
https://log-laboratory.tistory.com/169

'Kafka' 카테고리의 다른 글

kafka with raft(kraft)(kafka witout zookeeper) 설치  (0) 2023.03.11
댓글