ModelArts Notebook中连接DLI Spark集群并进行机器学习建模

网友投稿 751 2022-05-30

一、背景介绍

二、notebook实践

1.基于ma-spark魔法命令连接DLI Spark集群

2.数据准备

a.加载iris数据集,并将列名转换为DLI Spark支持的格式

b.本地数据集发送至远程集群,并将DataFrame命名为spark_df

3.读取集群中的数据

在ModelArts Notebook中连接DLI Spark集群并进行机器学习建模

a.查看数据

b.划分训练集和验证集

c.基于训练集进行简单的可视化分析

4.特征工程

a.特征交叉

5.构建模型

a.基于spark.ml构建随机森林

b.训练模型和预测

c.保存预测结果至notebook本地并计算AUC和f1-score

三、附件

一、背景介绍

本样例基于sklearn中内置的iris数据集(鸢尾花分类数据集)介绍如何将本地notebook中的数据发送到DLI spark集群,并进行简单的特征工程和分类预测建模。

ma-spark魔法命令

%reload_ext ma_magic : 加载ModelArts内置的自研魔法命令ma_magic

%ma_spark query cluster : 使用ma_spark魔法命令查询已注册的DLI队列,执行该命令之前需要先加载ma_magic命令

%ma_spark connect --cluster_name --ak --sk : 连接指定队列,执行该命令之前需要先加载ma_magic命令

sparkmagic魔法命令

sparkmagic魔法命令仅在Sparkmagic(DLI-PySpark)类型的Kernel中可用,详情见sparkmagic

%%local :

当前cell中的代码在notebook本地执行。

%%sql : 在当前cell中仅可执行sql命令,并将提交到DLI Spark集群中执行

sparkmagic其余魔法命令可通过 %%help 查看,若不加 %% 默认支持spark语法且会将代码提交到DLI远程Spark集群执行,执行完成后会将执行结果返回至notebook进行展示。

二、Notebook实践

本样例代码结构如下

1 基于ma-spark魔法命令连接DLI Spark分布式集群

2 数据准备

a. 加载iris数据集

b. 本地数据集发送至远程集群

3 读取集群中的数据

a. 查看数据

b. 划分训练集和验证集

c. 基于训练集进行简单的可视化分析

4 特征工程

a. 特征交叉

5 构建模型

a. 基于spark.ml构建随机森林

b. 训练模型和预测

c. 保存预测结果至notebook本地并计算AUC和f1-score

1. 基于ma-spark魔法命令连接DLI spark集群

在同一个实例中开启多个jupyter notebook执行远程Spark作业时,仅需连接DLI spark集群一次;

在Sparkmagic(DLI-PySpark)类型的Kernel中连接DLI/MRS集群,需要添加 %%local 命令;

在PySpark和python类型的kernel中连接DLI/MRS集群,请去除 %%local 命令;

查询已注册的队列

%%local %reload_ext ma_magic %ma_spark query cluster

连接队列

连接成功后,请先执行restart kernel,然后再执行下述spark相关操作

%%local %reload_ext ma_magic %ma_spark connect --cluster_name --ak --sk

2. 数据准备

''' e.g. columns: ['sepal length (cm)', ...] -> ['sepal_length', ...] ''' %%local from sklearn.datasets import load_iris import pandas as pd import numpy as np dataset = load_iris() data = dataset.data target = dataset.target columns = [col_name.split('(cm)')[0].strip().replace(' ', '_') for col_name in dataset.feature_names] data_pd = pd.DataFrame(np.hstack([data, target.reshape(-1, 1)]), columns = columns+['target']) data_pd.head()

%%send_to_spark -i data_pd -t df -n spark_df

3. 读取集群中的数据

查看数据前5行

%%pretty spark_df.show(5)

查看各列数据类型

spark_df.printSchema()

查看各列缺失值比率,若存在缺失值则需要以众数或者中位数等进行缺失值填充

