Spark核心编程-分组取topN

2024-06-11 05:38
文章标签 分组 编程 核心 spark topn

本文主要是介绍Spark核心编程-分组取topN,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

案例需求
对每个班级内的学生成绩,取出前3名。(分组取topN)

输入测试数据(以“ ”以做分割符)

class1 90
class2 56
class1 87
class1 76
class2 88
class1 95
class1 74
class2 87
class2 67
class2 77
class1 98
class2 96 


实现如下:

1 、scala的版本


package com.spark.core
 
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.collection.mutable.ArrayBuffer
import scala.util.control.Breaks._
 
/**
 * @author Ganymede
 */
object GroupTop3 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Top3").setMaster("local[1]")
    val sc = new SparkContext(conf)
 
    val lines = sc.textFile("D:/scala-eclipse/workspace/spark-study-java/src/main/resources/score.txt", 1)
 
    val pairs = lines.map { x =>
      {
        val splited = x.split(" ")
        (splited(0), splited(1).toInt)
      }
    }
 
    val groupedPairs = pairs.groupByKey();
 
    val top3Score = groupedPairs.map(classScores => {
      val top3 = Array[Int](-1, -1, -1)
 
      val className = classScores._1
 
      val scores = classScores._2
 
      for (score <- scores) {
        breakable {
          for (i <- 0 until 3) {
            if (top3(i) == -1) {
              top3(i) = score;
              break;
            } else if (score > top3(i)) {
              var j = 2
              while (j > i) {
                top3(j) = top3(j - 1);
                j = j - 1
              }
              top3(i) = score;
              break;
            }
          }
        }
      }
      (className, top3);
    })
 
    top3Score.foreach(x => {
      println(x._1)
      val res = x._2
      for (i <- res) {
        println(i)
      }
      println("==========================")
    })
 
  }
}

输出:

class1
98
95
90
==========================
class2
96
88
87
========================== 

在实现group by 后的排序算法,用到了break函数.
scala没有提供类似于java的break语句。但是可以使用boolean类型变量、return或者Breaks的break函数来替代使用。 

2、用spark-sql来实现

创建一个表


create table scores(className string, score int) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' '
加载数据
load data local inpath '/opt/software/tmp/scores.data'  overwrite into table scores;
查询按班级分组并返回倒序的top3

select className,score from (SELECT className,score, Row_Number() OVER (partition by className ORDER BY score desc ) rank FROM scores ) a where a.rank<=3;
实际就是用了 row_number() over (partition by ... order by ...)的函数。同样hive也是支持的


3、总结:实际生产中,大部分还是用SQL来分析与统计的,明显方便一条SQL搞定了;而代码实现更灵活,便于性能的优化。
 

这篇关于Spark核心编程-分组取topN的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1050334

相关文章

SpringQuartz定时任务核心组件JobDetail与Trigger配置

《SpringQuartz定时任务核心组件JobDetail与Trigger配置》Spring框架与Quartz调度器的集成提供了强大而灵活的定时任务解决方案,本文主要介绍了SpringQuartz定... 目录引言一、Spring Quartz基础架构1.1 核心组件概述1.2 Spring集成优势二、J

Mysql如何将数据按照年月分组的统计

《Mysql如何将数据按照年月分组的统计》:本文主要介绍Mysql如何将数据按照年月分组的统计方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录mysql将数据按照年月分组的统计要的效果方案总结Mysql将数据按照年月分组的统计要的效果方案① 使用 DA

揭秘Python Socket网络编程的7种硬核用法

《揭秘PythonSocket网络编程的7种硬核用法》Socket不仅能做聊天室,还能干一大堆硬核操作,这篇文章就带大家看看Python网络编程的7种超实用玩法,感兴趣的小伙伴可以跟随小编一起... 目录1.端口扫描器:探测开放端口2.简易 HTTP 服务器:10 秒搭个网页3.局域网游戏:多人联机对战4.

Java并发编程必备之Synchronized关键字深入解析

《Java并发编程必备之Synchronized关键字深入解析》本文我们深入探索了Java中的Synchronized关键字,包括其互斥性和可重入性的特性,文章详细介绍了Synchronized的三种... 目录一、前言二、Synchronized关键字2.1 Synchronized的特性1. 互斥2.

Python异步编程中asyncio.gather的并发控制详解

《Python异步编程中asyncio.gather的并发控制详解》在Python异步编程生态中,asyncio.gather是并发任务调度的核心工具,本文将通过实际场景和代码示例,展示如何结合信号量... 目录一、asyncio.gather的原始行为解析二、信号量控制法:给并发装上"节流阀"三、进阶控制

Linux find 命令完全指南及核心用法

《Linuxfind命令完全指南及核心用法》find是Linux系统最强大的文件搜索工具,支持嵌套遍历、条件筛选、执行动作,下面给大家介绍Linuxfind命令完全指南,感兴趣的朋友一起看看吧... 目录一、基础搜索模式1. 按文件名搜索(精确/模糊匹配)2. 排除指定目录/文件二、根据文件类型筛选三、时间

使用Python在Excel中创建和取消数据分组

《使用Python在Excel中创建和取消数据分组》Excel中的分组是一种通过添加层级结构将相邻行或列组织在一起的功能,当分组完成后,用户可以通过折叠或展开数据组来简化数据视图,这篇博客将介绍如何使... 目录引言使用工具python在Excel中创建行和列分组Python在Excel中创建嵌套分组Pyt

C#多线程编程中导致死锁的常见陷阱和避免方法

《C#多线程编程中导致死锁的常见陷阱和避免方法》在C#多线程编程中,死锁(Deadlock)是一种常见的、令人头疼的错误,死锁通常发生在多个线程试图获取多个资源的锁时,导致相互等待对方释放资源,最终形... 目录引言1. 什么是死锁?死锁的典型条件:2. 导致死锁的常见原因2.1 锁的顺序问题错误示例:不同

PyCharm接入DeepSeek实现AI编程的操作流程

《PyCharm接入DeepSeek实现AI编程的操作流程》DeepSeek是一家专注于人工智能技术研发的公司,致力于开发高性能、低成本的AI模型,接下来,我们把DeepSeek接入到PyCharm中... 目录引言效果演示创建API key在PyCharm中下载Continue插件配置Continue引言

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont