node读取卡夫卡

2024-05-06 18:18
文章标签 读取 node 卡夫卡

本文主要是介绍node读取卡夫卡,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

https://www.npmjs.com/package/no-kafka
测试可用
node 代码
//下面代码测试可用

let Kafka = require('no-kafka');
let settings=require('../settings');
let DBProvider = require("../models/db.js").DBProvider;
let DeviceItem = require("../models/deviceitem.js");
let async=require('async');
let db = new DBProvider(settings);
DeviceItem.setdb(db);
let consumer = new Kafka.SimpleConsumer({connectionString: '169.60.8.115:9092,169.60.8.109:9092,169.48.170.237:9092'}); //测试环境
//let consumer = new Kafka.SimpleConsumer({connectionString: '10.177.210.87:9092,10.177.210.82:9092,10.177.184.101:9092'}); //线上// data handler function can return a Promise
let kafka_data=[];
let collection_name="kafka_zaful";
let referRegex=/"refer":"(.*?)","lkid/;let dataHandler = function (messageSet, topic, partition) {messageSet.forEach(function (message) {console.log(topic, partition, message.offset, message.message.value.toString('utf8'));let str=message.message.value.toString('utf8');//console.log('kafka_data.length=',kafka_data.length);let item;try {item=JSON.parse(str);} catch (e) {let obj=referRegex.exec(str);let repl_str='"refer":"'+obj[1].replace(/"/g,"'").replace(/\\/g,'')+'","lkid';try {item=JSON.parse(str.replace(referRegex,repl_str));} catch (e2) {console.log('!!!!!');console.log(str);}}if (item) {kafka_data.push(item);saveKafkaData();} else {console.log('!!! item null');}});
};return consumer.init().then(function () {// Subscribe partitons 0 and 1 in a topic:return consumer.subscribe('cdn-etl-zaful-com', [0,1,2,3,4,5], {offset: 0, groupId: 'cdn-zaful-local20'}, dataHandler);
});

//另一个模块 (测试不可用)

// let ServiceRouter = require('../service-router/serviceRouter.js');
function toKafka() {let kafka = require('kafka-node');let Consumer = kafka.Consumer;//let client = new kafka.Client('10.177.210.87:9092,10.177.210.82:9092,10.177.184.101:9092');//线上let client = new kafka.Client('169.60.8.115:9092,169.60.8.109:9092,169.48.170.237:9092'); //测试let Offset = kafka.Offset;let offset = new Offset(client);console.log('连接kafka中');
//     let topics = [{
//         topic: 'cdn-etl-zaful-com', partition: 0, offset: 0
//     }, {
//         topic: 'cdn-etl-zaful-com', partition: 1, offset: 310
//     }, {
//         topic: 'cdn-etl-zaful-com', partition: 2, offset: 20103
//     }, {
//         topic: 'cdn-etl-zaful-com', partition: 3, offset: 42055
//     }];
//     let options = {
// // Auto commit config
//         autoCommit: true,
//         autoCommitMsgCount: 100,
//         autoCommitIntervalMs: 1000,
// // Fetch message config
//         fetchMaxWaitMs: 100,
//         fetchMinBytes: 1,
//         fetchMaxBytes: 1024 * 10,
//         fromOffset: true,
//         fromBeginning: true
//     };let argv = {topic: "cdn-etl-zaful-com"};let topic = argv.topic || 'cdn-etl-zaful-com';let topics = [{topic: topic,partition:0}],options = {groupId: 'cdn-test',autoCommit: true,autoCommitIntervalMs:1000,sessionTimeout:30000,// fetchMaxWaitMs: 1000,// fetchMaxBytes: 1024 * 1024,// fromOffset: true,// fromBeginning: true};console.log('create ');// console.log('topics=',topics);// console.log('options=',options);let consumer = new Consumer(client,topics,options);console.log('listen ');consumer.connect();// consumer.resume()consumer.on('connect', function () {console.log('connect');client.loadMetadataForTopics([], function (error, results) {if (error) {return console.error(error);}console.log('%j', _.get(results, '1.metadata'));});});consumer.on('message', function (message) {console.log(message);let key = message.key.toString();console.log(key);if (key !== -1) {console.log(message);try {let msg = JSON.parse(message.value);// ServiceRouter.dispatch(key, msg);} catch (e) {console.log(e)}} else {console.log(message)}});consumer.on('offsetOutOfRange', function (topic) {console.log('topic =',topic);console.log("------------- offsetOutOfRange ------------");topic.maxNum = 2;offset.fetch([topic], function (err, offsets) {console.log(offsets);let min = Math.min.apply(null, offsets[topic.topic][topic.partition]);consumer.setOffset(topic.topic, topic.partition, min);});});consumer.on('error', function (message) {console.log(message);console.log('kafka错误');});
}
module.exports = toKafka;

Java
需要构建一个maven项目

