java 发送数据到flume_flume接收http请求,并将数据写到kafka

2023-10-07 04:40

本文主要是介绍java 发送数据到flume_flume接收http请求,并将数据写到kafka,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

flume接收http请求,并将数据写到kafka,spark消费kafka的数据。是数据采集的经典框架。

直接上flume的配置:

source : http

channel : file

sink : kafka

xx :~/software/flume1.8/conf$ cat http-file-kafka.conf

# example.conf: A single-node Flume configuration

##########

# data example

# use post request, select raw, data example : [{"body" : "{'xx':'xxxxx1'}"}]

# just use the office request demo

#[{

# "headers" : {

# "timestamp" : "434324343",

# "host" : "random_host.example.com"

# "topic" : "venn" # if headers contain topic, will replace the default topic

# },

# "body" : "random_body" # random_body is the message send to channel

# }]

# Name the components on this agent1

agent1.sources = s1

agent1.sinks = k1

agent1.channels = c1

# Describe/configure the source

agent1.sources.s1.type = http

agent1.sources.s1.bind = spring # localhost 只能接收本地请求

agent1.sources.s1.port = 8084 # http的端口

agent1.sources.s1.handler = org.apache.flume.source.http.JSONHandler # 自带的接收http请求的handler

# Describe the sink

agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink # kafkasink

agent1.sinks.k1.kafka.topic = mytopic # topic

agent1.sinks.k1.kafka.bootstrap.servers = localhost:9092 # kafka host and port

agent1.sinks.k1.kafka.flumeBatchSize = 20

agent1.sinks.k1.kafka.producer.acks = 1

agent1.sinks.k1.kafka.producer.linger.ms = 1

agent1.sinks.k1.kafka.producer.compression.type = snappy # 压缩

# Use a channel which buffers events in memory

agent1.channels.c1.type = file

#agent1.channels.c1.capacity = 1000 # 这两个参数要配置,需要配大一点,不然channel满了会报错,http返回503(通道已满)

#agent1.channels.c1.transactionCapacity = 100

agent1.channels.c1.checkpointDir = /opt/flume/checkpoint

agent1.channels.c1.dataDirs = /opt/flume/channel

# Bind the source and sink to the channel

agent1.sources.s1.channels = c1

agent1.sinks.k1.channel = c1

有了flume的配置,下面启动flume:

./bin/flume-ng agent -n agent1 -c conf -f conf/http-to-kafka.properties -Dflume.root.logger=INFO,console

启动之后,就可以发http请求了。

http请求的格式如下:

[{

"headers" : {

"timestamp" : "434324343",

"host" : "random_host.example.com",

"topic" : "xxx"

},

"body" : "random_body"

},

{

"headers" : {

"namenode" : "namenode.example.com",

"datanode" : "random_datanode.example.com"

},

"body" : "really_random_body"

}]

注: http请求的headers中又topic 会替代配置文件中的topic

flume官网文档说:1.8.0版本的flume只支持0.9.x的kafka,不支持0.8.x的kafka了(没测过)

然后就是发数的程序了(自己请求太麻烦了。)

package com.venn.http;

import com.venn.entity.User;

import java.io.BufferedReader;

import java.io.DataOutputStream;

import java.io.IOException;

import java.io.InputStreamReader;

import java.io.UnsupportedEncodingException;

import java.net.HttpURLConnection;

import java.net.MalformedURLException;

import java.net.URL;

import java.util.*;

import org.apache.flume.Context;

import org.apache.flume.Event;

import org.apache.flume.event.JSONEvent;

import com.google.gson.Gson;

import org.apache.flume.source.http.HTTPBadRequestException;

import org.apache.flume.source.http.HTTPSourceHandler;

import javax.servlet.http.HttpServletRequest;

/**

* Created by venn on 19-1-17.

*/

