Spark项目实战:飞机延误预测项目

2023-11-22 02:48

本文主要是介绍Spark项目实战:飞机延误预测项目,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

大家好,我是邵奈一,一个不务正业的程序猿、正儿八经的斜杠青年。
1、这几年,我整理了很多IT技术相关的教程给大家,爱生活、爱分享。
2、博客导航跳转(请收藏):邵奈一的技术博客导航
3、此篇为本博客的导航,遇到问题可以直接在本篇搜索,方便快捷。涉及的技术面比较广,也包括各种解决方案,报错解决等。
4、如果您觉得文章有用,请收藏,转发,评论,并关注我,谢谢!

| 公众号 | 微信 | 微博 | CSDN | 简书 |


教程目录

  • 0x00 教程内容
  • 0x01 数据准备
          • 1. 下载数据
          • 2. 上传数据到HDFS
          • 3. 数据字段详细说明
  • 0x02 工程实现
          • 1. 依赖准备
          • 2. 上传一份数据到本地
          • 3. 数据处理代码实现
          • 4. 执行效果展示
  • 0x03 项目讲解
          • 1. 项目整体介绍
          • 2. 使用Hadoop和Spark进行预处理
          • 3. 使用Spark和ML-Lib建模
  • 0x04 打包到服务器执行
  • 0x05 项目升级
  • 0xFF 总结

0x00 教程内容

  1. 数据准备
  2. 工程实现
  3. 项目讲解
  4. 项目升级

PS:后期还会补充:
1、进行Spark、Scala版本升级
2、继续优化数据,提高预测效果
3、代码优化,代码里有小部分测试代码,应该优化~

0x01 数据准备

1. 下载数据

a. wget参考命令:

wget https://dataverse.harvard.edu/api/access/datafile/:persistentId?persistentId=doi:10.7910/DVN/HG7NV7/2BHLWK -O /tmp/flights_2007.csv.bz2
wget https://dataverse.harvard.edu/api/access/datafile/:persistentId?persistentId=doi:10.7910/DVN/HG7NV7/EIR0RA -O /tmp/flights_2008.csv.bz2

PS:
若上述链接已失效,请联系博主私下获取。
或者关注公众号,回复:飞机延误预测。

b. 请自行修改名称,如果不是wget方式下载:

flights_2007.csv.bz2
flights_2008.csv.bz2

2. 上传数据到HDFS

a. 上传到HDFS的/tmp/airflightsdelays/路径下

3. 数据字段详细说明

a. 上传到HDFS的/tmp/airflightsdelays/路径下
在这里插入图片描述
在这里插入图片描述
说明:可以自己解压一下数据,查看一下前面几条数据(数据信息有待完善及校验!)。

0x02 工程实现

1. 依赖准备
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.shaonaiyi</groupId><artifactId>sparkMLlib</artifactId><version>1.0-SNAPSHOT</version><properties><spark.version>1.6.3</spark.version><scala.version>2.10.5</scala.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>joda-time</groupId><artifactId>joda-time</artifactId><version>2.9.1</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-compiler</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-reflect</artifactId><version>${scala.version}</version></dependency></dependencies><build><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.3.1</version><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><phase>compile</phase><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions><configuration><scalaVersion>${scala.version}</scalaVersion></configuration></plugin><!--<plugin>--><!--<groupId>org.scala-tools</groupId>--><!--<artifactId>maven-scala-plugin</artifactId>--><!--<executions>--><!--<execution>--><!--<goals>--><!--<goal>compile</goal>--><!--<goal>testCompile</goal>--><!--</goals>--><!--</execution>--><!--</executions>--><!--<configuration>--><!--<scalaVersion>${scala.version}</scalaVersion>--><!--<args>--><!--<arg>-target:jvm-1.7</arg>--><!--</args>--><!--</configuration>--><!--</plugin>--></plugins></build></project>
2. 上传一份数据到本地

a. 项目根路径的/tmp/airflightsdelays/
在这里插入图片描述

