R︱sparkR的安装与使用、函数尝试笔记、一些案例

2023-12-21 03:38

本文主要是介绍R︱sparkR的安装与使用、函数尝试笔记、一些案例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本节内容转载于博客: wa2003 

spark是一个我迟早要攻克的内容呀~

—————————————————————————————————————


一、SparkR 1.4.0 的安装及使用


1、./sparkR打开R shell之后,使用不了SparkR的函数

装在了   /usr/local/spark-1.4.0/ 下

[root@master sparkR]#./bin/sparkR

能进入R,和没装SparkR的一样,无报错

> library(SparkR)

报错: 
Error in library.dynam(lib, package, package.lib) :  
  shared object ?.parkR.so?.not found 
Error: package or namespace load failed for ?.parkR? 


解决办法:重新编译sparkR之后,运行以下命令:

[root@master sparkR]# ./install-dev.sh

然后运行    

[root@elcndc2bdwd01t spark-1.4.0]# ./bin/sparkR

R version 3.2.0 (2015-04-16) -- "Full of Ingredients"
Copyright (C) 2015 The R Foundation for Statistical Computing
Platform: x86_64-unknown-linux-gnu (64-bit)

R is free software and comes with ABSOLUTELY NO WARRANTY.
You are welcome to redistribute it under certain conditions.
Type 'license()' or 'licence()' for distribution details.

  Natural language support but running in an English locale

R is a collaborative project with many contributors.
Type 'contributors()' for more information and
'citation()' on how to cite R or R packages in publications.

Type 'demo()' for some demos, 'help()' for on-line help, or
'help.start()' for an HTML browser interface to help.
Type 'q()' to quit R.

下面是启动SparkR那些,包括加载SparkR的库,自动生成 Sparkcontext和sqlContext。
Launching java with spark-submit command /usr/local/spark-1.4.0/bin/spark-submit  "sparkr-shell" /tmp/RtmpAN5LID/backend_port7d49547c6f51
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/25 13:33:13 INFO SparkContext: Running Spark version 1.4.0
.........................................
15/06/25 13:33:16 INFO BlockManagerMaster: Registered BlockManager

 Welcome to SparkR!
 Spark context is available as sc, SQL context is available as sqlContext

2、修改log4j的日志控制台打印级别

在Spark的conf目录下,把log4j.properties.template修改为log4j.properties

[appadmin@elcndc2bdwd01t bin]$ cd /usr/local/spark-1.4.0/

$ sudo mv log4j.properties.template  log4j.properties

把log4j.rootCategory=INFO, console改为log4j.rootCategory=ERROR, console即可抑制Spark把INFO级别的日志打到控制台上。

如果要显示全面的信息,则把INFO改为DEBUG。

3、在Rstudio 下使用sparkR的设置

(1)建立sparkR包的位置与rstudio包位置的软链接,用 ln -s 命令 

[root@elcndc2bdwd01t /]#  ln -s /usr/local/spark-1.4.0/R/lib/SparkR    /home/enn_james/R/x86_64-unknown-linux-gnu-library/3.2 

(2)在R的环境设置文件(.Rprofile)中增加一行

Sys.setenv(SPARK_HOME=”/usr/local/spark-1.4.0”)  


两个配置文件,.Renviron和.Rprofile。这两个文件名看起来有点奇怪,怪在哪儿?它们只有扩展名,没有主文件名
在操作系统中有一个默认的规则,凡是以点开头的文件都是隐藏文件,而且通常都是配置文件。前面那句list.files()代码你要是运行过,可能就会发现很多以点开头的文件和文件夹。
R启动的时候会在系统的若干位置寻找配置文件,如果文件存在就会使用这些配置。
其中.Renviron文件用来设置一些R要用的环境变量,而.Rprofile文件则是一个R代码文件,在R启动时,如果这个文件存在,它会被首先执行。因此,如果我们有一些任务要在R启动时运行,或有一些个人选项要配置,都可以写在这个文件里。

3、4040端口看Spark的任务执行情况

http://10.37.148.39:4040/jobs/


—————————————————————————————————————


二、SparkR跑通的函数(持续更新中...)


spark1.4.0的sparkR的思路:用spark从大数据集中抽取小数据(sparkR的DataFrame),然后到R里分析(DataFrame)。

这两个DataFrame是不同的,前者是分布式的,集群上的DF,R里的那些包都不能用;后者是单机版的DF,包里的函数都能用。

sparkR的开发计划,个人觉得是将目前包里的函数,迁移到sparkR的DataFrame里,这样就打开一片天地。


> a<- sql(hiveContext, "SELECT count(*) FROM anjuke_scores where restaurant>=10");

