K8S Proxy 源码分析
前言
在分析kube-proxy之前,我们需要先知道k8s中另外一个概念service。
service 是一组具有相同 label pod 集合的抽象,集群内外的各个服务可以通过 service 进行互相访问。
当创建一个 service 对象时也会对应创建一个 endpoint 对象,endpoint 存放的是pod服务的ip地址,service 只是将多个pod进行关联起来。
当用户通过service访问后端pod服务时,实际的路由转发都是由 kubernetes 中的 kube-proxy 组件来实现,因此,service 必须结合 kube-proxy 使用。
kube-proxy 组件可以运行在 kubernetes 集群中的每一个节点上也可以只运行在单独的几个节点上,其会根据 service 和 endpoints 的变动来刷新节点上 iptables 或者 ipvs 中保存的路由规则。
service 工作原理
endpoint controller 负责监听 service 和对应 pod 的变化,更新对应 service 的 endpoints 对象。当用户创建 service 后 endpoints controller 会监听 pod 的状态,当 pod 处于 running 且准备就绪时,endpoints controller 会将 pod ip 记录到 endpoints 对象中。 kube-proxy 会监听 service 和 endpoints 的变化并调用其代理模块在主机上刷新路由规则。
总结就是,endpoint controller负责更新service ip和其对应后端服务endpoint ip的关系。而kube-proxy则负责更新各个节点的iptables或者ipvs使得流量能够被正常转发。
service 暴露服务方式
ClusterIp
ClusterIP 类型的 service 是 kubernetes 集群默认的服务暴露方式,它只能用于集群内部通信,可以被各 pod 访问,其访问方式为:
pod ---> ClusterIP:ServicePort --> (iptables)DNAT --> PodIP:containePort
NodePort
client ---> NodeIP:NodePort ---> ClusterIP:ServicePort ---> (iptables)DNAT ---> PodIP:containePort
LoadBalancer
Ingress
service 数据包流动分析
实验背景就是,我们启动三个nginx pod,pod3和pod2在node2中,而pod1在node1中。网络设备结构图如下: pod2和pod3在同一个node中,为了简洁暂未画出。
nginx部署yaml文件如下:
apiVersion: v1
kind: ReplicationController
metadata:
name: nginx
spec:
replicas: 3
selector:
app: nginx
template:
metadata:
name: nginx
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx
ports:
- containerPort: 80
---
apiVersion: v1
kind: Service
metadata:
name: nginx-service
labels:
name: nginx-service
spec:
type: NodePort
selector:
app: nginx
ports:
- protocol: TCP
port: 80
targetPort: 80
nodePort: 30001
部署完成之后,我们访问service如下:
$ kubectl get svc -o wide
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE SELECTOR
nginx-service NodePort 10.96.193.229 <none> 80:30001/TCP 9d app=nginx
nginx-service含有三个后端endpoint
- 情景一 从pod2中发送curl nginx-service请求
在pod2中curl 10.96.193.229,通过service cluster-ip来访问nginx, 数据包从pod2 eth0到达Cali.c2,再从Cali.c2发往tunl0之前会被iptables拦截做一次DNAT, 将原来的目标地址10.96.193.229转化为pod1的地址10.100.15.150,此时也就是说经过了一次负载均衡,选中了pod1作为服务端server来相应这次请求。
然后,经过DNAT的数据包达到node2 tunl0设备,添加上新的ip头(172.31.127.252->172.31.112.2),通过隧道发往node1网卡eth0, 然后通过tunl0解包,拿到原始的目标地址,也就是pod1的地址10.100.15.150,然后通过pod1中的veth pair对到达pod1。
iptables分析:
- 情景二 同一个node中pod的相互调用
从pod2中发出curl nginx-service(curl 10.96.193.229)请求,假设负载均衡到了pod3,也就是说收到了pod3的应答。同一个node中调用,不会调用tunl0进行IPIP封装,直接通过 Cali.c2发送到Cali.fa,当然中间会经过iptables做一次DNAT替换,将目标地址从10.96.193.229更换为pod3的地址10.100.9.207。
- 情景三 同一个pod发生调用
从pod2中发出curl nginx-service(curl 10.96.193.229)请求,负载均衡到了pod2本身 首先做一次DNAT转化,将源地址10.96.193.229替换为10.100.9.206,
- 情景四 集群外部调用nginx服务
我们通过访问nodeip:30001,来验证集群外调用,通过抓包,我们直接贴出ip包流向图:
当用户访问curl 172.31.112.2:30001时,也就是通过nodeport(hostip+port)的方式来访问nginx服务, 请求发出后kube-proxy首先会做一次反向代理,随机选择一个端口将请求代理到172.31.112.2.57546,然后经过iptables负载均衡,选择了pod1 提供服务,如果选择的pod所在的node节点和curl 请求的hostip一致,就相当于本机pod访问,此时数据包源地址目标地址为:172.31.112.2:57546->10.100.15.150:80 直接通过pod1所在的veth pair对到达pod1,完成访问。
如果经过iptables负载均衡,选择pod2提供服务,首先随机选择一个端口将请求代理到172.31.112.2.57546,经过iptables做一次SNAT,将源ip地址替换为tunl0 ip地址。 此时源地址目标地址为10.100.15.128:57546(node1中tunl0的地址)–>10.100.9.206:80(pod2的地址),接着tunl0进行ipip封包,源地址和目标地址变为: 127031.112.0–>172.31.127.252:10.100.15.128:57546(node1中tunl0的地址)–>10.100.9.206:80(pod2的地址)。此时数据包到达node2的eth0网卡,经过tunl0设备解包, 拿到实际的包10.100.15.128:57546(node1中tunl0的地址)–>10.100.9.206:80(pod2的地址),经过pod2 veth pair对到达pod2,完成访问。
总结来说就是:
- 访问同一个node中的pod,通过nodeport访问从外部访问nginx服务,发送curl 172.31.112.2:30001请求时,首先kube-proxy会做一次反向代理,随机选择一个端口将请求代理到172.31.112.2:57546,由它来去请求pod服务。然后通过iptabels做一次DNAT修改目的地址为10.100.15.10:80,此时数据包变成172.31.112.2:57546–>10.100.15.10:80,不经过tunl0设备直接到达目标pod。
2.访问不同的node中的pod,通过nodeport访问从外部访问nginx服务,发送curl 172.31.112.2:30001请求时,首先kube-proxy会做一次反向代理,随机选择一个端口将请求代理到172.31.112.2:57546,由它来去请求pod服务,然后通过iptables发现需要经过tunl0设备,首先做SNAT,修改源地址为tunl0地址,然后做DNAT修改目的地址为10.100.9.206:80,然后通过IPIP加上新的包头。 这样就发送到了node2所在的节点,经过tunl0,经过pod veth paird对到达目标pod。
kube-proxy 模式
kube-proxy 的路由转发规则是通过其后端的代理模块实现的,kube-proxy 的代理模块目前有四种实现方案,userspace、iptables、ipvs、kernelspace,其发展历程如下所示:
- kubernetes v1.0:services 仅是一个“4层”代理,代理模块只有 userspace
- kubernetes v1.1:Ingress API 出现,其代理“7层”服务,并且增加了 iptables 代理模块
- kubernetes v1.2:iptables 成为默认代理模式
- kubernetes v1.8:引入 ipvs 代理模块
- kubernetes v1.9:ipvs 代理模块成为 beta 版本
- kubernetes v1.11:ipvs 代理模式 GA
userspace 模式
iptables 模式
ipvs 模式
kernelspace 模式
windowns下使用
kube-proxy 源码分析入口
//cmd/kube-proxy/proxy.go:33
func main() {
command := app.NewProxyCommand()
if err := command.Execute(); err != nil {
os.Exit(1)
}
}
//cmd/kube-proxy/app/server.go:415
// NewProxyCommand creates a *cobra.Command object with default parameters
func NewProxyCommand() *cobra.Command {
opts := NewOptions()
cmd := &cobra.Command{
Use: "kube-proxy",
Run: func(cmd *cobra.Command, args []string) {
...
if err := opts.Run(); err != nil {
klog.Exit(err)
}
},
}
var err error
opts.config, err = opts.ApplyDefaults(opts.config)
if err != nil {
klog.Fatalf("unable to create flag defaults: %v", err)
}
return cmd
}
下面进入kube-proxy真正启动逻辑opts.Run()
//cmd/kube-proxy/app/server.go:290
// Run runs the specified ProxyServer.
func (o *Options) Run() error {
defer close(o.errCh)
//如果指定了 --write-config-to 参数,则将参数写入到指定的配置文件中
if len(o.WriteConfigTo) > 0 {
return o.writeConfigFile()
}
//新建Proxyserver
proxyServer, err := NewProxyServer(o)
if err != nil {
return err
}
//如果在启动参数中指定了--cleanup=true,则清空ipvs,iptables里面的规则
if o.CleanupAndExit {
return proxyServer.CleanupAndExit()
}
o.proxyServer = proxyServer
return o.runLoop()
}
主要逻辑很简单,就是新建ProxyServer,然后启动runLoop进行无限循环,接下来我们看下NewProxyServer
都创建了些啥
//cmd/kube-proxy/app/server_others.go:62
func newProxyServer(
config *proxyconfigapi.KubeProxyConfiguration,
cleanupAndExit bool,
master string) (*ProxyServer, error) {
//定义将要使用的工具包,iptables, ipvs, kernel, ipset, dbus
var iptInterface utiliptables.Interface
var ipvsInterface utilipvs.Interface
var kernelHandler ipvs.KernelHandler
var ipsetInterface utilipset.Interface
var dbus utildbus.Interface
// 初始化linux命令行工具
execer := exec.New()
//工具包初始化,dbus,iptables, ipvs, kernel, ipset
dbus = utildbus.New()
iptInterface = utiliptables.New(execer, dbus, protocol)
kernelHandler = ipvs.NewLinuxKernelHandler()
ipsetInterface = utilipset.New(execer)
//检测节点是否支持ipvs模式
canUseIPVS, _ := ipvs.CanUseIPVSProxier(kernelHandler, ipsetInterface)
if canUseIPVS {
ipvsInterface = utilipvs.New(execer)
}
client, eventClient, err := createClients(config.ClientConnection, master)
if err != nil {
return nil, err
}
var proxier proxy.Provider
//获取kube-proxy mode,
proxyMode := getProxyMode(string(config.Mode), kernelHandler, ipsetInterface, iptables.LinuxKernelCompatTester{})
...
//如果kube-proxy mode是iptables模式
if proxyMode == proxyModeIPTables {
klog.V(0).Info("Using iptables Proxier.")
if config.IPTables.MasqueradeBit == nil {
// MasqueradeBit must be specified or defaulted.
return nil, fmt.Errorf("unable to read IPTables MasqueradeBit from config")
}
//新建iptables 模式的proxier
proxier, err = iptables.NewProxier(
...
)
//如果proxy mode是ipvs类型
} else if proxyMode == proxyModeIPVS {
klog.V(0).Info("Using ipvs Proxier.")
//判断是否启用了ipv6 双栈模式,双栈技术所谓双栈(Dual IP Stack) ,就是在一复个系统(如一台主机或一台路由器) 中同时使制用百IPv6/ IPv4 两个可以并行工作的协议栈
//并且ipv4和ipv6可以相互转换
if utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {
klog.V(0).Info("creating dualStackProxier for ipvs.")
proxier, err = ipvs.NewDualStackProxier(
...
)
} else {
proxier, err = ipvs.NewProxier(
...
)
}
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
} else {
//如果上面iptables和ipvs模式都不能满足条件,则使用userspace模式启动
klog.V(0).Info("Using userspace Proxier.")
// TODO this has side effects that should only happen when Run() is invoked.
proxier, err = userspace.NewProxier(
userspace.NewLoadBalancerRR(),
net.ParseIP(config.BindAddress),
iptInterface,
execer,
*utilnet.ParsePortRangeOrDie(config.PortRange),
config.IPTables.SyncPeriod.Duration,
config.IPTables.MinSyncPeriod.Duration,
config.UDPIdleTimeout.Duration,
config.NodePortAddresses,
)
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
}
iptInterface.AddReloadFunc(proxier.Sync)
return &ProxyServer{
Client: client,
EventClient: eventClient,
IptInterface: iptInterface,
IpvsInterface: ipvsInterface,
IpsetInterface: ipsetInterface,
execer: execer,
Proxier: proxier,
...
ProxyMode: proxyMode,
...
}, nil
}
看来挺简单的就是iptables,ipvs, userspace三种模式分别进行条件判断,iptables模式优先级最高,其次ipvs,最后是userspace模式,最后返回一个初始化好的proxy结构体。
接下来我们看一下Runloop()
中的逻辑
//cmd/kube-proxy/app/server.go:309
// runLoop will watch on the update change of the proxy server's configuration file.
// Return an error when updated
func (o *Options) runLoop() error {
//watch Kube-proxy配置文件的变化
if o.watcher != nil {
o.watcher.Run()
}
// goroutine 启动proxyServer
go func() {
err := o.proxyServer.Run()
o.errCh <- err
}()
...
}
进入proxyServer.Run()
进行查看,
//cmd/kube-proxy/app/server.go:527
// 启动 ProxyServer,并且需要保持该server永远不会退出
func (s *ProxyServer) Run() error {
// TODO(vmarmol): Use container config for this.
var oomAdjuster *oom.OOMAdjuster
if s.OOMScoreAdj != nil {
oomAdjuster = oom.NewOOMAdjuster()
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(*s.OOMScoreAdj)); err != nil {
klog.V(2).Info(err)
}
}
if s.Broadcaster != nil && s.EventClient != nil {
s.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: s.EventClient.Events("")})
}
// Start up a healthz server if requested
if s.HealthzServer != nil {
s.HealthzServer.Run()
}
// Start up a metrics server if requested
if len(s.MetricsBindAddress) > 0 {
proxyMux := mux.NewPathRecorderMux("kube-proxy")
healthz.InstallHandler(proxyMux)
proxyMux.HandleFunc("/proxyMode", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "%s", s.ProxyMode)
})
proxyMux.Handle("/metrics", legacyregistry.Handler())
if s.EnableProfiling {
routes.Profiling{}.Install(proxyMux)
}
configz.InstallHandler(proxyMux)
go wait.Until(func() {
err := http.ListenAndServe(s.MetricsBindAddress, proxyMux)
if err != nil {
utilruntime.HandleError(fmt.Errorf("starting metrics server failed: %v", err))
}
}, 5*time.Second, wait.NeverStop)
}
// Tune conntrack, if requested
// Conntracker is always nil for windows
if s.Conntracker != nil {
max, err := getConntrackMax(s.ConntrackConfiguration)
if err != nil {
return err
}
if max > 0 {
err := s.Conntracker.SetMax(max)
if err != nil {
if err != errReadOnlySysFS {
return err
}
// errReadOnlySysFS is caused by a known docker issue (https://github.com/docker/docker/issues/24000),
// the only remediation we know is to restart the docker daemon.
// Here we'll send an node event with specific reason and message, the
// administrator should decide whether and how to handle this issue,
// whether to drain the node and restart docker. Occurs in other container runtimes
// as well.
// TODO(random-liu): Remove this when the docker bug is fixed.
const message = "CRI error: /sys is read-only: " +
"cannot modify conntrack limits, problems may arise later (If running Docker, see docker issue #24000)"
s.Recorder.Eventf(s.NodeRef, api.EventTypeWarning, err.Error(), message)
}
}
if s.ConntrackConfiguration.TCPEstablishedTimeout != nil && s.ConntrackConfiguration.TCPEstablishedTimeout.Duration > 0 {
timeout := int(s.ConntrackConfiguration.TCPEstablishedTimeout.Duration / time.Second)
if err := s.Conntracker.SetTCPEstablishedTimeout(timeout); err != nil {
return err
}
}
if s.ConntrackConfiguration.TCPCloseWaitTimeout != nil && s.ConntrackConfiguration.TCPCloseWaitTimeout.Duration > 0 {
timeout := int(s.ConntrackConfiguration.TCPCloseWaitTimeout.Duration / time.Second)
if err := s.Conntracker.SetTCPCloseWaitTimeout(timeout); err != nil {
return err
}
}
}
noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil)
if err != nil {
return err
}
noHeadlessEndpoints, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil)
if err != nil {
return err
}
labelSelector := labels.NewSelector()
labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints)
informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = labelSelector.String()
}))
// Create configs (i.e. Watches for Services and Endpoints or EndpointSlices)
// Note: RegisterHandler() calls need to happen before creation of Sources because sources
// only notify on changes, and the initial update (on process start) may be lost if no handlers
// are registered yet.
serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod)
serviceConfig.RegisterEventHandler(s.Proxier)
go serviceConfig.Run(wait.NeverStop)
if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) {
endpointSliceConfig := config.NewEndpointSliceConfig(informerFactory.Discovery().V1alpha1().EndpointSlices(), s.ConfigSyncPeriod)
endpointSliceConfig.RegisterEventHandler(s.Proxier)
go endpointSliceConfig.Run(wait.NeverStop)
} else {
endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
endpointsConfig.RegisterEventHandler(s.Proxier)
go endpointsConfig.Run(wait.NeverStop)
}
// This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those
// functions must configure their shared informer event handlers first.
informerFactory.Start(wait.NeverStop)
// Birth Cry after the birth is successful
s.birthCry()
// Just loop forever for now...
s.Proxier.SyncLoop()
return nil
}
参考
https://kubernetes.io/docs/reference/command-line-tools-reference/kube-proxy/ https://blog.csdn.net/u010771890/article/details/103226784