如何删除报价并像原始格式一样发送数据 原始JSON-format是:

{
  "@timestamp": "2020-06-02T09:38:03.183186Z"
}

此数据在另一个主题中

"{\"@timestamp\": \"2020-05-25T17:40:47.582778Z\"}"

这是在服务器之间发送数据的代码

def parse(d):   
    if str(type(d)) == "<class 'dict'>":       
        return (r)
    return -1

producer = KafkaProducer(bootstrap_servers=param["BOOTSTRAP_SERVERS"],
                                 value_serializer=lambda x: dumps(x).encode('utf-8'))  # utf-8
consumer = KafkaConsumer(bootstrap_servers=param["BOOTSTRAP_SERVERS"]+'1',
                                 auto_offset_reset=param["AUTO_OFFSET_RESET"],
                                 consumer_timeout_ms=param["CONSUMER_TIMEOUT_MS"],
                                 enable_auto_commit=False,
                                 auto_commit_interval_ms=60000,
                                 group_id=param["GROUP_ID"],
                                 client_id=param["CLIENT_ID"]
                                 )
consumer.subscribe([param["TOPIC_IN"]])
 while True:
      num_rows = 0
      for msg in consumer:
          num_rows = num_rows + 1
          m = json.loads(msg.value)
          j = parse(m)
          if j != -1:
             data = json.dumps(j)
             producer.send(param["TOPIC_OUT"], value=data)
分析解答

您当前正在将值序列化为字符串。如果要使用JSON而不是字符串,则需要正确序列化您的值。


以下应该可以解决问题:

import json  

producer = KafkaProducer(
    bootstrap_servers='mykafka-broker',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)