K8S Proxy 源码分析


前言

在分析kube-proxy之前,我们需要先知道k8s中另外一个概念service。

service 是一组具有相同 label pod 集合的抽象,集群内外的各个服务可以通过 service 进行互相访问。

当创建一个 service 对象时也会对应创建一个 endpoint 对象,endpoint 存放的是pod服务的ip地址,service 只是将多个pod进行关联起来。

当用户通过service访问后端pod服务时,实际的路由转发都是由 kubernetes 中的 kube-proxy 组件来实现,因此,service 必须结合 kube-proxy 使用。

kube-proxy 组件可以运行在 kubernetes 集群中的每一个节点上也可以只运行在单独的几个节点上,其会根据 service 和 endpoints 的变动来刷新节点上 iptables 或者 ipvs 中保存的路由规则。

service 工作原理

service 工作原理

endpoint controller 负责监听 service 和对应 pod 的变化,更新对应 service 的 endpoints 对象。当用户创建 service 后 endpoints controller 会监听 pod 的状态,当 pod 处于 running 且准备就绪时,endpoints controller 会将 pod ip 记录到 endpoints 对象中。 kube-proxy 会监听 service 和 endpoints 的变化并调用其代理模块在主机上刷新路由规则。

总结就是,endpoint controller负责更新service ip和其对应后端服务endpoint ip的关系。而kube-proxy则负责更新各个节点的iptables或者ipvs使得流量能够被正常转发。

service 暴露服务方式

ClusterIp

ClusterIP 类型的 service 是 kubernetes 集群默认的服务暴露方式,它只能用于集群内部通信,可以被各 pod 访问,其访问方式为:

pod ---> ClusterIP:ServicePort --> (iptables)DNAT --> PodIP:containePort

NodePort

client ---> NodeIP:NodePort ---> ClusterIP:ServicePort ---> (iptables)DNAT ---> PodIP:containePort

LoadBalancer

Ingress

service 数据包流动分析

实验背景就是,我们启动三个nginx pod,pod3和pod2在node2中,而pod1在node1中。网络设备结构图如下: pod2和pod3在同一个node中,为了简洁暂未画出。

nginx部署yaml文件如下:

apiVersion: v1
kind: ReplicationController
metadata:
  name: nginx
spec:
  replicas: 3
  selector:
    app: nginx
  template:
    metadata:
      name: nginx
      labels:
        app: nginx
    spec:
      containers:
      - name: nginx
        image: nginx
        ports:
        - containerPort: 80
---
apiVersion: v1
kind: Service
metadata:
  name: nginx-service
  labels:
    name: nginx-service
spec:
  type: NodePort
  selector:
    app: nginx
  ports:
    - protocol: TCP
      port: 80
      targetPort: 80
      nodePort: 30001

部署完成之后,我们访问service如下:

$ kubectl get svc -o wide
NAME            TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)        AGE   SELECTOR 
nginx-service   NodePort    10.96.193.229   <none>        80:30001/TCP   9d    app=nginx

nginx-service含有三个后端endpoint

在pod2中curl 10.96.193.229,通过service cluster-ip来访问nginx, 数据包从pod2 eth0到达Cali.c2,再从Cali.c2发往tunl0之前会被iptables拦截做一次DNAT, 将原来的目标地址10.96.193.229转化为pod1的地址10.100.15.150,此时也就是说经过了一次负载均衡,选中了pod1作为服务端server来相应这次请求。

然后,经过DNAT的数据包达到node2 tunl0设备,添加上新的ip头(172.31.127.252->172.31.112.2),通过隧道发往node1网卡eth0, 然后通过tunl0解包,拿到原始的目标地址,也就是pod1的地址10.100.15.150,然后通过pod1中的veth pair对到达pod1。

iptables分析:

从pod2中发出curl nginx-service(curl 10.96.193.229)请求,假设负载均衡到了pod3,也就是说收到了pod3的应答。同一个node中调用,不会调用tunl0进行IPIP封装,直接通过 Cali.c2发送到Cali.fa,当然中间会经过iptables做一次DNAT替换,将目标地址从10.96.193.229更换为pod3的地址10.100.9.207。

从pod2中发出curl nginx-service(curl 10.96.193.229)请求,负载均衡到了pod2本身 首先做一次DNAT转化,将源地址10.96.193.229替换为10.100.9.206,

我们通过访问nodeip:30001,来验证集群外调用,通过抓包,我们直接贴出ip包流向图:

当用户访问curl 172.31.112.2:30001时,也就是通过nodeport(hostip+port)的方式来访问nginx服务, 请求发出后kube-proxy首先会做一次反向代理,随机选择一个端口将请求代理到172.31.112.2.57546,然后经过iptables负载均衡,选择了pod1 提供服务,如果选择的pod所在的node节点和curl 请求的hostip一致,就相当于本机pod访问,此时数据包源地址目标地址为:172.31.112.2:57546->10.100.15.150:80 直接通过pod1所在的veth pair对到达pod1,完成访问。

如果经过iptables负载均衡,选择pod2提供服务,首先随机选择一个端口将请求代理到172.31.112.2.57546,经过iptables做一次SNAT,将源ip地址替换为tunl0 ip地址。 此时源地址目标地址为10.100.15.128:57546(node1中tunl0的地址)–>10.100.9.206:80(pod2的地址),接着tunl0进行ipip封包,源地址和目标地址变为: 127031.112.0–>172.31.127.252:10.100.15.128:57546(node1中tunl0的地址)–>10.100.9.206:80(pod2的地址)。此时数据包到达node2的eth0网卡,经过tunl0设备解包, 拿到实际的包10.100.15.128:57546(node1中tunl0的地址)–>10.100.9.206:80(pod2的地址),经过pod2 veth pair对到达pod2,完成访问。

总结来说就是:

  1. 访问同一个node中的pod,通过nodeport访问从外部访问nginx服务,发送curl 172.31.112.2:30001请求时,首先kube-proxy会做一次反向代理,随机选择一个端口将请求代理到172.31.112.2:57546,由它来去请求pod服务。然后通过iptabels做一次DNAT修改目的地址为10.100.15.10:80,此时数据包变成172.31.112.2:57546–>10.100.15.10:80,不经过tunl0设备直接到达目标pod。

2.访问不同的node中的pod,通过nodeport访问从外部访问nginx服务,发送curl 172.31.112.2:30001请求时,首先kube-proxy会做一次反向代理,随机选择一个端口将请求代理到172.31.112.2:57546,由它来去请求pod服务,然后通过iptables发现需要经过tunl0设备,首先做SNAT,修改源地址为tunl0地址,然后做DNAT修改目的地址为10.100.9.206:80,然后通过IPIP加上新的包头。 这样就发送到了node2所在的节点,经过tunl0,经过pod veth paird对到达目标pod。

kube-proxy 模式

kube-proxy 的路由转发规则是通过其后端的代理模块实现的,kube-proxy 的代理模块目前有四种实现方案,userspace、iptables、ipvs、kernelspace,其发展历程如下所示:

userspace 模式

iptables 模式

ipvs 模式

kernelspace 模式

windowns下使用

kube-proxy 源码分析入口

//cmd/kube-proxy/proxy.go:33
func main() {
	command := app.NewProxyCommand()
	if err := command.Execute(); err != nil {
		os.Exit(1)
	}
}

//cmd/kube-proxy/app/server.go:415
// NewProxyCommand creates a *cobra.Command object with default parameters
func NewProxyCommand() *cobra.Command {
	opts := NewOptions()
	cmd := &cobra.Command{
		Use: "kube-proxy",
		Run: func(cmd *cobra.Command, args []string) {
			...
			if err := opts.Run(); err != nil {
				klog.Exit(err)
			}
		},
	}

	var err error
	opts.config, err = opts.ApplyDefaults(opts.config)
	if err != nil {
		klog.Fatalf("unable to create flag defaults: %v", err)
	}
	return cmd
}

下面进入kube-proxy真正启动逻辑opts.Run()

//cmd/kube-proxy/app/server.go:290
// Run runs the specified ProxyServer.
func (o *Options) Run() error {
	defer close(o.errCh)
	//如果指定了 --write-config-to 参数,则将参数写入到指定的配置文件中
	if len(o.WriteConfigTo) > 0 {
		return o.writeConfigFile()
	}

    //新建Proxyserver
	proxyServer, err := NewProxyServer(o)
	if err != nil {
		return err
	}
    
    //如果在启动参数中指定了--cleanup=true,则清空ipvs,iptables里面的规则
	if o.CleanupAndExit {
		return proxyServer.CleanupAndExit()
	}

	o.proxyServer = proxyServer
	return o.runLoop()
}

主要逻辑很简单,就是新建ProxyServer,然后启动runLoop进行无限循环,接下来我们看下NewProxyServer都创建了些啥

