Using spark_sklearn with a kubernetes cluster

11/7/2019

I am working on a machine learning project. I initially used scikit-learn (sklearn) library. In the process of model optimization I use the classical GridSearchCV class from sklearn. It currently parallelizes using all the resources from the host where the api is ran (joblib library). Here below you have an example of it,

from sklearn                  import datasets
from sklearn.ensemble         import RandomForestClassifier
from sklearn.model_selection  import GridSearchCV
from datetime                 import datetime
import numpy as np

def count_trials(param_grid):
    total_trials = 0
    for k,v in param_grid.items():
        total_trials += len(v)

    return total_trials

# Load data
digits = datasets.load_digits()

X, y = digits.data, digits.target

print("")
print("Iris data set: ")
print("X:      {}".format(X.shape))
print("labels: {}".format(np.unique(y)))
print("")

param_grid = {"max_depth":         [3, None],
              "max_features":      ["auto"],
              "min_samples_split": [2, 3,4,5,10,20],
              "min_samples_leaf":  [1, 3,4,6,10,20],
              "bootstrap":         [True],
              "criterion":         ["entropy"],
              "n_estimators":      [40, 80],
              }
cv = 5

n_models = count_trials(param_grid)

print("trying {} models, with CV = {}. A total of {} fits.".format(n_models,cv,n_models*cv))

start_time = datetime.now()
print("Starting at {}".format(start_time))
gs = GridSearchCV(estimator = RandomForestClassifier(),
                  param_grid=param_grid,
                  cv=cv,
                  refit=True,
                  scoring="accuracy",
                  n_jobs = -1)
gs.fit(X, y)
end_time   = datetime.now()

print("Ending   at {}".format(end_time))
print("\n total time = {}\n".format(end_time - start_time))

I recently found that it has been extended to use the resources of a spark cluster (pyspark and spark_sklearn libraries). I managed to setup a spark cluster with one master and 2 workers. The code below runs the same task as before but using the spark cluster resources.

from sklearn                  import datasets
from sklearn.ensemble         import RandomForestClassifier
from sklearn.model_selection  import GridSearchCV as SKGridSearchCV
from spark_sklearn            import GridSearchCV as SparkGridSearchCV
from pyspark                  import SparkConf, SparkContext
from datetime                 import datetime
import numpy as np

def get_context():
    sc_conf = SparkConf()
    sc_conf.setAppName("test-sklearn-spark-app")
    sc_conf.setMaster('spark://<master-IP>:7077')
    sc_conf.set('spark.cores.max', '40')
    sc_conf.set('spark.logConf', True)
    print(sc_conf.getAll())

    return SparkContext(conf=sc_conf)

def count_trials(param_grid):
    total_trials = 0
    for k,v in param_grid.items():
        total_trials += len(v)

    return total_trials

# Load data
digits = datasets.load_digits()

X, y = digits.data, digits.target

print("")
print("Iris data set: ")
print("X:      {}".format(X.shape))
print("labels: {}".format(np.unique(y)))
print("")

param_grid = {"max_depth":         [3, None],
              "max_features":      ["auto"],
              "min_samples_split": [2, 3,4,5,10,20],
              "min_samples_leaf":  [1, 3,4,6,10,20],
              "bootstrap":         [True],
              "criterion":         ["entropy"],
              "n_estimators":      [40, 80],
              }
cv = 5

n_models = count_trials(param_grid)

print("trying {} models, with CV = {}. A total of {} fits.".format(n_models,cv,n_models*cv))

sc = get_context()
gs = SparkGridSearchCV(sc = sc,
                       estimator = RandomForestClassifier(),
                       param_grid=param_grid,
                       cv=cv,
                       refit=True,
                       scoring="accuracy",
                       n_jobs = -1)

start_time = datetime.now()
print("Starting at {}".format(start_time))
gs.fit(X, y)
end_time   = datetime.now()

print("Ending   at {}".format(end_time))
print("\n total time = {}\n".format(end_time - start_time))

Where master-IP is the IP of the master node. The code works perfectly, using all the resources available in the spark cluster.

I then configured a kubernetes cluster with one master and one slave nodes. I then ran the same code as before, but replacing the line with

sc_conf.setMaster('spark://<master-IP>:7077')