def check_na_ratio(df, cols): if isinstance(cols, str): cols = [cols] if isinstance(cols, list): na_ratio = df.agg(*[(1-F.count(col)/F.count('*')).alias(f'{col}_na_ratio') for col in cols]) else: raise TypeError(f"cols should be [str, list], but got type {type(cols)}") return na_ratio from pyspark.sql import functions as F na_ratio = check_na_ratio(spark_df, spark_df.columns) na_ratio.show()

将全部数据按照8比2的比例划分为训练集和验证集,随机种子为2021

df_train, df_val = spark_df.randomSplit([0.8, 0.2], seed=2021) print(f'训练集样本数:{df_train.count()}, 验证集样本数:{df_val.count()}') df_val.show()

注意,在DLI远程Spark集群中仅安装了pyspark, pandas和numpy, 不支持pip install安装python库,因此无法基于matplotlib和seaborn等工具进行可视化,可以通过魔法命令将数据导入到notebook本地进行可视化操作。

此外,尽量避免将大量数据导入notebook,避免因内存或网络等原因导致失败或者其他错误。当需要将数据导入notebook本地进行可视化时,可以先在远程spark集群进行groupBy等操作,减少数据量。

### 构建各种鸢尾花类别下长度等指标的统计值:最大值,最小值和均值 df_train_agg_target = df_train.groupBy('target').agg( F.max('petal_length').alias('petal_length_max'), F.min('petal_length').alias('petal_length_min'), F.mean('petal_length').alias('petal_length_mean'), F.max('petal_width').alias('petal_width_max'), F.min('petal_width').alias('petal_width_min'), F.mean('petal_width').alias('petal_width_mean'), F.max('sepal_length').alias('sepal_length_max'), F.min('sepal_length').alias('sepal_length_min'), F.mean('sepal_length').alias('sepal_length_mean'), F.max('sepal_width').alias('sepal_width_max'), F.min('sepal_width').alias('sepal_width_min'), F.mean('sepal_width').alias('sepal_width_mean'), ) df_train_agg_target.show()

基于sparkmagic魔法命令 %%spark 将远程spark集群中的数据加载到notebook中, 具体可在当前ipynb中执行%%help查看

%%spark -o df_train_agg_target df_train_agg_target.select(df_train_agg_target.columns)

在notebook中进行可视化操作,画出不同鸢尾花类型下各个指标的统计值分布情况。从下图可以看出,不同鸢尾花类别的各指标差异较大,可知能够较为简单的区分三种鸢尾花类型

%%local import warnings import logging import matplotlib.pyplot as plt logging.getLogger('matplotlib.font_manager').disabled = True warnings.filterwarnings('ignore') x = range(1, df_train_agg_target.shape[-1]) bar_width = 0.2 color_map = dict(zip([0, 1, 2], ['red', 'orange', 'blue'])) plot_columns = [col for col in df_train_agg_target.columns if col != 'target'] for tar in df_train_agg_target.target.unique(): tmp_df = df_train_agg_target[df_train_agg_target.target==tar] plt.bar(x, tmp_df[plot_columns].squeeze(), width=bar_width, label = target_name_map.get(tar), fc=color_map.get(tar)) x = [i+bar_width for i in x] plt.xticks(x, plot_columns, rotation=-60, fontsize=10) plt.legend() plt.show()

4. 特征工程

原始数据有4列特征,分别是petal_length,petal_width,sepal_length和sepal_width,对任意两列特征进行减法和乘法的特征交叉操作,特征维度从4维扩展到16维

feat_cols = list(set(spark_df.columns) -set(['target'])) print(feat_cols) for i in range(len(feat_cols)): for j in range(i+1, len(feat_cols)): df_train = df_train.withColumn(f"{feat_cols[j]}_sub_{feat_cols[i]}", F.col(feat_cols[j]) - F.col(feat_cols[i])) df_train = df_train.withColumn(f"{feat_cols[j]}_mul_{feat_cols[i]}", F.col(feat_cols[j]) * F.col(feat_cols[i])) df_val = df_val.withColumn(f"{feat_cols[j]}_sub_{feat_cols[i]}", F.col(feat_cols[j]) - F.col(feat_cols[i])) df_val = df_val.withColumn(f"{feat_cols[j]}_mul_{feat_cols[i]}", F.col(feat_cols[j]) * F.col(feat_cols[i])) %%pretty df_val.show()