public class HttpDemo {

private static String urlStr = "http://localhost:8084";

private static Random random = new Random();

public static void main(String[] args) throws InterruptedException {

while (true){

String message = new User().toString();

send(message);

// Thread.sleep(1);

}

}

public static void send(String message){

System.out.println("send message : " + message);

try{

//创建连接

URL url = new URL(urlStr);

HttpURLConnection connection = (HttpURLConnection) url.openConnection();

connection.setDoOutput(true);

connection.setDoInput(true);

connection.setRequestMethod("POST");

connection.setUseCaches(false);

connection.setInstanceFollowRedirects(true);

connection.setRequestProperty("Content-Type",

"application/x-www-form-urlencoded");

connection.connect();

//POST请求

DataOutputStream out = new DataOutputStream(

connection.getOutputStream());

JSONEvent jsonEvent = new JSONEvent();

Map header = new HashMap();

header.put("timestamp", System.currentTimeMillis());

header.put("host", "venn");

header.put("topic","venn"+random.nextInt(4));

jsonEvent.setBody(message.getBytes());

jsonEvent.setHeaders(header);

Gson gson = new Gson();

List list = new ArrayList();

list.add(jsonEvent);

out.writeBytes(gson.toJson(list));

out.flush();

out.close();

//读取响应

BufferedReader reader = new BufferedReader(new InputStreamReader(

connection.getInputStream())); // 不会返回数据

int code = connection.getResponseCode();

String lines;

StringBuffer sb = new StringBuffer("");

while ((lines = reader.readLine()) != null) {

lines = new String(lines.getBytes(), "utf-8");

sb.append(lines);

}

System.out.println("code : " + code + ", message : " + sb);

reader.close();

// 断开连接

connection.disconnect();

} catch (MalformedURLException e) {

e.printStackTrace();

} catch (UnsupportedEncodingException e) {

e.printStackTrace();

} catch (IOException e) {

e.printStackTrace();

}

}

}

搞定。。

发数:

145113d51fba9115b1a3d501769313e5.png

kafka接收到的数据:

1e7605f3042aa2b8d28394f2f636cee6.png

注意: 由于在headers中加入了topic参数,实际接收到的数据是在不同的kafka topic中的

.net接收post请求并把数据转为字典格式

