argo-rollouts之流量管理添加traefik支持

argo-rollouts之流量管理添加traefik支持 #

背景 #

在使用argo-rollouts做金丝雀部署的时候,为了精准控制流量,使用它流量控制的特性,该特性需要结合ingress网关一起作用,目前官网仅支持Nginx,Istio,ALB,而笔者使用最多的是traefik。

最小化原则,最好在不改变现有基础设施的前提下用上argo-rollouts,所以被迫添加对traefik的支持。

traefik可行性验证 #

参考文档 traefik分流,我们可以通过annotation的控制实现。

结合argo-rollouts以及traefik文档,我们需要准备的资源对象如下:

  • app-rollout || Rollout
  • app-stable-service || Service
  • app-canary-service || Service
  • app-stable-ingress || Ingress

traefik带权重控制的Ingress配置:

apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  annotations:
    kubernetes.io/ingress.class: traefik
    traefik.ingress.kubernetes.io/service-weights: |
      app-stable-service: 100%
      app-canary-service: 0%      
  name: app-stable-ingress
spec:
  rules:
  - http:
      paths:
      - backend:
          serviceName: app-stable-service
          servicePort: 80
        path: /
      - backend:
          serviceName: app-canary-service
          servicePort: 80
        path: /
对比nginx的流量控制,nginx采用两个ingress进行weighted负载均衡,而traefik采用一个ingress进行weighted负载均衡;所以在argo-rollouts流量控制实现上,我们只需要去将目标ingressannotation设置为上述目标ingress即可.

代码分析 #

type TrafficRoutingReconciler interface {
	Reconcile(desiredWeight int32) error
	Type() string
}

这是流量控制的接口,我们只需要实现Reconcile方法和Type即可,传入目标权重值,将权重渲染到对应的ingress资源模板中应用即可。

关键代码 #

// 检查stable ingress
func (r *Reconciler) checkStableIngress(stableIngress *extensionsv1beta1.Ingress) (err error) {
	stableServiceName := r.cfg.Rollout.Spec.Strategy.Canary.StableService
	canaryServiceName := r.cfg.Rollout.Spec.Strategy.Canary.CanaryService

    // 检查现有ingress配置中的services,保证相同host下的相同path下
    // 的stableService和canaryService成对出现
	rules := stableIngress.Spec.Rules

	for _, rule := range rules {
		paths := rule.HTTP.Paths

		matchMap := make(map[string]int)

		for _, path := range paths {
			if path.Backend.ServiceName == stableServiceName {
				matchMap[path.Path]++
			} else if path.Backend.ServiceName == canaryServiceName {
				matchMap[path.Path]++
			}
		}

		for p, cnt := range matchMap {
			if cnt == 1 {
				err = fmt.Errorf("[host/path: %s/%s] stableService %s and canaryService %s must be consistent", rule.Host, p, stableServiceName, canaryServiceName)
				r.log.WithField(logutil.IngressKey, stableIngress).
					WithField("host/path", rule.Host + "/" + p).WithError(err)
				return
			}
		}
	}

	return
}

