node读取卡夫卡

2024-05-06 18:18
文章标签 读取 node 卡夫卡

本文主要是介绍node读取卡夫卡,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

https://www.npmjs.com/package/no-kafka
测试可用
node 代码
//下面代码测试可用

let Kafka = require('no-kafka');
let settings=require('../settings');
let DBProvider = require("../models/db.js").DBProvider;
let DeviceItem = require("../models/deviceitem.js");
let async=require('async');
let db = new DBProvider(settings);
DeviceItem.setdb(db);
let consumer = new Kafka.SimpleConsumer({connectionString: '169.60.8.115:9092,169.60.8.109:9092,169.48.170.237:9092'}); //测试环境
//let consumer = new Kafka.SimpleConsumer({connectionString: '10.177.210.87:9092,10.177.210.82:9092,10.177.184.101:9092'}); //线上// data handler function can return a Promise
let kafka_data=[];
let collection_name="kafka_zaful";
let referRegex=/"refer":"(.*?)","lkid/;let dataHandler = function (messageSet, topic, partition) {messageSet.forEach(function (message) {console.log(topic, partition, message.offset, message.message.value.toString('utf8'));let str=message.message.value.toString('utf8');//console.log('kafka_data.length=',kafka_data.length);let item;try {item=JSON.parse(str);} catch (e) {let obj=referRegex.exec(str);let repl_str='"refer":"'+obj[1].replace(/"/g,"'").replace(/\\/g,'')+'","lkid';try {item=JSON.parse(str.replace(referRegex,repl_str));} catch (e2) {console.log('!!!!!');console.log(str);}}if (item) {kafka_data.push(item);saveKafkaData();} else {console.log('!!! item null');}});
};return consumer.init().then(function () {// Subscribe partitons 0 and 1 in a topic:return consumer.subscribe('cdn-etl-zaful-com', [0,1,2,3,4,5], {offset: 0, groupId: 'cdn-zaful-local20'}, dataHandler);
});

//另一个模块 (测试不可用)

// let ServiceRouter = require('../service-router/serviceRouter.js');
function toKafka() {let kafka = require('kafka-node');let Consumer = kafka.Consumer;//let client = new kafka.Client('10.177.210.87:9092,10.177.210.82:9092,10.177.184.101:9092');//线上let client = new kafka.Client('169.60.8.115:9092,169.60.8.109:9092,169.48.170.237:9092'); //测试let Offset = kafka.Offset;let offset = new Offset(client);console.log('连接kafka中');
//     let topics = [{
//         topic: 'cdn-etl-zaful-com', partition: 0, offset: 0
//     }, {
//         topic: 'cdn-etl-zaful-com', partition: 1, offset: 310
//     }, {
//         topic: 'cdn-etl-zaful-com', partition: 2, offset: 20103
//     }, {
//         topic: 'cdn-etl-zaful-com', partition: 3, offset: 42055
//     }];
//     let options = {
// // Auto commit config
//         autoCommit: true,
//         autoCommitMsgCount: 100,
//         autoCommitIntervalMs: 1000,
// // Fetch message config
//         fetchMaxWaitMs: 100,
//         fetchMinBytes: 1,
//         fetchMaxBytes: 1024 * 10,
//         fromOffset: true,
//         fromBeginning: true
//     };let argv = {topic: "cdn-etl-zaful-com"};let topic = argv.topic || 'cdn-etl-zaful-com';let topics = [{topic: topic,partition:0}],options = {groupId: 'cdn-test',autoCommit: true,autoCommitIntervalMs:1000,sessionTimeout:30000,// fetchMaxWaitMs: 1000,// fetchMaxBytes: 1024 * 1024,// fromOffset: true,// fromBeginning: true};console.log('create ');// console.log('topics=',topics);// console.log('options=',options);let consumer = new Consumer(client,topics,options);console.log('listen ');consumer.connect();// consumer.resume()consumer.on('connect', function () {console.log('connect');client.loadMetadataForTopics([], function (error, results) {if (error) {return console.error(error);}console.log('%j', _.get(results, '1.metadata'));});});consumer.on('message', function (message) {console.log(message);let key = message.key.toString();console.log(key);if (key !== -1) {console.log(message);try {let msg = JSON.parse(message.value);// ServiceRouter.dispatch(key, msg);} catch (e) {console.log(e)}} else {console.log(message)}});consumer.on('offsetOutOfRange', function (topic) {console.log('topic =',topic);console.log("------------- offsetOutOfRange ------------");topic.maxNum = 2;offset.fetch([topic], function (err, offsets) {console.log(offsets);let min = Math.min.apply(null, offsets[topic.topic][topic.partition]);consumer.setOffset(topic.topic, topic.partition, min);});});consumer.on('error', function (message) {console.log(message);console.log('kafka错误');});
}
module.exports = toKafka;

Java
需要构建一个maven项目

