【kubernetes/k8s原始碼分析】kubectl-proxy ipvs原始碼分析
kubernetes版本: 1.12.1 原始碼路徑 pkg/proxy/ipvs/proxier.go
本文只講解IPVS相關部分,啟動流程前文:
https://blog.csdn.net/zhonglinzhang/article/details/80185053
WHY IPVS
儘管 Kubernetes 在版本v1.6中已經支援5000個節點,但使用 iptables 的 kube-proxy 實際上是將叢集擴充套件到5000個節點的瓶頸。 在5000節點叢集中使用 NodePort 服務,如果有2000個服務並且每個服務有10個 pod,這將在每個工作節點上至少產生20000個 iptable 記錄,這可能使核心非常繁忙。
WHAT ?
kube-proxy引入了IPVS,IPVS與iptables基於Netfilter,但IPVS採用的hash表,因此當service數量規模特別大時,hash查表的速度優勢就會突顯,而提高查詢service效能
HOW IPVS?
kube-proxy啟動引數
/usr/bin/kube-proxy --bind-address=10.12.51.172 --hostname-override=10.12.51.172 --cluster-cidr=10.254.0.0/16 --kubeconfig=/etc/kubernetes/kube-proxy.kubeconfig --logtostderr=true --v=2
--ipvs-scheduler=wrr --ipvs-min-sync-period=5s --ipvs-sync-period=5s --proxy-mode=ipvs
引數 --masquerade-all=true
則 ipvs 將偽裝所有訪問 Service 的 Cluster IP 的流量,此時的行為和 iptables 一樣
引數--cluster-cidr=<cidr>
引數: –cleanup-ipvs:true清除在 IPVS 模式下建立的 IPVS 配置和 IPTables 規則。
引數: –ipvs-sync-period 同步IPVS 規則的最大間隔時間(’5s’,’1m’)。
引數: –ipvs-min-sync-period 同步 IPVS 規則的最小間隔時間間隔(例如’5s’,’1m’)
引數: –ipvs-scheduler 預設為rr
- rr: round-robin
- lc: least connection
- dh: destination hashing
- sh: source hashing
- sed: shortest expected delay
- nq: never queue
IPVS原理
摘自網上文章,一目瞭然
ipvs : 工作於核心空間,主要用於使使用者定義的策略生效;
ipvsadm : 工作於使用者空間,主要用於使用者定義和管理叢集服務的工具;
IPVS 中有三種代理模式:
NAT(masq),IPIP 和 DR。
只有 NAT 模式支援埠對映。 Kube-proxy 利用 NAT 模式進行埠對映。
IPVS DR方式原理
ipset原理
ipset是iptables的擴充套件, 建立匹配整個地址集合的規則。而普通的iptables鏈只能單IP匹配, ip集合儲存在帶索引的資料結構中,這種結構即時集合比較大也可以進行高效的查詢。官網:http://ipset.netfilter.org/
ipvs 會使用 iptables 進行包過濾、SNAT、masquared(偽裝)。具體來說,ipvs 將使用ipset
來儲存需要DROP
或masquared
的流量的源或目標地址,以確保 iptables 規則的數量是恆定的
核心模組
確保 ipvs 需要的核心模組,需要下面幾個模組:ip_vs、ip_vs_rr、ip_vs_wrr、ip_vs_sh、nf_conntrack_ipv4
var ipvsModules = []string{
"ip_vs",
"ip_vs_rr",
"ip_vs_wrr",
"ip_vs_sh",
"nf_conntrack_ipv4",
}
1. NewProxier函式
1.1 設定核心引數
- net/ipv4/conf/all/route_localnet: 是否允許外部訪問localhost
- net/bridge/bridge-nf-call-iptables: 1為二層的網橋在轉發包時也會被iptables的FORWARD規則所過濾,這樣就會出現L3層的iptables rules去過濾L2的幀的問題
- net/ipv4/vs/conntrack
- net/ipv4/ip_forward: 是否開啟ipv4的IP轉發(0:禁止 1:開啟)
// Set the route_localnet sysctl we need for
if err := sysctl.SetSysctl(sysctlRouteLocalnet, 1); err != nil {
return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlRouteLocalnet, err)
}
// Proxy needs br_netfilter and bridge-nf-call-iptables=1 when containers
// are connected to a Linux bridge (but not SDN bridges). Until most
// plugins handle this, log when config is missing
if val, err := sysctl.GetSysctl(sysctlBridgeCallIPTables); err == nil && val != 1 {
glog.Infof("missing br-netfilter module or unset sysctl br-nf-call-iptables; proxy may not work as intended")
}
// Set the conntrack sysctl we need for
if err := sysctl.SetSysctl(sysctlVSConnTrack, 1); err != nil {
return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlVSConnTrack, err)
}
// Set the ip_forward sysctl we need for
if err := sysctl.SetSysctl(sysctlForward, 1); err != nil {
return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlForward, err)
}
1.2 初始化IPSet列表
load定義的IPSet到ipsetList map中
// initialize ipsetList with all sets we needed
proxier.ipsetList = make(map[string]*IPSet)
for _, is := range ipsetInfo {
if is.isIPv6 {
proxier.ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, isIPv6, is.comment)
}
proxier.ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, false, is.comment)
}
1.3 syncRunner初始化,主要函式是syncProxyRules
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
2. syncProxyRules函式
2.1 reset 四個buffer
在頭部寫入*filter,*nat標誌表的起始
// Reset all buffers used later.
// This is to avoid memory reallocations and thus improve performance.
proxier.natChains.Reset()
proxier.natRules.Reset()
proxier.filterChains.Reset()
proxier.filterRules.Reset()
// Write table headers.
writeLine(proxier.filterChains, "*filter")
writeLine(proxier.natChains, "*nat")
2.2 建立dunmny device
# ip route show table local type local proto kernel
- 10.12.51.172 dev eth0 scope host src 10.12.51.172
- 10.254.0.1 dev kube-ipvs0 scope host src 10.254.0.1
- 10.254.0.2 dev kube-ipvs0 scope host src 10.254.0.2
- 10.254.69.27 dev kube-ipvs0 scope host src 10.254.69.27
- 10.254.86.39 dev kube-ipvs0 scope host src 10.254.86.39
- 127.0.0.0/8 dev lo scope host src 127.0.0.1
- 127.0.0.1 dev lo scope host src 127.0.0.1
- 172.30.46.1 dev docker0 scope host src 172.30.46.1
// make sure dummy interface exists in the system where ipvs Proxier will bind service address on it
_, err := proxier.netlinkHandle.EnsureDummyDevice(DefaultDummyDevice)
if err != nil {
glog.Errorf("Failed to create dummy interface: %s, error: %v", DefaultDummyDevice, err)
return
}
2.3 使用ipset建立規則
// make sure ip sets exists in the system.
for _, set := range proxier.ipsetList {
if err := ensureIPSet(set); err != nil {
return
}
set.resetEntries()
}
規則如下:
Name: | Type: | Revision: | Header: | Size in memory: | References: | Members: |
KUBE-LOOP-BACK | hash:ip,port,ip | 2 | family inet hashsize 1024 maxelem 65536 | 16824 | 1 | 172.30.46.39,tcp:6379,172.30.46.39 172.30.3.15,udp:53,172.30.3.15 |
KUBE-NODE-PORT-TCP | bitmap:port | 1 | range 0-65535 | 524432 | 1 | 31011 32371 |
KUBE-CLUSTER-IP | hash:ip,port | 2 | family inet hashsize 1024 maxelem 65536 | 16688 | 2 | 10.254.0.2,tcp:53 10.254.0.2,udp:53 10.254.86.39,tcp:6379 10.254.0.1,tcp:443 10.254.69.27,tcp:443 |
3. 對於每一個service建立IPVS規則
// Build IPVS rules for each service.
for svcName, svc := range proxier.serviceMap {
svcInfo, ok := svc.(*serviceInfo)
if !ok {
glog.Errorf("Failed to cast serviceInfo %q", svcName.String())
continue
}
3.1 對於KUBE-LOOP-BACK,更新資料
比如members這樣:
- 172.30.46.39,tcp:6379,172.30.46.39
- 172.30.3.15,udp:53,172.30.3.15
- 172.30.3.27,tcp:6379,172.30.3.27
- 172.30.3.15,tcp:53,172.30.3.15
- 10.12.51.171,tcp:6443,10.12.51.17
// Handle traffic that loops back to the originator with SNAT.
for _, e := range proxier.endpointsMap[svcName] {
ep, ok := e.(*proxy.BaseEndpointInfo)
if !ok {
glog.Errorf("Failed to cast BaseEndpointInfo %q", e.String())
continue
}
epIP := ep.IP()
epPort, err := ep.Port()
// Error parsing this endpoint has been logged. Skip to next endpoint.
if epIP == "" || err != nil {
continue
}
entry := &utilipset.Entry{
IP: epIP,
Port: epPort,
Protocol: protocol,
IP2: epIP,
SetType: utilipset.HashIPPortIP,
}
if valid := proxier.ipsetList[kubeLoopBackIPSet].validateEntry(entry); !valid {
glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoopBackIPSet].Name))
continue
}
proxier.ipsetList[kubeLoopBackIPSet].activeEntries.Insert(entry.String())
}
3.2 KUBE-CLUSTER-IP更新到map中
- 10.254.0.2,tcp:53
- 10.254.0.2,udp:53
- 10.254.86.39,tcp:6379
- 10.254.0.1,tcp:443
- 10.254.69.27,tcp:443
// Capture the clusterIP.
// ipset call
entry := &utilipset.Entry{
IP: svcInfo.ClusterIP.String(),
Port: svcInfo.Port,
Protocol: protocol,
SetType: utilipset.HashIPPort,
}
// add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.
// proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String())
if valid := proxier.ipsetList[kubeClusterIPSet].validateEntry(entry); !valid {
glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeClusterIPSet].Name))
continue
}
proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String())
// Capture externalIPs.
// Capture load-balancer ingress
KUBE-NODE-PORT-LOCAL-TCP
KUBE-NODE-PORT-LOCAL-UDP
略過這些,操作大致相同。
4. inspectWithIptablesChain
KUBE-POSTROUTING匹配KUBE-LOOP-BACK ipset表,則偽裝: -A KUBE-POSTROUTING -m comment --comment "Kubernetes endpoints dst ip:port, source ip for solving hairpin purpose" -m set --match-set KUBE-LOOP-BACK dst,dst,src -j MASQUERADE
-A KUBE-SERVICES -m addrtype --dst-type LOCAL -j KUBE-NODE-PORT
-A KUBE-SERVICES -m set --match-set KUBE-CLUSTER-IP dst,dst -j ACCEPT
// ipsetWithIptablesChain is the ipsets list with iptables source chain and the chain jump to
// `iptables -t nat -A <from> -m set --match-set <name> <matchType> -j <to>`
// example: iptables -t nat -A KUBE-SERVICES -m set --match-set KUBE-NODE-PORT-TCP dst -j KUBE-NODE-PORT
// ipsets with other match rules will be created Individually.
// Note: kubeNodePortLocalSetTCP must be prior to kubeNodePortSetTCP, the same for UDP.
var ipsetWithIptablesChain = []struct {
name string
from string
to string
matchType string
protocolMatch string
}{
{kubeLoopBackIPSet, string(kubePostroutingChain), "MASQUERADE", "dst,dst,src", ""},
{kubeLoadBalancerSet, string(kubeServicesChain), string(KubeLoadBalancerChain), "dst,dst", ""},
{kubeLoadbalancerFWSet, string(KubeLoadBalancerChain), string(KubeFireWallChain), "dst,dst", ""},
{kubeLoadBalancerSourceCIDRSet, string(KubeFireWallChain), "RETURN", "dst,dst,src", ""},
{kubeLoadBalancerSourceIPSet, string(KubeFireWallChain), "RETURN", "dst,dst,src", ""},
{kubeLoadBalancerLocalSet, string(KubeLoadBalancerChain), "RETURN", "dst,dst", ""},
{kubeNodePortLocalSetTCP, string(KubeNodePortChain), "RETURN", "dst", "tcp"},
{kubeNodePortSetTCP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst", "tcp"},
{kubeNodePortLocalSetUDP, string(KubeNodePortChain), "RETURN", "dst", "udp"},
{kubeNodePortSetUDP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst", "udp"},
{kubeNodePortSetSCTP, string(kubeServicesChain), string(KubeNodePortChain), "dst", "sctp"},
{kubeNodePortLocalSetSCTP, string(KubeNodePortChain), "RETURN", "dst", "sctp"},
}
5. writeIptablesRules
將規則寫入nat rule buffer中,寫入filter buffer中,下面一大堆都是這種操作
for _, set := range ipsetWithIptablesChain {
if _, find := proxier.ipsetList[set.name]; find && !proxier.ipsetList[set.name].isEmpty() {
args = append(args[:0], "-A", set.from)
if set.protocolMatch != "" {
args = append(args, "-p", set.protocolMatch)
}
args = append(args,
"-m", "comment", "--comment", proxier.ipsetList[set.name].getComment(),
"-m", "set", "--match-set", set.name,
set.matchType,
)
writeLine(proxier.natRules, append(args, "-j", set.to)...)
}
}
-A KUBE-SERVICES ! -s 10.254.0.0/16 -m comment --comment "Kubernetes service cluster ip + port for masquerade purpose" -m set --match-set KUBE-CLUSTER-IP dst,dst -j KUBE-MARK-MASQ
if !proxier.ipsetList[kubeClusterIPSet].isEmpty() {
args = append(args[:0],
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", proxier.ipsetList[kubeClusterIPSet].getComment(),
"-m", "set", "--match-set", kubeClusterIPSet,
)
if proxier.masqueradeAll {
writeLine(proxier.natRules, append(args, "dst,dst", "-j", string(KubeMarkMasqChain))...)
} else if len(proxier.clusterCIDR) > 0 {
// This masquerades off-cluster traffic to a service VIP. The idea
// is that you can establish a static route for your Service range,
// routing to any node, and that node will bridge into the Service
// for you. Since that might bounce off-node, we masquerade here.
// If/when we support "Local" policy for VIPs, we should update this.
writeLine(proxier.natRules, append(args, "dst,dst", "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...)
} else {
// Masquerade all OUTPUT traffic coming from a service ip.
// The kube dummy interface has all service VIPs assigned which
// results in the service VIP being picked as the source IP to reach
// a VIP. This leads to a connection from VIP:<random port> to
// VIP:<service port>.
// Always masquerading OUTPUT (node-originating) traffic with a VIP
// source ip and service port destination fixes the outgoing connections.
writeLine(proxier.natRules, append(args, "src,dst", "-j", string(KubeMarkMasqChain))...)
}
}
-A KUBE-LOAD-BALANCER -j KUBE-MARK-MASQ
// mark drop for KUBE-LOAD-BALANCER
writeLine(proxier.natRules, []string{
"-A", string(KubeLoadBalancerChain),
"-j", string(KubeMarkMasqChain),
}...)
// mark drop for KUBE-FIRE-WALL
writeLine(proxier.natRules, []string{
"-A", string(KubeFireWallChain),
"-j", string(KubeMarkDropChain),
}...)
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
// If the masqueradeMark has been added then we want to forward that same
// traffic, this allows NodePort traffic to be forwarded even if the default
// FORWARD policy is not accept.
writeLine(proxier.filterRules,
"-A", string(KubeForwardChain),
"-m", "comment", "--comment", `"kubernetes forwarding rules"`,
"-m", "mark", "--mark", proxier.masqueradeMark,
"-j", "ACCEPT",
)
這個主要是建立
-A KUBE-FORWARD -s 10.254.0.0/16 -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
-A KUBE-FORWARD -d 10.254.0.0/16 -m comment --comment "kubernetes forwarding conntrack pod destination rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
// The following rules can only be set if clusterCIDR has been defined.
if len(proxier.clusterCIDR) != 0 {
// The following two rules ensure the traffic after the initial packet
// accepted by the "kubernetes forwarding rules" rule above will be
// accepted, to be as specific as possible the traffic must be sourced
// or destined to the clusterCIDR (to/from a pod).
writeLine(proxier.filterRules,
"-A", string(KubeForwardChain),
"-s", proxier.clusterCIDR,
"-m", "comment", "--comment", `"kubernetes forwarding conntrack pod source rule"`,
"-m", "conntrack",
"--ctstate", "RELATED,ESTABLISHED",
"-j", "ACCEPT",
)
writeLine(proxier.filterRules,
"-A", string(KubeForwardChain),
"-m", "comment", "--comment", `"kubernetes forwarding conntrack pod destination rule"`,
"-d", proxier.clusterCIDR,
"-m", "conntrack",
"--ctstate", "RELATED,ESTABLISHED",
"-j", "ACCEPT",
)
}
6. 使用iptables-restore批量匯入Linux防火牆規則
// Sync iptables rules.
// NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table.
proxier.iptablesData.Reset()
proxier.iptablesData.Write(proxier.natChains.Bytes())
proxier.iptablesData.Write(proxier.natRules.Bytes())
proxier.iptablesData.Write(proxier.filterChains.Bytes())
proxier.iptablesData.Write(proxier.filterRules.Bytes())
glog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes())
err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
if err != nil {
glog.Errorf("Failed to execute iptables-restore: %v\nRules:\n%s", err, proxier.iptablesData.Bytes())
// Revert new local ports.
utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
return
}
7. 獲得當前繫結地址
// Clean up legacy bind address
// currentBindAddrs represents ip addresses bind to DefaultDummyDevice from the system
currentBindAddrs, err := proxier.netlinkHandle.ListBindAddress(DefaultDummyDevice)
if err != nil {
glog.Errorf("Failed to get bind address, err: %v", err)
}
ipvs模式,通過svc建立的Cluster都繫結在kube-ipvs0這塊虛擬網絡卡。建立 ClusterIP 執行以下三項操作:
- 節點中存在虛擬介面為 kube-ipvs0
- 服務 IP 地址繫結到虛擬介面
- 分別為每個服務 IP 地址建立 IPVS 虛擬伺服器
TCP 10.254.23.85:5566 wrr
-> 172.30.3.29:5566 Masq 1 0 0
-> 172.30.46.41:5566 Masq 1 0 0