Kafka Producer causes org.apache.kafka.common.network.InvalidReceiveException: Invalid receive

4/14/2020

I am running 3-zookeeper-cluster and 3-kafka-cluster on Kubernetes.
Kafka seems to be running.
However if I produce some message to a topic and check the topic, there's no message at all.

Here's my broker saying. That says some invalid receive or something, the funny thing is trying to make topics work well but producing.
also I could watch topics or schemas which i made early on Topics-ui which is GUI tool for broker.
Schema-registry, Connect, Rest's log is fine so the broker seems to be running well.

org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1195725856 larger than 104857600)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:104)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
    at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
    at kafka.network.Processor.poll(SocketServer.scala:863)
    at kafka.network.Processor.run(SocketServer.scala:762)
    at java.lang.Thread.run(Thread.java:748)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:104)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
    at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
    at kafka.network.Processor.poll(SocketServer.scala:863)
    at kafka.network.Processor.run(SocketServer.scala:762)
    at java.lang.Thread.run(Thread.java:748)

and here's my broker configurations with terraform Statefulset

          port {
            container_port = 9092
          }   

          env {
            name = "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR" 
            value = "3" 
          }   

          env {
            name = "KAFKA_DEFAULT_REPLICATION_FACTOR" 
            value = "3" 
          }   

          env {
            name = "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP" 
            value = "PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT"
          }   

          env {
            name = "KAFKA_ZOOKEEPER_CONNECT"
            value = "lucent-zookeeper-0.zookeeper-service.default:2181,lucent-zookeeper-1.zookeeper-service.default:2181,lucent-zookeeper-2.zookeeper-service.default:2181"
          }   

          env {
            name = "POD_IP"

            value_from {
              field_ref {
                field_path = "status.podIP"
              }   
            }   
          }   

          env {
            name = "HOST_IP"
            value_from {
              field_ref {
                field_path = "status.hostIP"
              }   
            }   
          }   

          env {
            name = "POD_NAME"

            value_from {
              field_ref {
                field_path = "metadata.name"
              }   
            }   
          }   

          env {
            name = "POD_NAMESPACE"

            value_from {
              field_ref {
                field_path = "metadata.namespace"
              }   
            }   
          }   

          command = [ 
            "sh",
            "-exec",
            "export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${POD_NAME}.kafka-service.${POD_NAMESPACE}:9092 && export KAFKA_BROKER_ID=${HOSTNAME##*-} && exec /etc/confluent/docker/run"
          ]   

service

resource "kubernetes_service" "kafka-service" {
  metadata {
    name = "kafka-service"

    labels = {
      app = "broker" 
    }
  }

  spec {
    selector = {
      app = "broker"
    }

    port {
      port = 9092
    }

    cluster_ip = "None"
  }

code to try produce

kafka-console-producer --broker-list kafka-service:9092 --topic test
-- Anton 재호프
apache-kafka
kafka-producer-api
kafka-topic
kubernetes
terraform

1 Answer

4/14/2020

My initial guess would be that you might be trying to receive a request that is too large. The maximum size is the default size for socket.request.max.bytes, which is 100MB. So if you have a message which is bigger than 100MB try to increase the value of this variable under server.properties.

-- Giorgos Myrianthous
Source: StackOverflow