Parallelize your massive SHAP computations with MLlib and PySpark

2024-06-16 14:28

本文主要是介绍Parallelize your massive SHAP computations with MLlib and PySpark,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

https://medium.com/towards-data-science/parallelize-your-massive-shap-computations-with-mllib-and-pyspark-b00accc8667c

(能翻墙直接看原文)

A stepwise guide for efficiently explaining your models using SHAP.

Photo by Pietro Jeng on Unsplash

Introduction to MLlib

Apache Spark’s Machine Learning Library (MLlib) is designed primarily for scalability and speed by leveraging the Spark runtime for common distributed use cases in supervised learning like classification and regression, unsupervised learning like clustering and collaborative filtering and in other cases like dimensionality reduction. In this article, I cover how we can use SHAP to explain a Gradient Boosted Trees (GBT) model that has fit our data at scale.

What are Gradient Boosted Trees?

Before we understand what Gradient Boosted Trees are, we need to understand boosting. Boosting is an ensemble technique that sequentially combines a number of weak learners to achieve an overall strong learner. In case of Gradient Boosted Trees, each weak learner is a decision tree that sequentially minimizes the errors (MSE in case of regression and log loss in case of classification) generated by the previous decision tree in that sequence. To read about GBTs in more detail, please refer to this blog post.

Understanding our imports

from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.ml.classification import GBTClassificationModel
import shap
import pyspark.sql.functions as F
from pyspark.sql.types import *

The first two imports are for initializing a Spark session. It will be used for converting our pandas dataframe to a spark one. The third import is used to load our GBT model into memory which will be passed to our SHAP explainer to generate explanations. The SHAP explainer itself will be initialized using the SHAP package using the fourth import. The penultimate and last import is for performing SQL functions and using SQL types. These will be used in our User-Defined Function (UDF) which I shall describe later.

Converting our MLlib GBT feature vector to a Pandas dataframe

The SHAP Explainer takes a dataframe as input. However, training an MLlib GBT model requires data preprocessing. More specifically, the categorical variables in our data needs to be converted into numeric variables using either Category Indexing or One-Hot Encoding. To learn more about how to train a GBT model, refer to this article). The resulting “features” column is a SparseVector (to read more on it, check the “Preprocess Data” section in this example). It looks like something below:

SparseVector features column description — 1. default index value, 2. vector length, 3. list of indexes of the feature columns, 4. list of data values at the corresponding index at 3. [Image by author]

The “features” column shown above is for a single training instance. We need to transform this SparseVector for all our training instances. One way to do it is to iteratively process each row and append to our pandas dataframe that we will feed to our SHAP explainer (ouch!). There is a much faster way, which leverages the fact that we have all of our data loaded in memory (if not, we can load it in batches and perform the preprocessing for each in-memory batch). In Shikhar Dua’s words:

1. Create a list of dictionaries in which each dictionary corresponds to an input data row.

2. Create a data frame from this list.

So, based on the above method, we get something like this:

rows_list = []
for row in spark_df.rdd.collect(): dict1 = {} dict1.update({k:v for k,v in zip(spark_df.cols,row.features)})rows_list.append(dict1) 
pandas_df = pd.DataFrame(rows_list)

If rdd.collect() looks scary, it’s actually pretty simple to explain. Resilient Distributed Datasets (RDD) are fundamental Spark data structures that are an immutable distribution of objects. Each dataset in an RDD is further subdivided into logical partitions that can be computed in different worker nodes of our Spark cluster. So, all PySpark RDD collect() does is retrieve data from all the worker nodes to the driver node. As you might guess, this is a memory bottleneck, and if we are handling data larger than our driver node’s memory capacity, we need to increase the number of our RDD partitions and filter them by partition index. Read how to do that here.

Don’t take my word on the execution performance. Check out the stats.

Performance profiling for inserting rows to a pandas dataframe. [Source (Thanks to Mikhail_Sam and Peter Mortensen): here]

Here are the metrics from one of my Databricks notebook scheduled job runs:

Input size: 11.9 GiB (~12.78GB), Total time Across All Tasks: 20 min, Number of records: 165.16K

Summary Metrics for 125 Completed Tasks executed by the stage that run the above cell. [Image by author]

Working with the SHAP Library

