本文主要是介绍SparkStreaming 删选含有error的行,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
筛选流数据中所有含error的行
package com.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContextobject PrintError {def main(args: Array[String]) {val conf = new SparkConf()// conf.setMaster("local")conf.setAppName("print Error")//从SparkConf创建StreamingContext并指定1秒钟的批处理大小val ssc = new StreamingContext(conf, Seconds(10))//链接到本机机器7777端口上后,使用收到的数据创建DStream// val lines = ssc.socketTextStream("172.171.51.131", 7777)val lines = ssc.socketTextStream("172.171.51.131", 7777)//从DStream中筛选出包含字符串“error”的行val errorLines = lines.filter(_.contains("error"))//打印所有含“error”的行errorLines.print()//启动流计算环境StreamingContext并等待它“完成”ssc.start()//等待作业完成ssc.awaitTermination()}}
输入:
输出:
这篇关于SparkStreaming 删选含有error的行的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!