node读取卡夫卡

2024-05-06 18:18
文章标签 读取 node 卡夫卡

本文主要是介绍node读取卡夫卡,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

https://www.npmjs.com/package/no-kafka
测试可用
node 代码
//下面代码测试可用

let Kafka = require('no-kafka');
let settings=require('../settings');
let DBProvider = require("../models/db.js").DBProvider;
let DeviceItem = require("../models/deviceitem.js");
let async=require('async');
let db = new DBProvider(settings);
DeviceItem.setdb(db);
let consumer = new Kafka.SimpleConsumer({connectionString: '169.60.8.115:9092,169.60.8.109:9092,169.48.170.237:9092'}); //测试环境
//let consumer = new Kafka.SimpleConsumer({connectionString: '10.177.210.87:9092,10.177.210.82:9092,10.177.184.101:9092'}); //线上// data handler function can return a Promise
let kafka_data=[];
let collection_name="kafka_zaful";
let referRegex=/"refer":"(.*?)","lkid/;let dataHandler = function (messageSet, topic, partition) {messageSet.forEach(function (message) {console.log(topic, partition, message.offset, message.message.value.toString('utf8'));let str=message.message.value.toString('utf8');//console.log('kafka_data.length=',kafka_data.length);let item;try {item=JSON.parse(str);} catch (e) {let obj=referRegex.exec(str);let repl_str='"refer":"'+obj[1].replace(/"/g,"'").replace(/\\/g,'')+'","lkid';try {item=JSON.parse(str.replace(referRegex,repl_str));} catch (e2) {console.log('!!!!!');console.log(str);}}if (item) {kafka_data.push(item);saveKafkaData();} else {console.log('!!! item null');}});
};return consumer.init().then(function () {// Subscribe partitons 0 and 1 in a topic:return consumer.subscribe('cdn-etl-zaful-com', [0,1,2,3,4,5], {offset: 0, groupId: 'cdn-zaful-local20'}, dataHandler);
});

//另一个模块 (测试不可用)

// let ServiceRouter = require('../service-router/serviceRouter.js');
function toKafka() {let kafka = require('kafka-node');let Consumer = kafka.Consumer;//let client = new kafka.Client('10.177.210.87:9092,10.177.210.82:9092,10.177.184.101:9092');//线上let client = new kafka.Client('169.60.8.115:9092,169.60.8.109:9092,169.48.170.237:9092'); //测试let Offset = kafka.Offset;let offset = new Offset(client);console.log('连接kafka中');
//     let topics = [{
//         topic: 'cdn-etl-zaful-com', partition: 0, offset: 0
//     }, {
//         topic: 'cdn-etl-zaful-com', partition: 1, offset: 310
//     }, {
//         topic: 'cdn-etl-zaful-com', partition: 2, offset: 20103
//     }, {
//         topic: 'cdn-etl-zaful-com', partition: 3, offset: 42055
//     }];
//     let options = {
// // Auto commit config
//         autoCommit: true,
//         autoCommitMsgCount: 100,
//         autoCommitIntervalMs: 1000,
// // Fetch message config
//         fetchMaxWaitMs: 100,
//         fetchMinBytes: 1,
//         fetchMaxBytes: 1024 * 10,
//         fromOffset: true,
//         fromBeginning: true
//     };let argv = {topic: "cdn-etl-zaful-com"};let topic = argv.topic || 'cdn-etl-zaful-com';let topics = [{topic: topic,partition:0}],options = {groupId: 'cdn-test',autoCommit: true,autoCommitIntervalMs:1000,sessionTimeout:30000,// fetchMaxWaitMs: 1000,// fetchMaxBytes: 1024 * 1024,// fromOffset: true,// fromBeginning: true};console.log('create ');// console.log('topics=',topics);// console.log('options=',options);let consumer = new Consumer(client,topics,options);console.log('listen ');consumer.connect();// consumer.resume()consumer.on('connect', function () {console.log('connect');client.loadMetadataForTopics([], function (error, results) {if (error) {return console.error(error);}console.log('%j', _.get(results, '1.metadata'));});});consumer.on('message', function (message) {console.log(message);let key = message.key.toString();console.log(key);if (key !== -1) {console.log(message);try {let msg = JSON.parse(message.value);// ServiceRouter.dispatch(key, msg);} catch (e) {console.log(e)}} else {console.log(message)}});consumer.on('offsetOutOfRange', function (topic) {console.log('topic =',topic);console.log("------------- offsetOutOfRange ------------");topic.maxNum = 2;offset.fetch([topic], function (err, offsets) {console.log(offsets);let min = Math.min.apply(null, offsets[topic.topic][topic.partition]);consumer.setOffset(topic.topic, topic.partition, min);});});consumer.on('error', function (message) {console.log(message);console.log('kafka错误');});
}
module.exports = toKafka;

Java
需要构建一个maven项目

