Using PySpark and Pandas UDFs to Train Scikit-Learn Models Distributedly

Say you find yourself in the peculiar situation where you need to train a whole bunch of scikit-learn models over different groups from a large amount of data. And say you want to leverage Spark to distribute the process to do it all in a scalable fashion.

Recently I ran into such a use case and found that by using pandas_udf – a PySpark user defined function (UDF) made available through PyArrow – this can be done in a pretty straight-forward fashion. Pandas UDFs allow you to write a UDF that is just like a regular Spark UDF that operates over some grouped or windowed data, except it takes in data as a pandas DataFrame and returns back a pandas DataFrame. We just need to define the schema for the pandas DataFrame returned.

Let’s define this return schema. Assume we have some group_id that we can use to group our data into those portions that will be used to train each model. We’ll return a model with that group_id and since it might good info to have later let’s also return the number of instances within that group that the model was trained with, call it num_instances_trained_with. To store all the trained models we will use the python pickle library to dump the model to a string which we can later load back, call it model_str.

from pyspark.sql.types import *

# define schema for what the pandas udf will return
schema = StructType([
StructField('group_id', IntegerType()),
StructField('num_instances_trained_with', IntegerType()),
StructField('model_str', StringType())
])

To define a pandas UDF that will train a scikit-learn model, we need to use the pandas_udf decorator, and since we will take in a pandas DataFrame and return the same we need to define the function as a PandasUDFType.GROUPED_MAP (as opposed to PandasUDFType.SCALAR which would take just a pandas Series). Within the UDF we can then train a scikit-learn model using the data coming in as a pandas DataFrame, just like we would in a regular python application:

import pickle
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def train_model(df_pandas):
    '''
    Trains a RandomForestRegressor model on training instances
    in df_pandas.

    Assumes: df_pandas has the columns:
                 ['my_feature_1', 'my_feature_2', 'my_label']

    Returns: a single row pandas DataFrame with columns:
               ['group_id', 'num_instances_trained_with',
                'model_str']
    '''

    # get the value of this group id
    group_id = df_pandas['group_id'].iloc[0]

    # get the number of training instances for this group
    num_instances = df_pandas.shape[0]

    # get features and label for all training instances in this group
    feature_columns = ['my_feature_1', 'my_feature_2']
    label = 'my_label';
    X = df_pandas[feature_columns]
    Y = df_pandas[label]

    # train this model
    model = RandomForestRegressor()
    model.fit(X,Y)

    # get a string representation of our trained model to store
    model_str = pickle.dumps(model)

    # build the DataFrame to return
    df_to_return = pd.DataFrame([group_id, num_instances, model_str],
    columns = ['group_id', 'num_instances_trained_with', 'model_str'])

    return df_to_return

Now, assuming we have a PySpark DataFrame (df) with our features and labels and a group_id, we can apply this pandas UDF to all groups of our data and get back a PySpark DataFrame with a model trained (stored as a pickle dumped string) on the data for each group:

df_trained_models = df.groupBy('group_id').apply(train_model)

Note that the models will each be trained on a single Spark executor so some caution may be necessary to not blow up the executor memory if the data within each group is too large for a single executor to hold and do the model training in memory.