java 发送数据到flume_flume接收http请求,并将数据写到kafka

2023-10-07 04:40

本文主要是介绍java 发送数据到flume_flume接收http请求,并将数据写到kafka,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

flume接收http请求,并将数据写到kafka,spark消费kafka的数据。是数据采集的经典框架。

直接上flume的配置:

source : http

channel : file

sink : kafka

xx :~/software/flume1.8/conf$ cat http-file-kafka.conf

# example.conf: A single-node Flume configuration

##########

# data example

# use post request, select raw, data example : [{"body" : "{'xx':'xxxxx1'}"}]

# just use the office request demo

#[{

# "headers" : {

# "timestamp" : "434324343",

# "host" : "random_host.example.com"

# "topic" : "venn" # if headers contain topic, will replace the default topic

# },

# "body" : "random_body" # random_body is the message send to channel

# }]

# Name the components on this agent1

agent1.sources = s1

agent1.sinks = k1

agent1.channels = c1

# Describe/configure the source

agent1.sources.s1.type = http

agent1.sources.s1.bind = spring # localhost 只能接收本地请求

agent1.sources.s1.port = 8084 # http的端口

agent1.sources.s1.handler = org.apache.flume.source.http.JSONHandler # 自带的接收http请求的handler

# Describe the sink

agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink # kafkasink

agent1.sinks.k1.kafka.topic = mytopic # topic

agent1.sinks.k1.kafka.bootstrap.servers = localhost:9092 # kafka host and port

agent1.sinks.k1.kafka.flumeBatchSize = 20

agent1.sinks.k1.kafka.producer.acks = 1

agent1.sinks.k1.kafka.producer.linger.ms = 1

agent1.sinks.k1.kafka.producer.compression.type = snappy # 压缩

# Use a channel which buffers events in memory

agent1.channels.c1.type = file

#agent1.channels.c1.capacity = 1000 # 这两个参数要配置,需要配大一点,不然channel满了会报错,http返回503(通道已满)

#agent1.channels.c1.transactionCapacity = 100

agent1.channels.c1.checkpointDir = /opt/flume/checkpoint

agent1.channels.c1.dataDirs = /opt/flume/channel

# Bind the source and sink to the channel

agent1.sources.s1.channels = c1

agent1.sinks.k1.channel = c1

有了flume的配置,下面启动flume:

./bin/flume-ng agent -n agent1 -c conf -f conf/http-to-kafka.properties -Dflume.root.logger=INFO,console

启动之后,就可以发http请求了。

http请求的格式如下:

[{

"headers" : {

"timestamp" : "434324343",

"host" : "random_host.example.com",

"topic" : "xxx"

},

"body" : "random_body"

},

{

"headers" : {

"namenode" : "namenode.example.com",

"datanode" : "random_datanode.example.com"

},

"body" : "really_random_body"

}]

注: http请求的headers中又topic 会替代配置文件中的topic

flume官网文档说:1.8.0版本的flume只支持0.9.x的kafka,不支持0.8.x的kafka了(没测过)

然后就是发数的程序了(自己请求太麻烦了。)

package com.venn.http;

import com.venn.entity.User;

import java.io.BufferedReader;

import java.io.DataOutputStream;

import java.io.IOException;

import java.io.InputStreamReader;

import java.io.UnsupportedEncodingException;

import java.net.HttpURLConnection;

import java.net.MalformedURLException;

import java.net.URL;

import java.util.*;

import org.apache.flume.Context;

import org.apache.flume.Event;

import org.apache.flume.event.JSONEvent;

import com.google.gson.Gson;

import org.apache.flume.source.http.HTTPBadRequestException;

import org.apache.flume.source.http.HTTPSourceHandler;

import javax.servlet.http.HttpServletRequest;

/**

* Created by venn on 19-1-17.

*/