//cmd/kube-proxy/app/server_others.go:62
func newProxyServer(
	config *proxyconfigapi.KubeProxyConfiguration,
	cleanupAndExit bool,
	master string) (*ProxyServer, error) {

	//定义将要使用的工具包,iptables, ipvs, kernel, ipset, dbus
	var iptInterface utiliptables.Interface
	var ipvsInterface utilipvs.Interface
	var kernelHandler ipvs.KernelHandler
	var ipsetInterface utilipset.Interface
	var dbus utildbus.Interface

	// 初始化linux命令行工具
	execer := exec.New()
    
    //工具包初始化,dbus,iptables, ipvs, kernel, ipset
	dbus = utildbus.New()
	iptInterface = utiliptables.New(execer, dbus, protocol)
	kernelHandler = ipvs.NewLinuxKernelHandler()
	ipsetInterface = utilipset.New(execer)
    //检测节点是否支持ipvs模式
	canUseIPVS, _ := ipvs.CanUseIPVSProxier(kernelHandler, ipsetInterface)
	if canUseIPVS {
		ipvsInterface = utilipvs.New(execer)
	}
	

	client, eventClient, err := createClients(config.ClientConnection, master)
	if err != nil {
		return nil, err
	}

	var proxier proxy.Provider

    //获取kube-proxy mode,
	proxyMode := getProxyMode(string(config.Mode), kernelHandler, ipsetInterface, iptables.LinuxKernelCompatTester{})
    ...
    //如果kube-proxy mode是iptables模式
	if proxyMode == proxyModeIPTables {
		klog.V(0).Info("Using iptables Proxier.")
		if config.IPTables.MasqueradeBit == nil {
			// MasqueradeBit must be specified or defaulted.
			return nil, fmt.Errorf("unable to read IPTables MasqueradeBit from config")
		}

		//新建iptables 模式的proxier
		proxier, err = iptables.NewProxier(
			...
		)
		//如果proxy mode是ipvs类型
	} else if proxyMode == proxyModeIPVS {
		klog.V(0).Info("Using ipvs Proxier.")
        //判断是否启用了ipv6 双栈模式,双栈技术所谓双栈(Dual IP Stack) ,就是在一复个系统(如一台主机或一台路由器) 中同时使制用百IPv6/ IPv4 两个可以并行工作的协议栈
        //并且ipv4和ipv6可以相互转换
		if utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {
			klog.V(0).Info("creating dualStackProxier for ipvs.")
			
			proxier, err = ipvs.NewDualStackProxier(
				...
			)
		} else {
			proxier, err = ipvs.NewProxier(
				...
			)
		}
		if err != nil {
			return nil, fmt.Errorf("unable to create proxier: %v", err)
		}
	} else {

        //如果上面iptables和ipvs模式都不能满足条件,则使用userspace模式启动
		klog.V(0).Info("Using userspace Proxier.")

		// TODO this has side effects that should only happen when Run() is invoked.
		proxier, err = userspace.NewProxier(
			userspace.NewLoadBalancerRR(),
			net.ParseIP(config.BindAddress),
			iptInterface,
			execer,
			*utilnet.ParsePortRangeOrDie(config.PortRange),
			config.IPTables.SyncPeriod.Duration,
			config.IPTables.MinSyncPeriod.Duration,
			config.UDPIdleTimeout.Duration,
			config.NodePortAddresses,
		)
		if err != nil {
			return nil, fmt.Errorf("unable to create proxier: %v", err)
		}
	}

	iptInterface.AddReloadFunc(proxier.Sync)

	return &ProxyServer{
		Client:                 client,
		EventClient:            eventClient,
		IptInterface:           iptInterface,
		IpvsInterface:          ipvsInterface,
		IpsetInterface:         ipsetInterface,
		execer:                 execer,
		Proxier:                proxier,
	    ...
		ProxyMode:              proxyMode,
	    ...
	}, nil
}

看来挺简单的就是iptables,ipvs, userspace三种模式分别进行条件判断,iptables模式优先级最高,其次ipvs,最后是userspace模式,最后返回一个初始化好的proxy结构体。

接下来我们看一下Runloop()中的逻辑

//cmd/kube-proxy/app/server.go:309
// runLoop will watch on the update change of the proxy server's configuration file.
// Return an error when updated
func (o *Options) runLoop() error {
    //watch Kube-proxy配置文件的变化
	if o.watcher != nil {
		o.watcher.Run()
	}

	// goroutine 启动proxyServer
	go func() {
		err := o.proxyServer.Run()
		o.errCh <- err
	}()
    ...
}

进入proxyServer.Run() 进行查看,

