using sliding in mapPartitions , but the result executed in driver

8/4/2020

environment: spark4.4 + k8s

I used mapPartitions a lot , always I think the code in mapPartitions or forPartitions executed on the executor jvm, But this case following conflicts it. Before using sliding, it logs on the executor pod, but after sliding I found the log on the driver.

I dont know why?

Anybody can explain it (in fact I indeed want it executed on executor ) , Thank you..


val cleanedDF = df
 .groupBy(_.id)  
 .mapPartitions {
   par => 
     val rows = par.map {
       case (id, it) =>
         // read from redis, if not exists, create a new object and save to redis at the same time 
         val status: Option[Status] = CacheDB.readXorWriteNx[Status](id)
         val series = it.toList  
         val goldp2p = series
          .map { r => 
             //the following found on executor pod
                 logger.debug("before_sliding||key={}||time={}|host={}", id, r(1).time, java.net.InetAddress.getLocalHost().getHostName())
                 r
           }
          .sliding(2, 1)
          .map { r =>
             //the following found on driverpod
             logger.debug("after_sliding||key={}||time={}||host={}", id, r(1).time, java.net.InetAddress.getLocalHost().getHostName())
          r
          }.filter(_.size >= 2)
          .map{p2p=>
             // update status according some conditions; all the updates happen on driver
             if{???} status.num = 1
             else if {???} status.num = 2
             ....
          }
         //save to redis,however the following log found on executor again; but the result in redis is the initiated status
         logger.debug("key={}||value={}||host={}", id, status, java.net.InetAddress.getLocalHost().getHostName())
         CacheDB.write[Status](id, status) 

and finally I changed my code like this , It's Ok , executed on executor and the result what I want .

       (0 until series.size - 1).map {
                    i =>
                      List(series(i), series(i + 1))
                  }...
-- Daniel
apache-spark
kubernetes

0 Answers