public class HttpDemo {

private static String urlStr = "http://localhost:8084";

private static Random random = new Random();

public static void main(String[] args) throws InterruptedException {

while (true){

String message = new User().toString();

send(message);

// Thread.sleep(1);

}

}

public static void send(String message){

System.out.println("send message : " + message);

try{

//创建连接

URL url = new URL(urlStr);

HttpURLConnection connection = (HttpURLConnection) url.openConnection();

connection.setDoOutput(true);

connection.setDoInput(true);

connection.setRequestMethod("POST");

connection.setUseCaches(false);

connection.setInstanceFollowRedirects(true);

connection.setRequestProperty("Content-Type",

"application/x-www-form-urlencoded");

connection.connect();

//POST请求

DataOutputStream out = new DataOutputStream(

connection.getOutputStream());

JSONEvent jsonEvent = new JSONEvent();

Map header = new HashMap();

header.put("timestamp", System.currentTimeMillis());

header.put("host", "venn");

header.put("topic","venn"+random.nextInt(4));

jsonEvent.setBody(message.getBytes());

jsonEvent.setHeaders(header);

Gson gson = new Gson();

List list = new ArrayList();

list.add(jsonEvent);

out.writeBytes(gson.toJson(list));

out.flush();

out.close();

//读取响应

BufferedReader reader = new BufferedReader(new InputStreamReader(

connection.getInputStream())); // 不会返回数据

int code = connection.getResponseCode();

String lines;

StringBuffer sb = new StringBuffer("");

while ((lines = reader.readLine()) != null) {

lines = new String(lines.getBytes(), "utf-8");

sb.append(lines);

}

System.out.println("code : " + code + ", message : " + sb);

reader.close();

// 断开连接

connection.disconnect();

} catch (MalformedURLException e) {

e.printStackTrace();

} catch (UnsupportedEncodingException e) {

e.printStackTrace();

} catch (IOException e) {

e.printStackTrace();

}

}

}

搞定。。

发数:

145113d51fba9115b1a3d501769313e5.png

kafka接收到的数据:

1e7605f3042aa2b8d28394f2f636cee6.png

注意: 由于在headers中加入了topic参数,实际接收到的数据是在不同的kafka topic中的

.net接收post请求并把数据转为字典格式

