Kafka 기본 프로그래밍
Client Tool 사용
Kafka CLI Tools 접근 방법
- docker ps를 통해 Broker의 Container ID 혹은 Container name 파악
- 해당 컨테이너로 로그인 : docker exec -it Brokcer_Container_ID sh
- 다양한 Kafka Client Tool 사용 가능
- kafka-topics
- kafka-configs
- kafka-console-consumer
- kakfa-console-producer
- ...
kafka-topics
- kafka-topics --bootstrap-server kafka1:9092 --list
- kafka-topics --bootstrap-server kafka1:9092 --delete --topic topic_test
kafka-console-producer
- Command line을 통해 Topic을 만들고 Message 생성 가능
- kafka-console-producer --bootstrap-server kafka1:9092 --topic test_console
kafka-console-consumer
- Command line을 통해 Topic에서 Message 읽기 가능
- kafka-console-consumer --bootstrap-server kafka1:9092 --topic test_console --from-beginning
- --from-beginning : 가장 최근 것부터 읽음(earliest), 아니면 latest로 동작
Topic 파라미터 설정
Topic 생성 시 다수의 Partition 혹은 Replica
- 먼저 KafkaAdminClient 오브젝트를 생성하고 create_topics 함수로 Topic을 추가
- create_topics의 인자로는 NewTopic 클래스의 오브젝트를 지정
- num_partitions (default=1) : Partition의 수, replication_factor (default=1) : Replication의 수
client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
topic = NewTopic(
name=name,
num_partitions=partitions,
replication_factor=replica)
client.create_topics([topic])
KafkaProducer 파라미터
- bootstrap_servers : 메시지를 보낼 때 사용할 브로커 리스트 (default='localhost:9092)
- cliend_id : Kafka Producer의 이름
- key_serializer, value_serializer : 메시지의 key와 value의 serialize 방법 지정 (함수)
- enable_idempotence : 중복 메시지 전송 방지 (default=False - 막지 않음)
- acks (0, 1, 'all') : Consistency level (0 : 바로 리턴, 1 : Leader에 쓰일 때까지 대기, 'all' : 모든 partition 대기)
- retries : 메시지 실패 시 재시도 횟수 (default=214783647)
- delivery.timeout.ms : 메시지 전송 최대 시간 (default=120000ms)
- linger_ms, batch_size : 메시지 송신 전 대기 시간 (default=0), 메시지 송신 전 데이터 크기 (default=16384)
- max_in_flight_request_per_connection : Broker에게 응답을 안 기다리고 보내는 최대 메시지 수 (default=5)
- 파라미터 레퍼런스
Kafka Producer 동작
KafkaProducer로 Topic 만들기
- 랜덤 하게 사람 정보를 만들어서 저장하는 Kafka Producer를 구현해 보자
- 예제 코드
- Faker 모듈 사용
- pydantic의 BaseModel을 사용하여 메시지 클래스를 구현 (Person)
from pydantic import BaseModel
class Person(BaseModel):
id: str
name: str
title: str
Consumer 옵션 살펴보기
KafkaConsumer 파라미터
- bootstrap_servers : 메시지를 보낼 때 사용할 브로커 리스트 (default='localhost:9092)
- client_id : Kafka Consumer의 이름
- group_id : Kafka Consumer Group의 이름
- key_deserializer, value_deserializer : 메시지의 key와 value의 deserialize 방법 지정 (함수)
- auto_offset_reset : earliest, latest (default=latest)
- enable_auto_commit (default=True)
- True : 소비자의 오프셋이 백그라운드에서 주기적으로 커밋
- False : 명시적으로 커밋을 해줘야 함
- 파라미터 레퍼런스
Consumer가 다수의 Partitions으로부터 어떻게 읽나?
- Consumer가 하나이고 다수의 Partition으로 구성된 Topic에서 읽어야 한다면?
- Consumer는 각 Partition로부터 라운드 로빈 형태로 하나씩 읽게 됨
- 이 경우 병렬성이 떨어지고 데이터 생산 속도에 따라 Backpressure가 심해질 수 있음
- 이를 해결하기 위한 것이 Consumer Group
- 한 프로세스에서 다수의 Topic을 읽는 것이 가능
- Topic 수만큼 KafkaConsumer 인스턴스를 생성하고 별도의 Group ID와 Client ID를 지정해야 함
Consumer Group
- Consumer가 Topic을 읽기 시작하면 해당 Topic 내 일부 Partition이 자동으로 할당
- Consumer의 수보다 Partition의 수가 더 많은 경우, Partition은 라운드 로빈 방식으로 할당
- 데이터 소비 병렬성을 늘리고 Backpressure 경감
- Consumer가 일부 중단되더라도 계속해서 데이터 처리 가능
- Consumer Group Rebalancing를 알아서 수행해 줌
- 기존 Consumer가 사라지거나 새로운 Consumer가 Group에 참여하는 경우 Partition 재지정
Consumer 예제 프로그램
Message Processing Guarantee 방식
- 실시간 메시지 처리 및 전송 관점에서 시스템의 보장 방식에는 3가지 존재
- Exactly Once (정확히 한 번)
- 각 Message가 Consumer에게 정확히 한 번만 전달된다는 것을 보장
- 네트워크 문제, 장애 혹은 재시도 가능성으로 아주 어려운 문제
- 1) Producer 단에서는 enable_idempotence를 True로 설정
- 2) Producer에서 메시지를 쓸 때와 Consumer에서 읽을 때 Transaction API 사용
- At Least Once (적어도 한 번 이상)
- 모든 메시지가 Consumer에게 적어도 한 번 이상 전달되도록 보장
- Consumer는 중복 메시지를 처리하기 위해 중복 제거 메커니즘을 구현해야 함
- 보통 Consumer가 직접 offset을 커밋할 때 발생
- At Most Once (최대 한 번만)
- 메시지 손실 가능성에 중점을 둠
- 메시지가 손실될 수는 있지만 중복은 없음
- 가장 흔한 메시지 전송 보장 방식 (기본 방식)
Consumer / Producer 패턴
- 많은 경우 Consumer는 한 Topic의 메시지를 소비해서 새로운 Topic을 만들기도 함
- 즉 Consumer이면서 Producer로 동작하는 것은 흔한 패턴
- 데이터 Transformation, Filtering, Enrichment
- 동일한 프로세스 내에서 Kafka Consumer를 사용하면서 한 Topic의 메시지를 읽고 Enrichment 수행
- Producer를 사용해 수정된 데이터를 다른 Topic으로 푸시 가능
ksqlDB 사용해 보기
ksqlDB
- REST API나 ksql 클라이언트 툴을 사용해서 Topic을 테이블처럼 SQL로 조작
- 여기서는 ksql을 사용하는 간단한 데모
- docker ps 후 confluentinc/cp-ksqldb-server의 container ID 복사
- docker exec -it ContainerID sh
- ksql
- ksql 실행 후 아래 두 개의 명령 실행
CREATE STREAM my_stream (id STRING, name STRING, title STRING) with (kafka_topic='fake_people', value_format='JSON');
SELECT * FROM my_stream;
timestamp (event time)
- ROWTIME : Producer가 데이터를 생성한 시간
- EMIT CHANGES : 이전에 읽은 것 이후의 데이터만 가져옴
SELECT *, ROWTIME FROM my_stream EMIT CHANGES;
'[프로그래머스] 데이터 엔지니어링 데브코스 3기 > TIL(Today I Learn)' 카테고리의 다른 글
[TIL - 71일 차] 머신러닝 기초 (0) | 2024.07.01 |
---|---|
[TIL - 70일 차] Kafka와 Spark Streaming 기반 스트리밍 처리 (5) (0) | 2024.06.28 |
[TIL - 68일 차] Kafka와 Spark Streaming 기반 스트리밍 처리 (3) (0) | 2024.06.26 |
[TIL - 67일 차] Kafka와 Spark Streaming 기반 스트리밍 처리 (2) (0) | 2024.06.25 |
[TIL - 66일 차] Kafka와 Spark Streaming 기반 스트리밍 처리 (1) (0) | 2024.06.24 |