Spring Cloud Data Flow > Stream > Source throws UNKNOWN_TOPIC_OR_PARTITION when deploying in K8s but working in local deployment

4/6/2017

I'm trying to run a "Hello, world" Spring Cloud Data Flow stream based on the very simple example explained at http://cloud.spring.io/spring-cloud-dataflow/. I'm able to create a simple source and sink and run it on my local SCDF server using Kafka, so until here everything is correct.

Now, I'm trying to deploy it in my private cloud based on the instructions listed at http://docs.spring.io/spring-cloud-dataflow-server-kubernetes/docs/current-SNAPSHOT/reference/htmlsingle/#_getting_started. Using this deployment I'm able to deploy a simple "time | log" out-of-the-box stream with no problems, but my example fails.

Specific versions are:

  • Docker version 1.13.1, build 092cba3
  • Hyperkube 1.5.5
  • SCDF 1.2.0.M2
  • zookeeper 3.4.9-1757313, built on 08/23/2016 06:50 GMT
  • Kafka 0.10.1.1

Source artifact logs are:

2017-04-06T11:05:07.429204866Z 2017-04-06 11:05:07,428 INFO main-SendThread(10.0.0.181:2181) o.a.z.ClientCnxn:876 - Socket connection established to 10.0.0.181/10.0.0.181:2181, initiating session 2017-04-06T11:05:07.440381666Z 2017-04-06 11:05:07,439 INFO main-SendThread(10.0.0.181:2181) o.a.z.ClientCnxn:1299 - Session establishment complete on server 10.0.0.181/10.0.0.181:2181, sessionid = 0x15b155ef61e014a, negotiated timeout = 10000 2017-04-06T11:05:07.740130495Z 2017-04-06 11:05:07,737 INFO main o.a.k.c.p.ProducerConfig:180 - ProducerConfig values: 2017-04-06T11:05:07.740160464Z acks = 1 2017-04-06T11:05:07.740163408Z batch.size = 16384 2017-04-06T11:05:07.740165226Z block.on.buffer.full = false 2017-04-06T11:05:07.740166942Z bootstrap.servers = [10.0.0.213:9092] 2017-04-06T11:05:07.740168741Z buffer.memory = 33554432 2017-04-06T11:05:07.740170545Z client.id = 2017-04-06T11:05:07.740172245Z compression.type = none 2017-04-06T11:05:07.740173971Z connections.max.idle.ms = 540000 2017-04-06T11:05:07.740175706Z interceptor.classes = null 2017-04-06T11:05:07.744179899Z reconnect.backoff.ms = 50 2017-04-06T11:05:07.744181600Z request.timeout.ms = 30000 2017-04-06T11:05:07.744183356Z retries = 0 2017-04-06T11:05:07.744185083Z retry.backoff.ms = 100 2017-04-06T11:05:07.744186754Z sasl.kerberos.kinit.cmd = /usr/bin/kinit 2017-04-06T11:05:07.744188494Z sasl.kerberos.min.time.before.relogin = 60000 2017-04-06T11:05:07.744190205Z sasl.kerberos.service.name = null 2017-04-06T11:05:07.744191916Z sasl.kerberos.ticket.renew.jitter = 0.05 2017-04-06T11:05:07.744193763Z sasl.kerberos.ticket.renew.window.factor = 0.8 2017-04-06T11:05:07.744195432Z sasl.mechanism = GSSAPI 2017-04-06T11:05:07.744197163Z security.protocol = PLAINTEXT 2017-04-06T11:05:07.744198789Z send.buffer.bytes = 131072 2017-04-06T11:05:07.744200522Z ssl.cipher.suites = null 2017-04-06T11:05:07.744202328Z ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 2017-04-06T11:05:07.744204161Z ssl.endpoint.identification.algorithm = null 2017-04-06T11:05:07.744205837Z ssl.key.password = null 2017-04-06T11:05:07.744207544Z ssl.keymanager.algorithm = SunX509 2017-04-06T11:05:07.744212464Z ssl.keystore.location = null 2017-04-06T11:05:07.744214272Z ssl.keystore.password = null 2017-04-06T11:05:07.744216025Z ssl.keystore.type = JKS 2017-04-06T11:05:07.744217647Z ssl.protocol = TLS 2017-04-06T11:05:07.744219234Z ssl.provider = null 2017-04-06T11:05:07.744220987Z ssl.secure.random.implementation = null 2017-04-06T11:05:07.744222666Z ssl.trustmanager.algorithm = PKIX 2017-04-06T11:05:07.744224359Z ssl.truststore.location = null 2017-04-06T11:05:07.744226022Z ssl.truststore.password = null 2017-04-06T11:05:07.744228171Z ssl.truststore.type = JKS 2017-04-06T11:05:07.744230006Z timeout.ms = 30000 2017-04-06T11:05:07.744231705Z value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer 2017-04-06T11:05:07.744233544Z 2017-04-06T11:05:07.837193978Z 2017-04-06 11:05:07,834 WARN main o.a.k.c.p.ProducerConfig:188 - The configuration 'key.deserializer' was supplied but isn't a known config. 2017-04-06T11:05:07.837221870Z 2017-04-06 11:05:07,835 WARN main o.a.k.c.p.ProducerConfig:188 - The configuration 'value.deserializer' was supplied but isn't a known config. 2017-04-06T11:05:07.929207703Z 2017-04-06 11:05:07,926 INFO main o.a.k.c.u.AppInfoParser:83 - Kafka version : 0.10.1.1 2017-04-06T11:05:07.929239636Z 2017-04-06 11:05:07,927 INFO main o.a.k.c.u.AppInfoParser:84 - Kafka commitId : f10ef2720b03b247 2017-04-06T11:05:08.228817026Z 2017-04-06 11:05:08,228 WARN kafka-producer-network-thread | producer-1 o.a.k.c.NetworkClient:600 - Error while fetching metadata with correlation id 0 : {output=UNKNOWN_TOPIC_OR_PARTITION} 2017-04-06T11:05:08.436574800Z 2017-04-06 11:05:08,435 WARN kafka-producer-network-thread | producer-1 o.a.k.c.NetworkClient:600 - Error while fetching metadata with correlation id 1 : {output=UNKNOWN_TOPIC_OR_PARTITION}

