I tried to add a logic that will send slack notification when the pipeline terminated due to some error. I tried to implement this with ExitHandler
. But, seems the ExitHandler
can’t dependent on any op. Do you have any good idea?
I found a solution to which uses ExitHandler
. I post my code below, hope it can help someone else.
def slack_notification(slack_channel: str, status: str, name: str, is_exit_handler: bool = False):
"""
performs slack notifications
"""
send_slack_op = dsl.ContainerOp(
name=name,
image='wenmin.wu/slack-cli:latest',
is_exit_handler=is_exit_handler,
command=['sh', '-c'],
arguments=["/send-message.sh -d {} '{}'".format(slack_channel, status)]
)
send_slack_op.add_env_variable(V1EnvVar(name = 'SLACK_CLI_TOKEN', value_from=V1EnvVarSource(config_map_key_ref=V1ConfigMapKeySelector(name='workspace-config', key='SLACK_CLI_TOKEN'))))
return send_slack_op
@dsl.pipeline(
name='forecasting-supply',
description='forecasting supply ...'
)
def ml_pipeline(
param1,
param2,
param3,
):
exit_task = slack_notification(
slack_channel = slack_channel,
name = "supply-forecasting",
status = "Kubeflow pipeline: {{workflow.name}} has {{workflow.status}}!",
is_exit_handler = True
)
with dsl.ExitHandler(exit_task):
# put other tasks here