public SortedDictionary GetRequestPost() { int i = 0; SortedDictionary

MVC Control 接收post请求的json数据

[HttpPost] public string QueryInvoice() { string stream; using (var sr = new StreamReader(Request.In ...

servlet接收request请求的json数据

此次使用的是alibaba的fastjson:jar包为fastjson-1.2.7.jar 参考:https://www.qingtingip.com/h_229797.html 思路:由于此次接收 ...

FLume监控文件夹,将数据发送给Kafka以及HDFS的配置文件详解

详细配置文件flume-conf.properties如下: ############################################ # producer config ###### ...

将数据写到kafka的topic

package test05 import java.util.Propertiesimport org.apache.kafka.clients.producer.{KafkaProducer, P ...

Struts2 Action接收POST请求JSON数据及其实现解析

一.认识JSON JSON是一种轻量级.基于文本.与语言无关的数据交换格式,可以用文本格式的形式来存储或表示结构化的数据. 二.POST请求与Content-Type: application/jso ...

javaweb Servlet接收Android请求,并返回json数据

1.实现功能 (1)接收http请求 (2)获取Android客户端发送的参数对应的内容 (3)hibernate查询数据库 (4)返回json数据 2.java代码 import EntityCla ...

学习笔记_springmvc返回值、数据写到页面、表单提交、ajax、重定向

数据写到页面 后台往前台传数据 TestController添加 /** * 方法的返回值采用ModelAndView, new ModelAndView("index", map ...

随机推荐

Js添加消息提示数量

接到个新需求,类似以下这种需求,得把它封装成一个插件 后端给返回一个这种数据 var data = [ { key:"020506", num:5 }, { key:"0 ...

SQL Server 统计信息

SELECT * FROM SYS.stats _WA_Sys_00000009_00000062:统计对象的名称.不同的机器名称不同,自动创建的统计信息都以_WA_Sys开头,00000009表示的 ...

Window I/O 完成端口 (Windows I/O Completion Port (IOCP))

相关对象 IO EndPoint, 所有支持重叠IO(overlapped IO)的设备,比如文件,Winsock,管道等. IOCP, IO完成端口内核对象,可以使用API CreateIoComp ...

原创:整理编辑jQuery全部思维导图【附下载地址】

主图 全部图已经打包:下载地址 2. 3. 4. 5. 6. 附上一点简单说明 Dom对象和jquer对象之间的转化 如何将一个jquery对象转换为DOM对象? test是一个span元素 var ...

jqcss选择器

$("p").css("background-color","red"); $(this) 当前 HTML 元素$("p&quot ...

UVA 11300 Spreading the Wealth (数学推导 中位数)

Spreading the Wealth Problem A Communist regime is trying to redistribute wealth in a village. They ...

CKEditor 集成CKFinder集成

lCKEditor原名FckEditor,著名的HTML编辑器,可以在线编辑HTML内容,演示一下.打开.自己人用CKEditor,网友用UBBEditor. l配置参考文档,主要将ckeditor中 ...

UNIX环境高级编程——线程和fork

当线程调用fork时,就为子进程创建了整个进程地址空间的副本.子进程通过继承整个地址空间的副本,也从父进程那里继承了所有互斥量.读写锁和条件变量的状态.如果父进程包含多个线程,子进程在fork返回以后 ...

Reactive Programming

Reactive的表现 Reactive 规范是 JVM Reactive 扩展规范 Reactive Streams JVM,而 Reactive 实现框架则是最典型的实现: Reactive St ...

rman实验——测试备份压缩

oracle rman自带的备份压缩机制,可以有效的压缩备份的大小,降低磁盘的占用率.但是也会因为压缩而消耗更多的系统性能,和增加备份时间.现在就通过实验来看压缩和不压缩的区别. 进行不压缩全备 RM ...

这篇关于java 发送数据到flume_flume接收http请求,并将数据写到kafka的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/156076

相关文章

在Ubuntu上部署SpringBoot应用的操作步骤

《在Ubuntu上部署SpringBoot应用的操作步骤》随着云计算和容器化技术的普及,Linux服务器已成为部署Web应用程序的主流平台之一,Java作为一种跨平台的编程语言,具有广泛的应用场景,本... 目录一、部署准备二、安装 Java 环境1. 安装 JDK2. 验证 Java 安装三、安装 mys

Springboot的ThreadPoolTaskScheduler线程池轻松搞定15分钟不操作自动取消订单

《Springboot的ThreadPoolTaskScheduler线程池轻松搞定15分钟不操作自动取消订单》:本文主要介绍Springboot的ThreadPoolTaskScheduler线... 目录ThreadPoolTaskScheduler线程池实现15分钟不操作自动取消订单概要1,创建订单后

详谈redis跟数据库的数据同步问题

《详谈redis跟数据库的数据同步问题》文章讨论了在Redis和数据库数据一致性问题上的解决方案,主要比较了先更新Redis缓存再更新数据库和先更新数据库再更新Redis缓存两种方案,文章指出,删除R... 目录一、Redis 数据库数据一致性的解决方案1.1、更新Redis缓存、删除Redis缓存的区别二

JAVA中整型数组、字符串数组、整型数和字符串 的创建与转换的方法

《JAVA中整型数组、字符串数组、整型数和字符串的创建与转换的方法》本文介绍了Java中字符串、字符数组和整型数组的创建方法,以及它们之间的转换方法,还详细讲解了字符串中的一些常用方法,如index... 目录一、字符串、字符数组和整型数组的创建1、字符串的创建方法1.1 通过引用字符数组来创建字符串1.2

Redis事务与数据持久化方式

《Redis事务与数据持久化方式》该文档主要介绍了Redis事务和持久化机制,事务通过将多个命令打包执行,而持久化则通过快照(RDB)和追加式文件(AOF)两种方式将内存数据保存到磁盘,以防止数据丢失... 目录一、Redis 事务1.1 事务本质1.2 数据库事务与redis事务1.2.1 数据库事务1.

SpringCloud集成AlloyDB的示例代码

《SpringCloud集成AlloyDB的示例代码》AlloyDB是GoogleCloud提供的一种高度可扩展、强性能的关系型数据库服务,它兼容PostgreSQL,并提供了更快的查询性能... 目录1.AlloyDBjavascript是什么?AlloyDB 的工作原理2.搭建测试环境3.代码工程1.

Java调用Python代码的几种方法小结

《Java调用Python代码的几种方法小结》Python语言有丰富的系统管理、数据处理、统计类软件包,因此从java应用中调用Python代码的需求很常见、实用,本文介绍几种方法从java调用Pyt... 目录引言Java core使用ProcessBuilder使用Java脚本引擎总结引言python

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

springboot整合 xxl-job及使用步骤

《springboot整合xxl-job及使用步骤》XXL-JOB是一个分布式任务调度平台,用于解决分布式系统中的任务调度和管理问题,文章详细介绍了XXL-JOB的架构,包括调度中心、执行器和Web... 目录一、xxl-job是什么二、使用步骤1. 下载并运行管理端代码2. 访问管理页面,确认是否启动成功

Java中的密码加密方式

《Java中的密码加密方式》文章介绍了Java中使用MD5算法对密码进行加密的方法,以及如何通过加盐和多重加密来提高密码的安全性,MD5是一种不可逆的哈希算法,适合用于存储密码,因为其输出的摘要长度固... 目录Java的密码加密方式密码加密一般的应用方式是总结Java的密码加密方式密码加密【这里采用的