Cannot create a producer in Pulsar

10/24/2021

I am currently running Pulsar on a local Minikube instance. I am attempting to connect to the instance and create a producer using Python. After I install/start Pulsar, I get the following connection information:

|-----------|-------------------|-------------|---------------------------|
| NAMESPACE |       NAME        | TARGET PORT |            URL            |
|-----------|-------------------|-------------|---------------------------|
| pulsar    | pulsar-mini-proxy | http/80     | http://192.168.49.2:31183 |
|           |                   | pulsar/6650 | http://192.168.49.2:30841 |
|-----------|-------------------|-------------|---------------------------|
🏃  Starting tunnel for service pulsar-mini-proxy.
|-----------|-------------------|-------------|------------------------|
| NAMESPACE |       NAME        | TARGET PORT |          URL           |
|-----------|-------------------|-------------|------------------------|
| pulsar    | pulsar-mini-proxy |             | http://127.0.0.1:50069 |
|           |                   |             | http://127.0.0.1:50070 |
|-----------|-------------------|-------------|------------------------|

I have attempted to connect using all of the above URLs in my code. No matter which URL I use, I get either a connection refused or a timeout when trying to create a producer. Note, it's not the actual connection step that fails, but actually the producer = client... step that fails. My code/outputs are as follows:

Code (http://192.168.49.2:31183):

import pulsar

# create a producer
client = pulsar.Client('http://192.168.49.2:31183')
producer = client.create_producer('persistent://public/default/my-topic', block_if_queue_full=True, batching_enabled=True, batching_max_publish_delay_ms=10)

Output (Timeout):

2021-10-24 00:19:12.908 INFO  [0x70000ca06000] HTTPLookupService:237 | Curl Lookup Request sent for http://192.168.49.2:31183/admin/v2/persistent/public/default/my-topic/partitions
2021-10-24 00:19:42.909 ERROR [0x70000ca06000] HTTPLookupService:270 | Response failed for url http://192.168.49.2:31183/admin/v2/persistent/public/default/my-topic/partitions. Error Code 28
2021-10-24 00:19:42.909 ERROR [0x70000ca06000] ClientImpl:181 | Error Checking/Getting Partition Metadata while creating producer on persistent://public/default/my-topic -- TimeOut

Code (http://192.168.49.2:30841):

import pulsar

# create a producer
client = pulsar.Client('http://192.168.49.2:30841')
producer = client.create_producer('persistent://public/default/my-topic', block_if_queue_full=True, batching_enabled=True, batching_max_publish_delay_ms=10)

Output (Timeout):

2021-10-24 00:21:58.313 INFO  [0x7000108b5000] HTTPLookupService:237 | Curl Lookup Request sent for http://192.168.49.2:30841/admin/v2/persistent/public/default/my-topic/partitions
2021-10-24 00:22:28.314 ERROR [0x7000108b5000] HTTPLookupService:270 | Response failed for url http://192.168.49.2:30841/admin/v2/persistent/public/default/my-topic/partitions. Error Code 28
2021-10-24 00:22:28.314 ERROR [0x7000108b5000] ClientImpl:181 | Error Checking/Getting Partition Metadata while creating producer on persistent://public/default/my-topic -- TimeOut

Code (http://127.0.0.1:50069):

import pulsar

# create a producer
client = pulsar.Client('http://127.0.0.1:50069')
producer = client.create_producer('persistent://public/default/my-topic', block_if_queue_full=True, batching_enabled=True, batching_max_publish_delay_ms=10)

Output (ConnectError):

2021-10-24 00:23:54.336 INFO  [0x7000103da000] HTTPLookupService:237 | Curl Lookup Request sent for http://127.0.0.1:50069/admin/v2/persistent/public/default/my-topic/partitions
2021-10-24 00:23:54.337 ERROR [0x7000103da000] HTTPLookupService:262 | Response failed for url http://127.0.0.1:50069/admin/v2/persistent/public/default/my-topic/partitions. Error Code 7
2021-10-24 00:23:54.337 ERROR [0x7000103da000] ClientImpl:181 | Error Checking/Getting Partition Metadata while creating producer on persistent://public/default/my-topic -- ConnectError

Code (http://127.0.0.1:50070):

import pulsar

# create a producer
client = pulsar.Client('http://127.0.0.1:50070')
producer = client.create_producer('persistent://public/default/my-topic', block_if_queue_full=True, batching_enabled=True, batching_max_publish_delay_ms=10)

Output (ConnectError):

2021-10-24 00:27:00.336 INFO  [0x700011650000] HTTPLookupService:237 | Curl Lookup Request sent for http://127.0.0.1:50070/admin/v2/persistent/public/default/my-topic/partitions
2021-10-24 00:27:00.337 ERROR [0x700011650000] HTTPLookupService:262 | Response failed for url http://127.0.0.1:50070/admin/v2/persistent/public/default/my-topic/partitions. Error Code 7
2021-10-24 00:27:00.337 ERROR [0x700011650000] ClientImpl:181 | Error Checking/Getting Partition Metadata while creating producer on persistent://public/default/my-topic -- ConnectError
-- Snoop
apache-pulsar
kubernetes
minikube
python

2 Answers

10/24/2021

I don't have much experience with the Pulsar proxy, and I'm not exactly sure how the helm chart sets it up, but usually you will be connecting to Pulsar on port 6650 (6651 for TLS), or for the admin service, port 8080 (8443 for TLS.) Reference: https://pulsar.apache.org/docs/en/reference-configuration/

Those port numbers you used look like ports internal to your Kubernetes deployment. Typically, to connect to services inside Kubernetes, you need to create a NodePort or LoadBalancer. (In production, usually you'd use an Ingress to setup the LoadBalancer.) With minikube, there are some slight differences since you need to use minikube tunnel to expose a LoadBalancer. It's explained here: https://minikube.sigs.k8s.io/docs/handbook/accessing/

I recommend looking through the objects created by the helm chart and familiarizing yourself with how they work, especially the ones involved in networking, and make sure you understand what they're actually configuring for you.

-- devinbost
Source: StackOverflow

10/25/2021

As it turns out, the helm installation for Pulsar does create a LoadBalancer object. The LoadBalancer service object is called pulsar-mini-proxy. The correct address to reach the LoadBalancer service is:

pulsar://127.0.0.1:6650

The only other step I needed to do was:

minikube tunnel

-- Snoop
Source: StackOverflow