I am having trouble linking up my python application pod to my rabbit mq pod. I am new to the k8s world - so any and all help is much appreciated. This steps I have taken are as follows:
Building docker image tagged as adapteremulator-container:latest
applying config files
running docker build adapteremulator-container:latest .
I am getting this connection error.
Traceback (most recent call last):
File "Emulator.py", line 17, in <module>
RMQ = rabbit(config["rabbitMQ"])
File "/app/RabbitClass.py", line 20, in __init__
self.createChannel()
File "/app/RabbitClass.py", line 30, in createChannel
connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, port=self.port, virtual_host=self.virtualHost, credentials=self.credentials))
File "/usr/local/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 377, in __init__
self._process_io_for_connection_setup()
File "/usr/local/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 417, in _process_io_for_connection_setup
self._open_error_result.is_ready)
File "/usr/local/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 469, in _flush_output
raise maybe_exception
File "/usr/local/lib/python3.7/site-packages/pika/adapters/base_connection.py", line 176, in _adapter_connect
socket.IPPROTO_TCP)
File "/usr/local/lib/python3.7/site-packages/pika/adapters/base_connection.py", line 304, in _getaddrinfo
return socket.getaddrinfo(host, port, family, socktype, proto)
File "/usr/local/lib/python3.7/socket.py", line 748, in getaddrinfo
for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
socket.gaierror: [Errno -3] Try again
Dockerfile:
FROM python:3.7-alpine
WORKDIR /app
COPY requirements.txt ./Emulator/ /app/
RUN apk add --no-cache build-base --virtual .install-deps \
&& ln -s /usr/include/locale.h /usr/include/xlocale.h \
&& pip install --no-cache-dir -r requirements.txt \
&& apk del .install-deps
EXPOSE 5000
ENTRYPOINT [ "python" ]
CMD [ "emulator.py" ]
Here are my 3 config files:
apiVersion: apps/v1
kind: Deployment
metadata:
name: emulator-deployment
spec:
replicas: 1
selector:
matchLabels:
component: adapteremulator
template:
metadata:
labels:
component: adapteremulator
spec:
containers:
- name: adapteremulator-container
image: adapteremulator-container:latest
imagePullPolicy: "IfNotPresent"
# rabbitmq-management-deploy.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: rabbitmq-deploy
spec:
replicas: 1
selector:
matchLabels:
name: rabbitmq-pod
template:
metadata:
labels:
name: rabbitmq-pod
spec:
restartPolicy: Always
containers:
- name: rabbitmq-container
image: rabbitmq:3.7.8-management
apiVersion: v1
kind: Service
metadata:
name: rabbitmq-cluster-ip-service
spec:
type: ClusterIP
selector:
name: rabbitmq-pod
ports:
- port: 5672
targetPort: 5672
Here is my JobConfig & RabbitMQ
class rabbit():
def __init__(self, config):
#unpack configuration
self.host = config["host"]
self.port = config["port"]
self.user = config["user"]
self.password = config["password"]
self.virtualHost = config["virtualHost"]
self.credentials = pika.PlainCredentials(self.user, self.password)
#set internal variables
self.id = uuid.uuid4()
self.createChannel()
self.queues = {}
self.messageSendHistory = []
self.messageReceiveHistory = []
self.activeQueue = None
self.activeQueueName = None
self.callbackFunction = None
def createChannel(self):
#establish connection with RabbitMQ server
connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, port=self.port, virtual_host=self.virtualHost, credentials=self.credentials))
channel = connection.channel()
self.channel = channel
print("\nRabbitMQ channel successfully created: channel == {0} >>> host == {1} >>> port == {2} >>> virtual host == {3}\n".format(channel, self.host, self.port, self.virtualHost))
return channel
def createQueue(self, name):
#create new queue on channel & make active
newQueue = self.channel.queue_declare(name)
self.queues.update({name: newQueue})
self.activateQueue(name)
print("\nRabbitMQ queue successfully created: name == {0} >>> queue == {1}\n".format(name, newQueue))
return newQueue
def activateQueue(self, name):
#make queue active
self.activeQueueName = name
self.activeQueue = self.queues[name]
print("\nRabbitMQ queue successfully set as active: name == {0} >>> queue == {1}\n".format(self.activeQueueName, self.activeQueue))
def produce(self, msg):
#send message to queue
#setup exchange
#routing_key is queue name
#body is actual message
self.channel.basic_publish(exchange="", routing_key=self.activeQueueName, body=msg)
self.messageSendHistory.append([self.activeQueueName, msg])
print("\nmsg == {0} >>> queue: {1}\n".format(msg, self.activeQueueName))
print("Message successfully added to send history\n")
def callback(self, ch, method, properties, msg):
#callback function for receiving messages
#add consumed message to queue of alarm messages
self.messageReceiveHistory.append([self.activeQueueName, msg])
print("\nqueue: {0} >>> msg: {1}".format(self.activeQueueName, msg))
print("Message successfully consumed from queue and added to receive history\n")
if self.callbackFunction != None:
self.callbackFunction(ch, method, properties, msg)
print("\nWaiting to receive messages from queue: name == {0}".format(self.activeQueueName))
return
def consume(self, callbackFunction):
#infinite loop to consume messages from the queue
#assign callback function to receive messages from queue
self.callbackFunction = callbackFunction
self.channel.basic_consume(self.callback, queue=self.activeQueueName, no_ack=True)
print("\nWaiting to receive messages from queue: name == {0}".format(self.activeQueueName))
self.channel.start_consuming()
def close(self):
#gently close the connection
#flushes network buffer
self.connection.close()
print("\nRabbitMQ connection successfully closed: {0}\n".format(self.connection))
def purgeQueue(self):
#purge all messages from a given queue
self.channel.queue_purge(queue=self.activeQueueName)
print("\nRabbitMQ queue successfully purged: name == {0}".format(self.activeQueueName))
Job Config
{
"seconds": 1000,
"rabbitMQ": {
"host": "rabbitmq-cluster-ip-service",
"port": 5672,
"user": "guest",
"password": "guest",
"virtualHost": "/"
},