I am trying to access my Strimzi Kafka Cluster deployed in k8s cluster on AWS (kops not EKS) using kafka-python.
Here's the listener setup:
Here's the configuration:
spec:
kafka:
version: 2.5.0
replicas: 3
listeners:
external:
port: 9094
tls: false
type: loadbalancer
authentication:
type: scram-sha-512
configuration:
bootstrap:
host: kafka-bootstrap.mydomain.com
brokers:
- broker: 0
host: kafka-broker-0.mydomain.com
- broker: 1
host: kafka-broker-1.mydomain.com
- broker: 2
host: kafka-broker-2.mydomain.com
overrides:
bootstrap:
address: kafka-bootstrap.mydomain.com
dnsAnnotations:
# external-dns.alpha.kubernetes.io/hostname: kafka-bootstrap.mydomain.com
service.beta.kubernetes.io/aws-load-balancer-backend-protocol: tcp
service.beta.kubernetes.io/aws-load-balancer-ssl-cert: arn:aws:acm:....
service.beta.kubernetes.io/aws-load-balancer-ssl-ports: "9094"
brokers:
- advertisedHost: kafka-broker-0.mydomain.com
advertisedPort: 9094
broker: 0
dnsAnnotations:
# external-dns.alpha.kubernetes.io/hostname: kafka-broker-0.mydomain.com
service.beta.kubernetes.io/aws-load-balancer-backend-protocol: tcp
service.beta.kubernetes.io/aws-load-balancer-ssl-cert: arn:aws:acm:....
service.beta.kubernetes.io/aws-load-balancer-ssl-ports: "9094"
- advertisedHost: kafka-broker-1.mydomain.com
advertisedPort: 9094
broker: 1
dnsAnnotations:
# external-dns.alpha.kubernetes.io/hostname: kafka-broker-1.mydomain.com
service.beta.kubernetes.io/aws-load-balancer-backend-protocol: tcp
service.beta.kubernetes.io/aws-load-balancer-ssl-cert: arn:aws:acm:....
service.beta.kubernetes.io/aws-load-balancer-ssl-ports: "9094"
- advertisedHost: kafka-broker-2.mydomain.com
advertisedPort: 9094
broker: 2
dnsAnnotations:
external-dns.alpha.kubernetes.io/hostname: kafka-broker-2.mydomain.com
service.beta.kubernetes.io/aws-load-balancer-backend-protocol: tcp
service.beta.kubernetes.io/aws-load-balancer-ssl-cert: arn:aws:acm:....
service.beta.kubernetes.io/aws-load-balancer-ssl-ports: "9094"
I am able to access the cluster with the following :
Rest the same as the above configuration (only difference being no authentication)
The client configuration (I am using kafka-python)
consumer = KafkaConsumer(
bootstrap_servers=['kafka-bootstrap.mydomain.com:9094'],
client_id="test-consumer",
security_protocol="SSL",
api_version=(2, 5, 0),
)
But I am not able to access the cluster with SCRAM-SHA-512 auth enabled. I get the following error in the python client :
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com <disconnected> [unspecified None]>: creating new socket
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com <disconnected> [IPv4 ('x.x.x.x', 9094)]>: setting socket option (6, 1, 1)
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com <connecting> [IPv4 ('x.x.x.x', 9094)]>: connecting to kafka-bootstrap.mydomain.com [('x.x.x.x', 9094) IPv4]
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com<connecting> [IPv4 ('x.x.x.x', 9094)]>: established TCP connection
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com<connecting> [IPv4 ('x.x.x.x', 9094)]>: initiating SSL handshake
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com<handshake> [IPv4 ('x.x.x.x', 9094)]>: configuring default SSL Context
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com<handshake> [IPv4 ('x.x.x.x', 9094)]>: Loading system default SSL CAs from DefaultVerifyPaths(cafile=None, capath='/usr/lib/ssl/certs', openssl_cafile_env='SSL_CERT_FILE', openssl_cafile='/usr/lib/ssl/cert.pem', openssl_capath_env='SSL_CERT_DIR', openssl_capath='/usr/lib/ssl/certs')
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com <handshake> [IPv4 ('x.x.x.x', 9094)]>: wrapping socket in ssl context
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com <handshake> [IPv4 ('x.x.x.x', 9094)]>: completed SSL handshake.
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com <handshake> [IPv4 ('x.x.x.x', 9094)]>: initiating SASL authentication
DEBUG:kafka.protocol.parser:Sending request SaslHandShakeRequest_v0(mechanism='SCRAM-SHA-512')
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com <authenticating> [IPv4 ('x.x.x.x', 9094)]> Request 1: SaslHandShakeRequest_v0(mechanism='SCRAM-SHA-512')
DEBUG:kafka.protocol.parser:Received correlation id: 1
DEBUG:kafka.protocol.parser:Processing response SaslHandShakeResponse_v0
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com<authenticating> [IPv4 ('x.x.x.x', 9094)]> Response 1 (220.21484375 ms): SaslHandShakeResponse_v0(error_code=0, enabled_mechanisms=['SCRAM-SHA-512'])
ERROR:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com <authenticating> [IPv4 ('x.x.x.x', 9094)]>: Error receiving reply from server
Traceback (most recent call last):
File "/home/something/.local/lib/python3.8/site-packages/kafka/conn.py", line 692, in _try_authenticate_scram
(data_len,) = struct.unpack('>i', self._recv_bytes_blocking(4))
File "/home/something/.local/lib/python3.8/site-packages/kafka/conn.py", line 616, in _recv_bytes_blocking
raise ConnectionError('Connection reset during recv')
ConnectionError: Connection reset during recv
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com <authenticating> [IPv4 ('x.x.x.x', 9094)]>: Closing connection. KafkaConnectionError: <BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com <authenticating> [IPv4 ('x.x.x.x', 9094)]>: Connection reset during recv
DEBUG:kafka.client:Initializing connection to node bootstrap-0 for metadata request
and this error repeats as the client retries.
This is the python client configuration I am using that generates this error.
consumer = KafkaConsumer(
bootstrap_servers=['kafka-bootstrap.mydomain.com:9094'],
client_id="test-consumer",
security_protocol="SASL_SSL",
api_version=(2, 5, 0),
sasl_mechanism="SCRAM-SHA-512",
sasl_plain_username='someusername',
sasl_plain_password='somerandom'
)
Is this due to some error in the configuration on the Kafka Server/Client side? I have created a KafkaUser resource in k8s cluster successfully.
Or is it because Kafka needs SSL on the kafka server side (and not on the LoadBalancer side) to make SASL SCRAM-SHA-512 work? If so, is there any work around?