kubernetes的Pod变更过程

Tags: kubernetes 

目录

Kubelet的syncLoop()方法中会监听pod信息,将任务发送到podWorkers:

pkg/kubelet/kubelet.go:

func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
	...
	kl.syncLoop(updates, kl)
}

那么chan updates中的内容是如何写入的?

kubelet的启动过程回顾

cmd/kubelet/app/server.go:

func Run(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) error {
	if err := run(s, kubeDeps); err != nil {
		return fmt.Errorf("failed to run Kubelet: %v", err)
	}
	return nil
}

cmd/kubelet/app/server.go:

func run(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) (err error) {
	// TODO: this should be replaced by a --standalone flag
	standaloneMode := (len(s.APIServerList) == 0 && !s.RequireKubeConfig)
	...
	if err := RunKubelet(&s.KubeletConfiguration, kubeDeps, s.RunOnce, standaloneMode); err != nil {
		return err
	}
	...

cmd/kubelet/app/server.go:

func RunKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, runOnce bool, standaloneMode bool) error {
	hostname := nodeutil.GetHostname(kubeCfg.HostnameOverride)
	// Query the cloud provider for our node name, default to hostname if kcfg.Cloud == nil
	
	...
	
	builder := kubeDeps.Builder
	if builder == nil {
		builder = CreateAndInitKubelet
	}
	if kubeDeps.OSInterface == nil {
		kubeDeps.OSInterface = kubecontainer.RealOS{}
	}
	k, err := builder(kubeCfg, kubeDeps, standaloneMode)
	if err != nil {
		return fmt.Errorf("failed to create kubelet: %v", err)
	}
	
	...
	
	podCfg := kubeDeps.PodConfig
	
	...
	
	// process pods and exit.
	if runOnce {
		if _, err := k.RunOnce(podCfg.Updates()); err != nil {
			return fmt.Errorf("runonce failed: %v", err)
		}
		glog.Infof("Started kubelet %s as runonce", version.Get().String())
	} else {
		startKubelet(k, podCfg, kubeCfg, kubeDeps)
		glog.Infof("Started kubelet %s", version.Get().String())
	}
	return nil

cmd/kubelet/app/server.go:

func startKubelet(k kubelet.KubeletBootstrap, podCfg *config.PodConfig, kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps) {
	// start the kubelet
	go wait.Until(func() { k.Run(podCfg.Updates()) }, 0, wait.NeverStop)
	...
}

k的类型是Kubelet,k8s.io/kubernetes/pkg/kubelet/kubelet.go:

// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
	if kl.logServer == nil {
		kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
	}
	...
	kl.syncLoop(updates, kl)
	...

k的创建

cmd/kubelet/app/server.go:

func RunKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, runOnce bool, standaloneMode bool) error {
	hostname := nodeutil.GetHostname(kubeCfg.HostnameOverride)
	// Query the cloud provider for our node name, default to hostname if kcfg.Cloud == nil
	
	...
	
	builder := kubeDeps.Builder
	if builder == nil {
		builder = CreateAndInitKubelet
	}
	if kubeDeps.OSInterface == nil {
		kubeDeps.OSInterface = kubecontainer.RealOS{}
	}
	k, err := builder(kubeCfg, kubeDeps, standaloneMode)
	if err != nil {
		return fmt.Errorf("failed to create kubelet: %v", err)
	}
	
	...
	
	podCfg := kubeDeps.PodConfig
	
	...

从上下文代码中可以看到,builder是CreateAndInitKubelet,kkubeDeps.PodConfig都在这里面创建、设置。

cmd/kubelet/app/server.go:

func CreateAndInitKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, standaloneMode bool) (k kubelet.KubeletBootstrap, err error) {
	// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
	// up into "per source" synchronizations
	
	k, err = kubelet.NewMainKubelet(kubeCfg, kubeDeps, standaloneMode)
	if err != nil {
		return nil, err
	}
	
	k.BirthCry()
	
	k.StartGarbageCollection()
	
	return k, nil
}

PodConfig的创建

pkg/kubelet/kubelet.go, NewMainKubelet():

if kubeDeps.PodConfig == nil {
	var err error
	kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName)
	if err != nil {
		return nil, err
	}
}

PodConfig的内容

PodConfig的Updates()方法,直接返回c.updates,看一下初始化时候做了哪些设置,谁负责向updates中写入。

// Updates returns a channel of updates to the configuration, properly denormalized.
func (c *PodConfig) Updates() <-chan kubetypes.PodUpdate {
	return c.updates
}

pkg/kubelet/kubelet.go,makePodSourceConfig():

...

// source of all configuration
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)

