使用 Shell Operator + CRD 恢复被暂停的 Argo Workflow
上一篇讲到,使用 Kyverno 通过对特定标签的识别,让每个步骤进入自动暂停的状态,实现逐步骤运行。留了个尾巴,怎样才能快速的恢复被暂停步骤的运行?
TL;DR;
随便搞个 CRD,用 Shell Operator 监听,自动执行 kubectl exec
恢复目标步骤的运行。
Shell Operator 简介
简单来说,Shell Operator 是一个让用户能够使用脚本语言快速建立 Operator 的框架,能够非常方便的完成定时运行、启动运行、监听并响应 Kubernetes 对象和 CRD 等能力。
这篇文章会使用这一框架,从 CR 资源获取用户恢复运行指定步骤的意图,并完成恢复运行的操作。整个操作分为如下步骤:
- 创建 CRD
- 编写 Shell Operator
- 运行测试
创建 CRD
要恢复一个被暂停的工作流步骤,其输入只需要工作流 ID 和被暂停步骤(Template
)名称即可,制定如下 CRD:
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: resumes.argocontroller.io
spec:
conversion:
strategy: None
group: argocontroller.io
names:
kind: Resume
listKind: ResumeList
plural: resumes
singular: resume
scope: Namespaced
versions:
- name: v1alpha1
schema:
openAPIV3Schema:
properties:
apiVersion:
type: string
kind:
type: string
metadata:
type: object
spec:
properties:
TemplateName:
type: string
Workflow-ID:
type: string
type: object
type: object
served: true
storage: true
命名有点草率。
这个 CRD 中使用了 Workflow-ID
和 TemplateName
两个字段,分别用来查找流程实例和被暂停的步骤。
编写 Shell Operator
一个 Shell Operator 通常分为几个部分:
- 配置文件,用来指定触发时机,例如定时执行、启动执行和被 Kubenetes 对象操作触发。
- Hook 脚本:主体,用来执行指定功能。
- Dockerfile:继承 Flant 的 Shell Operator,用来构建运行镜像。
- Kubernetes YML:用来在 Kubernetes 中运行 Shell Operator。
配置文件
我们这里设置,监听前面的 CR argocontroller.io/resume
的创建动作,因此配置文件这样编写:
configVersion: v1
kubernetes:
- apiVersion: argocontroller.io/v1alpha1
kind: Resume
executeHookOnEvent: ["Added"]
使用命令 kubectl create configmap hook-conf --from-file=config.yml
创建 Configmap 供后续脚本加载。
脚本
符合触发条件的 CR 一旦创建,就会被 Shell Operator 捕获,并保存到对应 Pod 的文件系统中,临时文件名保存在环境变量 BINDING_CONTEXT_PATH
里。
这里我们使用 Shell 脚本,处理环临时文件内容,查找 CR 包含的流程实例和模板名称。查找到流程实例之后,在其 status
节点查找 Pod 名称,最后执行恢复操作。脚本内容如下:
#!/bin/bash
if [[ $1 == "--config" ]] ; then
cat /conf/config.yml
else
jq -c '.[]' "$BINDING_CONTEXT_PATH" | while read -r item; do
type=$(echo "$item" | jq -r '.type')
# 跳过无用元素
if [ "$type" != "Event" ]; then
continue
fi
# 查找流程 ID 和模板名称
TEMPLATE_NAME=$(echo "$item" | jq -r '.object.spec.TemplateName')
WORKFLOW_ID=$(echo "$item" | jq -r '.object.spec["Workflow-ID"]')
echo "tmpl=${TEMPLATE_NAME} wfid=${WORKFLOW_ID}"
FILENAME=/tmp/wf.json
# 获取工作流实例的 YAML
kubectl get workflow ${WORKFLOW_ID} -o json > "${FILENAME}"
# 查找 Pod 名称
POD=$(jq -r ".status.nodes[] | select(.templateName == \"${TEMPLATE_NAME}\" and .type == "Pod") | .id" "${FILENAME}")
echo "pod=${POD}"
kubectl exec -it ${POD} -- touch /proc/1/root/var/run/argo/ctr/main/after
done
fi
Dockerfile
Dockerfile 很简单,只要把脚本设置为可执行,并加入到 /hooks
文件夹即可:
FROM ghcr.io/flant/shell-operator:latest
ADD wf-resume.sh /hooks
编写好之后,使用 Docker 构建镜像并推送:
docker buildx build --platform linux/amd64,linux/arm64 --push \
-t [image-name:image-tag] .
YAML
这里我们用一个 Pod YAML 来运行 Shell Operator:
apiVersion: v1
kind: Pod
metadata:
name: shell-operator-observe
spec:
serviceAccountName: shell-operator
containers:
- name: shell-operator-observe
image: [image-name:image-tag]
volumeMounts:
- name: config-volume
mountPath: /conf
volumes:
- name: config-volume
configMap:
name: hook-conf
这里有两个需要注意的点:
- 监听或者修改 Kubernetes 对象是需要授权的,要针对
shell-operator
这个 Service Account 进行 RBAC 授权。 - 使用 Configmap 加载到镜像的
/conf
目录。
使用 kubectl
提交运行。
运行测试
运行前一片文章中使用的工作流,暂停之后,使用 kubectl get workflow
,例如 pause-3141592654ft97
,就可以创建如下 CR:
{
"apiVersion": "argocontroller.io/v1alpha1",
"kind": "Resume",
"metadata": {
"name": "example-resume",
"namespace": "default"
},
"spec": {
"Workflow-ID": "pause-3141592654ft97",
"TemplateName": "whalesay"
}
}
提交集群后,可以看到,暂停状态取消,流程变为 Succeeded
状态。
调试
Shell Operator 在工作过程中难免会出现问题,我主要依赖的三板斧:
- 使用
kubectl logs
查看 Pod 日志。 - 进入 Operator Pod,修改脚本,重复触发
- 脚本中加入 echo 语句,或者保存
BINDING_CONTEXT_PATH
文件。