Skip to main content

Command Palette

Search for a command to run...

在 Argo workflow 中使用插件减少并行 Pod 数量

Updated
2 min read

在之前写过的使用 Argo workflow 调用公有云客户端软件实现运维过程的文章中,可以看到,使用 Argo workflow 的容器模板,简单的将既有运维能力容器化,就能使用 Argo workflow 对这些能力进行编排了。

不过近期一个测试中,遇到个小麻烦——在一个 With 循环里,我输入了 500 个任务,结果是 6 节点 CCE 集群爆满,流程卡住——集群规模的事情很简单,我直接将 Argo workflow 部署到 CCE Autopilot 集群中,随着流程启动,Auto pilot 集群非常给力,不到一分钟就扩容到了上百节点。然而新的问题出现了,Argo workflow 容器模板使用的镜像托管在 quay.io 上,我被限流了——无法拉取镜像,工作流自然也就无法执行了。

如果说必须要限流的话,Argo workflow 提供了多种机制,在不同粒度上对工作流的并发进行控制:

  1. 在模板中,使用 parallelism 参数,限制流程实例内的并发数。
  2. 在 Workflow Controller 的 Configmap(workflow-controller-configmap)中,使用 parallelism 或者 namespaceParallelism,在集群范围内,限制总体并发的流程数量。
  3. 模板中使用 synchronization,使用同样的共享锁的流程实例将会被有效限流。

不难看出,在有限集群的规模下,通过对并发的控制,以及垃圾回收策略的定义,都能有效的限制集群规模——毕竟上百节点是要花不少银子的。在这种情况下,还有一条路就是,使用执行插件。例如如下工作流:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: python-example-
spec:
  entrypoint: main
  arguments:
    parameters:
      - name: value
        value: "1"  
  templates:
    - name: main
      steps:
      - - name: evaluate
          template: evaluate
          arguments:
            parameters:
              - name: value
                value: "{{workflow.parameters.value}}"
          withSequence:
            count: "50"         
    - name: evaluate
      inputs:
        parameters:
          - name: value    
      plugin:
        python:
          expression: |
            {"sum": int(parameters["value"]) + 1}

这里使用 plugin.python 的方式引用了一个插件,执行时,循环了 50 次,提交后,我们会发现,这里只执行了一个 Pod:python-example-hlc5t-1340600742-agent,也就是说,这一个 Pod 承载了所有的 50 个任务。如何实现的呢?这里就要看看 Argo workflow 的插件机制了。

Argo workflow 默认是不启用插件的,要启用插件,需要给控制器加入环境变量:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: workflow-controller
spec:
  template:
    spec:
      containers:
        - name: workflow-controller
          env:
            - name: ARGO_EXECUTOR_PLUGINS
              value: "true"

重启后,就可以启用上面工作流引用的插件了,启用插件的方式很有意思,提交一个 Configmap 即可:

# This is an auto-generated file. DO NOT EDIT
apiVersion: v1
data:
  sidecar.container: |
    args:
....
kind: ConfigMap
metadata:
...
    workflows.argoproj.io/version: '>= v3.3'
  creationTimestamp: null
  labels:
    workflows.argoproj.io/configmap-type: ExecutorPlugin
  name: python-executor-plugin

这方式有点奇怪,Configmap 里面包含了一堆 Python 代码。以及似乎是 Sidecar 的容器定义。应用之后,就能够运行上述工作流了。

注意 Configmap 中的注释说明:这是一个自动生成的文件,哪里来的呢?

实际上,Argo workflow 插件是由 argo executor-plugin build 命令构建出来的,一个插件的原始文件主要包含三个部分:

  1. 插件清单(plugin.yaml):这里实际上是对一个容器的定义,其中包含了容器镜像、资源使用等。
  2. 启动文件:一个命名为 server.* 的文本文件,可以是 Shell 或者 Python 脚本,他会在插件启动时被执行。
  3. 插件镜像:上述文本文件可能无法描述一些业务逻辑,因此,可以将二进制文件封装到镜像里,给启动文件调用。

例如前边用到的 Python 插件的 plugin.yaml

kind: ExecutorPlugin
apiVersion: argoproj.io/v1alpha1
metadata:
  name: python
...
    workflows.argoproj.io/version: '>= v3.3'
spec:
  sidecar:
    container:
      command:
        - python
        - -c
      image: python:alpine
      name: python-executor-plugin
...

不难看出,这个定义和上边的 Configmap 是一致的。再看看 server.py

import json
from http.server import BaseHTTPRequestHandler, HTTPServer