// define file config source
if kubeCfg.PodManifestPath != "" {
	glog.Infof("Adding manifest file: %v", kubeCfg.PodManifestPath)
	config.NewSourceFile(kubeCfg.PodManifestPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
}

// define url config source
if kubeCfg.ManifestURL != "" {
	glog.Infof("Adding manifest url %q with HTTP header %v", kubeCfg.ManifestURL, manifestURLHeader)
	config.NewSourceURL(kubeCfg.ManifestURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
}
if kubeDeps.KubeClient != nil {
	glog.Infof("Watching apiserver")
	config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, cfg.Channel(kubetypes.ApiserverSource))
}

...

注意每个Source分到了不同的channel, cfg.Channel([SourceName])

以最后一个“Watching apiserver”为例,k8s.io/kubernetes/pkg/kubelet/config/apiserver.go:

func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
	lw := cache.NewListWatchFromClient(c.Core().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
	newSourceApiserverFromLW(lw, updates)
}

在这里创建了一个lw,lw实现了ListerWatcher接口,cache相关的内容见前面的文章”kubernetes-Client-Cache”。

pkg/kubelet/config/apiserver.go:

// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
	send := func(objs []interface{}) {
		var pods []*v1.Pod
		for _, o := range objs {
			pods = append(pods, o.(*v1.Pod))
		}
		updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
	}
	cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0).Run()
}

最后一行,创建了一个reflector,并且使用的storage类型是UndeltaStore,这个store的特点是,每次遇到变更,都会调用传入的send()函数,从而将更新信息写入了channel。

这时候,只是把更新写入了每个source自己在PodConfig中分配的channel中。

PodConfig的更新传递

pkg/kubelet/config/config.go:

type PodConfig struct {
	pods *podStorage
	mux  *config.Mux
	
	// the channel of denormalized changes passed to listeners
	updates chan kubetypes.PodUpdate
	
	// contains the list of all configured sources
	sourcesLock sync.Mutex
	sources     sets.String
}

创建:

func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) *PodConfig {
	updates := make(chan kubetypes.PodUpdate, 50)
	storage := newPodStorage(updates, mode, recorder)
	podConfig := &PodConfig{
		pods:    storage,
		mux:     config.NewMux(storage),
		updates: updates,
		sources: sets.String{},
	}
	return podConfig
}

注意mux,每个source的channel就在mux中,mux创建时传入参数storage,而storage中包含了最终的update channel。

// NewMux creates a new mux that can merge changes from multiple sources.
func NewMux(merger Merger) *Mux {
	mux := &Mux{
		sources: make(map[string]chan interface{}),
		merger:  merger,
	}
	return mux
}

而在mux中创建source对应的channel时,会同时启动一个协程监听channel:

func (m *Mux) Channel(source string) chan interface{} {
	if len(source) == 0 {
		panic("Channel given an empty name")
	}
	m.sourceLock.Lock()
	defer m.sourceLock.Unlock()
	channel, exists := m.sources[source]
	if exists {
		return channel
	}
	newChannel := make(chan interface{})
	m.sources[source] = newChannel
	go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
	return newChannel
}

在listen函数中就会调用创建时传入的参数storage(类型podStorage)的Merge()方法:

func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
	for update := range listenChannel {
		m.merger.Merge(source, update)
	}
}

podStorage的Merge中,更具设置的规则,将更新写入最终的channel,s.updates,k8s.io/kubernetes/pkg/kubelet/config/config.go:

