SpringBoot操作spark处理hdfs文件的操作方法

2025-01-10 04:50

本文主要是介绍SpringBoot操作spark处理hdfs文件的操作方法,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser...

SpringBoot操作spark处理hdfs文件

SpringBoot操作spark处理hdfs文件的操作方法

1、导入xgOnM依赖

<!--        spark依赖-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.2.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-mllib -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.12</artifactId>
            <version>3.2.2</version>
        </dependency>

2、配置spark信息

建立一个配置文件,配置spark信息

import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//将文件交于spring管理
@Configuration
public class SparkConfig {
    //使用yml中的配置
    @Value("${spark.master}")
    private String sparkMaster;
    @Value("${spark.appName}")
    private String sparkAppName;
    @Value("${hdfs.user}")
    private String hdfsUser;
    @Value("${hdfs.path}")
    private String hdfsPath;
    @Bean
    public SparkConf sparkConf() {
        SparkConf conf = new SparkConf();
        conf.setMaster(sparkMaster);
        conf.setAppName(sparkAppName);
        // 添加HDFS配置
        conf.set("fs.defaultFS", hdfsPath);
        conf.set("spark.hadoop.hdfs.user",hdfsUser);
        return conf;
    }
    @Bean
    public SparkSession sparkSession() {
        return SparkSession.builder()
                .config(sparkConf())
          php      .getOrCreate();
    }
}

3、controller和service

controller类

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import orgjavascript.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import xyz.zzj.traffic_main_code.service.SparkService;
@RestControllehttp://www.chinasem.cnr
@RequestMapping("/spark")
public class SparkController {
    @Autowired
    private SparkService sparkService;
    @GetMapping("/run")
    public String runSparkJob() {
        //读取Hadoop HDFS文件
        String filePath = "hdfs://192.168.44.128:9000/subwayData.csv";
        sparkService.executeHadoopSparkJob(filePath);
        return "Spark job executed successfully!";
    }
}

