I have a local kubernetes cluster and I'm trying to create an SCDF stream as follows:
file --file.directory=/home | aggregator --aggregator.group-timeout=10000 --aggregator.correlation=T(Thread).currentThread().id --spring.cloud.stream.bindings.output.contentType=application/octet-stream aggregator.aggregation=#root.![payload].toString() | log
What I'm trying to achieve is the following: any file (any # and any type) that's being placed under the /home
folder in the file pod should be passed on to the aggregator that combines the contents (byte[]) of the files into one and that content should be logged out in the log pod.
The stream deploys just fine, but the pipeline gets stuck in the aggregator with the following error:
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'output'; nested exception is java.lang.IllegalStateException: Failed to convert message: 'GenericMessage [payload=[[B@7eb1bcc6], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=aha.file, amqp_deliveryTag=1, file_name=dasads, deliveryAttempt=1, amqp_consumerQueue=aha.file.aha, amqp_redelivered=false, file_originalFile=/home/dasads, file_relativePath=dasads, amqp_receivedRoutingKey=aha.file, amqp_timestamp=Wed Mar 04 02:09:09 GMT 2020, amqp_messageId=32196ebe-c66c-a3c6-c647-47d297385963, id=2b72fb4d-0259-62cc-d2d4-c9b06b9b4afe, amqp_consumerTag=amq.ctag-tTOyFxlowoCV9AE_Gmj0BQ, contentType=application/octet-stream, timestamp=1583287759048}]' to outbound message.
at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:166) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:483) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.1.9.RELEASE.jar!/:5.1.9.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.1.9.RELEASE.jar!/:5.1.9.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.1.9.RELEASE.jar!/:5.1.9.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.1.9.RELEASE.jar!/:5.1.9.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:431) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:284) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:265) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:223) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:823) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:785) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.expireGroup(AbstractCorrelatingMessageHandler.java:759) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.forceComplete(AbstractCorrelatingMessageHandler.java:680) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler$ForceReleaseMessageGroupProcessor.processMessageGroup(AbstractCorrelatingMessageHandler.java:940) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.processForceRelease(AbstractCorrelatingMessageHandler.java:597) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.lambda$scheduleGroupToForceComplete$3(AbstractCorrelatingMessageHandler.java:567) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.1.9.RELEASE.jar!/:5.1.9.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_192]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_192]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_192]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_192]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_192]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_192]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_192]
Caused by: java.lang.IllegalStateException: Failed to convert message: 'GenericMessage [payload=[[B@7eb1bcc6], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=aha.file, amqp_deliveryTag=1, file_name=dasads, deliveryAttempt=1, amqp_consumerQueue=aha.file.aha, amqp_redelivered=false, file_originalFile=/home/dasads, file_relativePath=dasads, amqp_receivedRoutingKey=aha.file, amqp_timestamp=Wed Mar 04 02:09:09 GMT 2020, amqp_messageId=32196ebe-c66c-a3c6-c647-47d297385963, id=2b72fb4d-0259-62cc-d2d4-c9b06b9b4afe, amqp_consumerTag=amq.ctag-tTOyFxlowoCV9AE_Gmj0BQ, contentType=application/octet-stream, timestamp=1583287759048}]' to outbound message.
at org.springframework.cloud.stream.binding.MessageConverterConfigurer$OutboundContentTypeConvertingInterceptor.doPreSend(MessageConverterConfigurer.java:389) ~[spring-cloud-stream-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
at org.springframework.cloud.stream.binding.MessageConverterConfigurer$AbstractContentTypeInterceptor.preSend(MessageConverterConfigurer.java:423) ~[spring-cloud-stream-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:608) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:443) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
... 24 common frames omitted
I even tried to explicitly specify the output type as application/octet-stream
in the aggregator to match what the file source is producing but no luck.. What am I missing? Thanks for any info!
Dependency versions: