我试图将数据读入使用与下面的配置卡夫卡连接器的RabbitMQ的队列我演讲的题目:

{
  "name" : "RabbitMQSourceConnector1",
  "config" : {
   "connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
   "tasks.max" : "1",
   "kafka.topic" : "rabbitmqtest3",
   "rabbitmq.queue" : "taskqueue",
    "rabbitmq.host" : "localhost",
    "rabbitmq.username" : "guest",
    "rabbitmq.password" : "guest",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true"

  }
}

但I'm有麻烦源数据流转换为JSON格式I'm丢失原始消息时

原版的:

{'id': 0, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}

收稿日期:

{"schema":{"type":"bytes","optional":false},"payload":"eyJpZCI6IDEsICJib2R5IjogIjAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMCJ9"}

有没有人有一个想法,为什么发生这种情况?

编辑:我尝试使用"value.converter"的消息转换为String:"org.apache.kafka.connect.storage.StringConverter",但结果是一样的:

11/27/19 4:07:37 PM CET , 0 , [B@1583a488

EDIT2:

I'm现在收到JSON文件,但内容是BASE64编码仍然

关于如何将其直接转换回UTF8任何想法?

{
 "name": "adls-gen2-sink",
 "config": {
   "connector.class":"io.confluent.connect.azure.datalake.gen2.AzureDataLakeGen2SinkConnector",
   "tasks.max":"1",
   "topics":"rabbitmqtest3",
   "flush.size":"3",
   "format.class":"io.confluent.connect.azure.storage.format.json.JsonFormat",
   "value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter",
   "internal.value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
   "topics.dir":"sw66jsoningest",
   "confluent.topic.bootstrap.servers":"localhost:9092",
   "confluent.topic.replication.factor":"1",
   "partitioner.class" : "io.confluent.connect.storage.partitioner.DefaultPartitioner"

          }
}

更新:

我得到了解决,考虑到这个流程:

消息(JSON) -->的RabbitMQ (ByteArray) -->卡夫卡(ByteArray) --> ADLS (JSON)

我用在RabbitMQ的卡夫卡连接器该转换器将消息从解码的Base64到UTF8。

"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"

后来我治疗的消息作为String并将其保存为一个JSON。

"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"format.class":"io.confluent.connect.azure.storage.format.json.JsonFormat",

非常感谢!

分析解答

如果设置schemas.enable": "false",你不应该获取模式和有效载荷领域

如果你不想要的翻译在所有的发生,使用ByteArrayConverter

如果您的数据只是一个简单的string(包括JSON),字符串转换使用

目前尚不清楚你printing得到的消息,但看起来像你printing的字节数组,而不是如何解码到String