Flink nc -l -p 监听端口测试

2024-06-18 15:44
文章标签 测试 监听 端口 flink nc

本文主要是介绍Flink nc -l -p 监听端口测试,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、9999端口未占用

netstat -apn|grep 9999

2、消息发送端

nc -l -k -p 9999

3、运行

周期性水位线

import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.sql.Timestamp;
import java.util.ArrayList;/*** @description: forMonotonousTimestamps->AscendingTimestampsWatermarks 有序流 -> 自定义断点式水位线(周期延迟时间=0ms)\ * forBoundedOutOfOrderness->BoundedOutOfOrdernessWatermarks 无序流 -> 自定义周期性水位线*/
public class FlinkPeriodicWatermarkGeneratorTestJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//        ArrayList<Event> list = new ArrayList<>();
//        list.add(new Event("ming","www.baidu1.com",1200L));
//        list.add(new Event("xiaohu","www.baidu5.com",1267L));
//        list.add(new Event("ming","www.baidu7.com",4200L));
//        list.add(new Event("xiaohu","www.baidu8.com",5500L));
//
//        DataStreamSource<Event> ds = env.fromCollection(list, BasicTypeInfo.of(Event.class));DataStreamSource<String> dss = env.socketTextStream("test002", 9999);SingleOutputStreamOperator<Event> ds = dss.map(new MapFunction<String, Event>() {@Overridepublic Event map(String value) throws Exception {Event event = new Event();event.toEvent(value);return event;}});
//        ds.print();SingleOutputStreamOperator<Event> watermarks = ds// AscendingTimestampsWatermarks 有序流 查看源码,实际上是延迟时间=0ms的乱序流
//                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()// BoundedOutOfOrdernessWatermarks 无序流 5ms固定延迟时间/表示最大乱序程度 处理乱序流数据.assignTimestampsAndWatermarks(new WatermarkStrategy<Event>() {@Overridepublic TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.getTimestamp();}};}@Overridepublic WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<Event>() {private Long delayTime = 5000L; // 延迟时间private Long maxTs = Long.MIN_VALUE + delayTime + 1L;@Overridepublic void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(event.timestamp, maxTs);// 更新最大时间戳}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线,默认 200ms 调用一次 可以使用 env.getConfig().setAutoWatermarkInterval(60 * 1000L); 调整周期时间 flink时间窗口(左开,右闭]output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}};}});ds.print();env.setParallelism(1);env.execute();}public static class Event{String user;String url;Long timestamp;public Event(){}public Event(String user, String url, Long timestamp) {this.user = user;this.url = url;this.timestamp = timestamp;}public String getUser() {return user;}public String getUrl() {return url;}public Long getTimestamp() {return timestamp;}@Overridepublic String toString() {return "Event{" +"user='" + user + '\'' +", url='" + url + '\'' +", timestamp=" + new Timestamp(timestamp) +'}';}public void toEvent(String val){JSONObject js = JSONObject.parseObject(val);this.user = js.getString("user");this.url = js.getString("url");this.timestamp = js.getLong("timestamp");}}
}

断点式水位线

import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.sql.Timestamp;
import java.util.ArrayList;/*** @description: forMonotonousTimestamps->AscendingTimestampsWatermarks 有序流 -> 自定义断点式水位线(周期延迟时间=0ms)\* forBoundedOutOfOrderness->BoundedOutOfOrdernessWatermarks 无序流 -> 自定义周期性水位线*/
public class FlinkPunctuatedWatermarkGeneratorTestJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dss = env.socketTextStream("test002", 9999);SingleOutputStreamOperator<Event> ds = dss.map(new MapFunction<String, Event>() {@Overridepublic Event map(String value) throws Exception {Event event = new Event();event.toEvent(value);return event;}});
//        ds.print();SingleOutputStreamOperator<Event> watermarks = ds// AscendingTimestampsWatermarks 有序流 查看源码,实际上是延迟时间=0ms的乱序流
//                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()// BoundedOutOfOrdernessWatermarks 无序流 5ms固定延迟时间/表示最大乱序程度 处理乱序流数据.assignTimestampsAndWatermarks(new WatermarkStrategy<Event>() {@Overridepublic TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.getTimestamp();}};}@Overridepublic WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<Event>() {@Overridepublic void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {// 只有在遇到特定的 itemId 时,才发出水位线if (event.getUser().equals("Biu")) {output.emitWatermark(new Watermark(event.getTimestamp() - 1));}}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 不需要做任何事情,因为我们在 onEvent 方法中发射了水位线}};}});ds.print();env.setParallelism(1);env.execute();}public static class Event{String user;String url;Long timestamp;public Event(){}public Event(String user, String url, Long timestamp) {this.user = user;this.url = url;this.timestamp = timestamp;}public String getUser() {return user;}public String getUrl() {return url;}public Long getTimestamp() {return timestamp;}@Overridepublic String toString() {return "Event{" +"user='" + user + '\'' +", url='" + url + '\'' +", timestamp=" + new Timestamp(timestamp) +'}';}public void toEvent(String val){JSONObject js = JSONObject.parseObject(val);this.user = js.getString("user");this.url = js.getString("url");this.timestamp = js.getLong("timestamp");}}
}

