spring cloud dataflow kafka binder error on sending - Magic v1 does not support record headers

11/19/2019

Trying to get spring cloud data flow stream work in Kubernetes and Getting below error when the Source try to send message to kafka. The same code worked fine when i used Rabbit MQ.Kafka Server version Kafka 2.1.0 . I read in other posts that this could be due to kafka version incompatibility with the client bundled with a lower kafka version .Wondering how can i get this working.

CODE For SENDING MESSAGE TO THE KAFKA CHANNEL

   public  void sendToChannelNew(List<String> list,String name)  {        
        MessageBuilder builder=MessageBuilder.withPayload(list.toString());
        builder=builder.setHeader("SOURCE",name);
        source.output().send(builder.build());
    }

POM

 <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
    </dependency>

    <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            <version>2.1.4.RELEASE</version>
    </dependency>

STACK TRACE

KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@5a4a522a]; nested exception is java.lang.IllegalArgumentException: Magic v1 does not support record headers, failedMessage=GenericMessage [payload=byte[271962], headers={SOURCE=CLIENT
entType=application/json, timestamp=1574199924374}]' [enable DEBUG level for full stacktrace] was thrown by a user handler's exceptionCaught() method while handling the following exception:
reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@5ArgumentException: Magic v1 does not support record headers, failedMessage=GenericMessage [payload=byte[271962], headers={SOURCE=ACCOUNT_CLIENT, id=2dcee73c-9da0-72fe-d5f7-07a2dba5b099, contentType=application/json, timestamp=1574199924374}]
Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@5a4a522a]; nested exception is java.lang.Il
rt record headers
        at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:179)
        at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1077)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
        at com.jpmorgan.awm.wm.edpipoc.EdpiSource.sendToChannelNew(EdpiSource.java:73)
        at com.jpmorgan.awm.wm.edpipoc.EdpiSource.lambda$null$2(EdpiSource.java:134)
        at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:73)
        at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:32)
        at reactor.core.publisher.FluxSubscribeOnCallable$CallableSubscribeOnSubscription.run(FluxSubscribeOnCallable.java:225)
        at reactor.core.scheduler.ImmediateScheduler.schedule(ImmediateScheduler.java:47)
        at reactor.core.publisher.MonoSubscribeOnCallable.subscribe(MonoSubscribeOnCallable.java:52)
        at reactor.core.publisher.Mono.subscribe(Mono.java:3694)
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:442)
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:244)
        at reactor.core.publisher.FluxBuffer$BufferExactSubscriber.onComplete(FluxBuffer.java:183)
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onComplete(MonoFlatMapMany.java:248)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:144)
        at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onComplete(FluxContextStart.java:122)
        at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:262)
        at reactor.core.publisher.DrainUtils.postCompleteDrain(DrainUtils.java:131)
        at reactor.core.publisher.DrainUtils.postComplete(DrainUtils.java:186)
        at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onComplete(FluxBufferPredicate.java:285)
        at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:325)
        at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:638)
        at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onComplete(FluxFlattenIterable.java:259)
        at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)
        at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
        at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:372)
        at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:196)
        at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:337)
        at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:334)
        at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:381)
        at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:522)
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:141)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
        at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1436)
        at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1203)
        at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1247)
        at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:502)
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:441)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:427)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:328)
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Magic v1 does not support record headers
        at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:410)
        at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:449)
        at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:506)
        at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:529)
        at org.apache.kafka.clients.producer.internals.ProducerBatch.tryAppend(ProducerBatch.java:107)
        at org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:223)
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:864)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:444)
        at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:372)
        at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:209)
        at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:382)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
-- Ajith Kannan
spring-cloud-dataflow
spring-cloud-kubernetes
spring-cloud-stream

1 Answer

11/21/2019

The issue was with kakfka broker used was old (0.10.0) not supporting headers .switched to a higher version and got this fixed

-- Ajith Kannan
Source: StackOverflow