We are now ready to pass our preprocessed dataset to the SHAP TreeExplainer. Remember that SHAP is a local feature attribution method that explains individual predictions as an algebraic sum of the shapley values of the features of our model.

We use a TreeExplainer for the following reasons:

  1. Suitable: TreeExplainer is a class that computes SHAP values for tree-based models (Random Forest, XGBoost, LightGBM, GBT, etc).
  2. Exact: Instead of simulating missing features by random sampling, it makes use of the tree structure by simply ignoring decision paths that rely on the missing features. The TreeExplainer output is therefore deterministic and does not vary based on the background dataset.
  3. Efficient: Instead of iterating over each possible feature combination (or a subset thereof), all combinations are pushed through the tree simultaneously, using a more complex algorithm to keep track of each combination’s result — reducing complexity from O(TL2ᵐ) for all possible coalitions to the polynomial O(TLD²) (where is the number of features, is number of trees, is maximum number of leaves and is maximum tree depth).

The check_additivity = False flag runs a validation check to verify if the sum of SHAP values equals to the output of the model. However, this flag requires predictions to be run that are not supported by Spark, so it needs to be set to False as it is ignored anyway. Once we get the SHAP values, we convert it into a pandas dataframe from a Numpy array, so that it is easily interpretable.

One thing to note is that the dataset order is preserved when we convert a Spark dataframe to pandas, but the reverse is not true.

The points above lead us to the code snippet below:

gbt = GBTClassificationModel.load('your-model-path') 
explainer = shap.TreeExplainer(gbt)
shap_values = explainer(pandas_df, check_additivity = False)
shap_pandas_df = pd.DataFrame(shap_values.values, cols = pandas_df.columns)

An Introduction to Pyspark UDFs and when to use them

How PySpark UDFs distribute individual tasks to worker (executor) nodes [Source: here]

User-Defined Functions are complex custom functions that operate on a particular row of our dataset. These functions are generally used when the native Spark functions are not deemed sufficient to solve the problem. Spark functions are inherently faster than UDFs because it is natively a JVM structure whose methods are implemented by local calls to Java APIs. However, PySpark UDFs are Python implementations that requires data movement between the Python interpreter and the JVM (refer to Arrow 4 in the picture above). This inevitably introduces some processing delay.

If no processing delays can be tolerated, the best thing to do is create a Python wrapper to call the Scala UDF from PySpark itself. A great example is shown in this blog. However, using a PySpark UDF was sufficient for my use case, since it is easy to understand and code.

The code below explains the Python function to be executed on each worker/executor node. We just pick up the highest SHAP values (absolute values as we want to find the most impactful negative features as well) and append it to the respective pos_features and neg_features list and in turn append both these lists to a features list that is returned to the caller.

def shap_udf(row):dict = {} pos_features = [] neg_features = [] for feature in row.columns: dict[feature] = row[feature]     dict_importance = {key: value for key, value insorted(dict.items(), key=lambda item: __builtin__.abs(item[1]),   reverse = True)}     for k,v in dict_importance.items(): if __builtin__.abs(v) >= <your-threshold-shap-value>: if v > 0: pos_features.append((k,v)) else: neg_features.append((k,v)) features = [] features.append(pos_features[:5]) features.append(neg_features[:5])    return features

We then register our PySpark UDF with our Python function name (in my case, it is shap_udf) and specify the return type (mandatory in Python and Java) of the function in the parameters to F.udf(). There are two lists in the outer ArrayType(), one for positive features and the other for negative ones. Since each individual list comprises of at most 5 (feature-name, shap-value) StructType() pairs, it represents the inner ArrayType(). Below is the code:

udf_obj = F.udf(shap_udf, ArrayType(ArrayType(StructType([ StructField(‘Feature’, StringType()), 
StructField(‘Shap_Value’, FloatType()),
]))))

Now, we just create a new Spark dataframe with a column called ‘Shap_Importance’ that invokes our UDF for each row of the spark_shapdf dataframe. To split the positive and negative features, we create two columns in a new Spark dataframe called final_sparkdf. Our final code-snippet looks like below:

