本文主要是介绍SpringBoot操作spark处理hdfs文件的操作方法,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser...
SpringBoot操作spark处理hdfs文件
1、导入xgOnM依赖
<!-- spark依赖--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.2.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.2.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-mllib --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_2.12</artifactId> <version>3.2.2</version> </dependency>
2、配置spark信息
建立一个配置文件,配置spark信息
import org.apache.spark.SparkConf; import org.apache.spark.sql.SparkSession; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; //将文件交于spring管理 @Configuration public class SparkConfig { //使用yml中的配置 @Value("${spark.master}") private String sparkMaster; @Value("${spark.appName}") private String sparkAppName; @Value("${hdfs.user}") private String hdfsUser; @Value("${hdfs.path}") private String hdfsPath; @Bean public SparkConf sparkConf() { SparkConf conf = new SparkConf(); conf.setMaster(sparkMaster); conf.setAppName(sparkAppName); // 添加HDFS配置 conf.set("fs.defaultFS", hdfsPath); conf.set("spark.hadoop.hdfs.user",hdfsUser); return conf; } @Bean public SparkSession sparkSession() { return SparkSession.builder() .config(sparkConf()) php .getOrCreate(); } }
3、controller和service
controller类
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import orgjavascript.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import xyz.zzj.traffic_main_code.service.SparkService; @RestControllehttp://www.chinasem.cnr @RequestMapping("/spark") public class SparkController { @Autowired private SparkService sparkService; @GetMapping("/run") public String runSparkJob() { //读取Hadoop HDFS文件 String filePath = "hdfs://192.168.44.128:9000/subwayData.csv"; sparkService.executeHadoopSparkJob(filePath); return "Spark job executed successfully!"; } }
处理地铁数据的service
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.jsapache.hadoop.fs.Path; import org.apache.spark.api.Java.JavASParkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import xyz.zzj.traffic_main_code.service.SparkReadHdfs; import java.io.IOException; import java.net.URI; import static org.apache.spark.sql.functions.*; @Service public class SparkReadHdfsImpl implements SparkReadHdfs { private final SparkSession spark; @Value("${hdfs.user}") private String hdfsUser; @Value("${hdfs.path}") private String hdfsPath; @Autowired public SparkReadHdfsImpl(SparkSession spark) { this.spark = spark; } /** * 读取HDFS上的CSV文件并上传到HDFS * @param filePath */ @Override public void sparkSubway(String filePath) { try { // 设置Hadoop配置 JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); Configuration hadoopConf = jsc.hadoopConfiguration(); hadoopConf.set("fs.defaultFS", hdfsPath); hadoopConf.set("hadoop.user.name", hdfsUser); // 读取HDFS上的文件 Dataset<Row> df = spark.read() .option("header", "true") // 指定第一行是列名 .option("inferSchema", "true") // 自动推断列的数据类型 .csv(filePath); // 显示DataFrame的所有数据 // df.show(Integer.MAX_VALUE, false); // 对DataFrame进行清洗和转换操作 // 检查缺失值 df.select("number", "people", "dateTime").na().drop().show(); // 对数据进行类型转换 Dataset<Row> df2 = df.select( col("number").cast(DataTypes.IntegerType), col("people").cast(DataTypes.IntegerType), to_date(col("dateTime"), "yyyy年MM月dd日").alias("dateTime") ); // 去重 Dataset<Row> df3 = df2.dropDuplicates(); // 数据过滤,确保people列没有负数 Dataset<Row> df4 = df3.filter(col("people").geq(0)); // df4.show(); // 数据聚合,按dateTime分组,统计每天的总客流量 Dataset<Row> df6 = df4.groupBy("dateTime").agg(sum("people").alias("total_people")); // df6.show(); sparkForSubway(df6,"/time_subwayData.csv"); //数据聚合,获取每天人数最多的地铁number Dataset<Row> df7 = df4.groupBy("dateTime").agg(max("people").alias("max_people")); sparkForSubway(df7,"/everyday_max_subwayData.csv"); //数据聚合,计算每天的客流强度:每天总people除以632840 Dataset<Row> df8 = df4.groupBy("dateTime").agg(sum("people").divide(632.84).alias("strength")); sparkForSubway(df8,"/everyday_strength_subwayData.csv"); } catch (Exception e) { e.printStackTrace(); } } private static void sparkForSubway(Dataset<Row> df6, String hdfsPath) throws IOException { // 保存处理后的数据到HDFS df6.coalesce(1) .write().mode("overwrite") .option("header", "true") .csv("hdfs://192.168.44.128:9000/time_subwayData"); // 创建Hadoop配置 Configuration conf = new Configuration(); // 获取FileSystem实例 FileSystem fs = FileSystem.get(URI.create("hdfs://192.168.44.128:9000"), conf); // 定义临时目录和目标文件路径 Path tempDir = new Path("/time_subwayData"); FileStatus[] files = fs.listStatus(tempDir); // 检查目标文件是否存在,如果存在则删除 Path targetFile1 = new Path(hdfsPath); if (fs.exists(targetFile1)) { fs.delete(targetFile1, true); // true 表示递归删除 } for (FileStatus file : files) { if (file.isFile() && file.getPath().getName().startsWith("part-")) { Path targetFile = new Path(hdfsPath); fs.rename(file.getPath(), targetFile); } } // 删除临时目录 fs.delete(tempDir, true); } }
4、运行
- 项目运行完后,打开浏览器
- spark处理地铁数据
- http://localhost:8686/spark/dispose
- 观察spark和hdfs
- http://192.168.44.128:8099/
- http://192.168.44.128:9870/explorer.html#/
到此这篇关于SpringBoot操作spark处理hdfs文件的文章就介绍到这了,更多相关SpringBoot spark处理hdfs文件内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程China编程(www.chinasem.cn)!
这篇关于SpringBoot操作spark处理hdfs文件的操作方法的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!