Parallelize your massive SHAP computations with MLlib and PySpark

2024-06-16 14:28

本文主要是介绍Parallelize your massive SHAP computations with MLlib and PySpark,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

https://medium.com/towards-data-science/parallelize-your-massive-shap-computations-with-mllib-and-pyspark-b00accc8667c

(能翻墙直接看原文)

A stepwise guide for efficiently explaining your models using SHAP.

Photo by Pietro Jeng on Unsplash

Introduction to MLlib

Apache Spark’s Machine Learning Library (MLlib) is designed primarily for scalability and speed by leveraging the Spark runtime for common distributed use cases in supervised learning like classification and regression, unsupervised learning like clustering and collaborative filtering and in other cases like dimensionality reduction. In this article, I cover how we can use SHAP to explain a Gradient Boosted Trees (GBT) model that has fit our data at scale.

What are Gradient Boosted Trees?

Before we understand what Gradient Boosted Trees are, we need to understand boosting. Boosting is an ensemble technique that sequentially combines a number of weak learners to achieve an overall strong learner. In case of Gradient Boosted Trees, each weak learner is a decision tree that sequentially minimizes the errors (MSE in case of regression and log loss in case of classification) generated by the previous decision tree in that sequence. To read about GBTs in more detail, please refer to this blog post.

Understanding our imports

from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.ml.classification import GBTClassificationModel
import shap
import pyspark.sql.functions as F
from pyspark.sql.types import *

The first two imports are for initializing a Spark session. It will be used for converting our pandas dataframe to a spark one. The third import is used to load our GBT model into memory which will be passed to our SHAP explainer to generate explanations. The SHAP explainer itself will be initialized using the SHAP package using the fourth import. The penultimate and last import is for performing SQL functions and using SQL types. These will be used in our User-Defined Function (UDF) which I shall describe later.

Converting our MLlib GBT feature vector to a Pandas dataframe

The SHAP Explainer takes a dataframe as input. However, training an MLlib GBT model requires data preprocessing. More specifically, the categorical variables in our data needs to be converted into numeric variables using either Category Indexing or One-Hot Encoding. To learn more about how to train a GBT model, refer to this article). The resulting “features” column is a SparseVector (to read more on it, check the “Preprocess Data” section in this example). It looks like something below:

SparseVector features column description — 1. default index value, 2. vector length, 3. list of indexes of the feature columns, 4. list of data values at the corresponding index at 3. [Image by author]

The “features” column shown above is for a single training instance. We need to transform this SparseVector for all our training instances. One way to do it is to iteratively process each row and append to our pandas dataframe that we will feed to our SHAP explainer (ouch!). There is a much faster way, which leverages the fact that we have all of our data loaded in memory (if not, we can load it in batches and perform the preprocessing for each in-memory batch). In Shikhar Dua’s words:

1. Create a list of dictionaries in which each dictionary corresponds to an input data row.

2. Create a data frame from this list.

So, based on the above method, we get something like this:

rows_list = []
for row in spark_df.rdd.collect(): dict1 = {} dict1.update({k:v for k,v in zip(spark_df.cols,row.features)})rows_list.append(dict1) 
pandas_df = pd.DataFrame(rows_list)

If rdd.collect() looks scary, it’s actually pretty simple to explain. Resilient Distributed Datasets (RDD) are fundamental Spark data structures that are an immutable distribution of objects. Each dataset in an RDD is further subdivided into logical partitions that can be computed in different worker nodes of our Spark cluster. So, all PySpark RDD collect() does is retrieve data from all the worker nodes to the driver node. As you might guess, this is a memory bottleneck, and if we are handling data larger than our driver node’s memory capacity, we need to increase the number of our RDD partitions and filter them by partition index. Read how to do that here.

Don’t take my word on the execution performance. Check out the stats.

Performance profiling for inserting rows to a pandas dataframe. [Source (Thanks to Mikhail_Sam and Peter Mortensen): here]

Here are the metrics from one of my Databricks notebook scheduled job runs:

Input size: 11.9 GiB (~12.78GB), Total time Across All Tasks: 20 min, Number of records: 165.16K

Summary Metrics for 125 Completed Tasks executed by the stage that run the above cell. [Image by author]

