本文主要是介绍大数据-玩转数据-Spark-Structured Streaming 监控(python版),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
大数据-玩转数据-Spark-Structured Streaming 监控(python版)
查询时返回的StreamingQuery() 对象可以对查询进行监控,对象包括recentProgress,lastProgress,status等多个属性。
代码举例
#!/usr/bin/env python3from pprint import pprint
import timefrom pyspark.sql import SparkSession
from pyspark.sql.functions import split
from pyspark.sql.functions import explodeif __name__ == "__main__":spark = SparkSession \.builder \.appName("StructuredNetworkWordCount") \.getOrCreate()spark.sparkContext.setLogLevel('WARN')lines = spark \.readStream \.format("socket") \.option("host", "localhost") \.option("port", 9999) \.load()words = lines.select(explode(split(lines.value, " ")).alias("word"))wordCounts = words.groupBy("word").count()query = wordCounts \.writeStream \.outputMode("complete") \.format("console") \.queryName('write_to_console') \.trigger(processingTime="8 seconds") \.start()while True:if query.lastProgress:if query.lastProgress['numInputRows'] > 0:pprint(query.lastProgress)pprint(query.status)time.sleep(5)
这篇关于大数据-玩转数据-Spark-Structured Streaming 监控(python版)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!