import java.util.Arrays;
import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import com.alibaba.fastjson.JSON;public class KafkaConsumerTest
{private static final String BOOTSTRAPSERVERS = "169.60.8.115:9092,169.60.8.109:9092,169.48.170.237:9092";//private static final String BOOTSTRAPSERVERS = "10.177.210.87:9092,10.177.210.82:9092,10.177.184.101:9092";private static final String TOPIC = "cdn-etl-zaful-com";private static Properties getPropsOfConsumer(){Properties props = new Properties();props.put("bootstrap.servers", BOOTSTRAPSERVERS);props.put("group.id", "cdn-local-1");props.put("auto.offset.reset", "earliest");//earliestprops.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return props;}public static void main(String[] args){System.out.println("开始读");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(getPropsOfConsumer());consumer.subscribe(Arrays.asList(TOPIC));while (true){//System.out.println("reading...");ConsumerRecords<String, String> records = consumer.poll(100);//System.out.println("records=");
/*          if (records!=null) {System.out.println(records);}*/for (ConsumerRecord<String, String> record : records){System.out.println(record.value());}}}}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>KafkaConsumer</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.10.1.1</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.6.6</version></dependency><dependency><groupId>net.sf.json-lib</groupId><artifactId>json-lib</artifactId><version>2.4</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.39</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.6.6</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.16</version></dependency></dependencies>
</project>

这篇关于node读取卡夫卡的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/965009

相关文章

基于Python实现读取嵌套压缩包下文件的方法

《基于Python实现读取嵌套压缩包下文件的方法》工作中遇到的问题,需要用Python实现嵌套压缩包下文件读取,本文给大家介绍了详细的解决方法,并有相关的代码示例供大家参考,需要的朋友可以参考下... 目录思路完整代码代码优化思路打开外层zip压缩包并遍历文件:使用with zipfile.ZipFil

Node.js 数据库 CRUD 项目示例详解(完美解决方案)

《Node.js数据库CRUD项目示例详解(完美解决方案)》:本文主要介绍Node.js数据库CRUD项目示例详解(完美解决方案),本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考... 目录项目结构1. 初始化项目2. 配置数据库连接 (config/db.js)3. 创建模型 (models/

使用Node.js制作图片上传服务的详细教程

《使用Node.js制作图片上传服务的详细教程》在现代Web应用开发中,图片上传是一项常见且重要的功能,借助Node.js强大的生态系统,我们可以轻松搭建高效的图片上传服务,本文将深入探讨如何使用No... 目录准备工作搭建 Express 服务器配置 multer 进行图片上传处理图片上传请求完整代码示例

解决Java中基于GeoTools的Shapefile读取乱码的问题

《解决Java中基于GeoTools的Shapefile读取乱码的问题》本文主要讨论了在使用Java编程语言进行地理信息数据解析时遇到的Shapefile属性信息乱码问题,以及根据不同的编码设置进行属... 目录前言1、Shapefile属性字段编码的情况:一、Shp文件常见的字符集编码1、System编码

nvm如何切换与管理node版本

《nvm如何切换与管理node版本》:本文主要介绍nvm如何切换与管理node版本问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录nvm切换与管理node版本nvm安装nvm常用命令总结nvm切换与管理node版本nvm适用于多项目同时开发,然后项目适配no

利用Python实现添加或读取Excel公式

《利用Python实现添加或读取Excel公式》Excel公式是数据处理的核心工具,从简单的加减运算到复杂的逻辑判断,掌握基础语法是高效工作的起点,下面我们就来看看如何使用Python进行Excel公... 目录python Excel 库安装Python 在 Excel 中添加公式/函数Python 读取

Python如何实现读取csv文件时忽略文件的编码格式

《Python如何实现读取csv文件时忽略文件的编码格式》我们再日常读取csv文件的时候经常会发现csv文件的格式有多种,所以这篇文章为大家介绍了Python如何实现读取csv文件时忽略文件的编码格式... 目录1、背景介绍2、库的安装3、核心代码4、完整代码1、背景介绍我们再日常读取csv文件的时候经常

Node.js net模块的使用示例

《Node.jsnet模块的使用示例》本文主要介绍了Node.jsnet模块的使用示例,net模块支持TCP通信,处理TCP连接和数据传输,具有一定的参考价值,感兴趣的可以了解一下... 目录简介引入 net 模块核心概念TCP (传输控制协议)Socket服务器TCP 服务器创建基本服务器服务器配置选项服

mac安装nvm(node.js)多版本管理实践步骤

《mac安装nvm(node.js)多版本管理实践步骤》:本文主要介绍mac安装nvm(node.js)多版本管理的相关资料,NVM是一个用于管理多个Node.js版本的命令行工具,它允许开发者在... 目录NVM功能简介MAC安装实践一、下载nvm二、安装nvm三、安装node.js总结NVM功能简介N

C#中读取XML文件的四种常用方法

《C#中读取XML文件的四种常用方法》Xml是Internet环境中跨平台的,依赖于内容的技术,是当前处理结构化文档信息的有力工具,下面我们就来看看C#中读取XML文件的方法都有哪些吧... 目录XML简介格式C#读取XML文件方法使用XmlDocument使用XmlTextReader/XmlTextWr