new_sparkdf = spark_df.withColumn(‘Shap_Importance’, udf_obj(F.struct([spark_shapdf[x] for x in spark_shapdf.columns])))final_sparkdf = new_sparkdf.withColumn(‘Positive_Shap’, final_sparkdf.Shap_Importance[0]).withColumn(‘Negative_Shap’, new_sparkdf.Shap_Importance[1])

And finally, we have extracted all the important features of our GBT model per testing instance without the use of any explicit for loops! The consolidated code can be found in the below GitHub gist.

from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.ml.classification import GBTClassificationModel
import shap
import pyspark.sql.functions as  F
from pyspark.sql.types import *#convert the sparse feature vector that is passed to the MLlib GBT model into a pandas dataframe. 
#This 'pandas_df' will be passed to the Shap TreeExplainer.
rows_list = []
for row in spark_df.rdd.collect(): dict1 = {}dict1.update({k:v for k,v in zip(spark_df.cols,row.features)})rows_list.append(dict1)pandas_df = pd.DataFrame(rows_list)#Load the GBT model from the path you have saved it
gbt = GBTClassificationModel.load("<your path where the GBT model is loaded>") 
#make sure the application where your notebook runs has access to the storage path!explainer = shap.TreeExplainer(gbt)
#check_additivity requires predictions to be run that is not supported by spark [yet], so it needs to be set to False as it is ignored anyway.
shap_values = explainer(pandas_df, check_additivity = False)
shap_pandas_df = pd.DataFrame(shap_values.values, cols = pandas_df.columns)spark = SparkSession.builder.config(conf=SparkConf().set("spark.master", "local[*]")).getOrCreate()
spark_shapdf = spark.createDataFrame(shap_pandas_df)def shap_udf(row): #work on a single spark dataframe row, for all rows. This work is distributed among all the worker nodes of your Apache Spark cluster.dict = {}pos_features = []neg_features = []for feature in row.columns:dict[feature] = row[feature]dict_importance = {key: value for key, value in sorted(dict.items(), key=lambda item: __builtin__.abs(item[1]), reverse = True)}for k,v in dict_importance.items():if __builtin__.abs(v) >= <your-threshold-shap-value>:if v > 0:pos_features.append((k,v))else:neg_features.append((k,v))features = []#taking top 5 features from pos and neg features. We can increase this number.features.append(pos_features[:5])features.append(neg_features[:5])return featuresudf_obj = F.udf(shap_udf, ArrayType(ArrayType(StructType([StructField('Feature', StringType()),StructField('Shap_Value', FloatType()),
]))))new_sparkdf = spark_df.withColumn('Shap_Importance', udf_obj(F.struct([spark_shapdf[x] for x in spark_shapdf.columns])))
final_sparkdf = new_sparkdf.withColumn('Positive_Shap', final_sparkdf.Shap_Importance[0]).withColumn('Negative_Shap', new_sparkdf.Shap_Importance[1])

Get the most impactful Positive and Negative SHAP values from our fitted GBT Model

P.S. This is my first attempt at writing an article and if there are any factual or statistical inconsistencies, please reach out to me and I shall be more than happy to learn together with you! :)

References

[1] Soner Yıldırım, Gradient Boosted Decision Trees-Explained (2020), Towards Data Science

[2] Susan Li, Machine Learning with PySpark and MLlib — Solving a Binary Classification Problem (2018), Towards Data Science

[3] Stephen Offer, How to Train XGBoost With Spark (2020), Data Science and ML

[4] Use Apache Spark MLlib on Databricks (2021), Databricks

[5] Umberto Griffo, Don’t collect large RDDs (2020), Apache Spark — Best Practices and Tuning

[6] Nikhilesh Nukala, Yuhao Zhu, Guilherme Braccialli, Tom Goldenberg (2019), Spark UDF — Deep Insights in Performance, QuantumBlack

这篇关于Parallelize your massive SHAP computations with MLlib and PySpark的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1066690

相关文章

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering)

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering) Power Iteration Clustering (PIC) 是一种基于图的聚类算法,用于在大规模数据集上进行高效的社区检测。PIC 算法的核心思想是通过迭代图的幂运算来发现数据中的潜在簇。该算法适用于处理大规模图数据,特别是在社交网络分析、推荐系统和生物信息学等领域具有广泛应用。Spa

Pyspark DataFrame常用操作函数和示例

