本文主要是介绍flume到kafka动态topic,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
json日志使用拦截器,字段取出放到header里
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractora1.sources.r1.interceptors.i1.regex = "自定义字段":"(\\w+)"
a1.sources.r1.interceptors.i1.serializers = s1
a1.sources.r1.interceptors.i1.serializers.s1.name = topic
topic不要自定义,1.7 源码固定是 "topic",...如果自定义的话..淡淡的忧伤.....
...........
#设置Kafka的Topic
a1.sinks.k1.topic=%{topic}
............
这篇关于flume到kafka动态topic的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!