import java.util.Arrays;
import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import com.alibaba.fastjson.JSON;public class KafkaConsumerTest
{private static final String BOOTSTRAPSERVERS = "169.60.8.115:9092,169.60.8.109:9092,169.48.170.237:9092";//private static final String BOOTSTRAPSERVERS = "10.177.210.87:9092,10.177.210.82:9092,10.177.184.101:9092";private static final String TOPIC = "cdn-etl-zaful-com";private static Properties getPropsOfConsumer(){Properties props = new Properties();props.put("bootstrap.servers", BOOTSTRAPSERVERS);props.put("group.id", "cdn-local-1");props.put("auto.offset.reset", "earliest");//earliestprops.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return props;}public static void main(String[] args){System.out.println("开始读");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(getPropsOfConsumer());consumer.subscribe(Arrays.asList(TOPIC));while (true){//System.out.println("reading...");ConsumerRecords<String, String> records = consumer.poll(100);//System.out.println("records=");
/*          if (records!=null) {System.out.println(records);}*/for (ConsumerRecord<String, String> record : records){System.out.println(record.value());}}}}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>KafkaConsumer</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.10.1.1</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.6.6</version></dependency><dependency><groupId>net.sf.json-lib</groupId><artifactId>json-lib</artifactId><version>2.4</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.39</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.6.6</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.16</version></dependency></dependencies>
</project>

这篇关于node读取卡夫卡的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/965009

相关文章

在C#中读取文件的六种主流方法详解

《在C#中读取文件的六种主流方法详解》在C#中读取文件有多种方法,不同方式适用于不同场景(小型文件、大型文件、文本文件或二进制文件),本文给大家介绍了6种主流方法以及其适用场景,需要的朋友可以参考下... 目录方法1:File.ReadAllText(读取整个文本文件)方法2:File.ReadAllLin

Linux下利用select实现串口数据读取过程

《Linux下利用select实现串口数据读取过程》文章介绍Linux中使用select、poll或epoll实现串口数据读取,通过I/O多路复用机制在数据到达时触发读取,避免持续轮询,示例代码展示设... 目录示例代码(使用select实现)代码解释总结在 linux 系统里,我们可以借助 select、

在Node.js中使用.env文件管理环境变量的全过程

《在Node.js中使用.env文件管理环境变量的全过程》Node.js应用程序通常依赖于环境变量来管理敏感信息或配置设置,.env文件已经成为一种流行的本地管理这些变量的方法,本文将探讨.env文件... 目录引言为什么使php用 .env 文件 ?如何在 Node.js 中使用 .env 文件最佳实践引

C#实现SHP文件读取与地图显示的完整教程

《C#实现SHP文件读取与地图显示的完整教程》在地理信息系统(GIS)开发中,SHP文件是一种常见的矢量数据格式,本文将详细介绍如何使用C#读取SHP文件并实现地图显示功能,包括坐标转换、图形渲染、平... 目录概述功能特点核心代码解析1. 文件读取与初始化2. 坐标转换3. 图形绘制4. 地图交互功能缩放

使用Node.js和PostgreSQL构建数据库应用

《使用Node.js和PostgreSQL构建数据库应用》PostgreSQL是一个功能强大的开源关系型数据库,而Node.js是构建高效网络应用的理想平台,结合这两个技术,我们可以创建出色的数据驱动... 目录初始化项目与安装依赖建立数据库连接执行CRUD操作查询数据插入数据更新数据删除数据完整示例与最佳

java读取excel文件为base64实现方式

《java读取excel文件为base64实现方式》文章介绍使用ApachePOI和EasyExcel处理Excel文件并转换为Base64的方法,强调EasyExcel适合大文件且内存占用低,需注意... 目录使用 Apache POI 读取 Excel 并转换为 Base64使用 EasyExcel 处

使用Java读取本地文件并转换为MultipartFile对象的方法

《使用Java读取本地文件并转换为MultipartFile对象的方法》在许多JavaWeb应用中,我们经常会遇到将本地文件上传至服务器或其他系统的需求,在这种场景下,MultipartFile对象非... 目录1. 基本需求2. 自定义 MultipartFile 类3. 实现代码4. 代码解析5. 自定

MySQL 数据库表操作完全指南:创建、读取、更新与删除实战

《MySQL数据库表操作完全指南:创建、读取、更新与删除实战》本文系统讲解MySQL表的增删查改(CURD)操作,涵盖创建、更新、查询、删除及插入查询结果,也是贯穿各类项目开发全流程的基础数据交互原... 目录mysql系列前言一、Create(创建)并插入数据1.1 单行数据 + 全列插入1.2 多行数据

SpringBoot多环境配置数据读取方式

《SpringBoot多环境配置数据读取方式》SpringBoot通过环境隔离机制,支持properties/yaml/yml多格式配置,结合@Value、Environment和@Configura... 目录一、多环境配置的核心思路二、3种配置文件格式详解2.1 properties格式(传统格式)1.

解决pandas无法读取csv文件数据的问题

《解决pandas无法读取csv文件数据的问题》本文讲述作者用Pandas读取CSV文件时因参数设置不当导致数据错位,通过调整delimiter和on_bad_lines参数最终解决问题,并强调正确参... 目录一、前言二、问题复现1. 问题2. 通过 on_bad_lines=‘warn’ 跳过异常数据3