public SortedDictionary GetRequestPost() { int i = 0; SortedDictionary

MVC Control 接收post请求的json数据

[HttpPost] public string QueryInvoice() { string stream; using (var sr = new StreamReader(Request.In ...

servlet接收request请求的json数据

此次使用的是alibaba的fastjson:jar包为fastjson-1.2.7.jar 参考:https://www.qingtingip.com/h_229797.html 思路:由于此次接收 ...

FLume监控文件夹,将数据发送给Kafka以及HDFS的配置文件详解

详细配置文件flume-conf.properties如下: ############################################ # producer config ###### ...

将数据写到kafka的topic

package test05 import java.util.Propertiesimport org.apache.kafka.clients.producer.{KafkaProducer, P ...

Struts2 Action接收POST请求JSON数据及其实现解析

一.认识JSON JSON是一种轻量级.基于文本.与语言无关的数据交换格式,可以用文本格式的形式来存储或表示结构化的数据. 二.POST请求与Content-Type: application/jso ...

javaweb Servlet接收Android请求,并返回json数据

1.实现功能 (1)接收http请求 (2)获取Android客户端发送的参数对应的内容 (3)hibernate查询数据库 (4)返回json数据 2.java代码 import EntityCla ...

学习笔记_springmvc返回值、数据写到页面、表单提交、ajax、重定向

数据写到页面 后台往前台传数据 TestController添加 /** * 方法的返回值采用ModelAndView, new ModelAndView("index", map ...

随机推荐

Js添加消息提示数量

接到个新需求,类似以下这种需求,得把它封装成一个插件 后端给返回一个这种数据 var data = [ { key:"020506", num:5 }, { key:"0 ...

SQL Server 统计信息

SELECT * FROM SYS.stats _WA_Sys_00000009_00000062:统计对象的名称.不同的机器名称不同,自动创建的统计信息都以_WA_Sys开头,00000009表示的 ...

Window I/O 完成端口 (Windows I/O Completion Port (IOCP))

相关对象 IO EndPoint, 所有支持重叠IO(overlapped IO)的设备,比如文件,Winsock,管道等. IOCP, IO完成端口内核对象,可以使用API CreateIoComp ...

原创:整理编辑jQuery全部思维导图【附下载地址】

主图 全部图已经打包:下载地址 2. 3. 4. 5. 6. 附上一点简单说明 Dom对象和jquer对象之间的转化 如何将一个jquery对象转换为DOM对象? test是一个span元素 var ...

jqcss选择器

$("p").css("background-color","red"); $(this) 当前 HTML 元素$("p&quot ...

UVA 11300 Spreading the Wealth (数学推导 中位数)

Spreading the Wealth Problem A Communist regime is trying to redistribute wealth in a village. They ...

CKEditor 集成CKFinder集成

lCKEditor原名FckEditor,著名的HTML编辑器,可以在线编辑HTML内容,演示一下.打开.自己人用CKEditor,网友用UBBEditor. l配置参考文档,主要将ckeditor中 ...

UNIX环境高级编程——线程和fork

当线程调用fork时,就为子进程创建了整个进程地址空间的副本.子进程通过继承整个地址空间的副本,也从父进程那里继承了所有互斥量.读写锁和条件变量的状态.如果父进程包含多个线程,子进程在fork返回以后 ...

Reactive Programming

Reactive的表现 Reactive 规范是 JVM Reactive 扩展规范 Reactive Streams JVM,而 Reactive 实现框架则是最典型的实现: Reactive St ...

rman实验——测试备份压缩

oracle rman自带的备份压缩机制,可以有效的压缩备份的大小,降低磁盘的占用率.但是也会因为压缩而消耗更多的系统性能,和增加备份时间.现在就通过实验来看压缩和不压缩的区别. 进行不压缩全备 RM ...

这篇关于java 发送数据到flume_flume接收http请求,并将数据写到kafka的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/156076

相关文章

idea maven编译报错Java heap space的解决方法

《ideamaven编译报错Javaheapspace的解决方法》这篇文章主要为大家详细介绍了ideamaven编译报错Javaheapspace的相关解决方法,文中的示例代码讲解详细,感兴趣的... 目录1.增加 Maven 编译的堆内存2. 增加 IntelliJ IDEA 的堆内存3. 优化 Mave

Java String字符串的常用使用方法

《JavaString字符串的常用使用方法》String是JDK提供的一个类,是引用类型,并不是基本的数据类型,String用于字符串操作,在之前学习c语言的时候,对于一些字符串,会初始化字符数组表... 目录一、什么是String二、如何定义一个String1. 用双引号定义2. 通过构造函数定义三、St

springboot filter实现请求响应全链路拦截

《springbootfilter实现请求响应全链路拦截》这篇文章主要为大家详细介绍了SpringBoot如何结合Filter同时拦截请求和响应,从而实现​​日志采集自动化,感兴趣的小伙伴可以跟随小... 目录一、为什么你需要这个过滤器?​​​二、核心实现:一个Filter搞定双向数据流​​​​三、完整代码

SpringBoot利用@Validated注解优雅实现参数校验

《SpringBoot利用@Validated注解优雅实现参数校验》在开发Web应用时,用户输入的合法性校验是保障系统稳定性的基础,​SpringBoot的@Validated注解提供了一种更优雅的解... 目录​一、为什么需要参数校验二、Validated 的核心用法​1. 基础校验2. php分组校验3

Java Predicate接口定义详解

《JavaPredicate接口定义详解》Predicate是Java中的一个函数式接口,它代表一个判断逻辑,接收一个输入参数,返回一个布尔值,:本文主要介绍JavaPredicate接口的定义... 目录Java Predicate接口Java lamda表达式 Predicate<T>、BiFuncti

Nginx中配置HTTP/2协议的详细指南

《Nginx中配置HTTP/2协议的详细指南》HTTP/2是HTTP协议的下一代版本,旨在提高性能、减少延迟并优化现代网络环境中的通信效率,本文将为大家介绍Nginx配置HTTP/2协议想详细步骤,需... 目录一、HTTP/2 协议概述1.HTTP/22. HTTP/2 的核心特性3. HTTP/2 的优

Spring Security基于数据库的ABAC属性权限模型实战开发教程

《SpringSecurity基于数据库的ABAC属性权限模型实战开发教程》:本文主要介绍SpringSecurity基于数据库的ABAC属性权限模型实战开发教程,本文给大家介绍的非常详细,对大... 目录1. 前言2. 权限决策依据RBACABAC综合对比3. 数据库表结构说明4. 实战开始5. MyBA

AJAX请求上传下载进度监控实现方式

《AJAX请求上传下载进度监控实现方式》在日常Web开发中,AJAX(AsynchronousJavaScriptandXML)被广泛用于异步请求数据,而无需刷新整个页面,:本文主要介绍AJAX请... 目录1. 前言2. 基于XMLHttpRequest的进度监控2.1 基础版文件上传监控2.2 增强版多

使用Python自建轻量级的HTTP调试工具

《使用Python自建轻量级的HTTP调试工具》这篇文章主要为大家详细介绍了如何使用Python自建一个轻量级的HTTP调试工具,文中的示例代码讲解详细,感兴趣的小伙伴可以参考一下... 目录一、为什么需要自建工具二、核心功能设计三、技术选型四、分步实现五、进阶优化技巧六、使用示例七、性能对比八、扩展方向建

Spring Security方法级安全控制@PreAuthorize注解的灵活运用小结

《SpringSecurity方法级安全控制@PreAuthorize注解的灵活运用小结》本文将带着大家讲解@PreAuthorize注解的核心原理、SpEL表达式机制,并通过的示例代码演示如... 目录1. 前言2. @PreAuthorize 注解简介3. @PreAuthorize 核心原理解析拦截与