针对类型:pyspark.sql.dataframe.DataFrame 目录 1.打印前几行 1.1 show()函数 1.2 take()函数 2. 读取文件 2.1 spark.read.csv 3. 获取某行某列的值(具体值) 4.查看列名 5.修改列名 5.1 修改单个列名 5.2 修改多个列名 5.2.1 链式调用 withColumnRenamed 方法 5.2.2 使用

Spark 0.9.1 MLLib 机器学习库

翻译自Spark官方文档。by月禾 Spark 0.9.1 MLLib 机器学习库简介 依赖 二元分类 线性回归 聚类 协同过滤 隐性反馈 vs 显性反馈 梯度下降基础算法 用Scala调用MLLib 二元分类 线性回归 聚类 协同过滤 用Java调用MLLib 用Python调用MLLib 二元分类 线性回归 聚类 协同过滤 MLlib 是Spark对常用的机器学

pyspark.sql.types

示例: from datetime import datetime, datefrom decimal import Decimalfrom pyspark.sql import SparkSessionfrom pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, Arr

数据分析:R语言计算XGBoost线性回归模型的SHAP值

禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者! 文章目录 介绍SHAP用途计算方法:应用 加载R包导入数据数据预处理函数模型 介绍 SHAP(SHapley Additive exPlanations)值是一种解释机器学习模型预测的方法。它基于博弈论中的Shapley值概念,用于解释任何机器学习模型的输出。 SHAP用途 解释性:机器学习模型

mllib之随机森林与梯度提升树

随机森林和GBTs都是集成学习算法,它们通过集成多棵决策树来实现强分类器。 集成学习方法就是基于其他的机器学习算法,并把它们有效的组合起来的一种机器学习算法。组合产生的算法相比其中任何一种算法模型更强大、准确。 随机森林和梯度提升树(GBTs)。两者之间主要差别在于每棵树训练的顺序。 随机森林通过对数据随机采样来单独训练每一棵树。这种随机性也使得模型相对于单决策树更健壮,且不易在

计算机毕业设计Hadoop+PySpark共享单车预测系统 PyHive 共享单车数据分析可视化大屏 共享单车爬虫 共享单车数据仓库 机器学习 深度学习

《Hadoop共享单车分析与预测系统》开题报告 一、课题背景与意义 1.1 课题背景 随着共享经济的快速发展,共享单车作为一种新型绿色环保的共享经济模式,在全球范围内迅速普及。共享单车通过提供便捷的短途出行服务,有效解决了城市居民出行的“最后一公里”问题,同时促进了低碳环保和绿色出行理念的推广。然而,随着共享单车数量的急剧增加,如何高效管理和优化单车布局成为共享单车运营商面临的重要挑战。

Spark实战(四)spark+python快速入门实战小例子(PySpark)

由于目前很多spark程序资料都是用scala语言写的,但是现在需要用python来实现,于是在网上找了scala写的例子改为python实现 1、集群测试实例    代码如下: from pyspark.sql import SparkSession if __name__ == "__main__":spark = SparkSession\.builder\.appName("P

计算机毕业设计PyHive+Hadoop深圳共享单车预测系统 共享单车数据分析可视化大屏 共享单车爬虫 共享单车数据仓库 机器学习 深度学习 PySpark

毕业设计题目基于 Hadoop 的共享单车布局规划 二、毕业设计背景 公共交通工具的“最后一公里”是城市居民出行采用公共交通出行的主要障碍,也是建设绿色城市、低碳城市过程中面临的主要挑战。 共享单车(自行车)企业通过在校园、地铁站点、公交站点、居民区、商业区、公共服务区等提供服务,完成交通行业最后一块“拼图”,带动居民使用其他公共交通工具的热情,也与其他公共交通方式产生协同效应。 共享单车是

sqlite3的db.parallelize方法:并行执行SQL语句,提升数据库操作效率

在Node.js环境中,sqlite3作为一个广受欢迎的轻量级数据库库,为开发者提供了一个与SQLite数据库进行交互的简洁API。在进行数据库操作时,为了提高效率,sqlite3提供了db.parallelize方法,允许并行执行多个SQL语句,从而充分利用系统资源,加快数据处理速度。本文将深入解析db.parallelize方法,包括其API函数定义和相应的代码示例解释。 一、db.para