Kubernetes - processing an unlimited number of work-items

3/30/2018

I need to get a work-item from a work-queue and then sequentially run a series of containers to process each work-item. This can be done using initContainers (https://stackoverflow.com/a/46880653/94078)

What would be the recommended way of restarting the process to get the next work-item?

  • Jobs seem ideal but don't seem to support an infinite/indefinite number of completions.
  • Using a single Pod doesn't work because initContainers aren't restarted (https://github.com/kubernetes/kubernetes/issues/52345).
  • I would prefer to avoid the maintenance/learning overhead of a system like argo or brigade.

Thanks!

-- eug
kubernetes

2 Answers

4/2/2018

Jobs should be used for working with work queues. When using work queues you should not set the .spec.comletions (or set it to null). In that case Pods will keep getting created until one of the Pods exit successfully. It is a little awkward exiting from the (main) container with a failure state on purpose, but this is the specification. You may set .spec.parallelism to your liking irrespective of this setting; I've set it to 1 as it appears you do not want any parallelism.

In your question you did not specify what you want to do if the work queue gets empty, so I will give two solutions, one if you want to wait for new items (infinite) and one if want to end the job if the work queue gets empty (finite, but indefinite number of items).

Both examples use redis, but you can apply this pattern to your favorite queue. Note that the part that pops an item from the queue is not safe; if your Pod dies for some reason after having popped an item, that item will remain unprocessed or not fully processed. See the reliable-queue pattern for a proper solution.

To implement the sequential steps on each work item I've used init containers. Note that this really is a primitve solution, but you have limited options if you don't want to use some framework to implement a proper pipeline.

There is an asciinema if any would like to see this at work without deploying redis, etc.

Redis

To test this you'll need to create, at a minimum, a redis Pod and a Service. I am using the example from fine parallel processing work queue. You can deploy that with:

kubectl apply -f https://rawgit.com/kubernetes/website/master/docs/tasks/job/fine-parallel-processing-work-queue/redis-pod.yaml
kubectl apply -f https://rawgit.com/kubernetes/website/master/docs/tasks/job/fine-parallel-processing-work-queue/redis-service.yaml

The rest of this solution expects that you have a service name redis in the same namespace as your Job and it does not require authentication and a Pod called redis-master.

Inserting items

To insert some items in the work queue use this command (you will need bash for this to work):

echo -ne "rpush job "{1..10}"\n" | kubectl exec -it redis-master -- redis-cli

Infinite version

This version waits if the queue is empty thus it will never complete.

apiVersion: batch/v1
kind: Job
metadata:
  name: primitive-pipeline-infinite
spec:
  parallelism: 1
  completions: null
  template:
    metadata:
      name: primitive-pipeline-infinite
    spec:
      volumes: [{name: shared, emptyDir: {}}]
      initContainers:
      - name: pop-from-queue-unsafe
        image: redis
        command: ["sh","-c","redis-cli -h redis blpop job 0 >/shared/item.txt"]
        volumeMounts: [{name: shared, mountPath: /shared}]
      - name: step-1
        image: busybox
        command: ["sh","-c","echo step-1 working on `cat /shared/item.txt` ...; sleep 5"]
        volumeMounts: [{name: shared, mountPath: /shared}]
      - name: step-2
        image: busybox
        command: ["sh","-c","echo step-2 working on `cat /shared/item.txt` ...; sleep 5"]
        volumeMounts: [{name: shared, mountPath: /shared}]
      - name: step-3
        image: busybox
        command: ["sh","-c","echo step-3 working on `cat /shared/item.txt` ...; sleep 5"]
        volumeMounts: [{name: shared, mountPath: /shared}]
      containers:
      - name: done
        image: busybox
        command: ["sh","-c","echo all done with `cat /shared/item.txt`; sleep 1; exit 1"]
        volumeMounts: [{name: shared, mountPath: /shared}]
      restartPolicy: Never

Finite version

This version stops the job if the queue is empty. Note the trick that the pop init container checks if the queue is empty and all the subsequent init containers and the main container immediately exit if it is indeed empty - this is the mechanism that signals Kubernetes that the Job is completed and there is no need to create new Pods for it.

apiVersion: batch/v1
kind: Job
metadata:
  name: primitive-pipeline-finite
spec:
  parallelism: 1
  completions: null
  template:
    metadata:
      name: primitive-pipeline-finite
    spec:
      volumes: [{name: shared, emptyDir: {}}]
      initContainers:
      - name: pop-from-queue-unsafe
        image: redis
        command: ["sh","-c","redis-cli -h redis lpop job >/shared/item.txt; grep -q . /shared/item.txt || :>/shared/done.txt"]
        volumeMounts: [{name: shared, mountPath: /shared}]
      - name: step-1
        image: busybox
        command: ["sh","-c","[ -f /shared/done.txt ] && exit 0; echo step-1 working on `cat /shared/item.txt` ...; sleep 5"]
        volumeMounts: [{name: shared, mountPath: /shared}]
      - name: step-2
        image: busybox
        command: ["sh","-c","[ -f /shared/done.txt ] && exit 0; echo step-2 working on `cat /shared/item.txt` ...; sleep 5"]
        volumeMounts: [{name: shared, mountPath: /shared}]
      - name: step-3
        image: busybox
        command: ["sh","-c","[ -f /shared/done.txt ] && exit 0; echo step-3 working on `cat /shared/item.txt` ...; sleep 5"]
        volumeMounts: [{name: shared, mountPath: /shared}]
      containers:
      - name: done
        image: busybox
        command: ["sh","-c","[ -f /shared/done.txt ] && exit 0; echo all done with `cat /shared/item.txt`; sleep 1; exit 1"]
        volumeMounts: [{name: shared, mountPath: /shared}]
      restartPolicy: Never
-- Janos Lenart
Source: StackOverflow

3/30/2018

The easiest way in this case is to use CronJob. CronJob runs Jobs according to a schedule. For more information go through documentation.

Here is an example (I took it from here and modified it)

apiVersion: batch/v1beta1
kind: CronJob 
metadata:
  name: sequential-jobs
spec:
  schedule: "*/1 * * * *" #Here is the schedule in Linux-cron format
  jobTemplate:
    spec:
      template:
        metadata:
          name: sequential-job
        spec:
          initContainers:
          - name: job-1
            image: busybox
            command: ['sh', '-c', 'for i in 1 2 3; do echo "job-1 `date`" && sleep 5s; done;']
          - name: job-2
            image: busybox
            command: ['sh', '-c', 'for i in 1 2 3; do echo "job-2 `date`" && sleep 5s; done;']
          containers:
          - name: job-done
            image: busybox
            command: ['sh', '-c', 'echo "job-1 and job-2 completed"']
          restartPolicy: Never

his solution however has some limitations:

  • It cannot run more often than 1 minute
  • If you need to process your work-items one-by-one you need to create additional check for running jobs in InitContainer
  • CronJobs are available only in Kubernetes 1.8 and higher
-- Artem Golenyaev
Source: StackOverflow