I am a Java developer working on creating a bpmn engine using Activiti deployed on a GKE Cluster. Currently, I have the cloud application set up and a runtime bundle with a bpmn process as well. With my cloud connector, I want it to interact with a job processing service and relay back the status once the job is finished. But after setting up everything I am encountering this error related to RabbitMQ queues that I cannot debug or am able to solve anywhere. I am new to rabbitMQ so any help would be highly appreciated please. Here is the error log :
For the rabbitMq messagingScheduler, here are the following classes I can share for more context:
MessageScheduler.java:
import java.util.Map;
import org.activiti.cloud.api.process.model.IntegrationRequest;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service("messageScheduler")
public class MessageScheduler {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private Exchange MessageExchange;
@Value("${spring.application.name}")
private String appName;
public String getRoutingKey(String name) {
return appName+".service."+name;
}
public String getQueueName(String Name) {
return getRoutingKey(Name)+"-queue";
}
public void schedule(String name, IntegrationRequest event, Map<String, Object> headers, Integer delay) {
String routingKey = getRoutingKey(name);
rabbitTemplate.convertAndSend(MessageExchange.getName(), routingKey, event, new StatusMessagePostProcessor(headers, delay));
}
class StatusMessagePostProcessor implements MessagePostProcessor {
private final Integer delay;
private final Map<String, Object> headers;
public StatusMessagePostProcessor(Map<String, Object> headers, Integer delay) {
this.headers = headers;
this.delay = delay;
}
@Override
public Message postProcessMessage(Message message)
throws AmqpException {
message.getMessageProperties().setDelay(delay);
if(headers != null) {
headers.forEach((key, value) -> {
message.getMessageProperties().setHeader(key, value);
});
}
return message;
}
}
}
MessageSchedulerConfiguration.java:
package org.activiti.cloud.connector;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import com.fasterxml.jackson.databind.ObjectMapper;
@Configuration
@EnableScheduling
public class MessageSchedulerConfiguration {
@Value("${spring.application.name}")
private String appName;
@Bean
public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory,
Jackson2JsonMessageConverter producerJackson2MessageConverter) {
final var rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(producerJackson2MessageConverter);
return rabbitTemplate;
}
@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter(ObjectMapper objectMapper) {
return new Jackson2JsonMessageConverter(objectMapper);
}
@Bean
public Exchange jobsMessageExchange() {
return ExchangeBuilder.directExchange("service-message.exchange")
.delayed()
.durable(true)
.build();
}
}
CloudConnector.java (Relevant Bits):
package org.activiti.cloud.connector.impl;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import org.activiti.cloud.api.process.model.IntegrationRequest;
import org.activiti.cloud.api.process.model.IntegrationResult;
import org.activiti.cloud.connector.MessageScheduler;
import org.activiti.cloud.connectors.starter.channels.IntegrationResultSender;
import org.activiti.cloud.connectors.starter.configuration.ConnectorProperties;
import org.activiti.cloud.connectors.starter.model.IntegrationResultBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.http.HttpStatus;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.integration.support.StringObjectMapBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.web.client.ResponseErrorHandler;
import org.springframework.web.client.RestTemplate;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@Component
@EnableBinding(Channels.class)
public class Connector {
private final Logger logger = LoggerFactory.getLogger(Connector.class);
public static final String DELAY_BACKOFF = "x-delay";
public static final String NUM_RETRIES = "numRetries";
@Value("${spring.application.name}")
private String appName;
@Autowired
private ConnectorProperties connectorProperties;
@Autowired
private ObjectMapper objectMapper;
@Autowired
private MessageScheduler messageScheduler;
private RestTemplate restTemplate = new RestTemplateBuilder()
.errorHandler(new RestResponseErrorHandler())
.setReadTimeout(Duration.ofSeconds(30))
.setConnectTimeout(Duration.ofSeconds(60))
.build();
private final IntegrationResultSender integrationResultSender;
public CloudConnector(IntegrationResultSender integrationResultSender) {
this.integrationResultSender = integrationResultSender;
}
@SuppressWarnings("unchecked")
@StreamListener(value = Channels.EXAMPLE_CONNECTOR_CONSUMER)
public void performTask(IntegrationRequest event) throws InterruptedException {
/**
Get relevant info from incoming message, create an api call to service to recieve an ID, and relay the id to another api call to check status of service.
**/
if(!StringUtils.isEmpty(XXXID)) {
// Let's schedule job status check message with delay
Map<String, Object> headers = new StringObjectMapBuilder()
.put(XXXID, I)
.get();
messageScheduler.schedule(type, event, headers, delay);
} else {
sendIntegrationResult(event, Collections.singletonMap("error", true));
}
}
public void sendIntegrationResult(IntegrationRequest event, Map<String, Object> results) {
Message<IntegrationResult> message = IntegrationResultBuilder.resultFor(event, connectorProperties)
.withOutboundVariables(results)
.buildMessage();
integrationResultSender.send(message);
}
@RabbitListener(
bindings=@QueueBinding(
value=@org.springframework.amqp.rabbit.annotation.Queue(name="#{@messageScheduler.getQueueName('Service')}", durable="true"),
exchange=@Exchange(name="#{@serviceMessageExchange.getName()}", delayed="true", durable="true"),
key="#{@messageScheduler.getRoutingKey('service')}"
)
)
public void handleService(@Payload IntegrationRequest event,
@Header(XXXId) String XXXID,
@Header(name=NUM_RETRIES, defaultValue="0") Integer numRetries,
@Header(name=DELAY_BACKOFF, defaultValue="1") Integer delayBackoff) {
logger.info(">> Check status for: " + XXXId);
try {
JsonNode response = restTemplate.getForObject(statusUrl, JsonNode.class, XXXId);
String status = response.get(status).asText();
if("Finished".equalsIgnoreCase(status)) {
JsonNode result = restTemplate.getForObject(resulturl, JsonNode.class, XXXId);
// Let's send result back to runtime bundle
sendIntegrationResult(event, toMap(result));
return;
}
else if (!"Running".equalsIgnoreCase(status) && !"Ready".equalsIgnoreCase(status)) {
// Let's send result back to runtime bundle
sendIntegrationResult(event, toMap(response));
return;
}
} catch(Exception e) {
logger.error(e.getMessage());
numRetries++;
delayBackoff *= 2;
}
if(numRetries > maxRetries) {
logger.error();
Map<String, Object> results = new StringObjectMapBuilder()
.put(appName, HttpStatus.SERVICE_UNAVAILABLE.toString())
.get();
// Let's send result back to runtime bundle
sendIntegrationResult(event, results);
return;
}
Map<String, Object> headers = new StringObjectMapBuilder()
.put(XXXID, XXXid)
.put(NUM_RETRIES, numRetries)
.put(DELAY_BACKOFF, delayBackoff)
.get();
messageScheduler.schedule(type, event, headers, delay * delayBackoff);
}
public class RestResponseErrorHandler implements ResponseErrorHandler {
@Override
public void handleError(ClientHttpResponse response) throws IOException {
logger.error("Response error: {} {}", response.getStatusCode(), response.getStatusText());
}
@Override
public boolean hasError(ClientHttpResponse response) throws IOException {
HttpStatus statusCode = HttpStatus.resolve(response.getRawStatusCode());
return statusCode.isError();
}
}
private Map<String, Object> toMap(JsonNode jsonNode) {
return objectMapper.convertValue(jsonNode, new TypeReference<Map<String,Object>>() {});
}
private JsonNode toJsonNode(Map<String, Object> map) {
return objectMapper.convertValue(map, JsonNode.class);
}
}
**ERROR LOG ON GKE POD: **
o.s.a.r.listener.BlockingQueueConsumer : Failed to declare queue: runtimeCmdResults.anonymous.pTfeJVxdS-S2iG0GI6Oe3g
o.s.a.r.listener.BlockingQueueConsumer : Queue declaration failed; retries left=3
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[runtimeCmdResults.anonymous.pTfeJVxdS-S2iG0GI6Oe3g]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:710) ~[spring-rabbit-2.1.3.RELEASE.jar!/:2.1.3.RELEASE] at
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.passiveDeclarations(BlockingQueueConsumer.java:594) ~[spring-rabbit-2.1.3.RELEASE.jar!/:2.1.3.RELEASE] at
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:581) ~[spring-rabbit-2.1.3.RELEASE.jar!/:2.1.3.RELEASE] at
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1196)
~[spring-rabbit-2.1.3.RELEASE.jar!/:2.1.3.RELEASE] at
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1041) ~[spring-rabbit-2.1.3.RELEASE.jar!/:2.1.3.RELEASE] at
java.base/java.lang.Thread.run(Thread.java:834) ~[na:na] Caused by: java.io.IOException: null at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:126) ~[amqp-client-5.4.3.jar!/:5.4.3] at
com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:122) ~[amqp-client-5.4.3.jar!/:5.4.3] at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:144) ~[amqp-client-5.4.3.jar!/:5.4.3] at
com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:1006) ~[amqp-client-5.4.3.jar!/:5.4.3] at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:52) ~[amqp-client-5.4.3.jar!/:5.4.3] at
jdk.internal.reflect.GeneratedMethodAccessor108.invoke(Unknown Source) ~[na:na] at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na] at
java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na] at
org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1110) ~[spring-rabbit-2.1.3.RELEASE.jar!/:2.1.3.RELEASE] at
com.sun.proxy.$Proxy137.queueDeclarePassive(Unknown Source) ~[na:na] at
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:689) ~[spring-rabbit-2.1.3.RELEASE.jar!/:2.1.3.RELEASE] ... 5 common frames omitted Caused by:
com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue
'runtimeCmdResults.anonymous.pTfeJVxdS-S2iG0GI6Oe3g' in vhost '/', class-id=50, method-id=10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-5.4.3.jar!/:5.4.3] at
com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) ~[amqp-client-5.4.3.jar!/:5.4.3] at
com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494) ~[amqp-client-5.4.3.jar!/:5.4.3] at
com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:288) ~[amqp-client-5.4.3.jar!/:5.4.3] at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:138) ~
[amqp-client-5.4.3.jar!/:5.4.3] ... 13 common frames omitted Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method:
#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'runtimeCmdResults.anonymous.pTfeJVxdS-S2iG0GI6Oe3g' in vhost '/', class-id=50, method-id=10) at
com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:516) ~[amqp-client-5.4.3.jar!/:5.4.3] at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346) ~[amqp-client-5.4.3.jar!/:5.4.3] at
com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:178) ~[amqp-client-5.4.3.jar!/:5.4.3]
this image shows an error which I believe is the root cause of everything: o.s.a.r.c.CachingConnectionFactory : Channel shutdown: connection error; protocol method: #method(reply-code=503, reply-text=COMMAND_INVALID - unknown exchange type 'x-delayed-message', class-id=40, method-id=10)
Any help would be highly appreciated. Thanks.