本文主要是介绍node读取卡夫卡,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
https://www.npmjs.com/package/no-kafka
测试可用
node 代码
//下面代码测试可用
let Kafka = require('no-kafka');
let settings=require('../settings');
let DBProvider = require("../models/db.js").DBProvider;
let DeviceItem = require("../models/deviceitem.js");
let async=require('async');
let db = new DBProvider(settings);
DeviceItem.setdb(db);
let consumer = new Kafka.SimpleConsumer({connectionString: '169.60.8.115:9092,169.60.8.109:9092,169.48.170.237:9092'}); //测试环境
//let consumer = new Kafka.SimpleConsumer({connectionString: '10.177.210.87:9092,10.177.210.82:9092,10.177.184.101:9092'}); //线上// data handler function can return a Promise
let kafka_data=[];
let collection_name="kafka_zaful";
let referRegex=/"refer":"(.*?)","lkid/;let dataHandler = function (messageSet, topic, partition) {messageSet.forEach(function (message) {console.log(topic, partition, message.offset, message.message.value.toString('utf8'));let str=message.message.value.toString('utf8');//console.log('kafka_data.length=',kafka_data.length);let item;try {item=JSON.parse(str);} catch (e) {let obj=referRegex.exec(str);let repl_str='"refer":"'+obj[1].replace(/"/g,"'").replace(/\\/g,'')+'","lkid';try {item=JSON.parse(str.replace(referRegex,repl_str));} catch (e2) {console.log('!!!!!');console.log(str);}}if (item) {kafka_data.push(item);saveKafkaData();} else {console.log('!!! item null');}});
};return consumer.init().then(function () {// Subscribe partitons 0 and 1 in a topic:return consumer.subscribe('cdn-etl-zaful-com', [0,1,2,3,4,5], {offset: 0, groupId: 'cdn-zaful-local20'}, dataHandler);
});
//另一个模块 (测试不可用)
// let ServiceRouter = require('../service-router/serviceRouter.js');
function toKafka() {let kafka = require('kafka-node');let Consumer = kafka.Consumer;//let client = new kafka.Client('10.177.210.87:9092,10.177.210.82:9092,10.177.184.101:9092');//线上let client = new kafka.Client('169.60.8.115:9092,169.60.8.109:9092,169.48.170.237:9092'); //测试let Offset = kafka.Offset;let offset = new Offset(client);console.log('连接kafka中');
// let topics = [{
// topic: 'cdn-etl-zaful-com', partition: 0, offset: 0
// }, {
// topic: 'cdn-etl-zaful-com', partition: 1, offset: 310
// }, {
// topic: 'cdn-etl-zaful-com', partition: 2, offset: 20103
// }, {
// topic: 'cdn-etl-zaful-com', partition: 3, offset: 42055
// }];
// let options = {
// // Auto commit config
// autoCommit: true,
// autoCommitMsgCount: 100,
// autoCommitIntervalMs: 1000,
// // Fetch message config
// fetchMaxWaitMs: 100,
// fetchMinBytes: 1,
// fetchMaxBytes: 1024 * 10,
// fromOffset: true,
// fromBeginning: true
// };let argv = {topic: "cdn-etl-zaful-com"};let topic = argv.topic || 'cdn-etl-zaful-com';let topics = [{topic: topic,partition:0}],options = {groupId: 'cdn-test',autoCommit: true,autoCommitIntervalMs:1000,sessionTimeout:30000,// fetchMaxWaitMs: 1000,// fetchMaxBytes: 1024 * 1024,// fromOffset: true,// fromBeginning: true};console.log('create ');// console.log('topics=',topics);// console.log('options=',options);let consumer = new Consumer(client,topics,options);console.log('listen ');consumer.connect();// consumer.resume()consumer.on('connect', function () {console.log('connect');client.loadMetadataForTopics([], function (error, results) {if (error) {return console.error(error);}console.log('%j', _.get(results, '1.metadata'));});});consumer.on('message', function (message) {console.log(message);let key = message.key.toString();console.log(key);if (key !== -1) {console.log(message);try {let msg = JSON.parse(message.value);// ServiceRouter.dispatch(key, msg);} catch (e) {console.log(e)}} else {console.log(message)}});consumer.on('offsetOutOfRange', function (topic) {console.log('topic =',topic);console.log("------------- offsetOutOfRange ------------");topic.maxNum = 2;offset.fetch([topic], function (err, offsets) {console.log(offsets);let min = Math.min.apply(null, offsets[topic.topic][topic.partition]);consumer.setOffset(topic.topic, topic.partition, min);});});consumer.on('error', function (message) {console.log(message);console.log('kafka错误');});
}
module.exports = toKafka;
Java
需要构建一个maven项目
import java.util.Arrays;
import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import com.alibaba.fastjson.JSON;public class KafkaConsumerTest
{private static final String BOOTSTRAPSERVERS = "169.60.8.115:9092,169.60.8.109:9092,169.48.170.237:9092";//private static final String BOOTSTRAPSERVERS = "10.177.210.87:9092,10.177.210.82:9092,10.177.184.101:9092";private static final String TOPIC = "cdn-etl-zaful-com";private static Properties getPropsOfConsumer(){Properties props = new Properties();props.put("bootstrap.servers", BOOTSTRAPSERVERS);props.put("group.id", "cdn-local-1");props.put("auto.offset.reset", "earliest");//earliestprops.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return props;}public static void main(String[] args){System.out.println("开始读");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(getPropsOfConsumer());consumer.subscribe(Arrays.asList(TOPIC));while (true){//System.out.println("reading...");ConsumerRecords<String, String> records = consumer.poll(100);//System.out.println("records=");
/* if (records!=null) {System.out.println(records);}*/for (ConsumerRecord<String, String> record : records){System.out.println(record.value());}}}}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>KafkaConsumer</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.10.1.1</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.6.6</version></dependency><dependency><groupId>net.sf.json-lib</groupId><artifactId>json-lib</artifactId><version>2.4</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.39</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.6.6</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.16</version></dependency></dependencies>
</project>
这篇关于node读取卡夫卡的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!