func (s *podStorage) Merge(source string, change interface{}) error {
	s.updateLock.Lock()
	defer s.updateLock.Unlock()

	seenBefore := s.sourcesSeen.Has(source)
	adds, updates, deletes, removes, reconciles := s.merge(source, change)
	firstSet := !seenBefore && s.sourcesSeen.Has(source)

	// deliver update notifications
	switch s.mode {
	case PodConfigNotificationIncremental:
		if len(removes.Pods) > 0 {
			s.updates <- *removes
		}

Kubelet的syncLoop中就从最终的channel中得到了更新信息。

简而言之

每个Source在PodConfig申请一个channel,将各自通过ListerWather获得的更新写入各自的channel。

PodConfig开启一个协程监听每个Source的channel,通过podStorage中的Merge()方法,将得到的更新汇总写入到最终channel。

Kubelet的从最终的channel中得到Pod的更新信息。


kubernetes

  1. kubernetes 使用:多可用区、Pod 部署拓扑与 Topology Aware Routing
  2. kubernetes 扩展:Cloud Controller Manager
  3. kubernetes 准入:操作合法性检查(Admission Control)
  4. kubernetes 鉴权:用户操作权限鉴定(Authorization)
  5. kubernetes 认证:用户管理与身份认证(Authenticating)
  6. kubernetes 开发:代码生成工具
  7. kubernetes 扩展:operator 开发
  8. kubernetes 扩展:CRD 的使用方法
  9. kubernetes configmap 热加载,inotifywatch 监测文件触发热更新
  10. kubernetes 扩展:扩展点和方法(api/cr/plugin...)
  11. kubernetes 调度组件 kube-scheduler 1.16.3 源代码阅读指引
  12. kubernetes 代码中的 k8s.io 是怎么回事?
  13. 阅读笔记《不一样的 双11 技术,阿里巴巴经济体云原生实践》
  14. kubernetes ingress-nginx 启用 upstream 长连接,需要注意,否则容易 502
  15. ingress-nginx 的限速功能在 nginx.conf 中的对应配置
  16. kubernetes 中的容器设置透明代理,自动在 HTTP 请求头中注入 Pod 信息
  17. kubernetes ingress-nginx 的测试代码(单元测试+e2e测试)
  18. kubernetes ingress-nginx http 请求复制功能与 nginx mirror 的行为差异
  19. kubernetes 基于 openresty 的 ingress-nginx 的状态和配置查询
  20. kubernetes ingress-nginx 0.25 源代码走读笔记
  21. kubernetes ingress-nginx 的金丝雀(canary)/灰度发布功能的使用方法
  22. kubernetes 操作命令 kubectl 在 shell 中的自动补全配置
  23. kubernetes 组件 kube-proxy 的 IPVS 功能的使用
  24. kubernetes initializer 功能的使用方法: 在 Pod 等 Resource 落地前进行修改
  25. kubernetes 版本特性: 新特性支持版本和组件兼容版本
  26. kubernetes API 与 Operator: 不为人知的开发者战争(完整篇)
  27. kubernetes 1.12 从零开始(七): kubernetes开发资源
  28. kubernetes 1.12 从零开始(六): 从代码编译到自动部署
  29. kubernetes 网络方案 Flannel 的学习笔记
  30. kubernetes 1.12 从零开始(五): 自己动手部署 kubernetes
  31. kubernetes 1.12 从零开始(四): 必须先讲一下基本概念
  32. kubernetes 1.12 从零开始(三): 用 kubeadm 部署多节点集群
  33. kubernetes 1.12 从零开始(二): 用 minikube 部署开发测试环境
  34. kubernetes 1.12 从零开始(一): 部署环境准备
  35. kubernetes 1.12 从零开始(零): 遇到的问题与解决方法
  36. kubernetes 1.12 从零开始(初): 课程介绍与官方文档汇总
  37. kubernetes 集群状态监控:通过 grafana 和 prometheus
  38. 一些比较有意思的Kubernetes周边产品
  39. Borg论文阅读笔记
  40. kubelet下载pod镜像时,docker口令文件的查找顺序
  41. kubernetes 的 Client Libraries 的使用
  42. kubernetes的网络隔离networkpolicy
  43. kube-router的源码走读
  44. kubernetes 跨网段通信: 通过 calico 的 ipip 模式
  45. kubernetes的调试方法
  46. kubernetes 与 calico 的衔接过程
  47. 怎样理解 kubernetes 以及微服务?
  48. kubernetes中部署有状态的复杂分布式系统
  49. kubernetes的apiserver的启动过程
  50. kubernetes的api定义与装载
  51. kubernetes的federation部署,跨区Service
  52. kubernetes的编译、打包、发布
  53. kubernetes的第三方包的使用
  54. kubernetes的Storage的实现
  55. kubernetes 的 Apiserver 的 storage 使用
  56. kubernetes的Controller-manager的工作过程
  57. kubernetes的Client端Cache
  58. kubernetes 的 Apiserver 的工作过程
  59. kubernetes的CNI插件初始化与Pod网络设置
  60. kubernetes的Pod变更过程
  61. kubernetes的kubelet的工作过程
  62. kuberntes 的 Cmdline 实现
  63. kubernetes的Pod内挂载的Service Account的使用方法
  64. kubernetes的社区资源与项目参与方式
  65. kubernetes的Kube-proxy的转发规则分析
  66. kubernetes的基本操作
  67. kubernetes在CentOS上的集群部署
  68. kubernetes在CentOS上的All In One部署
  69. 怎样选择集群管理系统?

推荐阅读

Copyright @2011-2019 All rights reserved. 转载请添加原文连接,合作请加微信lijiaocn或者发送邮件: [email protected],备注网站合作

友情链接:  系统软件  程序语言  运营经验  水库文集  网络课程  微信网文  发现知识星球