本文主要是介绍flink-cep实践,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
package com.techwolf.hubble;import com.alibaba.fastjson.JSONObject;
import com.techwolf.hubble.constant.Config;
import com.techwolf.hubble.model.TestEvent;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternFlatTimeoutFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.util.List;
import java.util.Map;/*** Hello world!**/
public class App {public static void main(String[] args) throws Exception{//初始化环境StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//定义时间戳提取器作为输入流分配时间戳和水位线WatermarkStrategy<TestEvent> watermarkStrategy=WatermarkStrategy.<TestEvent>forMonotonousTimestamps().withTimestampAssigner(new EventTimeAssignerSupplier());DataStream<TestEvent> inputDataSteam=env.fromElements(new TestEvent("1","A",System.currentTimeMillis()-100*1000,"1"),new TestEvent("1","A",System.currentTimeMillis()-85*1000,"2"),new TestEvent("1","A",System.currentTimeMillis()-80*1000,"3"),new TestEvent("1","A",System.currentTimeMillis()-75*1000,"4"),new TestEvent("1","A",System.currentTimeMillis()-60*1000,"5"),new TestEvent("1","A",System.currentTimeMillis()-55*1000,"6"),new TestEvent("1","A",System.currentTimeMillis()-40*1000,"7"),new TestEvent("1","A",System.currentTimeMillis()-35*1000,"8"),new TestEvent("1","A",System.currentTimeMillis()-20*1000,"9"),new TestEvent("1","A",System.currentTimeMillis()-10*1000,"10"),new TestEvent("1","B",System.currentTimeMillis()-5*1000,"11")).assignTimestampsAndWatermarks(watermarkStrategy);Pattern<TestEvent,TestEvent> pattern=Pattern.<TestEvent>begin("begin").where(new SimpleCondition<TestEvent>() {@Overridepublic boolean filter(TestEvent testEvent) throws Exception {return testEvent.getAction().equals("A");}}).followedBy("end").where(new SimpleCondition<TestEvent>() {@Overridepublic boolean filter(TestEvent testEvent) throws Exception {return testEvent.getAction().equals("B");}}).within(Time.seconds(10));PatternStream<TestEvent> patternStream=CEP.pattern(inputDataSteam.keyBy(TestEvent::getId),pattern);OutputTag<TestEvent> timeOutTag=new OutputTag<TestEvent>("timeOutTag"){};//处理匹配结果SingleOutputStreamOperator<TestEvent> twentySingleOutputStream=patternStream.flatSelect(timeOutTag,new EventTimeOut(),new FlatSelect()).uid("match_twenty_minutes_pattern");DataStream<String> result=twentySingleOutputStream.getSideOutput(timeOutTag).map(new MapFunction<TestEvent, String>() {@Overridepublic String map(TestEvent testEvent) throws Exception {return JSONObject.toJSONString(testEvent);}});result.print();env.execute(Config.JOB_NAME);}public static class EventTimeOut implements PatternFlatTimeoutFunction<TestEvent,TestEvent> {private static final long serialVersionUID = -2471077777598713906L;@Overridepublic void timeout(Map<String, List<TestEvent>> map, long l, Collector<TestEvent> collector) throws Exception {if (null != map.get("begin")) {for (TestEvent event : map.get("begin")) {collector.collect(event);}}}}public static class FlatSelect implements PatternFlatSelectFunction<TestEvent,TestEvent> {private static final long serialVersionUID = 1753544074226581611L;@Overridepublic void flatSelect(Map<String, List<TestEvent>> map, Collector<TestEvent> collector) throws Exception {if (null != map.get("begin")) {for (TestEvent event : map.get("begin")) {collector.collect(event);}}}}public static class EventTimeAssignerSupplier implements TimestampAssignerSupplier<TestEvent> {private static final long serialVersionUID = -9040340771307752904L;@Overridepublic TimestampAssigner<TestEvent> createTimestampAssigner(Context context) {return new EventTimeAssigner();}}public static class EventTimeAssigner implements TimestampAssigner<TestEvent> {@Overridepublic long extractTimestamp(TestEvent event, long l) {return event.getEventTime();}}
}
这篇关于flink-cep实践的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!