Kafka 소비자 도구가 Docker의 클러스터에 연결되지 않음
주문하다
docker-compose -f docker-compose.yaml up --force-recreate --remove-orphans
Docker Compose 명령은 3개의 가상 머신( zookeeper
, kafka
, ) 을 시작합니다 kafka-create-topics
. 세 번째 가상 머신은 새 주제를 생성 topic
하고 여기에 메시지를 푸시한 후 사용을 시도합니다. 처음 두 명령은 작동하고 오류가 발생하지 않는 것 같습니다. 그런데 kafka-console-consumer
실패한 것 같다.어떤 구성 매개변수를 사용하든 관계 없음.
cub kafka-ready
- 성공kafka-topics
- 성공kafka-console-producer
- 성공kafka-console-consumer
- 매번 실패
실수
kafka-create-topics_1 | [2019-10-07 21:03:00,556] WARN [Consumer clientId=consumer-1, groupId=console-consumer-58118] Connection to node -1 (/172.21.0.3:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
kafka-create-topics_1 | [2019-10-07 20:58:24,047] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
kafka-create-topics_1 | org.apache.kafka.common.errors.TimeoutException
kafka-create-topics_1 | Processed a total of 0 messages
두 개의 파일
kafka-sample-generator.sh
docker-compose.yaml
도커파일docker-compose.yaml
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.3.1
hostname: zookeeper
ports:
- '2181:2181'
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-enterprise-kafka:5.3.1
hostname: kafka
ports:
- '9092:9092'
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:9092
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
kafka-create-topics:
image: confluentinc/cp-enterprise-kafka:5.3.1
depends_on:
- zookeeper
- kafka
hostname: kafka-create-topics
volumes:
- './kafka-sample-generator.sh:/bin/kafka-sample-generator.sh'
command: "/bin/kafka-sample-generator.sh"
environment:
KAFKA_TOPIC: topic
KAFKA_BROKER: kafka
샘플 생성기 스크립트kafka-sample-generator.sh
#!/bin/sh
HOST=`host $KAFKA_BROKER | awk '/has address/ { print $4 ; exit }'`
## Wait until Kafka is ready then create demo topic
echo 'Waiting for Kafka to be ready...'
cub kafka-ready -b $HOST:9092 1 20 && \
sleep 1
echo "Creating Topic [$HOST:9092 <topic:'$KAFKA_TOPIC'>]"
kafka-topics --create \
--topic $KAFKA_TOPIC \
--if-not-exists \
--zookeeper zookeeper:2181 \
--partitions 1
#--replication-factor 1
sleep 1
echo "Availalbe Topics"
kafka-topics --list --zookeeper zookeeper:2181
sleep 1
## Emit sample data stream
while true
do echo "Sending Data [$HOST:9092 <topic:'$KAFKA_TOPIC'>]"
for i in `seq 1 10`;
do echo "$HOST"
DATA="{\"data\":\"sample-data-$i\"}"
echo "$DATA"
kafka-console-producer \
--broker-list $HOST:9092 \
--topic $KAFKA_TOPIC \
"$DATA"
done
sleep 1.0
echo ''
echo "Receiving Data [$HOST:9092 <topic:'$KAFKA_TOPIC'>]"
## FAILES HERE
## FAILES HERE
## FAILES HERE
kafka-console-consumer \
--bootstrap-server $HOST:9092 \
--topic $KAFKA_TOPIC #\
#--partition 0 \
#--from-beginning \
#--max-messages 1 \
#--timeout-ms 45000 \
#--skip-message-on-error
done
답변1
작동하도록 변경한 사항은 다음과 같습니다.
- 표시기를 비활성화합니다(복제 인수가 3인 주제를 생성하려고 시도함).
- 복제 팩터 1을 사용하도록 kafka-topics create 명령을 변경했습니다.
- STDIN을 통해 kafka-console-Producer에 메시지 데이터 전달
- kafka-console-consumer는 처음부터 최대 10개의 메시지를 수신하므로 새 메시지를 기다리는 것을 차단하지 않습니다.
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.3.1
hostname: zookeeper
ports:
- '2181:2181'
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-enterprise-kafka:5.3.1
hostname: kafka
ports:
- '9092:9092'
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://kafka:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
CONFLUENT_METRICS_ENABLE: 'false'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_BOB:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_BOB
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
kafka-create-topics:
image: confluentinc/cp-enterprise-kafka:5.3.1
depends_on:
- zookeeper
- kafka
hostname: kafka-create-topics
volumes:
- './kafka-sample-generator.sh:/bin/kafka-sample-generator.sh'
command: "/bin/kafka-sample-generator.sh"
environment:
KAFKA_TOPIC: topic
KAFKA_BROKER: kafka
#!/bin/sh
HOST=`host $KAFKA_BROKER | awk '/has address/ { print $4 ; exit }'`
## Wait until Kafka is ready then create demo topic
echo 'Waiting for Kafka to be ready...'
cub kafka-ready -b $HOST:9092 1 20 && \
sleep 1
echo "Creating Topic [$HOST:9092 <topic:'$KAFKA_TOPIC'>]"
kafka-topics --create --topic $KAFKA_TOPIC --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1
sleep 1
echo "Availalbe Topics"
kafka-topics --list --zookeeper zookeeper:2181
sleep 1
## Emit sample data stream
while true
do echo "Sending Data [$HOST:9092 <topic:'$KAFKA_TOPIC'>]"
for i in `seq 1 10`;
do
echo "$HOST"
echo "$DATA"
DATA="{\"data\":\"sample-data-$i\"}"
echo "$DATA" | kafka-console-producer \
--broker-list $HOST:9092 \
--topic $KAFKA_TOPIC
done
sleep 1.0
echo ''
echo "Receiving Data [$HOST:9092 <topic:'$KAFKA_TOPIC'>]"
kafka-console-consumer \
--bootstrap-server $HOST:9092 \
--topic $KAFKA_TOPIC \
--from-beginning \
--max-messages 10
done