|
| 1 | +## train a pyspark model and export it as a mleap bundle |
| 2 | +import os |
| 3 | + |
| 4 | +# parse command line arguments |
| 5 | +import argparse |
| 6 | +parser = argparse.ArgumentParser(description = 'train pyspark model and export mleap bundle') |
| 7 | +parser.add_argument('hdfs_path', nargs='?', default = "/spark_ml", type = str) |
| 8 | +parser.add_argument('model_name_export', nargs='?', default = "adult_census_pipeline.zip", type = str) |
| 9 | +args = parser.parse_args() |
| 10 | + |
| 11 | +hdfs_path = args.hdfs_path |
| 12 | +model_name_export = args.model_name_export |
| 13 | + |
| 14 | +# create spark session (needed only if this file is submitted as a spark jobs) |
| 15 | +from pyspark.sql import SparkSession |
| 16 | + |
| 17 | +spark = SparkSession\ |
| 18 | + .builder\ |
| 19 | + .appName(os.path.basename(__file__))\ |
| 20 | + .getOrCreate() |
| 21 | + |
| 22 | +############################################################################### |
| 23 | +## prepare data |
| 24 | + |
| 25 | +# read the data into a spark data frame. |
| 26 | +cwd = os.getcwd() |
| 27 | +filename = "AdultCensusIncome.csv" |
| 28 | + |
| 29 | +## NOTE: reading text file from local file path seems flaky! |
| 30 | +#import urllib.request |
| 31 | +#url = "https://amldockerdatasets.azureedge.net/" + filename |
| 32 | +#local_filename, headers = urllib.request.urlretrieve(url, filename) |
| 33 | +#datafile = "file://" + os.path.join(cwd, filename) |
| 34 | + |
| 35 | +data_all = spark.read.format('csv')\ |
| 36 | + .options( |
| 37 | + header='true', |
| 38 | + inferSchema='true', |
| 39 | + ignoreLeadingWhiteSpace='true', |
| 40 | + ignoreTrailingWhiteSpace='true')\ |
| 41 | + .load(filename) #.load(datafile) for local file |
| 42 | + |
| 43 | +print("Number of rows: {}, Number of coulumns : {}".format(data_all.count(), len(data_all.columns))) |
| 44 | + |
| 45 | +#replace "-" with "_" in column names |
| 46 | +columns_new = [col.replace("-", "_") for col in data_all.columns] |
| 47 | +data_all = data_all.toDF(*columns_new) |
| 48 | + |
| 49 | +data_all.printSchema() |
| 50 | +data_all.show(5) |
| 51 | + |
| 52 | +# choose feature columns and the label column for training. |
| 53 | +label = "income" |
| 54 | +#xvars = ["age", "hours_per_week"] #all numeric |
| 55 | +xvars = ["age", "hours_per_week", "education"] #numeric + string |
| 56 | + |
| 57 | +print("label: {}, features: {}".format(label, xvars)) |
| 58 | + |
| 59 | +select_cols = xvars |
| 60 | +select_cols.append(label) |
| 61 | +data = data_all.select(select_cols) |
| 62 | + |
| 63 | +############################################################################### |
| 64 | +## split data into train and test. |
| 65 | + |
| 66 | +train, test = data.randomSplit([0.75, 0.25], seed=123) |
| 67 | + |
| 68 | +print("train ({}, {})".format(train.count(), len(train.columns))) |
| 69 | +print("test ({}, {})".format(test.count(), len(test.columns))) |
| 70 | + |
| 71 | +train_data_path = os.path.join(hdfs_path, "AdultCensusIncomeTrain") |
| 72 | +test_data_path = os.path.join(hdfs_path, "AdultCensusIncomeTest") |
| 73 | + |
| 74 | +# write the train and test data sets to intermediate storage and then read |
| 75 | +train.write.mode('overwrite').orc(train_data_path) |
| 76 | +test.write.mode('overwrite').orc(test_data_path) |
| 77 | + |
| 78 | +print("train and test datasets saved to {} and {}".format(train_data_path, test_data_path)) |
| 79 | + |
| 80 | +train_read = spark.read.orc(train_data_path) |
| 81 | +test_read = spark.read.orc(test_data_path) |
| 82 | + |
| 83 | +assert train_read.schema == train.schema and train_read.count() == train.count() |
| 84 | +assert test_read.schema == test.schema and test_read.count() == test.count() |
| 85 | + |
| 86 | +############################################################################### |
| 87 | +## train model |
| 88 | + |
| 89 | +from pyspark.ml import Pipeline, PipelineModel |
| 90 | +from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, IndexToString, VectorAssembler |
| 91 | +from pyspark.ml.classification import LogisticRegression |
| 92 | + |
| 93 | +# create a new Logistic Regression model, which by default uses "features" and "label" columns for training. |
| 94 | +reg = 0.1 |
| 95 | +lr = LogisticRegression(regParam=reg) |
| 96 | + |
| 97 | +# encode string columns |
| 98 | +dtypes = dict(train.dtypes) |
| 99 | +dtypes.pop(label) |
| 100 | + |
| 101 | +si_xvars = [] |
| 102 | +ohe_xvars = [] |
| 103 | +featureCols = [] |
| 104 | +for idx,key in enumerate(dtypes): |
| 105 | + if dtypes[key] == "string": |
| 106 | + featureCol = "-".join([key, "encoded"]) |
| 107 | + featureCols.append(featureCol) |
| 108 | + |
| 109 | + tmpCol = "-".join([key, "tmp"]) |
| 110 | + si_xvars.append(StringIndexer(inputCol=key, outputCol=tmpCol, handleInvalid="skip")) #, handleInvalid="keep" |
| 111 | + ohe_xvars.append(OneHotEncoderEstimator(inputCols=[tmpCol], outputCols=[featureCol])) |
| 112 | + else: |
| 113 | + featureCols.append(key) |
| 114 | + |
| 115 | +# string-index the label column into a column named "label" |
| 116 | +si_label = StringIndexer(inputCol=label, outputCol='label') |
| 117 | +#si_label._resetUid("si_label") # try to name the transformer, which seems not carried over to the fitted pipeline. |
| 118 | + |
| 119 | +# assemble the encoded feature columns in to a column named "features" |
| 120 | +assembler = VectorAssembler(inputCols=featureCols, outputCol="features") |
| 121 | + |
| 122 | +# put together the pipeline |
| 123 | +stages = [] |
| 124 | +stages.extend(si_xvars) |
| 125 | +stages.extend(ohe_xvars) |
| 126 | +stages.append(si_label) |
| 127 | +stages.append(assembler) |
| 128 | +stages.append(lr) |
| 129 | + |
| 130 | +pipe = Pipeline(stages=stages) |
| 131 | +print("Pipeline Created") |
| 132 | + |
| 133 | +# train the model |
| 134 | +model = pipe.fit(train) |
| 135 | +print("Model Trained") |
| 136 | +print("Model is ", model) |
| 137 | +print("Model Stages", model.stages) |
| 138 | + |
| 139 | +# name the string-index stage for the label so it can be identified easier later |
| 140 | +model.stages[2]._resetUid("si_label") |
| 141 | + |
| 142 | +############################################################################### |
| 143 | +## evaluate model |
| 144 | + |
| 145 | +from pyspark.ml.evaluation import BinaryClassificationEvaluator |
| 146 | + |
| 147 | +# make prediction |
| 148 | +pred = model.transform(test) |
| 149 | + |
| 150 | +# evaluate. note only 2 metrics are supported out of the box by Spark ML. |
| 151 | +bce = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction') |
| 152 | +au_roc = bce.setMetricName('areaUnderROC').evaluate(pred) |
| 153 | +au_prc = bce.setMetricName('areaUnderPR').evaluate(pred) |
| 154 | + |
| 155 | +print("Area under ROC: {}".format(au_roc)) |
| 156 | +print("Area Under PR: {}".format(au_prc)) |
| 157 | + |
| 158 | +############################################################################### |
| 159 | +## save and load the model with ML persistence |
| 160 | +# https://spark.apache.org/docs/latest/ml-pipeline.html#ml-persistence-saving-and-loading-pipelines |
| 161 | + |
| 162 | +##NOTE: by default the model is saved to and loaded from hdfs |
| 163 | +model_name = "AdultCensus.mml" |
| 164 | +model_fs = os.path.join(hdfs_path, model_name) |
| 165 | + |
| 166 | +model.write().overwrite().save(model_fs) |
| 167 | +print("saved model to {}".format(model_fs)) |
| 168 | + |
| 169 | +# load the model file (from hdfs) |
| 170 | +print("load pyspark model from hdfs") |
| 171 | +model_loaded = PipelineModel.load(model_fs) |
| 172 | +assert str(model_loaded) == str(model) |
| 173 | + |
| 174 | +print("loaded model from {}".format(model_fs)) |
| 175 | +print("Model is " , model_loaded) |
| 176 | +print("Model stages", model_loaded.stages) |
| 177 | + |
| 178 | +############################################################################### |
| 179 | +## export and import model with mleap |
| 180 | + |
| 181 | +import mleap.pyspark |
| 182 | +from mleap.pyspark.spark_support import SimpleSparkSerializer |
| 183 | + |
| 184 | +# serialize the model to a local zip file in JSON format |
| 185 | +#model_name_export = "adult_census_pipeline.zip" |
| 186 | +model_name_path = cwd |
| 187 | +model_file = os.path.join(model_name_path, model_name_export) |
| 188 | + |
| 189 | +# remove an old model file, if needed. |
| 190 | +if os.path.isfile(model_file): |
| 191 | + os.remove(model_file) |
| 192 | + |
| 193 | +model_file_path = "jar:file:{}".format(model_file) |
| 194 | +model.serializeToBundle(model_file_path, model.transform(train)) |
| 195 | + |
| 196 | +## import mleap model |
| 197 | +model_deserialized = PipelineModel.deserializeFromBundle(model_file_path) |
| 198 | +assert str(model_deserialized) == str(model) |
| 199 | + |
| 200 | +print("The deserialized model is ", model_deserialized) |
| 201 | +print("The deserialized model stages are", model_deserialized.stages) |
| 202 | + |
| 203 | +############################################################################## |
| 204 | +## export the final model with mleap |
| 205 | + |
| 206 | +## remove the stringIndexer for the label column so it won't be required for prediction |
| 207 | +model_final = model.copy() |
| 208 | + |
| 209 | +si_label_index = -3 |
| 210 | +model_final.stages.pop(si_label_index) #si_label |
| 211 | + |
| 212 | +## append an IndexToString transformer to the model pipeline to get the original labels |
| 213 | +#labelReverse = IndexToString(inputCol = "label", outputCol = "predIncome") #no need to provide labels |
| 214 | +labelReverse = IndexToString( |
| 215 | + inputCol = "prediction", |
| 216 | + outputCol = "predictedIncome", |
| 217 | + labels = model.stages[si_label_index].labels) #must provide labels (from si_label) otherwise will fail |
| 218 | +model_final.stages.append(labelReverse) |
| 219 | + |
| 220 | +pred_final = model_final.transform(test) |
| 221 | +pred_final.printSchema() |
| 222 | +pred_final.show(5) |
| 223 | + |
| 224 | +# remove an old model file, if needed. |
| 225 | +if os.path.isfile(model_file): |
| 226 | + os.remove(model_file) |
| 227 | +model_final.serializeToBundle(model_file_path, model_final.transform(train)) |
| 228 | + |
| 229 | +print("persist the mleap bundle from local to hdfs") |
| 230 | +from subprocess import Popen, PIPE |
| 231 | +hdfs_fs_put = ["hadoop", "fs", "-put", "-f", model_file, os.path.join(hdfs_path, model_name_export)] |
| 232 | +proc = Popen(hdfs_fs_put, stdout=PIPE, stderr=PIPE) |
| 233 | +s_output, s_err = proc.communicate() |
| 234 | +if (s_err): |
| 235 | + print("s_output: {s_output}\ns_err: {s_err}".format(s_output=s_output, s_err=s_err)) |
| 236 | + |
| 237 | +############################################################################### |
0 commit comments