Working with the SHAP Library

We are now ready to pass our preprocessed dataset to the SHAP TreeExplainer. Remember that SHAP is a local feature attribution method that explains individual predictions as an algebraic sum of the shapley values of the features of our model.

We use a TreeExplainer for the following reasons:

  1. Suitable: TreeExplainer is a class that computes SHAP values for tree-based models (Random Forest, XGBoost, LightGBM, GBT, etc).
  2. Exact: Instead of simulating missing features by random sampling, it makes use of the tree structure by simply ignoring decision paths that rely on the missing features. The TreeExplainer output is therefore deterministic and does not vary based on the background dataset.
  3. Efficient: Instead of iterating over each possible feature combination (or a subset thereof), all combinations are pushed through the tree simultaneously, using a more complex algorithm to keep track of each combination’s result — reducing complexity from O(TL2ᵐ) for all possible coalitions to the polynomial O(TLD²) (where is the number of features, is number of trees, is maximum number of leaves and is maximum tree depth).

The check_additivity = False flag runs a validation check to verify if the sum of SHAP values equals to the output of the model. However, this flag requires predictions to be run that are not supported by Spark, so it needs to be set to False as it is ignored anyway. Once we get the SHAP values, we convert it into a pandas dataframe from a Numpy array, so that it is easily interpretable.

One thing to note is that the dataset order is preserved when we convert a Spark dataframe to pandas, but the reverse is not true.

The points above lead us to the code snippet below:

gbt = GBTClassificationModel.load('your-model-path') 
explainer = shap.TreeExplainer(gbt)
shap_values = explainer(pandas_df, check_additivity = False)
shap_pandas_df = pd.DataFrame(shap_values.values, cols = pandas_df.columns)

An Introduction to Pyspark UDFs and when to use them

How PySpark UDFs distribute individual tasks to worker (executor) nodes [Source: here]

User-Defined Functions are complex custom functions that operate on a particular row of our dataset. These functions are generally used when the native Spark functions are not deemed sufficient to solve the problem. Spark functions are inherently faster than UDFs because it is natively a JVM structure whose methods are implemented by local calls to Java APIs. However, PySpark UDFs are Python implementations that requires data movement between the Python interpreter and the JVM (refer to Arrow 4 in the picture above). This inevitably introduces some processing delay.

If no processing delays can be tolerated, the best thing to do is create a Python wrapper to call the Scala UDF from PySpark itself. A great example is shown in this blog. However, using a PySpark UDF was sufficient for my use case, since it is easy to understand and code.

The code below explains the Python function to be executed on each worker/executor node. We just pick up the highest SHAP values (absolute values as we want to find the most impactful negative features as well) and append it to the respective pos_features and neg_features list and in turn append both these lists to a features list that is returned to the caller.

def shap_udf(row):dict = {} pos_features = [] neg_features = [] for feature in row.columns: dict[feature] = row[feature]     dict_importance = {key: value for key, value insorted(dict.items(), key=lambda item: __builtin__.abs(item[1]),   reverse = True)}     for k,v in dict_importance.items(): if __builtin__.abs(v) >= <your-threshold-shap-value>: if v > 0: pos_features.append((k,v)) else: neg_features.append((k,v)) features = [] features.append(pos_features[:5]) features.append(neg_features[:5])    return features

We then register our PySpark UDF with our Python function name (in my case, it is shap_udf) and specify the return type (mandatory in Python and Java) of the function in the parameters to F.udf(). There are two lists in the outer ArrayType(), one for positive features and the other for negative ones. Since each individual list comprises of at most 5 (feature-name, shap-value) StructType() pairs, it represents the inner ArrayType(). Below is the code:

udf_obj = F.udf(shap_udf, ArrayType(ArrayType(StructType([ StructField(‘Feature’, StringType()), 
StructField(‘Shap_Value’, FloatType()),
]))))

Now, we just create a new Spark dataframe with a column called ‘Shap_Importance’ that invokes our UDF for each row of the spark_shapdf dataframe. To split the positive and negative features, we create two columns in a new Spark dataframe called final_sparkdf. Our final code-snippet looks like below:

new_sparkdf = spark_df.withColumn(‘Shap_Importance’, udf_obj(F.struct([spark_shapdf[x] for x in spark_shapdf.columns])))final_sparkdf = new_sparkdf.withColumn(‘Positive_Shap’, final_sparkdf.Shap_Importance[0]).withColumn(‘Negative_Shap’, new_sparkdf.Shap_Importance[1])

And finally, we have extracted all the important features of our GBT model per testing instance without the use of any explicit for loops! The consolidated code can be found in the below GitHub gist.

from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.ml.classification import GBTClassificationModel
import shap
import pyspark.sql.functions as  F
from pyspark.sql.types import *#convert the sparse feature vector that is passed to the MLlib GBT model into a pandas dataframe. 
#This 'pandas_df' will be passed to the Shap TreeExplainer.
rows_list = []
for row in spark_df.rdd.collect(): dict1 = {}dict1.update({k:v for k,v in zip(spark_df.cols,row.features)})rows_list.append(dict1)pandas_df = pd.DataFrame(rows_list)#Load the GBT model from the path you have saved it
gbt = GBTClassificationModel.load("<your path where the GBT model is loaded>") 
#make sure the application where your notebook runs has access to the storage path!explainer = shap.TreeExplainer(gbt)
#check_additivity requires predictions to be run that is not supported by spark [yet], so it needs to be set to False as it is ignored anyway.
shap_values = explainer(pandas_df, check_additivity = False)
shap_pandas_df = pd.DataFrame(shap_values.values, cols = pandas_df.columns)spark = SparkSession.builder.config(conf=SparkConf().set("spark.master", "local[*]")).getOrCreate()
spark_shapdf = spark.createDataFrame(shap_pandas_df)def shap_udf(row): #work on a single spark dataframe row, for all rows. This work is distributed among all the worker nodes of your Apache Spark cluster.dict = {}pos_features = []neg_features = []for feature in row.columns:dict[feature] = row[feature]dict_importance = {key: value for key, value in sorted(dict.items(), key=lambda item: __builtin__.abs(item[1]), reverse = True)}for k,v in dict_importance.items():if __builtin__.abs(v) >= <your-threshold-shap-value>:if v > 0:pos_features.append((k,v))else:neg_features.append((k,v))features = []#taking top 5 features from pos and neg features. We can increase this number.features.append(pos_features[:5])features.append(neg_features[:5])return featuresudf_obj = F.udf(shap_udf, ArrayType(ArrayType(StructType([StructField('Feature', StringType()),StructField('Shap_Value', FloatType()),
]))))new_sparkdf = spark_df.withColumn('Shap_Importance', udf_obj(F.struct([spark_shapdf[x] for x in spark_shapdf.columns])))
final_sparkdf = new_sparkdf.withColumn('Positive_Shap', final_sparkdf.Shap_Importance[0]).withColumn('Negative_Shap', new_sparkdf.Shap_Importance[1])

Get the most impactful Positive and Negative SHAP values from our fitted GBT Model

P.S. This is my first attempt at writing an article and if there are any factual or statistical inconsistencies, please reach out to me and I shall be more than happy to learn together with you! :)

References

[1] Soner Yıldırım, Gradient Boosted Decision Trees-Explained (2020), Towards Data Science

[2] Susan Li, Machine Learning with PySpark and MLlib — Solving a Binary Classification Problem (2018), Towards Data Science

[3] Stephen Offer, How to Train XGBoost With Spark (2020), Data Science and ML

[4] Use Apache Spark MLlib on Databricks (2021), Databricks

[5] Umberto Griffo, Don’t collect large RDDs (2020), Apache Spark — Best Practices and Tuning

[6] Nikhilesh Nukala, Yuhao Zhu, Guilherme Braccialli, Tom Goldenberg (2019), Spark UDF — Deep Insights in Performance, QuantumBlack

这篇关于Parallelize your massive SHAP computations with MLlib and PySpark的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1066690

相关文章

基于协方差信息的Massive MIMO信道估计算法性能研究

