Access to Azure Data Lake Gen 2 Blob Storage from Azure Kubernetes Cluster

8/24/2021

When I do my spark job locally (setting master to local), it all succeeds and does it's job well writing parquet files into Blob Storage. However, when I deploy my spark job into AKS cluster it fails on the line which writes parquet files into the storage. I think I've done something configuring access to the storage, especially interested in Blob Storage, but the settings are transparent and tried to make as much publicly available as possible. This is application code

object Main extends OpenCageDataResponseJsonProtocol {

  private final val spark = SparkSession.builder()
    .config(new SparkConf())
    .appName("spark-basics")
    .getOrCreate()
  private final val BASE_OPEN_CAGE_DATA_URI = s"https://api.opencagedata.com/geocode/v1/json?" // base uri without query params
  private final val OPEN_CAGE_DATA_API_KEY = spark.conf.get("spark.driver.OPEN_CAGE_DATA_API_KEY") // API Key for https://opencagedata.com stored as env variable
  private final val AZURE_STORAGE_PATH = "abfss://<container-name>@<account-name>.dfs.core.windows.net/" // base storage path
  private final val HOTELS_URI = new URI(AZURE_STORAGE_PATH + SubDirectory.HOTELS) // Path to storage hotels resource
  private final val HOTEL_WEATHER_URI = new URI(AZURE_STORAGE_PATH + SubDirectory.WEATHER) // Path to storage hotel weathers resource
  private final va.l MY_OWN_AZURE_STORAGE_PATH_TO_WRITE = s"abfss://<container-name>@<account-name.dfs.core.windows.net/result-cluster/"

  def main(array: Array[String]): Unit = {

    // hotels data transformation
    val hotelsDf = spark.createDataFrame(
      spark.read
        .format("csv")
        .option("header", "true")
        .option("delimiter", ",")
        .load(HOTELS_URI + "/*.csv.gz")
        .rdd
        .map(replaceNAValues)
        .map(toHotelWithGeoHashTransformed)
    )

    // hotel weather data transformation
    val hotelWeatherDF = spark.createDataFrame(
      spark.read
        .parquet(HOTEL_WEATHER_URI + "/*/*/*/*.parquet")
        .rdd
        .map(toHotelWeather)
    )

    // left join by geo hash
    val combinedDataFrame = hotelsDf.join(
      hotelWeatherDF,
      hotelsDf("hotel_geoHash") === hotelWeatherDF("hotel_weather_geoHash"),
      "left"
    )

    combinedDataFrame.write.parquet(MY_OWN_AZURE_STORAGE_PATH_TO_WRITE)
  }

This is not a full log derived from spark driver in AKS since its too big

2021-08-24 14:57:23,896 INFO spark.SparkContext: Created broadcast 7 from broadcast at DAGScheduler.scala:1388
2021-08-24 14:57:23,898 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 3 (MapPartitionsRDD[31] at parquet at Main.scala:56) (first 15 tasks are for partitions Vector(0, 1))
2021-08-24 14:57:23,898 INFO scheduler.TaskSchedulerImpl: Adding task set 3.0 with 2 tasks resource profile 0
2021-08-24 14:57:23,900 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 3.0 (TID 278) (10.244.1.10, executor 1, partition 0, PROCESS_LOCAL, 5256 bytes) taskResourceAssignments Map()
2021-08-24 14:57:23,901 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 4 (MapPartitionsRDD[36] at parquet at Main.scala:56), which has no missing parents
2021-08-24 14:57:23,914 INFO memory.MemoryStore: Block broadcast_8 stored as values in memory (estimated size 33.4 KiB, free 411.5 MiB)
2021-08-24 14:57:23,916 INFO memory.MemoryStore: Block broadcast_8_piece0 stored as bytes in memory (estimated size 13.0 KiB, free 411.5 MiB)
2021-08-24 14:57:23,917 INFO storage.BlockManagerInfo: Added broadcast_8_piece0 in memory on spark-basics-a96d077b78a9ebcf-driver-svc.default.svc:7079 (size: 13.0 KiB, free: 413.6 MiB)
2021-08-24 14:57:23,917 INFO spark.SparkContext: Created broadcast 8 from broadcast at DAGScheduler.scala:1388
2021-08-24 14:57:23,918 INFO scheduler.DAGScheduler: Submitting 16 missing tasks from ShuffleMapStage 4 (MapPartitionsRDD[36] at parquet at Main.scala:56) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
2021-08-24 14:57:23,918 INFO scheduler.TaskSchedulerImpl: Adding task set 4.0 with 16 tasks resource profile 0
2021-08-24 14:57:23,920 INFO storage.BlockManagerInfo: Added broadcast_7_piece0 in memory on 10.244.1.10:33951 (size: 13.9 KiB, free: 413.8 MiB)
2021-08-24 14:57:24,180 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 3.0 (TID 279) (10.244.1.10, executor 1, partition 1, PROCESS_LOCAL, 5105 bytes) taskResourceAssignments Map()
2021-08-24 14:57:24,194 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.0 (TID 278) (10.244.1.10 executor 1): java.lang.ExceptionInInitializerError
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
	at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1274)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
	at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2296)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
	at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2296)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
	at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2296)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
	at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2296)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:85)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: SparkSession should only be created and accessed on the driver.
	at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$assertOnDriver(SparkSession.scala:1126)
	at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:919)
	at Main$.<init>(Main.scala:18)
	at Main$.<clinit>(Main.scala)
	... 110 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
	at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:200)
	... 36 more
Caused by: java.lang.NoClassDefFoundError: Could not initialize class Main$
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
	at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1274)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
	at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2296)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
	at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2296)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
	at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2296)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
	at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2296)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:85)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
    enter code here

Main.scala:56 points to this line combinedDataFrame.write.parquet(MY_OWN_AZURE_STORAGE_PATH_TO_WRITE) And this is the script I'm using to make it work.

spark-submit --name "Main.scala" \
    --master k8s://https://<cluster-domain>:443 \
    --deploy-mode cluster \
    --name spark-basics \
    --jars https://repo1.maven.org/maven2/io/spray/spray-json_2.12/1.3.6/spray-json_2.12-1.3.6.jar \
    --conf spark.executor.memory=1g \
    --conf spark.executor.instances=1 \
    --conf spark.driver.OPEN_CAGE_DATA_API_KEY=$OPEN_CAGE_DATA_API_KEY \
    --conf spark.driver.HADOOP_OPTIONAL_TOOLS=hadoop-azure \
    --conf spark.hadoop.fs.azure.account.auth.type.<account-name>.dfs.core.windows.net=OAuth \
    --conf spark.hadoop.fs.azure.account.oauth.provider.type.<account-name>.dfs.core.windows.net=org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider \
    --conf spark.hadoop.fs.azure.account.oauth2.client.id.<account-name>.dfs.core.windows.net=f<client-id> \
    --conf spark.hadoop.fs.azure.account.oauth2.client.secret.<account-name>.dfs.core.windows.net=<secret> \
    --conf spark.hadoop.fs.azure.account.oauth2.client.endpoint.<account-name>.dfs.core.windows.net=https://login.microsoftonline.com/<tenant-id>/oauth2/token \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
    --conf spark.kubernetes.container.image=<image> \
    --conf spark.hadoop.fs.azure.account.key.<account-name>.dfs.core.windows.net=<key> \
    --class Main \
    local:///opt/sparkbasics-1.0.0.jar
-- Ахтем Вейс
apache-spark
azure-aks
azure-blob-storage
azure-data-lake-gen2
kubernetes

0 Answers