import java.util.Arrays;
import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import com.alibaba.fastjson.JSON;public class KafkaConsumerTest
{private static final String BOOTSTRAPSERVERS = "169.60.8.115:9092,169.60.8.109:9092,169.48.170.237:9092";//private static final String BOOTSTRAPSERVERS = "10.177.210.87:9092,10.177.210.82:9092,10.177.184.101:9092";private static final String TOPIC = "cdn-etl-zaful-com";private static Properties getPropsOfConsumer(){Properties props = new Properties();props.put("bootstrap.servers", BOOTSTRAPSERVERS);props.put("group.id", "cdn-local-1");props.put("auto.offset.reset", "earliest");//earliestprops.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return props;}public static void main(String[] args){System.out.println("开始读");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(getPropsOfConsumer());consumer.subscribe(Arrays.asList(TOPIC));while (true){//System.out.println("reading...");ConsumerRecords<String, String> records = consumer.poll(100);//System.out.println("records=");
/*          if (records!=null) {System.out.println(records);}*/for (ConsumerRecord<String, String> record : records){System.out.println(record.value());}}}}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>KafkaConsumer</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.10.1.1</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.6.6</version></dependency><dependency><groupId>net.sf.json-lib</groupId><artifactId>json-lib</artifactId><version>2.4</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.39</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.6.6</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.16</version></dependency></dependencies>
</project>

这篇关于node读取卡夫卡的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/965009

相关文章

Node.js学习记录(二)

目录 一、express 1、初识express 2、安装express 3、创建并启动web服务器 4、监听 GET&POST 请求、响应内容给客户端 5、获取URL中携带的查询参数 6、获取URL中动态参数 7、静态资源托管 二、工具nodemon 三、express路由 1、express中路由 2、路由的匹配 3、路由模块化 4、路由模块添加前缀 四、中间件

Node Linux相关安装

下载经编译好的文件cd /optwget https://nodejs.org/dist/v10.15.3/node-v10.15.3-linux-x64.tar.gztar -xvf node-v10.15.3-linux-x64.tar.gzln -s /opt/node-v10.15.3-linux-x64/bin/npm /usr/local/bin/ln -s /opt/nod

matlab读取NC文件(含group)

matlab读取NC文件(含group): NC文件数据结构: 代码: % 打开 NetCDF 文件filename = 'your_file.nc'; % 替换为你的文件名% 使用 netcdf.open 函数打开文件ncid = netcdf.open(filename, 'NC_NOWRITE');% 查看文件中的组% 假设我们想读取名为 "group1" 的组groupName

argodb自定义函数读取hdfs文件的注意点,避免FileSystem已关闭异常

一、问题描述 一位同学反馈,他写的argo存过中调用了一个自定义函数,函数会加载hdfs上的一个文件,但有些节点会报FileSystem closed异常,同时有时任务会成功,有时会失败。 二、问题分析 argodb的计算引擎是基于spark的定制化引擎,对于自定义函数的调用跟hive on spark的是一致的。udf要通过反射生成实例,然后迭代调用evaluate。通过代码分析,udf在

下载/保存/读取 文件,并转成流输出

最近对文件的操作又熟悉了下;现在记载下来:学习在于 坚持!!!不以细小而不为。 实现的是:文件的下载、文件的保存到SD卡、文件的读取输出String 类型、最后是文件转换成流输出;一整套够用了; 重点: 1:   操作网络要记得开线程; 2:更新网络获取的数据 切记用Handler机制; 3:注意代码的可读性(这里面只是保存到SD卡,在项目中切记要对SD卡的有无做判断,然后再获取路径!)

ROS1 + Realsense d455 固件安装+读取rostopic数据

目录 安装固件(一定要匹配)ROS1 wrapper 安装方法Realsense SDK 安装方法Realsense Firmware 安装方法 修改roslaunch配置文件,打开双目图像和IMU数据其他坑点参考链接 安装固件(一定要匹配) 如果你是使用ROS1获取realsense数据的话,一定要注意,SDK, Firmware的版本不是越新越好!!,这是因为intel已经不

Python批量读取身份证信息录入系统和重命名

前言 大家好, 如果你对自动化处理身份证图片感兴趣,可以尝试以下操作:从身份证图片中快速提取信息,填入表格并提交到网页系统。如果你无法完成这个任务,我们将在“Python自动化办公2.0”课程中详细讲解实现整个过程。 实现过程概述: 模块与功能: re 模块:用于从 OCR 识别出的文本中提取所需的信息。 日期模块:计算年龄。 pandas:处理和操作表格数据。 PaddleOCR:百度的

在Debian 8上安装Node.js的方法

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。 简介 Node.js 是一个通用编程的 JavaScript 平台,允许用户快速构建网络应用程序。通过在前端和后端都使用 JavaScript,开发可以更加一致,并且可以在同一个系统中设计。 在本指南中,您将在 Debian 8 服务器上安装 Node.js。Debian 8 包含一个版本的

java读取resource/通过文件名获取文件类型

java读取resource java读取resource目录下文件的方法: 借助Guava库的Resource类 Resources.getResource("test.txt") 通过文件名获取文件类型 mongodb java

Unity数据持久化 之 一个通过2进制读取Excel并存储的轮子(4)

本文仅作笔记学习和分享,不用做任何商业用途 本文包括但不限于unity官方手册,unity唐老狮等教程知识,如有不足还请斧正​​ Unity数据持久化 之 一个通过2进制读取Excel并存储的轮子(3)-CSDN博客  这节就是真正的存储数据了   理清一下思路: 1.存储路径并检查 //2进制文件类存储private static string Data_Binary_Pa