我的docker-compose.yml代码是:

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

而我的dockerfile是

FROM python
MAINTAINER Shubham Joshi 
ADD hello.py /
ADD transactions_producer.py /
COPY requirements.txt .
RUN pip3 install -r requirements.txt
CMD ["python","./transactions_producer.py"]

producer.py的python代码是:

from kafka import KafkaProducer
from time import sleep
from kafka.errors import KafkaError
import json

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],api_version=(0, 10, 0),value_serializer=lambda v: json.dumps(v).encode('utf-8'))


for i in range(10):
    producer.send('shubham', create_random_transaction())
    sleep(5)
    print("Success",i)

producer.close()

shubham是我创建的主题,但是我无法在该主题中使用produce/publish消息并收到此超时错误。

第1步:创建图片:

docker build -t python_producer2 .

它成功建立,我跑了

step 2:docker run python_producer2

这是我收到此错误的地方

Traceback (most recent call last):
  File "./transactions_producer.py", line 52, in <module>
    producer.send('shubham', create_random_transaction())
  File "/usr/local/lib/python3.8/site-packages/kafka/producer/kafka.py", line 564, in send
    self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
  File "/usr/local/lib/python3.8/site-packages/kafka/producer/kafka.py", line 690, in _wait_on_metadata
    raise Errors.KafkaTimeoutError(
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
分析解答

当您的代码在Docker中运行时,localhost表示Docker容器本身。这就是您的连接超时的原因,因为Kafka不在容器上运行。

首先要做的是将您的容器与代理放置在相同的Docker网络上。将其添加到Docker Compose中,或者在执行docker run时指定Docker Compose的网络。前者可能更容易:

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  client:
    image: python_producer2
    container_name: python_producer2
    depends_on: 
      - kafka

但是,由于您的客户端正在连接到另一台服务器上的Kafka(由于容器实际上是在单独的服务器上),因此它不能使用localhost,也不能连接到将localhost作为advertised.listener返回的代理上的侦听器。因此,将代码更改为:

producer = KafkaProducer(bootstrap_servers=['kafka:29092'],api_version=(0, 10, 0),value_serializer=lambda v: json.dumps(v).encode('utf-8'))

这将告诉它在kafka:29092找到代理,并在29092上找到将kafka:29092作为advertised.listener公开的侦听器。要了解有关播发的侦听器的更多信息,请参见https://rmoff.net/2018/08/02/kafka-listeners-explained/