Kubeflow Pipeline Termination Notificaiton

8/15/2019

I tried to add a logic that will send slack notification when the pipeline terminated due to some error. I tried to implement this with ExitHandler. But, seems the ExitHandler can’t dependent on any op. Do you have any good idea?

-- Wenmin Wu
kubeflow
kubernetes

1 Answer

8/21/2019

I found a solution to which uses ExitHandler. I post my code below, hope it can help someone else.

def slack_notification(slack_channel: str, status: str, name: str, is_exit_handler: bool = False):
    """
    performs slack notifications
    """    
    send_slack_op = dsl.ContainerOp(
        name=name,
        image='wenmin.wu/slack-cli:latest',
        is_exit_handler=is_exit_handler,
        command=['sh', '-c'],
        arguments=["/send-message.sh -d {} '{}'".format(slack_channel, status)]
    )
    send_slack_op.add_env_variable(V1EnvVar(name = 'SLACK_CLI_TOKEN', value_from=V1EnvVarSource(config_map_key_ref=V1ConfigMapKeySelector(name='workspace-config', key='SLACK_CLI_TOKEN'))))
    return send_slack_op

@dsl.pipeline(
    name='forecasting-supply',
    description='forecasting supply ...'
)
def ml_pipeline(
    param1,
    param2,
    param3,
):
    exit_task = slack_notification(
        slack_channel = slack_channel,
        name = "supply-forecasting",
        status = "Kubeflow pipeline: {{workflow.name}} has {{workflow.status}}!",
        is_exit_handler = True
    )

    with dsl.ExitHandler(exit_task):
        # put other tasks here
-- Wenmin Wu
Source: StackOverflow