I'm attempting to create a custom exception handler for my Spring Cloud Dataflow stream to route some errors to be requeued and others to be DLQ'd.
To do this I'm utilizing the global Spring Integration "errorChannel" and routing based on exception type.
This is the code for the Spring Integration error router:
package com.acme.error.router;
import com.acme.exceptions.DlqException;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.transformer.MessageTransformationException;
import org.springframework.messaging.Message;
@MessageEndpoint
@EnableBinding({ ErrorMessageChannels.class })
public class ErrorMessageMappingRouter {
private static final Logger LOGGER = LoggerFactory.getLogger(ErrorMessageMappingRouter.class);
public static final String ERROR_CHANNEL = "errorChannel";
@Router(inputChannel = ERROR_CHANNEL)
public String onError(Message<Object> message) {
LOGGER.debug("ERROR ROUTER - onError");
if(message.getPayload() instanceof MessageTransformationException) {
MessageTransformationException exception = (MessageTransformationException) message.getPayload();
Message<?> failedMessage = exception.getFailedMessage();
if(exceptionChainContainsDlq(exception)) {
return ErrorMessageChannels.DLQ_QUEUE_NAME;
}
return ErrorMessageChannels.REQUEUE_CHANNEL;
}
return ErrorMessageChannels.DLQ_QUEUE_NAME;
}
...
}
The error router is picked up by each of the stream apps through a package scan on the Spring Boot App for each:
@ComponentScan(basePackages = { "com.acme.error.router" }
@SpringBootApplication
public class StreamApp {}
When this is deployed and run with the local Spring Cloud Dataflow server (version 1.5.0-RELEASE), and a DlqException is thrown, the message is successfully routed to the onError method in the errorRouter and then placed into the dlq topic.
However, when this is deployed as a docker container with SCDF Kubernetes server (also version 1.5.0-RELEASE), the onError method is never hit. (The log statement at the beginning of the router is never output)
In the startup logs for the stream apps, it looks like the bean is picked up correctly and registers as a listener for the errorChannel, but for some reason, when exceptions are thrown they do not get handled by the onError method in our router.
Startup Logs:
o.s.i.endpoint.EventDrivenConsumer : Adding {router:errorMessageMappingRouter.onError.router} as a subscriber to the 'errorChannel' channel
o.s.i.channel.PublishSubscribeChannel : Channel 'errorChannel' has 1 subscriber(s).
o.s.i.endpoint.EventDrivenConsumer : started errorMessageMappingRouter.onError.router
We are using all default settings for the spring cloud stream and kafka binder configurations:
spring.cloud:
stream:
binders:
kafka:
type: kafka
environment.spring.cloud.stream.kafka.binder.brokers=brokerlist
environment.spring.cloud.stream.kafka.binder.zkNodes=zklist
Edit: Added pod args from kubectl describe <pod>
Args:
--spring.cloud.stream.bindings.input.group=delivery-stream
--spring.cloud.stream.bindings.output.producer.requiredGroups=delivery-stream
--spring.cloud.stream.bindings.output.destination=delivery-stream.enricher
--spring.cloud.stream.binders.xdkafka.environment.spring.cloud.stream.kafka.binder.zkNodes=<zkNodes>
--spring.cloud.stream.binders.xdkafka.type=kafka
--spring.cloud.stream.binders.xdkafka.defaultCandidate=true
--spring.cloud.stream.binders.xdkafka.environment.spring.cloud.stream.kafka.binder.brokers=<brokers>
--spring.cloud.stream.bindings.input.destination=delivery-stream.config-enricher
One other idea we attempted was trying to use the Spring Cloud Stream - spring integration error channel support to send to a broker topic on errors, but since messages don't seem to be landing in the global Spring Integration errorChannel at all, that didn't work either.
Is there anything special we need to do in SCDF Kubernetes to enable the global Spring Integration errorChannel?
What am I missing here?
Update with solution from the comments:
After reviewing your configuration I am now pretty sure I know what the issue is. You have a multi-binder configuration scenario. Even if you only deal with a single binder instance the existence of spring.cloud.stream.binders.... is what's going to make framework treat it as multi-binder. Basically this a bug - github.com/spring-cloud/spring-cloud-stream/issues/1384. As you can see it was fixed but you need to upgrade to Elmhurst.SR2 or grab the latest snapshot (we're in RC2 and 2.1.0.RELEASE is in few weeks anyway) – Oleg Zhurakousky
This was indeed the problem with our setup. Instead of upgrading, we just eliminated our multi-binder usage for now and the issue was resolved.
Update with solution from the comments:
After reviewing your configuration I am now pretty sure I know what the issue is. You have a multi-binder configuration scenario. Even if you only deal with a single binder instance the existence of spring.cloud.stream.binders.... is what's going to make framework treat it as multi-binder. Basically this a bug - github.com/spring-cloud/spring-cloud-stream/issues/1384. As you can see it was fixed but you need to upgrade to Elmhurst.SR2 or grab the latest snapshot (we're in RC2 and 2.1.0.RELEASE is in few weeks anyway) – Oleg Zhurakousky
This was indeed the problem with our setup. Instead of upgrading, we just eliminated our multi-binder usage for now and the issue was resolved.