4、打印

3> Event{user='ming', url='www.baidu1.com', timestamp=1970-01-01 08:00:01.2}
4> Event{user='xiaohu', url='www.baidu5.com', timestamp=1970-01-01 08:00:01.267}
5> Event{user='ming', url='www.baidu7.com', timestamp=1970-01-01 08:00:04.2}
6> Event{user='xiaohu', url='www.baidu8.com', timestamp=1970-01-01 08:00:05.5}

参考:

【Flink】Flink 中的时间和窗口之水位线(Watermark)-CSDN博客

Flink watermark_nc -lp 9999-CSDN博客

NoteWarehouse/05_BigData/09_Flink(1).md at main · FGL12321/NoteWarehouse · GitHub

这篇关于Flink nc -l -p 监听端口测试的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1072418

相关文章

性能测试介绍

性能测试是一种测试方法,旨在评估系统、应用程序或组件在现实场景中的性能表现和可靠性。它通常用于衡量系统在不同负载条件下的响应时间、吞吐量、资源利用率、稳定性和可扩展性等关键指标。 为什么要进行性能测试 通过性能测试,可以确定系统是否能够满足预期的性能要求,找出性能瓶颈和潜在的问题,并进行优化和调整。 发现性能瓶颈:性能测试可以帮助发现系统的性能瓶颈,即系统在高负载或高并发情况下可能出现的问题

字节面试 | 如何测试RocketMQ、RocketMQ?

字节面试:RocketMQ是怎么测试的呢? 答: 首先保证消息的消费正确、设计逆向用例,在验证消息内容为空等情况时的消费正确性; 推送大批量MQ,通过Admin控制台查看MQ消费的情况,是否出现消费假死、TPS是否正常等等问题。(上述都是临场发挥,但是RocketMQ真正的测试点,还真的需要探讨) 01 先了解RocketMQ 作为测试也是要简单了解RocketMQ。简单来说,就是一个分

【测试】输入正确用户名和密码,点击登录没有响应的可能性原因

目录 一、前端问题 1. 界面交互问题 2. 输入数据校验问题 二、网络问题 1. 网络连接中断 2. 代理设置问题 三、后端问题 1. 服务器故障 2. 数据库问题 3. 权限问题: 四、其他问题 1. 缓存问题 2. 第三方服务问题 3. 配置问题 一、前端问题 1. 界面交互问题 登录按钮的点击事件未正确绑定,导致点击后无法触发登录操作。 页面可能存在

业务中14个需要进行A/B测试的时刻[信息图]

在本指南中,我们将全面了解有关 A/B测试 的所有内容。 我们将介绍不同类型的A/B测试,如何有效地规划和启动测试,如何评估测试是否成功,您应该关注哪些指标,多年来我们发现的常见错误等等。 什么是A/B测试? A/B测试(有时称为“分割测试”)是一种实验类型,其中您创建两种或多种内容变体——如登录页面、电子邮件或广告——并将它们显示给不同的受众群体,以查看哪一种效果最好。 本质上,A/B测

matlab读取NC文件(含group)

matlab读取NC文件(含group): NC文件数据结构: 代码: % 打开 NetCDF 文件filename = 'your_file.nc'; % 替换为你的文件名% 使用 netcdf.open 函数打开文件ncid = netcdf.open(filename, 'NC_NOWRITE');% 查看文件中的组% 假设我们想读取名为 "group1" 的组groupName

Verybot之OpenCV应用一:安装与图像采集测试

在Verybot上安装OpenCV是很简单的,只需要执行:         sudo apt-get update         sudo apt-get install libopencv-dev         sudo apt-get install python-opencv         下面就对安装好的OpenCV进行一下测试,编写一个通过USB摄像头采

Flink任务重启策略

概述 Flink支持不同的重启策略,以在故障发生时控制作业如何重启集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。常用的重启策略: 固定间隔 (Fixe

BIRT 报表的自动化测试

来源:http://www.ibm.com/developerworks/cn/opensource/os-cn-ecl-birttest/如何为 BIRT 报表编写自动化测试用例 BIRT 是一项很受欢迎的报表制作工具,但目前对其的测试还是以人工测试为主。本文介绍了如何对 BIRT 报表进行自动化测试,以及在实际项目中的一些测试实践,从而提高了测试的效率和准确性 -------

可测试,可维护,可移植:上位机软件分层设计的重要性

互联网中,软件工程师岗位会分前端工程师,后端工程师。这是由于互联网软件规模庞大,从业人员众多。前后端分别根据各自需求发展不一样的技术栈。那么上位机软件呢?它规模小,通常一个人就能开发一个项目。它还有必要分前后端吗? 有必要。本文从三个方面论述。分别是可测试,可维护,可移植。 可测试 软件黑盒测试更普遍,但很难覆盖所有应用场景。于是有了接口测试、模块化测试以及单元测试。都是通过降低测试对象

NC 把数字翻译成字符串

系列文章目录 文章目录 系列文章目录前言 前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站,这篇文章男女通用,看懂了就去分享给你的码吧。 描述 有一种将字母编码成数字的方式:‘a’->1, ‘b->2’, … , ‘z->26’。 现在给一串数字,返回有多少种可能的译码结果 import java.u