Getting error when trying to write a Spark dataframe into Elasticsearch

8/10/2021

I am having some issues when trying to write a Spark dataframe into elasticsearch. The entire setup is running in Kubernetes so there is no firewalls blocking any traffic. The Spark executors is also running on the same node as the driver.

This is the configuration:

So, by running the following code:

schema = StructType() \
        .add("id", StringType()) \
        .add("a", StringType()) \
        .add("b", StringType()) 

data = [("0f14d0ab-9601-4a62-a9e4-5ed26688389b", "2", "6")]
d = sc.createDataFrame(data=data,schema=schema)
ds = d.write \
    .format("org.elasticsearch.spark.sql") \
    .option("es.port","9200") \
    .option("es.nodes", "elastic-svc") \
    .option("es.nodes.wan.only","true") \
    .option("es.resource", "elastic-svc:9200") \
    .save()

I am getting this error:

Py4JJavaError: An error occurred while calling o413.save.
: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'
	at org.elasticsearch.hadoop.rest.InitializationUtils.discoverClusterInfo(InitializationUtils.java:348)
	at org.elasticsearch.spark.sql.ElasticsearchRelation.cfg$lzycompute(DefaultSource.scala:225)
	at org.elasticsearch.spark.sql.ElasticsearchRelation.cfg(DefaultSource.scala:223)
	at org.elasticsearch.spark.sql.ElasticsearchRelation.isEmpty(DefaultSource.scala:624)
	at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:110)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
	at org.apache.spark.sql.execution.SparkPlan$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.DataFrameWriter$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.execution.SQLExecution$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[elastic-svc:9200]] 
	at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:160)
	at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:432)
	at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:428)
	at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:388)
	at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:392)
	at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:168)
	at org.elasticsearch.hadoop.rest.RestClient.mainInfo(RestClient.java:745)
	at org.elasticsearch.hadoop.rest.InitializationUtils.discoverClusterInfo(InitializationUtils.java:338)
	... 35 more

How can this be resolved?

-- toerq
apache-spark
elasticsearch
kubernetes

0 Answers