Trying to get spring cloud data flow stream work in Kubernetes and Getting below error when the Source try to send message to kafka. The same code worked fine when i used Rabbit MQ.Kafka Server version Kafka 2.1.0 . I read in other posts that this could be due to kafka version incompatibility with the client bundled with a lower kafka version .Wondering how can i get this working.
CODE For SENDING MESSAGE TO THE KAFKA CHANNEL
public void sendToChannelNew(List<String> list,String name) {
MessageBuilder builder=MessageBuilder.withPayload(list.toString());
builder=builder.setHeader("SOURCE",name);
source.output().send(builder.build());
}
POM
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>2.1.4.RELEASE</version>
</dependency>
STACK TRACE
KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@5a4a522a]; nested exception is java.lang.IllegalArgumentException: Magic v1 does not support record headers, failedMessage=GenericMessage [payload=byte[271962], headers={SOURCE=CLIENT
entType=application/json, timestamp=1574199924374}]' [enable DEBUG level for full stacktrace] was thrown by a user handler's exceptionCaught() method while handling the following exception:
reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@5ArgumentException: Magic v1 does not support record headers, failedMessage=GenericMessage [payload=byte[271962], headers={SOURCE=ACCOUNT_CLIENT, id=2dcee73c-9da0-72fe-d5f7-07a2dba5b099, contentType=application/json, timestamp=1574199924374}]
Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@5a4a522a]; nested exception is java.lang.Il
rt record headers
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:179)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1077)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at com.jpmorgan.awm.wm.edpipoc.EdpiSource.sendToChannelNew(EdpiSource.java:73)
at com.jpmorgan.awm.wm.edpipoc.EdpiSource.lambda$null$2(EdpiSource.java:134)
at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:73)
at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:32)
at reactor.core.publisher.FluxSubscribeOnCallable$CallableSubscribeOnSubscription.run(FluxSubscribeOnCallable.java:225)
at reactor.core.scheduler.ImmediateScheduler.schedule(ImmediateScheduler.java:47)
at reactor.core.publisher.MonoSubscribeOnCallable.subscribe(MonoSubscribeOnCallable.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:3694)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:442)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:244)
at reactor.core.publisher.FluxBuffer$BufferExactSubscriber.onComplete(FluxBuffer.java:183)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onComplete(MonoFlatMapMany.java:248)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:144)
at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onComplete(FluxContextStart.java:122)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:262)
at reactor.core.publisher.DrainUtils.postCompleteDrain(DrainUtils.java:131)
at reactor.core.publisher.DrainUtils.postComplete(DrainUtils.java:186)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onComplete(FluxBufferPredicate.java:285)
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:325)
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:638)
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onComplete(FluxFlattenIterable.java:259)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:372)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:196)
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:337)
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:334)
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:381)
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:522)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:141)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1436)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1203)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1247)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:502)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:441)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:427)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:328)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Magic v1 does not support record headers
at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:410)
at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:449)
at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:506)
at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:529)
at org.apache.kafka.clients.producer.internals.ProducerBatch.tryAppend(ProducerBatch.java:107)
at org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:223)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:864)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:444)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:372)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:209)
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:382)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
The issue was with kakfka broker used was old (0.10.0) not supporting headers .switched to a higher version and got this fixed