Kubernetes Java Exec not giving any output for an input

11/16/2018

Below is my code, where I am trying to provide input to the Process OutputStream via Redis topic from another application using Redisson Library and I am not able to see any output getting printed on the console.

Is it because the messages from Redisson library are pulled in a different thread and the exec process is running in a different thread or is it because the process can't take and give output to other than System.in/System.out.

As you can see, I am trying to publish commands through redis topic because my another application is responsible to send the commands to this application which has started the exec process with the pod.

Both applications are using Spring Boot 2.0. I am stuck here from a long time, any help is appreciated.

    package com.demo.kubeconnector.service;

import com.google.api.client.util.ByteStreams;
import com.demo.kubeconnector.model.Exec;
import com.demo.kubeconnector.utils.SocResponse;
import com.demo.kubeconnector.utils.WebSocket;
import io.kubernetes.client.ApiClient;
import io.kubernetes.client.ApiException;
import io.kubernetes.client.models.V1Pod;
import io.kubernetes.client.util.Config;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.redisson.api.RBucket;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.springframework.messaging.simp.SimpMessagingTemplate;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.concurrent.TimeUnit;

@Slf4j
public class ExecStartThread extends Thread {

    private V1Pod mPod;
    private String mContainer;
    private RTopic<Exec> mRTopic;
    private SimpMessagingTemplate mSimpMessagingTemplate;
    private Process mProcess;
    private BufferedReader mBufferedReader;
    private ByteArrayInputStream mByteArrayInputStream;
    private int mListenerId;
    private String mToken;
    private RBucket<String> mSessionBucket;

    public ExecStartThread(RedissonClient redissonClient, RBucket<String> sessionBucket, String session, String topic, V1Pod pod, String container, SimpMessagingTemplate simpMessagingTemplate) {
        mPod = pod;
        mRTopic = redissonClient.getTopic(topic);
        mContainer = container;
        mSimpMessagingTemplate = simpMessagingTemplate;
        mToken = session;
        mSessionBucket = sessionBucket;
    }

    @Override
    public void run() {

        String destination = String.format(WebSocket.POD_EXEC, mToken);

        try {

            log.info(String.format("Starting a thread {%s} for process", Thread.currentThread().getName()));

            mProcess = getProcess();
            mBufferedReader = new BufferedReader(new InputStreamReader(mProcess.getInputStream()));

            mRTopic.addListener(new MessageListener<Exec>() {
                @Override
                public void onMessage(String channel, Exec msg) {
                    String command = msg.getCommand();

                    try {
                        if (!command.equals("exit")) {
                            log.info("Executing command in Thread " + Thread.currentThread().getName() + "-->:" + msg);
                            mByteArrayInputStream = new ByteArrayInputStream(command.getBytes());
                            ByteStreams.copy(mByteArrayInputStream, mProcess.getOutputStream());
                        } else {
                            log.info("Exiting thread.... " + Thread.currentThread().getId());

                            mSessionBucket.deleteAsync();
                            mRTopic.removeListener(mListenerId);

                            IOUtils.closeQuietly(mBufferedReader);
                            IOUtils.closeQuietly(mByteArrayInputStream);
                            mProcess.destroy();

                            interrupt();
                        }
                    } catch (IOException ex) {
                        log.info(String.format("IOException occurred in the SSH topic {%s}", mRTopic.getChannelNames()));
                    }
                }
            });

            /* Return the created session.
             * */
            mSimpMessagingTemplate.convertAndSend(destination, SocResponse.ok(mToken));

            String line;
            while ((line = mBufferedReader.readLine()) != null) {
                System.out.println("Thread is " + Thread.currentThread().getName() + "-->:" + line);
                mSimpMessagingTemplate.convertAndSend(destination, SocResponse.ok(line));
            }

            log.info("Buffer reader has ended");
        } catch (IOException e) {
            log.info(String.format("Exception occurred while getting process ---> \n %s", e.getMessage()), e);
        } catch (ApiException e) {
            log.error(String.format("APIException occurred while getting process ---> \n %s", e.getResponseBody()), e);
        }
    }

    private Process getProcess() throws IOException, ApiException {

        ApiClient client = Config.defaultClient();
        client.getHttpClient().setReadTimeout(0l, TimeUnit.MILLISECONDS);

//        Configuration.setDefaultApiClient(client);

        io.kubernetes.client.Exec exec = new io.kubernetes.client.Exec();
        exec.setApiClient(client);

        return exec.exec(
                mPod,
                new String[]{"sh"},
                mContainer,
                Boolean.TRUE,
                Boolean.TRUE);
    }
}
-- ghost
java
kubernetes
redisson
spring-boot

0 Answers