environment: spark
4.4 + k8s
I used mapPartitions a lot , always I think the code in mapPartitions
or forPartitions
executed on the executor jvm, But this case following conflicts it. Before using sliding
, it logs on the executor pod, but after sliding I found the log on the driver.
I dont know why?
Anybody can explain it (in fact I indeed want it executed on executor ) , Thank you..
val cleanedDF = df
.groupBy(_.id)
.mapPartitions {
par =>
val rows = par.map {
case (id, it) =>
// read from redis, if not exists, create a new object and save to redis at the same time
val status: Option[Status] = CacheDB.readXorWriteNx[Status](id)
val series = it.toList
val goldp2p = series
.map { r =>
//the following found on executor pod
logger.debug("before_sliding||key={}||time={}|host={}", id, r(1).time, java.net.InetAddress.getLocalHost().getHostName())
r
}
.sliding(2, 1)
.map { r =>
//the following found on driverpod
logger.debug("after_sliding||key={}||time={}||host={}", id, r(1).time, java.net.InetAddress.getLocalHost().getHostName())
r
}.filter(_.size >= 2)
.map{p2p=>
// update status according some conditions; all the updates happen on driver
if{???} status.num = 1
else if {???} status.num = 2
....
}
//save to redis,however the following log found on executor again; but the result in redis is the initiated status
logger.debug("key={}||value={}||host={}", id, status, java.net.InetAddress.getLocalHost().getHostName())
CacheDB.write[Status](id, status)
and finally I changed my code like this , It's Ok , executed on executor and the result what I want .
(0 until series.size - 1).map {
i =>
List(series(i), series(i + 1))
}...