我试图将数据读入使用与下面的配置卡夫卡连接器的RabbitMQ的队列我演讲的题目:
{
"name" : "RabbitMQSourceConnector1",
"config" : {
"connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
"tasks.max" : "1",
"kafka.topic" : "rabbitmqtest3",
"rabbitmq.queue" : "taskqueue",
"rabbitmq.host" : "localhost",
"rabbitmq.username" : "guest",
"rabbitmq.password" : "guest",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true"
}
}
但I'm有麻烦源数据流转换为JSON格式I'm丢失原始消息时
原版的:
{'id': 0, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
收稿日期:
{"schema":{"type":"bytes","optional":false},"payload":"eyJpZCI6IDEsICJib2R5IjogIjAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMCJ9"}
有没有人有一个想法,为什么发生这种情况?
编辑:我尝试使用"value.converter"的消息转换为String:"org.apache.kafka.connect.storage.StringConverter",但结果是一样的:
11/27/19 4:07:37 PM CET , 0 , [B@1583a488
EDIT2:
I'm现在收到JSON文件,但内容是BASE64编码仍然
关于如何将其直接转换回UTF8任何想法?
{
"name": "adls-gen2-sink",
"config": {
"connector.class":"io.confluent.connect.azure.datalake.gen2.AzureDataLakeGen2SinkConnector",
"tasks.max":"1",
"topics":"rabbitmqtest3",
"flush.size":"3",
"format.class":"io.confluent.connect.azure.storage.format.json.JsonFormat",
"value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter",
"internal.value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"topics.dir":"sw66jsoningest",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.replication.factor":"1",
"partitioner.class" : "io.confluent.connect.storage.partitioner.DefaultPartitioner"
}
}
更新:
我得到了解决,考虑到这个流程:
消息(JSON) -->的RabbitMQ (ByteArray) -->卡夫卡(ByteArray) --> ADLS (JSON)
我用在RabbitMQ的卡夫卡连接器该转换器将消息从解码的Base64到UTF8。
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
后来我治疗的消息作为String并将其保存为一个JSON。
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"format.class":"io.confluent.connect.azure.storage.format.json.JsonFormat",
非常感谢!
分析解答
如果设置schemas.enable": "false"
,你不应该获取模式和有效载荷领域
如果你不想要的翻译在所有的发生,使用ByteArrayConverter
如果您的数据只是一个简单的string(包括JSON),字符串转换使用
目前尚不清楚你printing得到的消息,但看起来像你printing的字节数组,而不是如何解码到String