// 协调traefik的ingress设定为指定权重的状态
func (r *Reconciler) Reconcile(desiredWeight int32) error {
    // 获取stableIngress
	stableIngressName := r.cfg.Rollout.Spec.Strategy.Canary.TrafficRouting.Traefik.StableIngress
	stableIngress, err := r.cfg.IngressLister.Ingresses(r.cfg.Rollout.Namespace).Get(stableIngressName)

	if err != nil {
		r.log.WithField(logutil.IngressKey, stableIngressName).WithField("err", err.Error()).Error("error retrieving stableIngress")
		return fmt.Errorf("error retrieving stableIngress `%s` from cache: %v", stableIngressName, err)
	}

	// 校验流程
	err = r.checkStableIngress(stableIngress)

	if err != nil {
		r.cfg.Recorder.Event(r.cfg.Rollout, corev1.EventTypeWarning, "CheckIngress", fmt.Sprintf("checkc ingress `%s` failed, with error `%s`", stableIngressName, err.Error()))
		return err
	}

    // 插入权重的annotation设定
	if _, ok := stableIngress.GetAnnotations()["traefik.ingress.kubernetes.io/service-weights"]; !ok {
		stableIngress.GetAnnotations()["traefik.ingress.kubernetes.io/service-weights"] = r.getAnnotationWeightConfig(desiredWeight)
		_, err = r.cfg.Client.ExtensionsV1beta1().Ingresses(r.cfg.Rollout.Namespace).Update(stableIngress)
		if err != nil {
			r.log.WithField(logutil.IngressKey, stableIngressName).Error(err.Error())
			r.cfg.Recorder.Event(r.cfg.Rollout, corev1.EventTypeNormal, "InitIngressAnnotation", fmt.Sprintf("init ingress [%s] annotation`%s` success", stableIngressName))
			return err
		}
	}

	// 更新期望weights
	stableIngress.Annotations["traefik.ingress.kubernetes.io/service-weights"] = r.getAnnotationWeightConfig(desiredWeight)
    r.log.WithField(logutil.IngressKey, stableIngressName).Infof("adjust ingress %s weight, [canaryService: %d] success", stableIngressName, desiredWeight)
    
	_, err = r.cfg.Client.ExtensionsV1beta1().Ingresses(r.cfg.Rollout.Namespace).Update(stableIngress)

	if err != nil {
		r.log.WithField(logutil.IngressKey, stableIngressName).Errorf("fail to adjust ingress %s weight, err: %s", stableIngressName, err.Error())
		r.cfg.Recorder.Event(r.cfg.Rollout, corev1.EventTypeWarning, "AdjustIngressWeight", fmt.Sprintf("fail to adjust ingress %s weight, err: %s", stableIngressName, err.Error()))
		return err
	}

	return nil
}


// 渲染ingress模板
func (r *Reconciler) getAnnotationWeightConfig(desiredWeight int32) string {
	stableServiceName := r.cfg.Rollout.Spec.Strategy.Canary.StableService
	canaryServiceName := r.cfg.Rollout.Spec.Strategy.Canary.CanaryService
	return fmt.Sprintf(`{"%s": %d,"%s": %d}`,stableServiceName, 100 - desiredWeight,canaryServiceName, desiredWeight)
}

验证 #

验证的用例包括:

  • 金丝雀发布功能:发布,暂停,回滚,promote
  • 流量是否按照既定weight进行分流
  • 切换过程中是否平滑安全

验证使用的rollout配置:

apiVersion: argoproj.io/v1alpha1
kind: Rollout
metadata:
  name: app-rollout
spec:
  replicas: 5
  selector:
    matchLabels:
      app: nginx
  strategy:
    canary:
      canaryService: app-canary-service
      stableService: app-stable-service
      steps:
      - setWeight: 10
      - pause: {}
      trafficRouting:
        traefik:
          stableIngress: app-ingress
  template:
    metadata:
      labels:
        app: nginx
    spec:
      containers:
      - image: nginx:v1.14.2
        name: nginx
        ports:
        - containerPort: 80
          name: http
        resources: {}
        readinessProbe:
          failureThreshold: 3
          httpGet:
            path: /
            port: http
            scheme: HTTP
          initialDelaySeconds: 10
          periodSeconds: 5
          successThreshold: 1
          timeoutSeconds: 2
        livenessProbe:
          failureThreshold: 2
          httpGet:
            path: /
            port: http
            scheme: HTTP
          initialDelaySeconds: 5
          periodSeconds: 5
          successThreshold: 1
          timeoutSeconds: 2
        lifecycle:
          preStop:
            exec:
              command: ["sleep", "5s"]
验证实验就省略罗,关键Rollout已经给出,操作命令见上一篇。

等待上线 #