3. 数据处理代码实现
package com.shaonaiyiimport org.apache.spark.rdd._import scala.collection.JavaConverters._
import au.com.bytecode.opencsv.CSVReader
import java.io._import org.apache.spark.mllib.classification.{LogisticRegressionWithSGD, SVMWithSGD}
import org.apache.spark.mllib.feature.StandardScaler
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.{DecisionTree, RandomForest}
import org.apache.spark.mllib.tree.configuration.Strategy
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.joda.time.format.DateTimeFormat
import org.joda.time.DateTime
import org.joda.time.Days/*** @Auther: 邵奈一* @Date: 2019/05/06 下午 3:08* @Description: 飞机延误预测项目*/
object DelayRecProject {def main(args: Array[String]): Unit = {//打包到集群时,注释掉本地测试代码val conf = new SparkConf().setMaster("local[5]").setAppName("DelayRecProject")
//    val conf = new SparkConf()val sc = new SparkContext(conf)//设置log打印级别sc.setLogLevel("WARN")val sqlContext = new SQLContext(sc)import sqlContext.implicits._//阶段一:数据预处理//打包到集群时,注释掉本地测试代码val data_2007tmp = prepFlightDelays("tmp/airflightsdelays/flights_2007.csv.bz2",sc)
//    val data_2007tmp = prepFlightDelays("/tmp/airflightsdelays/flights_2007.csv.bz2",sc)val data_2007 = data_2007tmp.map(rec => rec.gen_features._2)//打包到集群时,注释掉本地测试代码val data_2008 = prepFlightDelays("tmp/airflightsdelays/flights_2008.csv.bz2",sc).map(rec => rec.gen_features._2)
//    val data_2008 = prepFlightDelays("/tmp/airflightsdelays/flights_2008.csv.bz2",sc).map(rec => rec.gen_features._2)data_2007tmp.toDF().registerTempTable("data_2007tmp")data_2007.take(5).map(x => x mkString ",").foreach(println)//阶段二:使用Spark和ML-Lib建模// Prepare training setval parsedTrainData = data_2007.map(parseData)parsedTrainData.cacheval scaler = new StandardScaler(withMean = true, withStd = true).fit(parsedTrainData.map(x => x.features))val scaledTrainData = parsedTrainData.map(x => LabeledPoint(x.label, scaler.transform(Vectors.dense(x.features.toArray))))scaledTrainData.cache// Prepare test/validation setval parsedTestData = data_2008.map(parseData)parsedTestData.cacheval scaledTestData = parsedTestData.map(x => LabeledPoint(x.label, scaler.transform(Vectors.dense(x.features.toArray))))scaledTestData.cachescaledTrainData.take(3).map(x => (x.label, x.features)).foreach(println)//阶段三:评估分类指标// Function to compute evaluation metricsdef eval_metrics(labelsAndPreds: RDD[(Double, Double)]) : Tuple2[Array[Double], Array[Double]] = {val tp = labelsAndPreds.filter(r => r._1==1 && r._2==1).count.toDoubleval tn = labelsAndPreds.filter(r => r._1==0 && r._2==0).count.toDoubleval fp = labelsAndPreds.filter(r => r._1==1 && r._2==0).count.toDoubleval fn = labelsAndPreds.filter(r => r._1==0 && r._2==1).count.toDoubleval precision = tp / (tp+fp)val recall = tp / (tp+fn)val F_measure = 2*precision*recall / (precision+recall)val accuracy = (tp+tn) / (tp+tn+fp+fn)new Tuple2(Array(tp, tn, fp, fn), Array(precision, recall, F_measure, accuracy))}class Metrics(labelsAndPreds: RDD[(Double, Double)]) extends java.io.Serializable {private def filterCount(lftBnd:Int,rtBnd:Int):Double = labelsAndPreds.map(x => (x._1.toInt, x._2.toInt)).filter(_ == (lftBnd,rtBnd)).count()lazy val tp = filterCount(1,1)  // true positiveslazy val tn = filterCount(0,0)  // true negativeslazy val fp = filterCount(0,1)  // false positiveslazy val fn = filterCount(1,0)  // false negativeslazy val precision = tp / (tp+fp)lazy val recall = tp / (tp+fn)lazy val F1 = 2*precision*recall / (precision+recall)lazy val accuracy = (tp+tn) / (tp+tn+fp+fn)}//阶段四:构建回归模型// Build the Logistic Regression modelval model_lr = LogisticRegressionWithSGD.train(scaledTrainData, numIterations=100)// Predictval labelsAndPreds_lr = scaledTestData.map { point =>val pred = model_lr.predict(point.features)(pred, point.label)}val m_lr = eval_metrics(labelsAndPreds_lr)._2println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f".format(m_lr(0), m_lr(1), m_lr(2), m_lr(3)))println(model_lr.weights)//阶段五:构建向量机算法模型// Build the SVM modelval svmAlg = new SVMWithSGD()svmAlg.optimizer.setNumIterations(100).setRegParam(1.0).setStepSize(1.0)val model_svm = svmAlg.run(scaledTrainData)// Predictval labelsAndPreds_svm = scaledTestData.map { point =>val pred = model_svm.predict(point.features)(pred, point.label)}val m_svm = eval_metrics(labelsAndPreds_svm)._2println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f".format(m_svm(0), m_svm(1), m_svm(2), m_svm(3)))//阶段六:构建决策树算法模型// Build the Decision Tree modelval numClasses = 2val categoricalFeaturesInfo = Map[Int, Int]()val impurity = "gini"val maxDepth = 10val maxBins = 100val model_dt = DecisionTree.trainClassifier(parsedTrainData, numClasses, categoricalFeaturesInfo, impurity, maxDepth, maxBins)// Predictval labelsAndPreds_dt = parsedTestData.map { point =>val pred = model_dt.predict(point.features)(pred, point.label)}val m_dt = eval_metrics(labelsAndPreds_dt)._2println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f".format(m_dt(0), m_dt(1), m_dt(2), m_dt(3)))//阶段七:构建随机森林算法模型val treeStrategy = Strategy.defaultStrategy("Classification")val numTrees = 100val featureSubsetStrategy = "auto" // Let the algorithm chooseval model_rf = RandomForest.trainClassifier(parsedTrainData, treeStrategy, numTrees, featureSubsetStrategy, seed = 123)// Predictval labelsAndPreds_rf = parsedTestData.map { point =>val pred = model_rf.predict(point.features)(point.label, pred)}val m_rf = new Metrics(labelsAndPreds_rf)println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f".format(m_rf.precision, m_rf.recall, m_rf.F1, m_rf.accuracy))}case class DelayRec(year: String,month: String,dayOfMonth: String,dayOfWeek: String,crsDepTime: String,depDelay: String,origin: String,distance: String,cancelled: String) {val holidays = List("01/01/2007", "01/15/2007", "02/19/2007", "05/28/2007", "06/07/2007", "07/04/2007","09/03/2007", "10/08/2007" ,"11/11/2007", "11/22/2007", "12/25/2007","01/01/2008", "01/21/2008", "02/18/2008", "05/22/2008", "05/26/2008", "07/04/2008","09/01/2008", "10/13/2008" ,"11/11/2008", "11/27/2008", "12/25/2008")def gen_features: (String, Array[Double]) = {val values = Array(depDelay.toDouble,month.toDouble,dayOfMonth.toDouble,dayOfWeek.toDouble,get_hour(crsDepTime).toDouble,distance.toDouble,days_from_nearest_holiday(year.toInt, month.toInt, dayOfMonth.toInt))new Tuple2(to_date(year.toInt, month.toInt, dayOfMonth.toInt), values)}def get_hour(depTime: String) : String = "%04d".format(depTime.toInt).take(2)def to_date(year: Int, month: Int, day: Int) = "%04d%02d%02d".format(year, month, day)def days_from_nearest_holiday(year:Int, month:Int, day:Int): Int = {val sampleDate = new DateTime(year, month, day, 0, 0)holidays.foldLeft(3000) { (r, c) =>val holiday = DateTimeFormat.forPattern("MM/dd/yyyy").parseDateTime(c)val distance = Math.abs(Days.daysBetween(holiday, sampleDate).getDays)math.min(r, distance)}}}def prepFlightDelays(infile: String, sc: SparkContext): RDD[DelayRec] = {val data = sc.textFile(infile)data.map { line =>val reader = new CSVReader(new StringReader(line))reader.readAll().asScala.toList.map(rec => DelayRec(rec(0),rec(1),rec(2),rec(3),rec(5),rec(15),rec(16),rec(18),rec(21)))}.map(list => list(0)).filter(rec => rec.year != "Year").filter(rec => rec.cancelled == "0").filter(rec => rec.origin == "ORD")}def parseData(vals: Array[Double]): LabeledPoint = {LabeledPoint(if (vals(0)>=15) 1.0 else 0.0, Vectors.dense(vals.drop(1)))}}
4. 执行效果展示

a. 执行:
在这里插入图片描述

0x03 项目讲解

1. 项目整体介绍

在本项目中,我们将演示如何使用Hadoop构建预测模型,这次我们将使用Apache Spark和ML-Lib。

教程通过其Scala API使用Apache Spark来生成我们的特征矩阵,并使用ML-Lib(Spark的机器学习库)来构建和评估我们的分类模型。

构建航班延误的预测模型,源数据集位于我们下载的数据,其中包括1987年至2008年间美国航班的详细信息。后期会加上天气信息丰富数据,包括每日温度(最小/最大),风速,降雪条件和降水量。

我们会建立一个监督学习模型,来预测离开奥黑尔国际机场(ORD)的航班延误情况。最后我们将使用2007年的数据来构建模型,并使用2008年的数据测试其有效性。

2. 使用Hadoop和Spark进行预处理

Apache Spark的基本数据抽象是RDD(弹性分布式数据集),它是一个容错的元素集合,可以在Hadoop集群中并行运行。

Spark的API(以Scala,Python或Java提供)支持各种转换,例如map()和flatMap(),filter(),join()等,以创建和操作RDD。有关API的完整说明,请查看Spark API编程指南:http://spark.apache.org/docs/1.6.3/programming-guide.html。

与Scikit-learn演示类似,在我们的第一次迭代中,我们为每个航班生成以下功能:

month(月份):冬季应该比夏季月份延迟更多
day of month(每月的哪一天):这可能不是一个非常具有预测性的变量,但无论如何都要使用它
day of week(星期几):周末与工作日
hour of the day(一天中第几个小时):晚些时候往往有更多的延误
Distance(距离):有趣的是看这个变量是否是延迟的良好预测因子
Days from nearest holiday(距离最近的假期天数):距离最近的美国假期的天数

我们将使用Spark RDD执行相同的预处理,将原始飞行延迟数据集转换为两个特征矩阵:data_2007(我们的训练集)和data_2008(我们的测试集)。
封装航班延误记录的Case类DelayRec表示特征向量,其方法执行大部分繁重工作:

to_date()是一种将年/月/日转换为字符串的辅助方法
gen_features(row)接受一行输入并生成一个键/值元组,其中键是日期字符串(to_date的输出),值是特征值。我们将在第二次迭代中使用它来与天气数据连接。
get_hour()方法提取出发时间的2位小时部分
days_from_nearest_holiday()方法计算列表假期中任何假日提供的年/月/日的最小距离(以天为单位)。

使用DelayRec,我们的处理将执行以下步骤(在函数prepFlightDelays中):

1、我们使用Spark的SparkContext.textFile方法读取原始输入文件,从而生成RDD。
2、每行使用CSVReader解析为字段,并填充到DelayRec对象中
3、然后,我们在输入RDD上执行一系列RDD转换,以确保我们只有与未被取消并且源自ORD的航班相对应的行。
4、最后,我们使用gen_features方法生成每行的最终特征向量,作为一组双精度。

3. 使用Spark和ML-Lib建模

使用data_2007数据集(用于训练)和data_2008数据集(用于验证)作为RDD,然后用Spark的ML-Lib机器学习库构建预测模型。

ML-Lib是Spark的可扩展机器学习库,包括各种学习算法和实用程序,包括分类,回归,聚类,协同过滤,降维等。

要使用ML-Lib的机器学习算法,首先我们将我们的特征矩阵解析为LabeledPoint对象的RDD(用于训练和测试数据集)。LabeledPoint是ML-Lib对带有标签的特征向量的抽象。

我们将15分钟或更长时间的航班延误视为“延迟”,并将其标记为1.0,并在15分钟内标记为“非延迟”,并将其标记为0.0

我们还使用ML-Lib的StandardScaler类来标准化训练和验证集的特征值。这很重要,因为ML-Lib使用随机梯度下降,如果特征向量被归一化,该随机梯度下降表现会最佳。

0x04 打包到服务器执行

注释掉相应的代码:
spark-submit --master yarn --class com.shainaiyi.DelayRecProject --name DelayRecProject /home/hadoop-sny/sparkMLlib-1.0-SNAPSHOT.jar
效果与本地是一样的!

0x05 项目升级

待补充!

0xFF 总结