> a<- sql(hiveContext, "SELECT * FROM anjuke_scores limit 5")
> a
DataFrame[city:string, housingname:string, ori_traffic_score:int, ori_traffic_score_normal:double, metro_station:double, metro_station_normal:double,...
> first(a)  #显示Formal Data Frame的第一行

> head(a) ;  #列出a的前6行 
> columns(a)      # 列出全部的列 
[1] "city"                      "housingname"               "ori_traffic_score"         "ori_traffic_score_normal" 

[5] "metro_station"             "metro_station_normal"      "bus_station"               "bus_station_normal"  ...

> showDF(a)
> b<-filter(a, a$ori_comfort>8); # 行筛选, ori_comfort_normal:double 

> print(a);    #打印列名及类型  
DataFrame[city:string, housingname:string, ori_traffic_score:int, ......

> printSchema(a); # 打印列名的树形框架概要 root |-- city: string (nullable = true) |-- housingname: string (nullable = true) |-- ori_traffic_score: integer (nullable = true) |-- ori_traffic_score_normal: double (nullable = true) |-- metro_station: double (nullable = true)
> take(a,10)   ;  # 提取Formal class DataFrame的前面num行,成为R中普通的 data frame , take(x, num) 

     city                  housingname ori_traffic_score ori_traffic_score_normal metro_station metro_station_normal 
1  \t\x9a                   \xddrw\xb8                NA                        0            NA                    0 
2  \t\x9a         \xe4\xf04\u03a2\021~                NA                        0            NA                    0 
3  \t\x9a                \xf6\xe3w\xb8                NA                        0            NA                    0 
4  \t\x9a               \x8e=\xb0w\xb8                NA                        0            NA                    0 
5  \t\x9a \t\x9a\xe4\xf04\xce\xe4\xf0~                NA                        0            NA                    0 
6  \t\x9a                      q4\xfdE                NA                        0            NA                    0 
7  \t\x9a                \xe4\xf04\xce                NA                        0            NA                    0 
8  \t\x9a                      )\xfdVT                NA                        0            NA                    0 
9  \t\x9a                       q\177V                NA                        0            NA                    0 
10 \t\x9a           \xe4\xf04\xceW\xb8                NA                        0            NA                    0 

> b<-take(a,10) 
> dim(b)
[1] 10 41

> aa <- withColumn(a, "ori_comfort_aa", a$ori_comfort * 5) #用现有的列生成新的列, 新增一列,ori_comfort_aa,结果还是Formal data frame结构
> printSchema(aa)
root|-- city: string (nullable = true)
.........|-- comfort_normal: double (nullable = true)|-- ori_comfort_aa: double (nullable = true)
> aa <- mutate(a, newCol1 = a$commerce_normal * 5, newCol2 = a$bank_normal * 2) ;   #与withColumn类似  
> printSchema(aa) 
root 
 |-- city: string (nullable = true) 
 。。。。。。。。。。。。。。。。。。 
 |-- comfort_normal: double (nullable = true) 
 |-- newCol1: double (nullable = true) 
 |-- newCol2: double (nullable = true) 


a1<-arrange(a,asc(a$level_tow));  # 按列排序, asc升序,desc降序

a1<-orderBy(a,asc(a$level_tow));  # 按列排序

count(a) ;  # 统计 Formal Data Frame有多少行数据

> dtypes(a); #以list的形式列出Formal Data Frame的全部列名及类型
[[1]]
[1] "city"   "string"[[2]]
[1] "housingname" "string"  
> a<-withColumnRenamed(a,"comfort_normal","AA");  # 更改列名  
> printSchema(a)
root|-- city: string (nullable = true)|-- housingname: string (nullable = true)
..........|-- AA: double (nullable = true)


创建sparkR的数据框的函数createDataFrame

> df<-createDataFrame(sqlContext,a.df);  # a.df是R中的数据框, df是sparkR的数据框,注意:使用sparkR的数据库,需要sqlContext
> str(a.df) 
'data.frame':    5 obs. of  41 variables: 

> str(df) 
Formal class 'DataFrame' [package "SparkR"] with 2 slots 
  ..@ env:<environment: 0x4fce350> 
  ..@ sdf:Class 'jobj' <environment: 0x4fc70b0> 

> destDF <- select(SFO_DF, "dest", "cancelled");  #选择列

> showDF(destDF);   #显示sparkR的DF
+----+---------+
|dest|cancelled|
+----+---------+
| SFO|        0|
................

> registerTempTable(SFO_DF, "flightsTable");  #要对sparkDF使用SQL语句,首先需要将DF注册成一个table
 
> wa <- sql(sqlContext, "SELECT dest, cancelled FROM flightsTable"); #在sqlContext下使用SQL语句

> showDF(wa);   #查询的结果还是sparkDF
+----+---------+
|dest|cancelled|
+----+---------+
| SFO|        0|
................
> local_df <- collect(wa);   #将sparkDF转换成R中的DF
> str(local_df)
'data.frame':    2818 obs. of  2 variables:
 $ dest     : chr  "SFO" "SFO" "SFO" "SFO" ...
 $ cancelled: int  0 0 0 0 0 0 0 0 0 0 ...

> wa<-flights_df[1:1000,];   #wa是R中的DF
> flightsDF<-createDataFrame(sqlContext,wa) ;   #flightsDF是sparkR的DF
> library(magrittr); #管道函数的包对sparkRDF适用
> groupBy(flightsDF, flightsDF$date) %>%
+     summarize(avg(flightsDF$dep_delay), avg(flightsDF$arr_delay)) -> dailyDelayDF;  #注意,语法和dplyr中的有所不同,结果还是sparkRDF

> str(dailyDelayDF)
Formal class 'DataFrame' [package "SparkR"] with 2 slots
  ..@ env:<environment: 0x4cd3118> 
  ..@ sdf:Class 'jobj' <environment: 0x4cd6968> 
> showDF(dailyDelayDF)
+----------+--------------------+--------------------+
|      date|      AVG(dep_delay)|      AVG(arr_delay)|
+----------+--------------------+--------------------+
|2011-01-01|                 5.2|                 5.8|
|2011-01-02|  1.8333333333333333|                -2.0|
................

在39机器上跑的

collect将sparkDF转化成DF
Collects all the elements of a Spark DataFrame and coerces them into an R data.frame.
collect(x, stringsAsFactors = FALSE),x:A SparkSQL DataFrame

> dist_df<- sql(hiveContext, "SELECT * FROM anjuke_scores where restaurant<=1");
> local_df <- dist_df %>% 
      groupBy(dist_df$city) %>% 
      summarize(count = n(dist_df$housingname)) %>% 
      collect
> local_df
           city count
1        \t\x9a     5
2         8\xde     7
3      \xf0\xde     2
..........
..........

take也可将sparkDF转化成DF
Take the first NUM rows of a DataFrame and return a the results as a data.frame
take(x, num)


> local_df <- dist_df %>% 
      groupBy(dist_df$city) %>% 
      summarize(count = n(dist_df$housingname))
> a<-take(local_df,100)
[Stage 16:=========================================>            (154 + 1) / 199]                                                                                > View(a)
> a
           city count
1        \t\x9a     5
2         8\xde     7
3      \xf0\xde     2
..........
..........



不通的函数:

> describe(a)
Error in x[present, drop = FALSE] : object of type 'S4' is not subsettable
> jfkDF <- filter(flightsDF, flightsDF$dest == "DFW")
Error in filter(flightsDF, flightsDF$dest == "DFW") : no method for coercing this S4 class to a vector


——————————————————————————————————————————————————————

三、用Spark分析Amazon的8000万商品评价


这篇文章里面提到了spark通过R的调取轻松胜任了复杂的数据查询功能,同时用ggplot2进行可视化操作。该案例是一个很好的sparkR的使用案例,国内翻译过来不够全面,想深入研究的请看原文:http://minimaxir.com/2017/01/amazon-spark/


使用面对R语言的新的升级包,我可以使用一个spark_connect()命令轻松启动本地Spark集群,并使用单个spark_read_csv()命令很快将整个CSV加载到集群中。


1、用sparkR进行大规模数据整理



在数据集中总共有8074万条记录,即8.074e + 07条。如果使用传统工具(如dplyr或甚至Python pandas)高级查询,这样的数据集将需要相当长的时间来执行。


使用sparklyr,操作实际很大的数据就像对只有少数记录的数据集执行分析一样简单(并且比上面提到的eDX类中教授的Python方法简单一个数量级)。



2、用Rnotebook+ggplot2.0进行可视化


作者写了一些ggplot2实现可视化的函数,在他的github:https://github.com/minimaxir/amazon-spark?spm=5176.100239.blogcont69165.13.Eo3vpV

列举几个:

library(readr)
library(dplyr)
library(ggplot2)
library(extrafont)
library(scales)
library(grid)
library(RColorBrewer)
library(digest)
library(readr)
library(stringr)fontFamily <- "Source Sans Pro"
fontTitle <- "Source Sans Pro Semibold"color_palette = c("#16a085","#27ae60","#2980b9","#8e44ad","#f39c12","#c0392b","#1abc9c", "#2ecc71", "#3498db", "#9b59b6", "#f1c40f","#e74c3c")neutral_colors = function(number) {return (brewer.pal(11, "RdYlBu")[-c(5:7)][(number %% 8) + 1])
}set1_colors = function(number) {return (brewer.pal(9, "Set1")[c(-6,-8)][(number %% 7) + 1])
}theme_custom <- function() {theme_bw(base_size = 8) + theme(panel.background = element_rect(fill="#eaeaea"),plot.background = element_rect(fill="white"),panel.grid.minor = element_blank(),panel.grid.major = element_line(color="#dddddd"),axis.ticks.x = element_blank(),axis.ticks.y = element_blank(),axis.title.x = element_text(family=fontTitle, size=8, vjust=-.3),axis.title.y = element_text(family=fontTitle, size=8, vjust=1.5),panel.border = element_rect(color="#cccccc"),text = element_text(color = "#1a1a1a", family=fontFamily),plot.margin = unit(c(0.25,0.1,0.1,0.35), "cm"),plot.title = element_text(family=fontTitle, size=9, vjust=1))                          
}create_watermark <- function(source = '', filename = '', dark=F) {bg_white = "#FFFFFF"
bg_text = '#969696'if (dark) {bg_white = "#000000"bg_text = '#666666'
}












这篇关于R︱sparkR的安装与使用、函数尝试笔记、一些案例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/518523

相关文章

如何解决mmcv无法安装或安装之后报错问题

《如何解决mmcv无法安装或安装之后报错问题》:本文主要介绍如何解决mmcv无法安装或安装之后报错问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录mmcv无法安装或安装之后报错问题1.当我们运行YOwww.chinasem.cnLO时遇到2.找到下图所示这里3.

Pydantic中Optional 和Union类型的使用

《Pydantic中Optional和Union类型的使用》本文主要介绍了Pydantic中Optional和Union类型的使用,这两者在处理可选字段和多类型字段时尤为重要,文中通过示例代码介绍的... 目录简介Optional 类型Union 类型Optional 和 Union 的组合总结简介Pyd

Vue3使用router,params传参为空问题

《Vue3使用router,params传参为空问题》:本文主要介绍Vue3使用router,params传参为空问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录vue3使用China编程router,params传参为空1.使用query方式传参2.使用 Histo

Python 安装和配置flask, flask_cors的图文教程

《Python安装和配置flask,flask_cors的图文教程》:本文主要介绍Python安装和配置flask,flask_cors的图文教程,本文通过图文并茂的形式给大家介绍的非常详细,... 目录一.python安装:二,配置环境变量,三:检查Python安装和环境变量,四:安装flask和flas

使用Python自建轻量级的HTTP调试工具

《使用Python自建轻量级的HTTP调试工具》这篇文章主要为大家详细介绍了如何使用Python自建一个轻量级的HTTP调试工具,文中的示例代码讲解详细,感兴趣的小伙伴可以参考一下... 目录一、为什么需要自建工具二、核心功能设计三、技术选型四、分步实现五、进阶优化技巧六、使用示例七、性能对比八、扩展方向建

使用Python实现一键隐藏屏幕并锁定输入

《使用Python实现一键隐藏屏幕并锁定输入》本文主要介绍了使用Python编写一个一键隐藏屏幕并锁定输入的黑科技程序,能够在指定热键触发后立即遮挡屏幕,并禁止一切键盘鼠标输入,这样就再也不用担心自己... 目录1. 概述2. 功能亮点3.代码实现4.使用方法5. 展示效果6. 代码优化与拓展7. 总结1.

使用Python开发一个简单的本地图片服务器

《使用Python开发一个简单的本地图片服务器》本文介绍了如何结合wxPython构建的图形用户界面GUI和Python内建的Web服务器功能,在本地网络中搭建一个私人的,即开即用的网页相册,文中的示... 目录项目目标核心技术栈代码深度解析完整代码工作流程主要功能与优势潜在改进与思考运行结果总结你是否曾经

C/C++错误信息处理的常见方法及函数

《C/C++错误信息处理的常见方法及函数》C/C++是两种广泛使用的编程语言,特别是在系统编程、嵌入式开发以及高性能计算领域,:本文主要介绍C/C++错误信息处理的常见方法及函数,文中通过代码介绍... 目录前言1. errno 和 perror()示例:2. strerror()示例:3. perror(

Linux中的计划任务(crontab)使用方式

《Linux中的计划任务(crontab)使用方式》:本文主要介绍Linux中的计划任务(crontab)使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、前言1、linux的起源与发展2、什么是计划任务(crontab)二、crontab基础1、cro

kotlin中const 和val的区别及使用场景分析

《kotlin中const和val的区别及使用场景分析》在Kotlin中,const和val都是用来声明常量的,但它们的使用场景和功能有所不同,下面给大家介绍kotlin中const和val的区别,... 目录kotlin中const 和val的区别1. val:2. const:二 代码示例1 Java