And Zookepeer logs are:

2017-04-06T11:04:38.000953447Z 2017-04-06 11:04:38,000 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@487] - Processed session termination for sessionid: 0x15b155ef61e0148 2017-04-06T11:05:04.939356606Z 2017-04-06 11:05:04,938 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@192] - Accepted socket connection from /10.1.98.5:48180 2017-04-06T11:05:04.940666418Z 2017-04-06 11:05:04,939 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@928] - Client attempting to establish new session at /10.1.98.5:48180 2017-04-06T11:05:04.943859474Z 2017-04-06 11:05:04,943 [myid:] - INFO [SyncThread:0:ZooKeeperServer@673] - Established session 0x15b155ef61e0149 with negotiated timeout 10000 for client /10.1.98.5:48180 2017-04-06T11:05:07.325929074Z 2017-04-06 11:05:07,325 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@487] - Processed session termination for sessionid: 0x15b155ef61e0149 2017-04-06T11:05:07.342876962Z 2017-04-06 11:05:07,341 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1008] - Closed socket connection for client /10.1.98.5:48180 which had sessionid 0x15b155ef61e0149 2017-04-06T11:05:07.429909440Z 2017-04-06 11:05:07,429 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@192] - Accepted socket connection from /10.1.98.5:48182 2017-04-06T11:05:07.429933377Z 2017-04-06 11:05:07,429 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@928] - Client attempting to establish new session at /10.1.98.5:48182 2017-04-06T11:05:07.441158222Z 2017-04-06 11:05:07,439 [myid:] - INFO [SyncThread:0:ZooKeeperServer@673] - Established session 0x15b155ef61e014a with negotiated timeout 10000 for client /10.1.98.5:48182 2017-04-06T11:05:29.695276997Z 2017-04-06 11:05:29,694 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught end of stream exception 2017-04-06T11:05:29.695325790Z EndOfStreamException: Unable to read additional data from client sessionid 0x15b155ef61e014a, likely client has closed socket 2017-04-06T11:05:29.695328912Z at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) 2017-04-06T11:05:29.695331119Z at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203) 2017-04-06T11:05:29.695333009Z at java.lang.Thread.run(Thread.java:745) 2017-04-06T11:05:29.696333706Z 2017-04-06 11:05:29,696 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1008] - Closed socket connection for client

I dont find any log in Kafka at the moment of the exception.

Code snippet for the source class is

package xxxx;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.support.GenericMessage;

@SpringBootApplication
@EnableBinding(Source.class)
public class HelloNitesApplication
{
    public static void main(String[] args)
    {
        SpringApplication.run(HelloNitesApplication.class, args);
    }

    @Bean
    @InboundChannelAdapter(value = Source.OUTPUT)
    public MessageSource<String> timerMessageSource()
    {
        return () -> new GenericMessage<>("Hello " + new SimpleDateFormat().format(new Date()));
    }

So, the pod containing the stream source keeps crashing in a loop.

-- Daniel Gutierrez
apache-kafka
kubernetes
spring-cloud-stream

1 Answer

4/7/2017

The problem seems to be the fact that the property "spring.cloud.stream.bindings.output.destination=XXX" is ignored by my implementation and I deleted the topic "output" before the execution as I expected it to write in the topic specified by the property.

After I redeployed everything the source works as the topic is created properly, although inserting the messages in "output" topic instead the one specified by he property I defined.

-- Daniel Gutierrez
Source: StackOverflow