5. 构建模型

构建随机森林分类器,并指定输入特征的列名为features(向量化后的特征列),标签列的列名为target,预测输出列的列名为prediction,其余参数为训练参数,具体可查看PySpark文档 PySpark

from pyspark.ml.classification import RandomForestClassifier rf_cls = RandomForestClassifier( featuresCol='features', labelCol='target', predictionCol='prediction', subsamplingRate=0.8, seed=2021 )

在Spark中需要先基于VectorAssembler将指定的各列特征向量化,并在spark.DataFrame中新增指定名称的列如features,才能进一步输入到模型中进行训练和推理。

''' e.g. inputCols = ['x', 'y', 'z'] ----------------------------- ---------------------------------------- col_name 'x' 'y' 'z' 'target' col_name 'x' 'y' 'z' 'target' 'features' row1 1 2 3 0 -> row1 1 2 3 0 [1,2,3] row2 2 3 4 1 row2 2 3 4 1 [2,3,4] ----------------------------- ---------------------------------------- ''' from pyspark.ml.feature import VectorAssembler feat_cols = df_train.columns feat_cols.remove('target') print(feat_cols) assembler = VectorAssembler(inputCols=feat_cols, outputCol="features") df_train = assembler.transform(df_train) df_val = assembler.transform(df_val) model = rf_cls.fit(df_train.select('features', 'target')) pred = model.transform(df_val.select('features', 'target'))

创建pred的副本label_and_preds,并通过魔法命令 %%sql 将其保存为名为label_and_preds且类型为pandas.DataFrame的notebook本地变量

pred.createOrReplaceTempView('label_and_preds')

%%sql -o label_and_preds select target, prediction from label_and_preds

在notebook本地计算auc和f1值,并画出ROC曲线

%%local import pandas as pd from sklearn import metrics import matplotlib.pyplot as plt label_and_preds['target'] = label_and_preds['target'].map(int) probs = label_and_preds['pred_probability'].map(lambda x: x['values']).explode() real_label = pd.get_dummies(label_and_preds, columns=['target'])[['target_0','target_1','target_2']].values.reshape(-1,1) fpr, tpr, thresholds = metrics.roc_curve(real_label,probs) auc = metrics.auc(fpr, tpr) print(f"the auc of val dataset is {auc}, cause iris is a very simple dataset.") #绘图 #FPR就是横坐标,TPR就是纵坐标 plt.plot(fpr, tpr, c = 'r', lw = 2, alpha = 0.7, label = u'AUC=%.3f' % auc) plt.plot((0, 1), (0, 1), c = '#808080', lw = 1, ls = '--', alpha = 0.7) plt.xlim((-0.01, 1.02)) plt.ylim((-0.01, 1.02)) plt.xticks(np.arange(0, 1.1, 0.1)) plt.yticks(np.arange(0, 1.1, 0.1)) plt.xlabel('False Positive Rate', fontsize=13) plt.ylabel('True Positive Rate', fontsize=13) plt.legend(loc='lower right', fancybox=True, framealpha=0.8, fontsize=12) plt.title(u'Roc Curve for RandomForest-iris', fontsize=17) plt.show()

%%local from sklearn.metrics import f1_score f1 = f1_score( label_and_preds.target, label_and_preds.prediction, average='micro') print(f"the f1 score of val dataset is {f1}, cause iris is a very simple dataset.")

三、附件

可执行ipynb文件见附件

附件: 在notebook中基于spark进行鸢尾花分类.zip 68.03KB 下载次数:0次

AI开发平台ModelArts spark 数据湖探索 DLI

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Python汇总篇,200+个Python标准库介绍(超全)
下一篇:ubuntu UFW简述
相关文章