# 在 Argo workflow 中使用插件减少并行 Pod 数量

在之前写过的使用 Argo workflow 调用公有云客户端软件实现运维过程的文章中，可以看到，使用 Argo workflow 的容器模板，简单的将既有运维能力容器化，就能使用 Argo workflow 对这些能力进行编排了。

不过近期一个测试中，遇到个小麻烦——在一个 `With` 循环里，我输入了 500 个任务，结果是 6 节点 CCE 集群爆满，流程卡住——集群规模的事情很简单，我直接将 Argo workflow 部署到 CCE Autopilot 集群中，随着流程启动，Auto pilot 集群非常给力，不到一分钟就扩容到了上百节点。然而新的问题出现了，Argo workflow 容器模板使用的镜像托管在 `quay.io` 上，我被限流了——无法拉取镜像，工作流自然也就无法执行了。

如果说必须要限流的话，Argo workflow 提供了多种机制，在不同粒度上对工作流的并发进行控制：

1. 在模板中，使用 `parallelism` 参数，限制流程实例内的并发数。
1. 在 Workflow Controller 的 Configmap（`workflow-controller-configmap`）中，使用 `parallelism` 或者 `namespaceParallelism`，在集群范围内，限制总体并发的流程数量。
1. 模板中使用 `synchronization`，使用同样的共享锁的流程实例将会被有效限流。

不难看出，在有限集群的规模下，通过对并发的控制，以及垃圾回收策略的定义，都能有效的限制集群规模——毕竟上百节点是要花不少银子的。在这种情况下，还有一条路就是，使用执行插件。例如如下工作流：

~~~yaml
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: python-example-
spec:
  entrypoint: main
  arguments:
    parameters:
      - name: value
        value: "1"  
  templates:
    - name: main
      steps:
      - - name: evaluate
          template: evaluate
          arguments:
            parameters:
              - name: value
                value: "{{workflow.parameters.value}}"
          withSequence:
            count: "50"         
    - name: evaluate
      inputs:
        parameters:
          - name: value    
      plugin:
        python:
          expression: |
            {"sum": int(parameters["value"]) + 1}
~~~

这里使用 `plugin.python` 的方式引用了一个插件，执行时，循环了 50 次，提交后，我们会发现，这里只执行了一个 Pod：`python-example-hlc5t-1340600742-agent`，也就是说，这一个 Pod 承载了所有的 50 个任务。如何实现的呢？这里就要看看 Argo workflow 的插件机制了。

Argo workflow **默认是不启用插件的**，要启用插件，需要给控制器加入环境变量：

~~~yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: workflow-controller
spec:
  template:
    spec:
      containers:
        - name: workflow-controller
          env:
            - name: ARGO_EXECUTOR_PLUGINS
              value: "true"
~~~

重启后，就可以启用上面工作流引用的插件了，启用插件的方式很有意思，提交一个 Configmap 即可：

~~~yaml
# This is an auto-generated file. DO NOT EDIT
apiVersion: v1
data:
  sidecar.container: |
    args:
....
kind: ConfigMap
metadata:
...
    workflows.argoproj.io/version: '>= v3.3'
  creationTimestamp: null
  labels:
    workflows.argoproj.io/configmap-type: ExecutorPlugin
  name: python-executor-plugin
~~~

这方式有点奇怪，Configmap 里面包含了一堆 Python 代码。以及似乎是 Sidecar 的容器定义。应用之后，就能够运行上述工作流了。

注意 Configmap 中的注释说明：这是一个自动生成的文件，哪里来的呢？

实际上，Argo workflow 插件是由 `argo executor-plugin build` 命令构建出来的，一个插件的原始文件主要包含三个部分：

1. 插件清单（`plugin.yaml`）：这里实际上是对一个容器的定义，其中包含了容器镜像、资源使用等。
1. 启动文件：一个命名为 `server.*` 的文本文件，可以是 Shell 或者 Python 脚本，他会在插件启动时被执行。
1. 插件镜像：上述文本文件可能无法描述一些业务逻辑，因此，可以将二进制文件封装到镜像里，给启动文件调用。

例如前边用到的 Python 插件的 `plugin.yaml`：

~~~yaml
kind: ExecutorPlugin
apiVersion: argoproj.io/v1alpha1
metadata:
  name: python
...
    workflows.argoproj.io/version: '>= v3.3'
spec:
  sidecar:
    container:
      command:
        - python
        - -c
      image: python:alpine
      name: python-executor-plugin
...
~~~

不难看出，这个定义和上边的 Configmap 是一致的。再看看 `server.py`：

~~~python
import json
from http.server import BaseHTTPRequestHandler, HTTPServer


class Plugin(BaseHTTPRequestHandler):

    def args(self):
        return json.loads(self.rfile.read(int(self.headers.get('Content-Length'))))

    def reply(self, reply):
        self.send_response(200)
        self.end_headers()
        self.wfile.write(json.dumps(reply).encode("UTF-8"))

    def unsupported(self):
        self.send_response(404)
        self.end_headers()

    def do_POST(self):
        if self.path == '/api/v1/template.execute':
            args = self.args()

            template = args['template']
            plugin = template.get('plugin', {})

            if 'python' in plugin:
                spec = plugin['python']

                # convert parameters into easy to use dict
                # artifacts are not supported
                parameters = {}
                for parameter in template.get('inputs', {}).get('parameters', []):
                    parameters[parameter['name']] = parameter['value']

                try:
                    code = compile(spec['expression'], "<string>", "eval")
...


if __name__ == '__main__':
    httpd = HTTPServer(('', 7984), Plugin)
    httpd.serve_forever()
~~~

上边的代码，不难看出，这里只是启动了一个简单的 Python HTTP Server，监听 `/api/v1/template.execute` 的 Post 请求，并对其进行处理。

上述的 YAML 和启动代码都编写完成之后，就可以使用 `argo executor-plugin build` 命令来构建 Configmap 了。

> 当然也可以使用自己定义的基础镜像。
