Create Apache Pulsar Sink on Kubernetes cluster with Java API

1/11/2022

I am trying to create a Clickhouse sink connector with the Java API on a distant Pulsar cluster running on kubernetes but I experience some difficulties with it.

My cluster run on pulsar 2.8.1

pulsarAdmin.sinks().createSinkWithUrl(
     mySinkConfig,
     "https://archive.apache.org/dist/pulsar/pulsar-2.8.1/connectors/pulsar-io-jdbc-clickhouse-2.8.1.nar")

The api returns well and seems to create the sink. I can get its status or its configuration however the status is in failure and when checking the pod on kubernetes I see the pod corresponding to the new sink crashing

NAME                                            READY   STATUS             RESTARTS   AGE
pf-my-tenant-test-sink6-ab222ce8-0        0/1     CrashLoopBackOff   15         57m

with this in the k8s logs

null

Reason: java.io.IOException: No such file or directory

Here is the command used by pulsar when creating the pod

sh -c
      /pulsar/bin/pulsar-admin 
            --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken
            --auth-params file:///etc/auth/token
            --admin-url https://XXXX:8443/ functions download 
            --tenant my-tenant
            --namespace test 
            --name sink6 
            --destination-file /pulsar/download/pulsar_functions/functions17103366778764930042.tmp && SHARD_ID=${POD_NAME##*-} && echo shardId=${SHARD_ID} && exec java -cp /pulsar/instances/java-instance.jar:/pulsar/instances/deps/* 
            -Dpulsar.functions.extra.dependencies.dir=/pulsar/instances/deps -Dpulsar.functions.instance.classpath=/pulsar/conf:::/pulsar/lib/*: 
            -Dlog4j.configurationFile=kubernetes_instance_log4j2.xml -Dpulsar.function.log.dir=logs/functions/my-tenant/test/sink6 
            -Dpulsar.function.log.file=sink6-$SHARD_ID -Xmx1073741824 org.apache.pulsar.functions.instance.JavaInstanceMain 
            --jar /pulsar/download/pulsar_functions/functions17103366778764930042.tmp --instance_id $SHARD_ID --function_id 16d9bcab-abcd-2f4b-b536-d3fb5d1232ab 
            --function_version 8807b42e-b1fc-4495-862e-21fe27085eb7 
            --function_details '{"tenant":"my-tenant","namespace":"test","name":"sink6","className":"org.apache.pulsar.functions.api.utils.IdentityFunction","autoAck":true,"parallelism":1,"source":{"typeClassName":"org.apache.pulsar.client.api.schema.GenericRecord","inputSpecs":{"my-topic":{}},"cleanupSubscription":true},"sink":{"className":"org.apache.pulsar.io.jdbc.ClickHouseJdbcAutoSchemaSink","configs":"{\"userName\":\"XXXXX\",\"password\":\"XXXX\",\"jdbcUrl\":\"jdbc:clickhouse://XXXXXX\",\"tableName\":\"XXXXXXX\"}","typeClassName":"org.apache.pulsar.client.api.schema.GenericRecord"},"resources":{"cpu":1.0,"ram":"1234","disk":"1234"},"componentType":"SINK"}' 
            --pulsar_serviceurl pulsar+ssl://XXXX:6651/ 
            --client_auth_plugin org.apache.pulsar.client.impl.auth.AuthenticationToken
            --client_auth_params file:///etc/auth/token 
            --use_tls false 
            --tls_allow_insecure false 
            --hostname_verification_enabled false 
            --max_buffered_tuples 1024 
            --port 9093 
            --metrics_port 39809 
            --pending_async_requests 1000 
            --expected_healthcheck_interval -1 
            --secrets_provider org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider 
            --cluster_name pulsar-ofgebi 
            --nar_extraction_directory /tmp

Does anyone has any idea why creating a sink with the Java API could result in such error ?

-- Guillaume Roffé
apache-pulsar
java
kubernetes

0 Answers