class Plugin(BaseHTTPRequestHandler):

    def args(self):
        return json.loads(self.rfile.read(int(self.headers.get('Content-Length'))))

    def reply(self, reply):
        self.send_response(200)
        self.end_headers()
        self.wfile.write(json.dumps(reply).encode("UTF-8"))

    def unsupported(self):
        self.send_response(404)
        self.end_headers()

    def do_POST(self):
        if self.path == '/api/v1/template.execute':
            args = self.args()

            template = args['template']
            plugin = template.get('plugin', {})

            if 'python' in plugin:
                spec = plugin['python']

                # convert parameters into easy to use dict
                # artifacts are not supported
                parameters = {}
                for parameter in template.get('inputs', {}).get('parameters', []):
                    parameters[parameter['name']] = parameter['value']

                try:
                    code = compile(spec['expression'], "<string>", "eval")
...


if __name__ == '__main__':
    httpd = HTTPServer(('', 7984), Plugin)
    httpd.serve_forever()

上边的代码,不难看出,这里只是启动了一个简单的 Python HTTP Server,监听 /api/v1/template.execute 的 Post 请求,并对其进行处理。

上述的 YAML 和启动代码都编写完成之后,就可以使用 argo executor-plugin build 命令来构建 Configmap 了。

当然也可以使用自己定义的基础镜像。

More from this blog

龙虾恐慌:AIOps 又要改名了?

ChatGPT 开始,把 AI 拉近到普罗大众的面前,让无数人感受到 AI 的亲民魅力。而龙虾,则把大模型驱动的自动化能力,突然间变得水灵灵、活泼泼地走进千家万户。它不只是“风口上的猪”,而是风口本身。热度高到让 Mac mini 一度断货,不知道这在不在库克的预料之内。 每代人都有每代人的鸡蛋,春节期间,我就领了我的鸡蛋。翻出古老的 MacBook Air M1,充值各种大模型。当然了,这个工具

Mar 9, 20261 min read

再见 2025

我猜不少人以为这个号废了吧?并没有,只是今年变化有点大,一直有种抄起键盘,无从说起的感觉,所以一直偷懒到今天,2025 的最后一天。 今年是我的第四个本命年,去年末一期播客里,大内说本命年不是灾年,是变化年,有危也有机。可是讲真啊,只看到危,没看到机。 各种因缘际会,从鹅厂跳槽到前东家,已经接近四年,第一个合同期已经进入尾声。除了前两年还在云原生领域嗷嗷叫,后两年基本都是些鸡零狗碎的东西了,用老东家的术语说是——偏离主航道,可谓是前景暗淡了。 一旦确定要滚蛋,反倒心思轻松起来,每天骑着我的小红车...

Jan 5, 20261 min read

辅助编程?dora 说:我知道你很急可是请你别急

从 OpenGPT 把大模型的火烧旺了之后,这三年来,相信很多组织或摩拳擦掌、或躬身入局,希望借助聪明能干的大模型,或想偿还技术宅,或想降本增效,或想弯道超车。一时间,沉寂许久的 AIxx 又活过来了,LLM Ops、Vibe Coding、中医大模型、GPT 算命等等,全都老树发新芽,焕发了勃勃生机。那么视角拉回从业者最关注的饭碗相关的领域之一——AI 辅助开发,产生了什么触动,应该如何拥抱呢? DORA 的年度报告中给出了很有意思的结论——强者恒强。 执行摘要部分总结了几个有趣的点: 问题...

Oct 6, 20251 min read

[译]dora:ai 辅助软件开发状态报告

执行摘要 在 2025 年,科技领导者面临的核心问题已不再是“是否要采用 AI”,而是“如何实现其价值”。 DORA 的研究基于超过 100 小时的定性访谈和来自全球近 5,000 名技术专业人士的问卷调查。研究揭示了一个关键事实:AI 在软件开发中的主要角色是“放大器”。它会放大高效能组织的优势,也会凸显组织的缺陷。 关键结论:AI 是放大器 AI 投资的最大回报并非来自工具本身,而是来自组织底层系统的战略性建设: 高质量的内部平台 清晰的工作流 团队的协同能力 缺少这些基础,AI ...

Oct 2, 202514 min read

僭越了,有人在用 Rust 写 Kubernetes

一个新语言问世,最爱做的事情之一,就是重写存量软件了。 云原生喝酒 SIG 重点扶持项目——rk8s(https://github.com/rk8s-dev/rk8s) 也可以归在这个范畴里,只不过这个项目重写的东西比较大,是 Kubernetes。 从 2025 年 1 月第一个 Commit 开始,到现在有了 200 多次 Commit,十几万行代码。当然距离 Kubernetes 的几百万行代码还差得远——老马就是喜欢整这种大无畏项目。 另外该项目也是国内第一个脱离 Cargo 转向使用 ...

Sep 27, 20253 min read

【伪】架构师

342 posts