[kafka] kafka cluster 코드

 

1. --alter 가 삭제되는 시간 8640000 초

 

topics 중요내용

kafka-topics.sh --create --topic topic3 --partitions 1
kafka-topics.sh --describe --topic topic3
kafka-topics.sh --alter --topic topic3 --partitions 3
kafka-configs.sh --alter --entity-type topics --entity-name topic3 --add-config
retention.ms=86400000 ( -alter 가삭제될때까지 시간초 )
kafka-topics.sh --list

 

2. producer 중요내용

producer 중요내용

kafka-console-producer.sh --topic topic3 --request-required-acks 1
kafka-console-producer.sh --topic topic3 --message-send-max-retries 50
kafka-verifiable-producer.sh --topic topic3 --max-messages 100

 

 

1. --request-required-acks ( 0,1,all )

   0  : 브로커를 응답메시지를 보내고 카프카에 응답메시지 듣지않고 성공을 처리  ( but 브로커가 장애되면 메시지 유실됨)

         빠른속도 보장

 

   1  : 브로커를 응답메시지를 보내고 리더파티션 성공 응답값으로 처리

         

  all : 메시지 유실없고 중요한 케이스에서 사용됨 ( 하나이상의 팔로우 파티션 응답값도 확인)

 

 

2. --message-send-max-retries 

 : 브로커 장애로인해서 producer 에서 몇번의 재전송을 할건지하는 중요한 내용

 

3. kafka-verifiable-producer.sh

 

 : 간단한 발행테스트에 사용

 

 

consumer

kafka-console-consumer.sh --topic topic3 --from-beginning
kafka-console-consumer.sh --topic topic3 --from-beginning --group group1
kafka-console-consumer.sh --topic topic3 --from-beginning --group group2 --property 
print.key=true --property key.separator="-"

 

consumer-groups

kafka-consumer-groups.sh --list
kafka-consumer-groups.sh --describe --group group1

 

 

단일모드 connect 설정파일 - config/connect-standalone.properties
key.converter.schemas.enable=false
value.converter.schemas.enable=false

File Sink connect 설정파일 - config/connect-file-sink.properties
topics=topic3

단일모드 connect 실행
bin/connect-standalone.sh config/connect-standalone.properties 
config/connect-file-sink.properties