Unable to Connect Python App to RabbitMQ in Kube pods using ClusterIP

1/21/2019

I am having trouble linking up my python application pod to my rabbit mq pod. I am new to the k8s world - so any and all help is much appreciated. This steps I have taken are as follows:

  • Building docker image tagged as adapteremulator-container:latest

  • applying config files

  • running docker build adapteremulator-container:latest .

I am getting this connection error.

Traceback (most recent call last):
  File "Emulator.py", line 17, in <module>
    RMQ = rabbit(config["rabbitMQ"])
  File "/app/RabbitClass.py", line 20, in __init__
    self.createChannel()
  File "/app/RabbitClass.py", line 30, in createChannel
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, port=self.port, virtual_host=self.virtualHost, credentials=self.credentials))
  File "/usr/local/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 377, in __init__
    self._process_io_for_connection_setup()
  File "/usr/local/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 417, in _process_io_for_connection_setup
    self._open_error_result.is_ready)
  File "/usr/local/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 469, in _flush_output
    raise maybe_exception
  File "/usr/local/lib/python3.7/site-packages/pika/adapters/base_connection.py", line 176, in _adapter_connect
    socket.IPPROTO_TCP)
  File "/usr/local/lib/python3.7/site-packages/pika/adapters/base_connection.py", line 304, in _getaddrinfo
    return socket.getaddrinfo(host, port, family, socktype, proto)
  File "/usr/local/lib/python3.7/socket.py", line 748, in getaddrinfo
    for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
socket.gaierror: [Errno -3] Try again

Dockerfile:

    FROM python:3.7-alpine
WORKDIR /app
COPY requirements.txt ./Emulator/ /app/
RUN apk add --no-cache build-base --virtual .install-deps \
    && ln -s /usr/include/locale.h /usr/include/xlocale.h \
    && pip install --no-cache-dir -r requirements.txt \
    && apk del .install-deps
EXPOSE 5000
ENTRYPOINT [ "python" ]
CMD [ "emulator.py" ]

Here are my 3 config files:

 apiVersion: apps/v1
kind: Deployment
metadata:
  name: emulator-deployment
spec:
  replicas: 1
  selector:
    matchLabels:
      component: adapteremulator
  template:
    metadata:
      labels:
        component: adapteremulator
    spec:
      containers:
        - name: adapteremulator-container
          image: adapteremulator-container:latest
          imagePullPolicy: "IfNotPresent"

# rabbitmq-management-deploy.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: rabbitmq-deploy
spec:
  replicas: 1
  selector:
    matchLabels:
      name: rabbitmq-pod
  template:
    metadata:
      labels:
        name: rabbitmq-pod
    spec:
      restartPolicy: Always
      containers:
      - name: rabbitmq-container
        image: rabbitmq:3.7.8-management

apiVersion: v1
kind: Service
metadata:
  name: rabbitmq-cluster-ip-service
spec:
  type: ClusterIP
  selector:
    name: rabbitmq-pod
  ports:
    - port: 5672
      targetPort: 5672

Here is my JobConfig & RabbitMQ

class rabbit():
def __init__(self, config):
    #unpack configuration
    self.host = config["host"]
    self.port = config["port"]
    self.user = config["user"]
    self.password = config["password"]
    self.virtualHost = config["virtualHost"]
    self.credentials = pika.PlainCredentials(self.user, self.password)
    #set internal variables
    self.id = uuid.uuid4()
    self.createChannel()
    self.queues = {}
    self.messageSendHistory = []
    self.messageReceiveHistory = []
    self.activeQueue = None
    self.activeQueueName = None
    self.callbackFunction = None

def createChannel(self):
    #establish connection with RabbitMQ server
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, port=self.port, virtual_host=self.virtualHost, credentials=self.credentials))
    channel = connection.channel()
    self.channel = channel
    print("\nRabbitMQ channel successfully created: channel == {0} >>> host == {1} >>> port == {2} >>> virtual host == {3}\n".format(channel, self.host, self.port, self.virtualHost))
    return channel

def createQueue(self, name):
    #create new queue on channel & make active
    newQueue = self.channel.queue_declare(name)
    self.queues.update({name: newQueue})
    self.activateQueue(name)
    print("\nRabbitMQ queue successfully created: name == {0} >>> queue == {1}\n".format(name, newQueue))
    return newQueue

def activateQueue(self, name):
    #make queue active
    self.activeQueueName = name
    self.activeQueue = self.queues[name]
    print("\nRabbitMQ queue successfully set as active: name == {0} >>> queue == {1}\n".format(self.activeQueueName, self.activeQueue))

def produce(self, msg):
    #send message to queue
    #setup exchange
    #routing_key is queue name
    #body is actual message
    self.channel.basic_publish(exchange="", routing_key=self.activeQueueName, body=msg)
    self.messageSendHistory.append([self.activeQueueName, msg])
    print("\nmsg == {0} >>>  queue: {1}\n".format(msg, self.activeQueueName))
    print("Message successfully added to send history\n")

def callback(self, ch, method, properties, msg):
    #callback function for receiving messages
    #add consumed message to queue of alarm messages
    self.messageReceiveHistory.append([self.activeQueueName, msg])
    print("\nqueue: {0} >>>  msg: {1}".format(self.activeQueueName, msg))
    print("Message successfully consumed from queue and added to receive history\n")
    if self.callbackFunction != None:
        self.callbackFunction(ch, method, properties, msg)
    print("\nWaiting to receive messages from queue: name == {0}".format(self.activeQueueName))
    return

def consume(self, callbackFunction):
    #infinite loop to consume messages from the queue
    #assign callback function to receive messages from queue
    self.callbackFunction = callbackFunction
    self.channel.basic_consume(self.callback, queue=self.activeQueueName, no_ack=True)
    print("\nWaiting to receive messages from queue: name == {0}".format(self.activeQueueName))
    self.channel.start_consuming()

def close(self):
    #gently close the connection
    #flushes network buffer
    self.connection.close()
    print("\nRabbitMQ connection successfully closed: {0}\n".format(self.connection))

def purgeQueue(self):
    #purge all messages from a given queue
    self.channel.queue_purge(queue=self.activeQueueName)
    print("\nRabbitMQ queue successfully purged: name == {0}".format(self.activeQueueName))

      Job Config
    {
  "seconds": 1000,
  "rabbitMQ": {
    "host": "rabbitmq-cluster-ip-service",
    "port": 5672,
    "user": "guest",
    "password": "guest",
    "virtualHost": "/"
  },
-- Chris Marsh
docker
kubernetes
rabbitmq

0 Answers