本文主要是介绍kafka Consumer high-level api 之白名单,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
Kafka提供了两套API给Consumer
- The high-level Consumer API
- The SimpleConsumer API
第一种高度抽象的Consumer API,它使用起来简单、方便,但是对于某些特殊的需求我们可能要用到第二种更底层的API,那么下面来介绍下第一种API:
使用白名单可以适配多个topic的情况。
示例代码:
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.consumer.Whitelist;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.mysite.constant.Constants;
import com.mysite.util.PropertiesUtil;
import com.mysite.util.Utils;public class KafkaConsumer {private static Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);private ConsumerIterator<byte[], byte[]> iterator = null;private static ConsumerConfig consumerConfig;private ConsumerConnector connector = null;private List<KafkaStream<byte[], byte[]>> partitions = null;private Whitelist whitelist = null;private int threads = 0;private String[] topics;private String type;private String topic = null;private String message = null;private MessageAndMetadata<byte[], byte[]> next = null;public KafkaConsumer(Properties props) {String topicStr = props.getProperty("topics");if(topicStr==null||topicStr.trim().length()<=0){throw new NullPointerException("请正确填写TOPIC.");} threads = Integer.parseInt(props.getProperty("threads", "1").trim());consumerConfig = createConsumerConfig(props);// topic的过滤器whitelist = new Whitelist("(" + topicStr + ")");init();}/*** 初始化参数* * @param props* @return*/private static ConsumerConfig createConsumerConfig(Properties props) {logger.info("---init kafka config...");props.put("zookeeper.session.timeout.ms", "30000");props.put("zookeeper.sync.time.ms", "6000");props.put("auto.commit.enable", "true");props.put("auto.commit.interval.ms", "1000");props.put("auto.offset.reset", "largest");return new ConsumerConfig(props);}private void init() {connector = Consumer.createJavaConsumerConnector(consumerConfig);partitions = connector.createMessageStreamsByFilter(whitelist,threads);if (CollectionUtils.isEmpty(partitions)) {logger.info("empty!");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}logger.info("---connect kafka success!");try{for (KafkaStream<byte[], byte[]> partition : partitions) {iterator = partition.iterator();while (iterator.hasNext()) {next = iterator.next();try {message = new String(next.message(), Constants.UTF8);} catch (UnsupportedEncodingException e) {e.printStackTrace();}logger.info(Thread.currentThread()+",partition:"+partition+",offset:" + next.offset() + ",message:" + message);}}}catch (Exception e) {logger.error("run time error:{}",e);close();try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e1) {e1.printStackTrace();}init();}}/*** 销毁资源 未使用* */private void close() {logger.info("close resource...");if (partitions != null)partitions.clear();partitions = null;if (iterator != null)iterator.clearCurrentChunk();iterator = null;if (connector != null)connector.shutdown();connector = null;}/*** 主方法入口* * @param args*/public static void main(String[] args) {FileInputStream fis = null;Properties props = new Properties();Properties kafkaProps = null;Properties syslogProps = null;try {String encode = System.getProperty(Constants.ENCODE, Constants.UTF8).trim();logger.info("encode:{}", encode);String path = System.getProperty(Constants.CONFIG_PATH);logger.info("path:{}", path);if(path==null||path.trim().length()<=0){throw new NullPointerException("请正确填写配置文件路径.");}fis = new FileInputStream(path);props.load(new InputStreamReader(fis, encode));kafkaProps = PropertiesUtil.getProperties(Constants.KAFKA_PREFIX, props);logger.info("kafkaProps:{}", kafkaProps);new KafkaConsumer(kafkaProps);} catch (Exception e) {logger.error("----Runtime error:", e);} finally {if (fis != null) {try {fis.close();} catch (IOException e) {e.printStackTrace();}}if (props != null)props.clear();if (kafkaProps != null)kafkaProps.clear();}}
}
使用到的配置:
zookeeper.connect=192.168.0.25:2181,192.168.0.26:2181
group.id=groupId1
topics=topic1,topic2
threads=2
这篇关于kafka Consumer high-level api 之白名单的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!