by

sc_conf.setMaster('k8s://<master-IP>:<PORT>')

where the master-IP and PORT are the ones I obtain by running the command on the master node,

kubectl  cluster-info

The thing is that my code doesn't work any more. It displays the following error messages,

19/11/07 12:57:32 ERROR Utils: Uncaught exception in thread kubernetes-executor-snapshots-subscribers-1
org.apache.spark.SparkException: Must specify the executor container image
    at org.apache.spark.deploy.k8s.features.BasicExecutorFeatureStep$anonfun$5.apply(BasicExecutorFeatureStep.scala:40)
    at org.apache.spark.deploy.k8s.features.BasicExecutorFeatureStep$anonfun$5.apply(BasicExecutorFeatureStep.scala:40)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.deploy.k8s.features.BasicExecutorFeatureStep.<init>(BasicExecutorFeatureStep.scala:40)
    at org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBuilder$anonfun$lessinit$greater$default$1$1.apply(KubernetesExecutorBuilder.scala:26)
    at org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBuilder$anonfun$lessinit$greater$default$1$1.apply(KubernetesExecutorBuilder.scala:26)
    at org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBuilder.buildFromFeatures(KubernetesExecutorBuilder.scala:43)
    at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator$anonfun$org$apache$spark$scheduler$cluster$k8s$ExecutorPodsAllocator$onNewSnapshots$1.apply$mcVI$sp(ExecutorPodsAllocator.scala:133)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.org$apache$spark$scheduler$cluster$k8s$ExecutorPodsAllocator$onNewSnapshots(ExecutorPodsAllocator.scala:126)
    at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator$anonfun$start$1.apply(ExecutorPodsAllocator.scala:68)
    at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator$anonfun$start$1.apply(ExecutorPodsAllocator.scala:68)
    at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl$anonfun$org$apache$spark$scheduler$cluster$k8s$ExecutorPodsSnapshotsStoreImpl$callSubscriber$1.apply$mcV$sp(ExecutorPodsSnapshotsStoreImpl.scala:102)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
    at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.org$apache$spark$scheduler$cluster$k8s$ExecutorPodsSnapshotsStoreImpl$callSubscriber(ExecutorPodsSnapshotsStoreImpl.scala:99)
    at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl$anonfun$addSubscriber$1.apply$mcV$sp(ExecutorPodsSnapshotsStoreImpl.scala:71)
    at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl$anon$1.run(ExecutorPodsSnapshotsStoreImpl.scala:107)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

It seems that it says that I have to specify a docker image, but I don't know how should I do that.

Does anybody has any experience with this? I have been looking around in the web but no answers.

Thank you in advance,

Ale

-- Alejandro Perez
apache-spark
kubernetes
pyspark
scikit-learn

1 Answer

11/7/2019

I would recommend you to read doc first.

When you run Spark on Kubernetes your processes are submitted to the Kubernetes cluster inside of Docker containers, which should be known during the Job submission. Set to SparkConf:

  • spark.kubernetes.container.image

or

  • spark.kubernetes.driver.container.image
  • spark.kubernetes.executor.container.image

As well be sure that your Kubernetes cluster can pull those images, the easiest way is to push them to DockerHub. Follow the guide on how to build Spark Docker images.

Seems like you're running your Job in Client mode, so take into account networking notes. Basically you need to be sure that your Driver process (which probably runs on your local machine) is accessible from within the Kubernetes network (executor Pods specifically), which can be not so obvious. As well your Driver process should have network access to Executor Pods. Actually it is really tricky to submit Spark Jobs in client mode to the remote Kubernetes cluster from the local workstation and I would recommend you to give it a try with cluster mode first.

If you would like to submit your Jobs in cluster mode you need to be sure that your Job artefact (python script in your case) and its dependencies are accessible from Spark Driver and Executor Pods (the easiest way is to put your script with all the dependencies inside Spark Docker image on the Spark classpath).

Rather than that it should work in the same manner for you as it usually does.

Also you can refer the Helm chart of Spark on Kubernetes cluster, which includes Jupyter notebooks integration, which make it easier to run interactive Spark Sessions on Kubernetes.

-- Aliaksandr Sasnouskikh
Source: StackOverflow