I am working on a machine learning project. I initially used scikit-learn (sklearn) library. In the process of model optimization I use the classical GridSearchCV class from sklearn. It currently parallelizes using all the resources from the host where the api is ran (joblib library). Here below you have an example of it,
from sklearn import datasets
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
from datetime import datetime
import numpy as np
def count_trials(param_grid):
total_trials = 0
for k,v in param_grid.items():
total_trials += len(v)
return total_trials
# Load data
digits = datasets.load_digits()
X, y = digits.data, digits.target
print("")
print("Iris data set: ")
print("X: {}".format(X.shape))
print("labels: {}".format(np.unique(y)))
print("")
param_grid = {"max_depth": [3, None],
"max_features": ["auto"],
"min_samples_split": [2, 3,4,5,10,20],
"min_samples_leaf": [1, 3,4,6,10,20],
"bootstrap": [True],
"criterion": ["entropy"],
"n_estimators": [40, 80],
}
cv = 5
n_models = count_trials(param_grid)
print("trying {} models, with CV = {}. A total of {} fits.".format(n_models,cv,n_models*cv))
start_time = datetime.now()
print("Starting at {}".format(start_time))
gs = GridSearchCV(estimator = RandomForestClassifier(),
param_grid=param_grid,
cv=cv,
refit=True,
scoring="accuracy",
n_jobs = -1)
gs.fit(X, y)
end_time = datetime.now()
print("Ending at {}".format(end_time))
print("\n total time = {}\n".format(end_time - start_time))
I recently found that it has been extended to use the resources of a spark cluster (pyspark and spark_sklearn libraries). I managed to setup a spark cluster with one master and 2 workers. The code below runs the same task as before but using the spark cluster resources.
from sklearn import datasets
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV as SKGridSearchCV
from spark_sklearn import GridSearchCV as SparkGridSearchCV
from pyspark import SparkConf, SparkContext
from datetime import datetime
import numpy as np
def get_context():
sc_conf = SparkConf()
sc_conf.setAppName("test-sklearn-spark-app")
sc_conf.setMaster('spark://<master-IP>:7077')
sc_conf.set('spark.cores.max', '40')
sc_conf.set('spark.logConf', True)
print(sc_conf.getAll())
return SparkContext(conf=sc_conf)
def count_trials(param_grid):
total_trials = 0
for k,v in param_grid.items():
total_trials += len(v)
return total_trials
# Load data
digits = datasets.load_digits()
X, y = digits.data, digits.target
print("")
print("Iris data set: ")
print("X: {}".format(X.shape))
print("labels: {}".format(np.unique(y)))
print("")
param_grid = {"max_depth": [3, None],
"max_features": ["auto"],
"min_samples_split": [2, 3,4,5,10,20],
"min_samples_leaf": [1, 3,4,6,10,20],
"bootstrap": [True],
"criterion": ["entropy"],
"n_estimators": [40, 80],
}
cv = 5
n_models = count_trials(param_grid)
print("trying {} models, with CV = {}. A total of {} fits.".format(n_models,cv,n_models*cv))
sc = get_context()
gs = SparkGridSearchCV(sc = sc,
estimator = RandomForestClassifier(),
param_grid=param_grid,
cv=cv,
refit=True,
scoring="accuracy",
n_jobs = -1)
start_time = datetime.now()
print("Starting at {}".format(start_time))
gs.fit(X, y)
end_time = datetime.now()
print("Ending at {}".format(end_time))
print("\n total time = {}\n".format(end_time - start_time))
Where master-IP is the IP of the master node. The code works perfectly, using all the resources available in the spark cluster.
I then configured a kubernetes cluster with one master and one slave nodes. I then ran the same code as before, but replacing the line with
sc_conf.setMaster('spark://<master-IP>:7077')
by
sc_conf.setMaster('k8s://<master-IP>:<PORT>')
where the master-IP and PORT are the ones I obtain by running the command on the master node,
kubectl cluster-info
The thing is that my code doesn't work any more. It displays the following error messages,
19/11/07 12:57:32 ERROR Utils: Uncaught exception in thread kubernetes-executor-snapshots-subscribers-1
org.apache.spark.SparkException: Must specify the executor container image
at org.apache.spark.deploy.k8s.features.BasicExecutorFeatureStep$anonfun$5.apply(BasicExecutorFeatureStep.scala:40)
at org.apache.spark.deploy.k8s.features.BasicExecutorFeatureStep$anonfun$5.apply(BasicExecutorFeatureStep.scala:40)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.deploy.k8s.features.BasicExecutorFeatureStep.<init>(BasicExecutorFeatureStep.scala:40)
at org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBuilder$anonfun$lessinit$greater$default$1$1.apply(KubernetesExecutorBuilder.scala:26)
at org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBuilder$anonfun$lessinit$greater$default$1$1.apply(KubernetesExecutorBuilder.scala:26)
at org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBuilder.buildFromFeatures(KubernetesExecutorBuilder.scala:43)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator$anonfun$org$apache$spark$scheduler$cluster$k8s$ExecutorPodsAllocator$onNewSnapshots$1.apply$mcVI$sp(ExecutorPodsAllocator.scala:133)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.org$apache$spark$scheduler$cluster$k8s$ExecutorPodsAllocator$onNewSnapshots(ExecutorPodsAllocator.scala:126)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator$anonfun$start$1.apply(ExecutorPodsAllocator.scala:68)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator$anonfun$start$1.apply(ExecutorPodsAllocator.scala:68)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl$anonfun$org$apache$spark$scheduler$cluster$k8s$ExecutorPodsSnapshotsStoreImpl$callSubscriber$1.apply$mcV$sp(ExecutorPodsSnapshotsStoreImpl.scala:102)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.org$apache$spark$scheduler$cluster$k8s$ExecutorPodsSnapshotsStoreImpl$callSubscriber(ExecutorPodsSnapshotsStoreImpl.scala:99)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl$anonfun$addSubscriber$1.apply$mcV$sp(ExecutorPodsSnapshotsStoreImpl.scala:71)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl$anon$1.run(ExecutorPodsSnapshotsStoreImpl.scala:107)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
It seems that it says that I have to specify a docker image, but I don't know how should I do that.
Does anybody has any experience with this? I have been looking around in the web but no answers.
Thank you in advance,
Ale
I would recommend you to read doc first.
When you run Spark on Kubernetes your processes are submitted to the Kubernetes cluster inside of Docker containers, which should be known during the Job submission. Set to SparkConf:
or
As well be sure that your Kubernetes cluster can pull those images, the easiest way is to push them to DockerHub. Follow the guide on how to build Spark Docker images.
Seems like you're running your Job in Client mode, so take into account networking notes. Basically you need to be sure that your Driver process (which probably runs on your local machine) is accessible from within the Kubernetes network (executor Pods specifically), which can be not so obvious. As well your Driver process should have network access to Executor Pods. Actually it is really tricky to submit Spark Jobs in client mode to the remote Kubernetes cluster from the local workstation and I would recommend you to give it a try with cluster mode first.
If you would like to submit your Jobs in cluster mode you need to be sure that your Job artefact (python script in your case) and its dependencies are accessible from Spark Driver and Executor Pods (the easiest way is to put your script with all the dependencies inside Spark Docker image on the Spark classpath).
Rather than that it should work in the same manner for you as it usually does.
Also you can refer the Helm chart of Spark on Kubernetes cluster, which includes Jupyter notebooks integration, which make it easier to run interactive Spark Sessions on Kubernetes.