//cmd/kube-proxy/app/server.go:527
// 启动 ProxyServer,并且需要保持该server永远不会退出
func (s *ProxyServer) Run() error {

	// TODO(vmarmol): Use container config for this.
	var oomAdjuster *oom.OOMAdjuster
	if s.OOMScoreAdj != nil {
		oomAdjuster = oom.NewOOMAdjuster()
		if err := oomAdjuster.ApplyOOMScoreAdj(0, int(*s.OOMScoreAdj)); err != nil {
			klog.V(2).Info(err)
		}
	}

	if s.Broadcaster != nil && s.EventClient != nil {
		s.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: s.EventClient.Events("")})
	}

	// Start up a healthz server if requested
	if s.HealthzServer != nil {
		s.HealthzServer.Run()
	}

	// Start up a metrics server if requested
	if len(s.MetricsBindAddress) > 0 {
		proxyMux := mux.NewPathRecorderMux("kube-proxy")
		healthz.InstallHandler(proxyMux)
		proxyMux.HandleFunc("/proxyMode", func(w http.ResponseWriter, r *http.Request) {
			fmt.Fprintf(w, "%s", s.ProxyMode)
		})
		proxyMux.Handle("/metrics", legacyregistry.Handler())
		if s.EnableProfiling {
			routes.Profiling{}.Install(proxyMux)
		}
		configz.InstallHandler(proxyMux)
		go wait.Until(func() {
			err := http.ListenAndServe(s.MetricsBindAddress, proxyMux)
			if err != nil {
				utilruntime.HandleError(fmt.Errorf("starting metrics server failed: %v", err))
			}
		}, 5*time.Second, wait.NeverStop)
	}

	// Tune conntrack, if requested
	// Conntracker is always nil for windows
	if s.Conntracker != nil {
		max, err := getConntrackMax(s.ConntrackConfiguration)
		if err != nil {
			return err
		}
		if max > 0 {
			err := s.Conntracker.SetMax(max)
			if err != nil {
				if err != errReadOnlySysFS {
					return err
				}
				// errReadOnlySysFS is caused by a known docker issue (https://github.com/docker/docker/issues/24000),
				// the only remediation we know is to restart the docker daemon.
				// Here we'll send an node event with specific reason and message, the
				// administrator should decide whether and how to handle this issue,
				// whether to drain the node and restart docker.  Occurs in other container runtimes
				// as well.
				// TODO(random-liu): Remove this when the docker bug is fixed.
				const message = "CRI error: /sys is read-only: " +
					"cannot modify conntrack limits, problems may arise later (If running Docker, see docker issue #24000)"
				s.Recorder.Eventf(s.NodeRef, api.EventTypeWarning, err.Error(), message)
			}
		}

		if s.ConntrackConfiguration.TCPEstablishedTimeout != nil && s.ConntrackConfiguration.TCPEstablishedTimeout.Duration > 0 {
			timeout := int(s.ConntrackConfiguration.TCPEstablishedTimeout.Duration / time.Second)
			if err := s.Conntracker.SetTCPEstablishedTimeout(timeout); err != nil {
				return err
			}
		}

		if s.ConntrackConfiguration.TCPCloseWaitTimeout != nil && s.ConntrackConfiguration.TCPCloseWaitTimeout.Duration > 0 {
			timeout := int(s.ConntrackConfiguration.TCPCloseWaitTimeout.Duration / time.Second)
			if err := s.Conntracker.SetTCPCloseWaitTimeout(timeout); err != nil {
				return err
			}
		}
	}

	noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil)
	if err != nil {
		return err
	}

	noHeadlessEndpoints, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil)
	if err != nil {
		return err
	}

	labelSelector := labels.NewSelector()
	labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints)

	informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
		informers.WithTweakListOptions(func(options *metav1.ListOptions) {
			options.LabelSelector = labelSelector.String()
		}))

	// Create configs (i.e. Watches for Services and Endpoints or EndpointSlices)
	// Note: RegisterHandler() calls need to happen before creation of Sources because sources
	// only notify on changes, and the initial update (on process start) may be lost if no handlers
	// are registered yet.
	serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod)
	serviceConfig.RegisterEventHandler(s.Proxier)
	go serviceConfig.Run(wait.NeverStop)

	if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) {
		endpointSliceConfig := config.NewEndpointSliceConfig(informerFactory.Discovery().V1alpha1().EndpointSlices(), s.ConfigSyncPeriod)
		endpointSliceConfig.RegisterEventHandler(s.Proxier)
		go endpointSliceConfig.Run(wait.NeverStop)
	} else {
		endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
		endpointsConfig.RegisterEventHandler(s.Proxier)
		go endpointsConfig.Run(wait.NeverStop)
	}

	// This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those
	// functions must configure their shared informer event handlers first.
	informerFactory.Start(wait.NeverStop)

	// Birth Cry after the birth is successful
	s.birthCry()

	// Just loop forever for now...
	s.Proxier.SyncLoop()
	return nil
}

参考

https://kubernetes.io/docs/reference/command-line-tools-reference/kube-proxy/ https://blog.csdn.net/u010771890/article/details/103226784