  1. 通过本实验,我们综合了前面所学习到的知识,比如,构建Maven项目,IDEA编写Scala代码,打包到服务器,本地测试代码,机器学习建模操作,结果校验等等操作,认真学习一定会收获满满的,请自行查看更多知识,举一反三,学以致用。
  2. 执行的过程中,如果发现跑起来非常慢,可以考虑减少数据量,相应的准确率也一样会降低,另外,跑作业的时候,尽量注释掉不需要执行的代码,可以加速执行作业的效率。

邵奈一的技术博客导航
邵奈一 原创不易,如转载请标明出处。


这篇关于Spark项目实战:飞机延误预测项目的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/406926

相关文章

Spring Security基于数据库的ABAC属性权限模型实战开发教程

《SpringSecurity基于数据库的ABAC属性权限模型实战开发教程》:本文主要介绍SpringSecurity基于数据库的ABAC属性权限模型实战开发教程,本文给大家介绍的非常详细,对大... 目录1. 前言2. 权限决策依据RBACABAC综合对比3. 数据库表结构说明4. 实战开始5. MyBA

Spring Boot + MyBatis Plus 高效开发实战从入门到进阶优化(推荐)

《SpringBoot+MyBatisPlus高效开发实战从入门到进阶优化(推荐)》本文将详细介绍SpringBoot+MyBatisPlus的完整开发流程,并深入剖析分页查询、批量操作、动... 目录Spring Boot + MyBATis Plus 高效开发实战:从入门到进阶优化1. MyBatis

MyBatis 动态 SQL 优化之标签的实战与技巧(常见用法)

《MyBatis动态SQL优化之标签的实战与技巧(常见用法)》本文通过详细的示例和实际应用场景,介绍了如何有效利用这些标签来优化MyBatis配置,提升开发效率,确保SQL的高效执行和安全性,感... 目录动态SQL详解一、动态SQL的核心概念1.1 什么是动态SQL?1.2 动态SQL的优点1.3 动态S

Pandas使用SQLite3实战

《Pandas使用SQLite3实战》本文主要介绍了Pandas使用SQLite3实战,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学... 目录1 环境准备2 从 SQLite3VlfrWQzgt 读取数据到 DataFrame基础用法:读

一文教你如何将maven项目转成web项目

《一文教你如何将maven项目转成web项目》在软件开发过程中,有时我们需要将一个普通的Maven项目转换为Web项目,以便能够部署到Web容器中运行,本文将详细介绍如何通过简单的步骤完成这一转换过程... 目录准备工作步骤一:修改​​pom.XML​​1.1 添加​​packaging​​标签1.2 添加

tomcat多实例部署的项目实践

《tomcat多实例部署的项目实践》Tomcat多实例是指在一台设备上运行多个Tomcat服务,这些Tomcat相互独立,本文主要介绍了tomcat多实例部署的项目实践,具有一定的参考价值,感兴趣的可... 目录1.创建项目目录,测试文China编程件2js.创建实例的安装目录3.准备实例的配置文件4.编辑实例的

springboot集成Deepseek4j的项目实践

《springboot集成Deepseek4j的项目实践》本文主要介绍了springboot集成Deepseek4j的项目实践,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价... 目录Deepseek4j快速开始Maven 依js赖基础配置基础使用示例1. 流式返回示例2. 进阶

SpringBoot项目启动报错"找不到或无法加载主类"的解决方法

《SpringBoot项目启动报错找不到或无法加载主类的解决方法》在使用IntelliJIDEA开发基于SpringBoot框架的Java程序时,可能会出现找不到或无法加载主类com.example.... 目录一、问题描述二、排查过程三、解决方案一、问题描述在使用 IntelliJ IDEA 开发基于

Python实战之屏幕录制功能的实现

《Python实战之屏幕录制功能的实现》屏幕录制,即屏幕捕获,是指将计算机屏幕上的活动记录下来,生成视频文件,本文主要为大家介绍了如何使用Python实现这一功能,希望对大家有所帮助... 目录屏幕录制原理图像捕获音频捕获编码压缩输出保存完整的屏幕录制工具高级功能实时预览增加水印多平台支持屏幕录制原理屏幕

SpringBoot项目使用MDC给日志增加唯一标识的实现步骤

《SpringBoot项目使用MDC给日志增加唯一标识的实现步骤》本文介绍了如何在SpringBoot项目中使用MDC(MappedDiagnosticContext)为日志增加唯一标识,以便于日... 目录【Java】SpringBoot项目使用MDC给日志增加唯一标识,方便日志追踪1.日志效果2.实现步