1. 引言 随着移动互联网不断发展,人们对通信的速率和可靠性的要求越来越高[1]。目前第四代移动通信系统已经逐渐商用,研究人员开始着手研究下一代移动通信系统相关技术[2][3]。在下一代移动通信系统中要求下行速率达到10Gbps,这就要求我们使用更先进的技术和更宽的系统带宽。MIMO技术由于可以在不增加系统带宽和功率的前提下,成倍的提升系统容量和可靠性,已经广泛应用于各种无线通信系统中,但仅采用

Spark中parallelize函数和makeRDD函数的区别

我们知道,在Spark中创建RDD的创建方式大概可以分为三种:(1)、从集合中创建RDD;(2)、从外部存储创建RDD;(3)、从其他RDD创建。   而从集合中创建RDD,Spark主要提供了两中函数:parallelize和makeRDD。我们可以先看看这两个函数的声明: def parallelize[T:ClassTag](seq:Seq[T],numSlices:Int =def

pyspark ERROR lzo.GPLNativeCodeLoader: Could not load native gpl library

使用pyspark出现问题: 14/10/24 14:51:40 ERROR lzo.GPLNativeCodeLoader: Could not load native gpl library java.lang.UnsatisfiedLinkError: no gplcompression in java.library.path cp /usr/lib/hadoo

Explain Python Machine Learning Models with SHAP Library

Explain Python Machine Learning Models with SHAP Library – Minimatech (能翻墙直接看原文) Explain Python Machine Learning Models with SHAP Library 11 September 2021Muhammad FawiMachine Learning Using S

PythonSQL应用随笔4——PySpark创建SQL临时表

零、前言 Python中直接跑SQL,可以很好的解决数据导过来导过去的问题,本文方法主要针对大运算量时,如何更好地让Python和SQL打好配合。 工具:Zeppelin 语法:PySpark(Apache Spark的Python API)、SparkSQL 数据库类型:Hive 一、相关方法 .createOrReplaceTempView() 在PySpark中,createOrRe

《Learning Spark》第十一章:Spark(MLlib)与机器学习

2020/07/06 - 引言 我一直以为这部分内容,其实没有什么大不了的,反正就是弄出来数据,然后跑算法就完事了嘛。但是仔细读了读这小节,发现了一些不一样的地方。这里来列举一下,我读完这个章节之后的一些想法,然后再具体来记录这部分的主要内容。 1)他算法的输入是需要转化一下的(向量),我看到的大多是通过了某种特征工程的东西,然后这种就已经是满足的数据格式了,不知道普通的矩阵向量应该怎么弄呢?m

计算机毕业设计Python+Django农产品推荐系统 农产品爬虫 农产品商城 农产品大数据 农产品数据分析可视化 PySpark Hadoop Hive

课题研究的意义,国内外研究现状、水平和发展趋势 研究意义21世纪是一个信息爆炸的时代,人们在日常生活中可接触到的信息量非常之巨大。推荐系统逐步发展,其中又以个性化推荐系统最为瞩目。个性化推荐系统的核心在于个性化推荐算法,该算法不需要用户提供明确的需求,而是使用从用户那里收集到的各种信息作为特征,进而为用户建立个性化的偏好模型,最终把满足个人品味和需求的信息推荐给用户。 随着国内电商环

Python第二语言(十三、PySpark实战)

目录 1.开篇 2. PySpark介绍 3. PySpark基础准备 3.1 PySpark安装 3.2 掌握PySpark执行环境入口对象的构建 3.3 理解PySpark的编程模型 4. PySpark:RDD对象数据输入 4.1 RDD对象概念:PySpark支持多种数据的输入,完成后会返回RDD类的对象; 4.2 Python数据容器转RDD对象.parallelize

PyCharm 远程连接linux中Python 运行pyspark

PySpark in PyCharm on a remote server 1、确保remote端Python、spark安装正确 2、remote端安装、设置 vi /etc/profile 添加一行:PYTHONPATH= SPARKHOME/python/: SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip source /etc/profile

利用pyspark评估lightgbm模型

1.打包conda虚拟环境并上传到hdfs (1)打包虚拟环境 cd /home/work/.conda/envs/light_gbm zip -r -q lgb.zip ./ 注意:这里打包的位置会影响到后面指定executor的pyspark环境。 一定要到虚拟环境light_gbm这一层目录下,而不是envs这一层目录 (2)把打包好的虚拟环境上传到hdfs hadoop fs