处理地铁数据的service

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.jsapache.hadoop.fs.Path;
import org.apache.spark.api.Java.JavASParkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import xyz.zzj.traffic_main_code.service.SparkReadHdfs;
import java.io.IOException;
import java.net.URI;
import static org.apache.spark.sql.functions.*;
@Service
public class SparkReadHdfsImpl implements SparkReadHdfs {
    private final SparkSession spark;
    @Value("${hdfs.user}")
    private String hdfsUser;
    @Value("${hdfs.path}")
    private String hdfsPath;
    @Autowired
    public SparkReadHdfsImpl(SparkSession spark) {
        this.spark = spark;
    }
    /**
     * 读取HDFS上的CSV文件并上传到HDFS
     * @param filePath
     */
    @Override
    public void sparkSubway(String filePath) {
        try {
            // 设置Hadoop配置
            JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
            Configuration hadoopConf = jsc.hadoopConfiguration();
            hadoopConf.set("fs.defaultFS", hdfsPath);
            hadoopConf.set("hadoop.user.name", hdfsUser);
            // 读取HDFS上的文件
            Dataset<Row> df = spark.read()
                    .option("header", "true") // 指定第一行是列名
                    .option("inferSchema", "true") // 自动推断列的数据类型
                    .csv(filePath);
            // 显示DataFrame的所有数据
//            df.show(Integer.MAX_VALUE, false);
            // 对DataFrame进行清洗和转换操作
            // 检查缺失值
            df.select("number", "people", "dateTime").na().drop().show();
            // 对数据进行类型转换
            Dataset<Row> df2 = df.select(
                    col("number").cast(DataTypes.IntegerType),
                    col("people").cast(DataTypes.IntegerType),
                    to_date(col("dateTime"), "yyyy年MM月dd日").alias("dateTime")
            );
            // 去重
            Dataset<Row> df3 = df2.dropDuplicates();
            // 数据过滤,确保people列没有负数
            Dataset<Row> df4 = df3.filter(col("people").geq(0));
//            df4.show();
            // 数据聚合,按dateTime分组,统计每天的总客流量
            Dataset<Row> df6 = df4.groupBy("dateTime").agg(sum("people").alias("total_people"));
//            df6.show();
            sparkForSubway(df6,"/time_subwayData.csv");
            //数据聚合,获取每天人数最多的地铁number
            Dataset<Row> df7 = df4.groupBy("dateTime").agg(max("people").alias("max_people"));
            sparkForSubway(df7,"/everyday_max_subwayData.csv");
            //数据聚合,计算每天的客流强度:每天总people除以632840
            Dataset<Row> df8 = df4.groupBy("dateTime").agg(sum("people").divide(632.84).alias("strength"));
            sparkForSubway(df8,"/everyday_strength_subwayData.csv");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    private static void sparkForSubway(Dataset<Row> df6, String hdfsPath) throws IOException {
        // 保存处理后的数据到HDFS
        df6.coalesce(1)
                .write().mode("overwrite")
                .option("header", "true")
                .csv("hdfs://192.168.44.128:9000/time_subwayData");
        // 创建Hadoop配置
        Configuration conf = new Configuration();
        // 获取FileSystem实例
        FileSystem fs = FileSystem.get(URI.create("hdfs://192.168.44.128:9000"), conf);
        // 定义临时目录和目标文件路径
        Path tempDir = new Path("/time_subwayData");
        FileStatus[] files = fs.listStatus(tempDir);
        // 检查目标文件是否存在,如果存在则删除
        Path targetFile1 = new Path(hdfsPath);
        if (fs.exists(targetFile1)) {
            fs.delete(targetFile1, true); // true 表示递归删除
        }
        for (FileStatus file : files) {
            if (file.isFile() && file.getPath().getName().startsWith("part-")) {
                Path targetFile = new Path(hdfsPath);
                fs.rename(file.getPath(), targetFile);
            }
        }
        // 删除临时目录
        fs.delete(tempDir, true);
    }
}

4、运行

  • 项目运行完后,打开浏览器
    • spark处理地铁数据
  • http://localhost:8686/spark/dispose
  • 观察spark和hdfs
    • http://192.168.44.128:8099/
    • http://192.168.44.128:9870/explorer.html#/

SpringBoot操作spark处理hdfs文件的操作方法

到此这篇关于SpringBoot操作spark处理hdfs文件的文章就介绍到这了,更多相关SpringBoot spark处理hdfs文件内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程China编程(www.chinasem.cn)!

这篇关于SpringBoot操作spark处理hdfs文件的操作方法的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:http://www.cppcns.com/ruanjian/java/696564.html
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/1153015

相关文章

SpringBoot应用中出现的Full GC问题的场景与解决

《SpringBoot应用中出现的FullGC问题的场景与解决》这篇文章主要为大家详细介绍了SpringBoot应用中出现的FullGC问题的场景与解决方法,文中的示例代码讲解详细,感兴趣的小伙伴可... 目录Full GC的原理与触发条件原理触发条件对Spring Boot应用的影响示例代码优化建议结论F

深入理解Apache Kafka(分布式流处理平台)

《深入理解ApacheKafka(分布式流处理平台)》ApacheKafka作为现代分布式系统中的核心中间件,为构建高吞吐量、低延迟的数据管道提供了强大支持,本文将深入探讨Kafka的核心概念、架构... 目录引言一、Apache Kafka概述1.1 什么是Kafka?1.2 Kafka的核心概念二、Ka

springboot项目中常用的工具类和api详解

《springboot项目中常用的工具类和api详解》在SpringBoot项目中,开发者通常会依赖一些工具类和API来简化开发、提高效率,以下是一些常用的工具类及其典型应用场景,涵盖Spring原生... 目录1. Spring Framework 自带工具类(1) StringUtils(2) Coll

Python 中的 with open文件操作的最佳实践

《Python中的withopen文件操作的最佳实践》在Python中,withopen()提供了一个简洁而安全的方式来处理文件操作,它不仅能确保文件在操作完成后自动关闭,还能处理文件操作中的异... 目录什么是 with open()?为什么使用 with open()?使用 with open() 进行

SpringBoot条件注解核心作用与使用场景详解

《SpringBoot条件注解核心作用与使用场景详解》SpringBoot的条件注解为开发者提供了强大的动态配置能力,理解其原理和适用场景是构建灵活、可扩展应用的关键,本文将系统梳理所有常用的条件注... 目录引言一、条件注解的核心机制二、SpringBoot内置条件注解详解1、@ConditionalOn

通过Spring层面进行事务回滚的实现

《通过Spring层面进行事务回滚的实现》本文主要介绍了通过Spring层面进行事务回滚的实现,包括声明式事务和编程式事务,具有一定的参考价值,感兴趣的可以了解一下... 目录声明式事务回滚:1. 基础注解配置2. 指定回滚异常类型3. ​不回滚特殊场景编程式事务回滚:1. ​使用 TransactionT

Spring LDAP目录服务的使用示例

《SpringLDAP目录服务的使用示例》本文主要介绍了SpringLDAP目录服务的使用示例... 目录引言一、Spring LDAP基础二、LdapTemplate详解三、LDAP对象映射四、基本LDAP操作4.1 查询操作4.2 添加操作4.3 修改操作4.4 删除操作五、认证与授权六、高级特性与最佳

Spring Shell 命令行实现交互式Shell应用开发

《SpringShell命令行实现交互式Shell应用开发》本文主要介绍了SpringShell命令行实现交互式Shell应用开发,能够帮助开发者快速构建功能丰富的命令行应用程序,具有一定的参考价... 目录引言一、Spring Shell概述二、创建命令类三、命令参数处理四、命令分组与帮助系统五、自定义S

SpringSecurity JWT基于令牌的无状态认证实现

《SpringSecurityJWT基于令牌的无状态认证实现》SpringSecurity中实现基于JWT的无状态认证是一种常见的做法,本文就来介绍一下SpringSecurityJWT基于令牌的无... 目录引言一、JWT基本原理与结构二、Spring Security JWT依赖配置三、JWT令牌生成与

Java中Date、LocalDate、LocalDateTime、LocalTime、时间戳之间的相互转换代码

《Java中Date、LocalDate、LocalDateTime、LocalTime、时间戳之间的相互转换代码》:本文主要介绍Java中日期时间转换的多种方法,包括将Date转换为LocalD... 目录一、Date转LocalDateTime二、Date转LocalDate三、LocalDateTim