最近的一个项目,需要对 kube-scheduler
进行改造,因此就系统性地阅读了下它的源码。几点阅读感受:
Scheduler 的代码整体上代码可读性较好,不同的逻辑之间做了较好的解耦;
可能正是在设计之初,就想把调度模块做彻底解耦,因此该模块内外都存在一些问题:
我阅读的 Kubernetes 版本(v1.6.6)里 Scheduler
模块位于 plugin 目录下,在最新的 Kubernetes 中,Scheduler
的主体已经被从 plugin 目录移出,和其他组件处于同一目录下;
Scheduler
模块代码中存在若干过度设计,当然这只是个人观点;
GO 语言的设计增加了语言的灵活性,但一定程度上也降低了代码的可读性,最简单的例子就是接口;
Scheduler 是什么 Kubernetes 是 Google 团队发起并维护的基于 Docker 的开源容器集群管理系统,它不仅支持常见的云平台,而且支持内部数据中心。Scheduler
(下称 kube-scheduler
) 在 Kubernetes 架构 中的位置如下图所示:
从上图可以知道,Kubernetes 中的核心组件有:API Server
(下称 kube-apiserver
)、Controller Manager(下称 kube-controller-manager
)、Scheduler
(kube-scheduler
)、etcd、Proxy(下称kube-proxy
)、Kubelet 这几个,在一个典型的扩容请求的流程中,各个组件扮演的角色如下:
用户通过客户端(可以使 kubernetes-client 这类客户端、也可以是kubelet
)通过 kube-apiserver
的 REST API 创建 Deployment/DaemonSet/Job等任务,Pod 状态由 kube-controller-manager
维护;
kube-apiserver
收到用户请求,存储到相关数据到 etcd
;
kube-scheduler
通过监听 kube-apiserver
,获取未调度的 Pod 列表,通过调度算法计算出分配给 Pod 的 Node ,并将 Node 信息和 Pod 进行绑定(Bind),结果存储到 etcd
中;
Node 上的 kubelet
根据调度结果执行 Pod 创建操作。
可见, kube-scheduler
作为k8s中的核心模块,可以被视为一个黑盒,黑盒的输入为待调度的 Pod 和全部计算节点(Node)的信息,经过黑盒内部的调度算法和策略处理,输出为最优的节点,而后将 Pod 调度该节点上 。
Scheduler 调度流程 kube-scheduler
是作为单独的进程启动的,可以总结 kube-scheduler
的职责有以下这些:
集群高可用 :如果 kube-scheduler
设置了 leader-elect
启动参数,那么会通过 etcd
进行节点选主( kube-scheduler
和 kube-controller-manager
都使用了一主多从的高可用方案);
调度资源监听 :通过 Watch 机制监听 kube-apiserver
上资源的变化,这里的资源主要指的是 Pod 和 Node ;
调度节点分配 :通过预选(Predicates
)与优选(Priorites
),为待调度的 Pod 分配一个 Node ,同时将分配结果通过 kube-apiserver
写入 etcd
;
集群高可用这一部分不再详述,简单描述:kube-scheduler
启动时,会在 etcd
中创建 endpoint,endpoint 的信息中记录了当前的 leader 节点信息,以及记录的上次更新时间。leader 节点会定期更新 endpoint 的信息,维护自己的 leader 身份。每个从节点的服务都会定期检查 endpoint 的信息,如果 endpoint 的信息在时间范围内没有更新,它们会尝试更新自己为 leader 节点。
调度资源监听 kube-apiserver
提供了一套 Watch
机制给 kubelet
、kube-controller-manager
、 kube-scheduler
等组件用来监控各种资源(Pod、Node、Service等)的变化,类似于消息中间件里的发布-订阅模式(Push), kube-apiserver
能够主动通知 这些组件。
kube-apiserver
初始化时,建立对etcd的连接,并对etcd进行watch。当 kube-scheduler
等客户端调用 Watch API
时,kube-apiserver
内部会建立一个 WatchServer,后者会从 etcd
里面获取资源的 Watch event
,event经过加工过滤后,就会发送给客户端。Watch API
实质是一个 GET 请求,有两种实现模式:
通过 websocket
协议发送;
通过 Transfer-Encoding=chunked
的方式建立一个长连接;
调度节点分配 调度节点分配主要可以分为预选(Predicates
)与优选(Priorites
)这两个环节,上面讲到的调度资源监听的实质就是为这两个环节提供输入。
预选 :根据配置的 Predicates Policies
(默认为 DefaultProvider
中定义的 default predicates policies
集合)过滤掉那些不满足这些 Policies 的 Node,预选的输出作为优选的输入;
优选 :根据配置的 Priorities Policies
(默认为 DefaultProvider
中定义的 default priorities policies
集合)给预选后的 Node 进行打分排名,得分最高的 Node 即作为最适合的 Node ,该 Pod 就绑定(Bind)到这个 Node 。
注:如果经过优选将 Node 打分排名后,有多个 Node 并列得分最高,那么 kube-scheduler
将随机从中选择一个 Node 作为目标 Node 。
Scheduler 源码结构 有了上面的基础知识后,我们来看一下上面所描述的 kube-scheduler
的功能在代码中是如何实现的。下面的提及的 Kubernetes 源码版本为 v1.6.6 。
kube-apiserver
的源码主要在 k8s.io/kubernetes/plugin/ 目录下,其中两个目录 cmd/scheduler 和 pkg/scheduler 分别定义了 kube-scheduler
中使用到的结构体的参数封装和 scheduler 的具体内部实现。具体的目录结构如下所示。
从源码也可以得知,Kubernetes 的调度器以 plugin 化形式实现(也是唯一一个以 plugin 形式存在的模块),方便用户定制和二次开发。用户可以自定义调度器并以 plugin 形式与 Kubernetes 集成,或集成其他调度器,便于调度不同类型的任务。
Scheduler 源码走读 构造并运行Scheduler 入口函数 下图就是调度器的入口函数:
plugin/cmd/kube-scheduler/scheduler.go +30
1 2 3 4 5 6 7 8 9 10 11 12 13 14 func main() { s := options.NewSchedulerServer() s.AddFlags(pflag.CommandLine) flag.InitFlags() logs.InitLogs() defer logs.FlushLogs() verflag.PrintAndExitIfRequested() if err := app.Run(s); err != nil { glog.Fatalf("scheduler app failed to run: %v", err) } }
在 main 函数的第一行,通过options.NewSchedulerServer()
创建一个SchedulerServer
(结构体参数配置),具体过程如下所示:
plugin/cmd/kube-scheduler/optinons/optinons.go +37
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 type SchedulerServer struct { componentconfig.KubeSchedulerConfiguration // kube-apiserver 的地址,可被 kubeconfig 中的配置覆盖). Master string // kubeconfig 文件路径 Kubeconfig string } // 通过默认参数创建一个 SchedulerServer func NewSchedulerServer() *SchedulerServer { versioned := &v1alpha1.KubeSchedulerConfiguration{} api.Scheme.Default(versioned) cfg := componentconfig.KubeSchedulerConfiguration{} api.Scheme.Convert(versioned, &cfg, nil) // 默认会有选主过程 cfg.LeaderElection.LeaderElect = true s := SchedulerServer{ KubeSchedulerConfiguration: cfg, } return &s }
创建了一个 SchedulerServer
后,然后是解析命令行参数、初始化日志系统参数等,最后通过 app.Run(s)
启动了调度器,下面我们直接分析 Run
函数中的逻辑。
配置并启动 Scheduler plugin/cmd/kube-scheduler/app/server.go +71
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 // 运行 SchedulerServer,不会退出. func Run(s *options.SchedulerServer) error { // 创建一个到 kube-apiserver master 的客户端连接 kubecli, err := createClient(s) // 创建一个事件广播器,用于向集群中的 Node 发送调度的信息 recorder := createRecorder(kubecli, s) // 通过工厂类创建 sharedInformerFactory 对象 informerFactory := informers.NewSharedInformerFactory(kubecli, 0) // 创建一个 scheduler server sched, err := createScheduler( s, kubecli, informerFactory.Core().V1().Nodes(), informerFactory.Core().V1().PersistentVolumes(), informerFactory.Core().V1().PersistentVolumeClaims(), informerFactory.Core().V1().ReplicationControllers(), informerFactory.Extensions().V1beta1().ReplicaSets(), informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), recorder, ) // 创建 HTTP 服务,用于性能分析,性能指标度量, // pprof 接口方便性能数据收集,metrics 接口供 prometheus 收集监控数据 go startHTTP(s,sched.GetConfig()) // 开始运行Informer,进行资源缓存 informerFactory.Start(stop) // 运行调度程序 run := func(_ <-chan struct{}) { sched.Run() select {} } // 如果不选主,则直接运行 if !s.LeaderElection.LeaderElect { run(nil) panic("unreachable") } // 集群选主 leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{ Lock: rl, LeaseDuration: s.LeaderElection.LeaseDuration.Duration, RenewDeadline: s.LeaderElection.RenewDeadline.Duration, RetryPeriod: s.LeaderElection.RetryPeriod.Duration, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: run, OnStoppedLeading: func() { glog.Fatalf("lost master") }, }, }) panic("unreachable") }
总结一下 Run
这个方法中的逻辑,有下面这些(有顺序):
通过 createClients
创建到 kube-apiserver
的连接 kubecli
;
通过 createRecorder
创建eventBroadcaster对象 recorder
,用于向集群中的 Node 发送调度的信息;
通过 startHTTP
将 kube-scheduler
通过 HTTP 接口暴露,用于性能分析,性能指标度量等;
通过 informers.NewSharedInformerFactory
创建 sharedInformerFactory
对象,该对象封装了 kube-apiserver
中的一些事件通知;
通过 createScheduler
创建一个运行时 Scheduler 实例 sched
(定义在 plugin/pkg/scheduler/scheduler.go 中);
根据 leaderElect
这个标志位决定是否需要进行选主,如果不需要则直接到下一步,否则进行 Leader 选举;
开始循环执行 sched.Run
,sched.Run
内部启动 goroutine
进行 Pod 的调度。
在上面的步骤中,步骤4、5、7比较关键:
第4步中得到一个静态工厂类 sharedInformerFactory
,该类可以得到 PodInformer
、 NodeInformer
等,Informer
的机制在这里不具体展开了;
第5步主要是通过一个静态工厂生成了 kube-apiserver
运行时的若干配置信息(Config
对象);
第7步通过 Config
对象启动调度器,5和7是整个调度器代码的核心,这两步后面再详细分析。
通过 sched.Run
,程序逻辑终于从 plugin/cmd/ 跳转到了 plugin/pkg/ 目录,下面是 Run
方法的定义:
plugin/pkg/scheduler/scheduler.go +148
1 2 3 func (s *Scheduler) Run() { go wait.Until(s.scheduleOne, 0, s.config.StopEverything) }
启动一个协程,循环反复执行 Scheduler.scheduleOne
方法,直到收到中止的信号。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 func (s *Scheduler) scheduleOne() { // 1. 从队列中获取一个 Pod Pod := s.config.NextPod() // 2. 通过调度算法,得到待调度的节点 dest, err, extraEnvs := s.config.Algorithm.Schedule(Pod, s.config.NodeLister) metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start)) // 3, 进行 Pod 和 Node 的绑定,理想情况绑定会成功 // 成功结果会发送给 apiserver,如果失败,会释放分配给 Pod 的资源 assumed := *Pod assumed.Spec. NodeName = dest if err := s.config.SchedulerCache.AssumePod (&assumed); err != nil { glog.Errorf("scheduler cache AssumePod failed: %v", err) return } go func() { b := &v1.Binding{ ObjectMeta: metav1.ObjectMeta{Namespace: Pod.Namespace, Name: Pod.Name}, Target: v1.ObjectReference{ Kind: "Node", Name: dest, }, } err := s.config.Binder.Bind(b) }() }
scheduleOne
中的逻辑主要分为三块:
通过 Scheduler.config.NextPod()
取得一个 Pod ;
通过 Scheduler.config.Algorithm.Schedule(Pod, Scheduler.config. NodeLister)
为 Pod 选择一个合适的 Node ;
通过 Scheduler.config.Binder.Bind(b)
将此 Node 同 Pod 进行绑定;
在进行 Bind 之前,需要从缓存的资源中进行资源的扣除:s.config.SchedulerCache.AssumePod(&assumed)
,最终调用的逻辑:
/plugin/pkg/scheduler/schedulercache/node_info.go +244
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func (n *NodeInfo) addPod(pod *v1.Pod) { res, non0_cpu, non0_mem := calculateResource(pod) n.requestedResource.MilliCPU += res.MilliCPU n.requestedResource.Memory += res.Memory n.requestedResource.NvidiaGPU += res.NvidiaGPU if n.requestedResource.OpaqueIntResources == nil && len(res.OpaqueIntResources) > 0 { n.requestedResource.OpaqueIntResources = map[v1.ResourceName]int64{} } for rName, rQuant := range res.OpaqueIntResources { n.requestedResource.OpaqueIntResources[rName] += rQuant } n.nonzeroRequest.MilliCPU += non0_cpu n.nonzeroRequest.Memory += non0_mem n.pods = append(n.pods, pod) if hasPodAffinityConstraints(pod) { n.podsWithAffinity = append(n.podsWithAffinity, pod) } n.generation++ }
绑定操作的逻辑如下:
plugin/pkg/scheduler/factory/factory.go +583
1 2 3 4 5 6 7 func (b *binder) Bind(binding *v1.Binding) error { glog.V(3).Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name) ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), binding.Namespace) return b.Client.Core().RESTClient().Post().Namespace(genericapirequest.NamespaceValue(ctx)).Resource("bindings").Body(binding).Do().Error() // TODO: use Pods interface for binding once clusters are upgraded // return b.Pods(binding.Namespace).Bind(binding) }
绑定就是创建一个 binding 对象(为 Pod 指定了 Node name)后,通过 POST 请求 kube-apiserver
。一旦绑定,kubelet
认为 Pod 找到了合适的 Node ,然后 Node 上的 kubelet
会拉起 Pod 。
Pod 的调度逻辑在这里就完成了,上面未展开的 createScheduler
和 Config
对象生成部分,了解 Config
的创建和内容对后面了解调度器的工作原理非常重要。
生成 Config 对象 createScheduler
创建的 Scheduler 定义如下:
plugin/pkg/scheduler/scheduler.go +32
1 2 3 type Scheduler struct { config *Config }
其中 Config
对象被定义为下面所示的结构,这是整个 kube-scheduler
的核心:
1 2 3 4 5 6 7 8 9 10 type Config struct { SchedulerCache schedulercache.Cache NodeLister algorithm.NodeLister Algorithm algorithm.ScheduleAlgorithm Binder Binder NextPod func() *v1.Pod Error func(*v1.Pod error) Recorder record.EventRecorder StopEverything chan struct{} }
SchedulerCache
能够暂时保存调度中的 Pod 和 Node 信息;
NodeLister
方法缓存获取所有 Node 信息;
NextPod()
方法能返回下一个需要调度的 Pod,FIFO 队列;
Algorithm.Schedule()
方法能计算出某个 Pod 在节点中的结果;
Error()
方法能够在出错的时候重新把 Pod 放到调度队列中进行重试;
Binder.Bind
方法在调度成功之后把调度结果发送到 kube-apiserver
中保存起来;
回到 createScheduler
,我们看 Config
对象是怎么创建的:
plugin/cmd/kube-scheduler/app/configurator.go +75
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 func createScheduler( s *options.SchedulerServer, kubecli *clientset.Clientset, nodeInformer coreinformers.NodeInformer, pvInformer coreinformers.PersistentVolumeInformer, pvcInformer coreinformers.PersistentVolumeClaimInformer, replicationControllerInformer coreinformers.ReplicationControllerInformer, replicaSetInformer extensionsinformers.ReplicaSetInformer, statefulSetInformer appsinformers.StatefulSetInformer, serviceInformer coreinformers.ServiceInformer, recorder record.EventRecorder, ) (*scheduler.Scheduler, error) { configurator := factory.NewConfigFactory( s.SchedulerName, kubecli, nodeInformer, pvInformer, pvcInformer, replicationControllerInformer, replicaSetInformer, statefulSetInformer, serviceInformer, s.HardPodAffinitySymmetricWeight, ) // Rebuild the configurator with a default Create(...) method. configurator = &schedulerConfigurator{ configurator, s.PolicyConfigFile, s.AlgorithmProvider} return scheduler.NewFromConfigurator(configurator, func(cfg *scheduler.Config) { cfg.Recorder = recorder }) }
首先 factory.NewConfigFactory
生成了一个 Configurator
,Configurator
是一个为了构建最终 Config
对象的结构体,为了不增加代码阅读的难度,这里就不列出来了。其实到了这一步的时候(即构造完成 Configurator
),Config
对象中 除了 Algorithm
这个域,其他的部分都已经设置好了,如关键的 SchedulerCache
已经通过 Node Informer
完成了配置。
下一步,是进行调度核心:调度算法配置的环节。
配置调度算法 Configurator
创建完成后,调用 scheduler.NewFromConfigurator
方法生成最终的 Config
对象 。具体逻辑是:会根据用户是否配置了 Policy文件
,决定调用 Configurator
中的 CreateFromProvider
还是 CreateFromConfig
方法。
plugin/cmd/kube-scheduler/app/configurator.go +120
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func (sc schedulerConfigurator) Create() (*scheduler.Config, error) { if _, err := os.Stat(sc.policyFile); err != nil { if sc.Configurator != nil { return sc.Configurator.CreateFromProvider(sc.algorithmProvider) } return nil, fmt.Errorf("Configurator was nil") } // policy file is valid, try to create a configuration from it. var policy schedulerapi.Policy configData, err := ioutil.ReadFile(sc.policyFile) if err != nil { return nil, fmt.Errorf("unable to read policy config: %v", err) } if err := runtime.DecodeInto(latestschedulerapi.Codec, configData, &policy); err != nil { return nil, fmt.Errorf("invalid configuration: %v", err) } return sc.CreateFromConfig(policy) }
用户自定义的 Policy
文件,可以在命令行参数中通过 policy-config-file
来指定,它包含了 PodFitsPorts
、 PodFitsResources
、NoDiskConflict
等过滤函数,LeastRequestedPriority
、BalancedResourceAllocation
等加权函数。下面是一个 Policy
文件样例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 { "kind" : "Policy", "apiVersion" : "v1", "predicates" : [ {"name" : "PodFitsPorts"}, {"name" : "PodFitsResources"}, {"name" : "NoDiskConflict"}, {"name" : "NoVolumeZoneConflict"}, {"name" : "MatchNodeSelector"}, {"name" : "HostName"} ], "priorities" : [ {"name" : "LeastRequestedPriority", "weight" : 1}, {"name" : "BalancedResourceAllocation", "weight" : 1}, {"name" : "ServiceSpreadingPriority", "weight" : 1}, {"name" : "EqualPriority", "weight" : 1} ] }
如果用户没有指定 Policy
文件,那么调度算法将会使用 DefaultProvider
这个过滤函数(PredicateFunc )和打分加权函数(PriorityFunc )的集合。DefaultProvider
的初始化位于plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go 中的 init()
方法中,init()
方法在 main 运行前就会运行,它会提前将 DefaultProvider
、FitPredicate
(过滤)、Priority
(打分加权)进行注册。
DefaultProvider
中默认配置的 PredicateFunc
有:
NoVolumeZoneConflict
MaxEBSVolumeCount
MaxGCEPDVolumeCount
MaxAzureDiskVolumeCount
MatchInterPodAffinity
NoDiskConflict
GeneralPredicates
PodToleratesNodeTaints
CheckNodeMemoryPressure
CheckNodeDiskPressure
DefaultProvider
中默认配置的 PriorityFunc
有:
SelectorSpreadPriority
InterPodAffinityPriority
LeastRequestedPriority
BalancedResourceAllocation
NodePreferAvoidPodsPriority
NodeAffinityPriority
TaintTolerationPriority
无论是使用 Policy
文件,还是使用 DefaultProvider
,最终的目的都是为了得到调度算法的一种组合实现,最终完成 Config 对象的组装。因此,CreateFromProvider
和 CreateFromConfig
两个方法殊途同归,最后都抵达了 func (f *ConfigFactory) CreateFromKeys
方法。
plugin/pkg/scheduler/factory/factory.go +346
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) { glog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v", predicateKeys, priorityKeys) if f.GetHardPodAffinitySymmetricWeight() < 0 || f.GetHardPodAffinitySymmetricWeight() > 100 { return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 0-100", f.GetHardPodAffinitySymmetricWeight()) } predicateFuncs, err := f.GetPredicates(predicateKeys) if err != nil { return nil, err } priorityConfigs, err := f.GetPriorityFunctionConfigs(priorityKeys) if err != nil { return nil, err } priorityMetaProducer, err := f.GetPriorityMetadataProducer() if err != nil { return nil, err } predicateMetaProducer, err := f.GetPredicateMetadataProducer() if err != nil { return nil, err } // 开始监控待调度的 Pod ,将其加入 PodQueue f.Run() algo := core.NewGenericScheduler(f.schedulerCache, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders) PodBackoff := util.CreateDefaultPodBackoff() return &scheduler.Config{ SchedulerCache: f.schedulerCache, // The scheduler only needs to consider schedulable Nodes. NodeLister: &NodePredicateLister{f.NodeLister}, Algorithm: algo, Binder: &binder{f.client}, PodConditionUpdater: &PodConditionUpdater{f.client}, NextPod : func() *v1.Pod { return f.getNextPod () }, Error: f.MakeDefaultErrorFunc(PodBackoff, f.PodQueue), StopEverything: f.StopEverything, }, nil }
在使用 schedulerCache
、predicates
、prioritizers
、extender
等通过 NewGenericScheduler
方法构建了一个 genericScheduler
, genericScheduler
类型为 scheduler.ScheduleAlgorithm
,准确说应该是一个调度器算法,它实现了 Schedule()
接口。
plugin/pkg/scheduler/core/generic_scheduler.go +439
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func NewGenericScheduler( cache schedulercache.Cache, predicates map[string]algorithm.FitPredicate, predicateMetaProducer algorithm.MetadataProducer, prioritizers []algorithm.PriorityConfig, priorityMetaProducer algorithm.MetadataProducer, extenders []algorithm.SchedulerExtender) algorithm.ScheduleAlgorithm { return &genericScheduler{ cache: cache, predicates: predicates, predicateMetaProducer: predicateMetaProducer, prioritizers: prioritizers, priorityMetaProducer: priorityMetaProducer, extenders: extenders, cached Node InfoMap: make(map[string]*schedulercache.NodeInfo), } }
至此,整个 Scheduler 的启动流程就打通了:在 scheduleOne
中通过 NextPod()
获取待调度 Pod ,然后根据 genericScheduler
中的 Schedule()
方法完成 Node 节点的选择,最后通过 Bind 操作完成整个调度流程。
调度算法解析 plugin/pkg/scheduler/generic_scheduler.go +101
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 func (g *genericScheduler) Schedule(Pod *v1.PodNodeLister algorithm.NodeLister) (string, error, []v1.EnvVar) { trace := utiltrace.New(fmt.Sprintf("Scheduling %s/%s", Pod.Namespace, Pod .Name)) defer trace.LogIfLong(100 * time.Millisecond) Nodes, err := NodeLister.List() if err != nil { return "", err, nil } if len(Nodes) == 0 { return "", ErrNoNodesAvailable, nil } // Used for all fit and priority funcs. err = g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap) if err != nil { return "", err, nil } // 1. 根据给定的过滤函数(即g.predicates)筛选符合所有要求的 Node trace.Step("Computing predicates") filteredNodes, failedPredicateMap, err := findNodesThatFit(Pod g.cachedNodeInfoMap, Nodes, g.predicates, g.extenders, g.predicateMetaProducer) if err != nil { return "", err, nil } predicateEnvs := []v1.EnvVar{ { Name: v1.SchedFailedReason, Value: failedPredicateMap.String(), }, } // 没有符合要求的 Node 则返回 if len(filteredNodes) == 0 { return "", &FitError{ Pod : Pod FailedPredicates: failedPredicateMap, }, predicateEnvs } // 2. 根据给定的打分函数(即g.prioritizers)给输入的各个 Node 进行打分 trace.Step("Prioritizing") metaPrioritiesInterface := g.priorityMetaProducer(Pod g.cachedNodeInfoMap) priorityList, err := PrioritizeNodes(Pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders) if err != nil { return "", err, nil } // 3. 从最终的 Node 列表中选出一个 Node trace.Step("Selecting host") dest, priorityErr, priorityEnvs := g.selectHost(priorityList) return dest, priorityErr, v1.MergeEnvs(predicateEnvs,priorityEnvs) }
从上面可以看到,Schedule()
方法为一个 Pod 选出一个 Node 需要经历下面三步:
通过 findNodesThatFit
过滤缓存的 Node 列表,过滤标准为 Config
对象中 的 Predicates(过滤);
通过 PrioritizeNodes
为过滤后的每个 Node 进行打分,具体某个 Node 的分数为各优先级算法得分的加权和;
如果第二步中的 Node 列表中的元素不止一个,会通过 selectHost
(最终选)进行排序,选出得分最高的节点,如果仍然有多个节点得分一致,则走 round-robin
选择一个。
预选环节 下面是 findNodesThatFit
方法的具体实现:
plugin/pkg/scheduler/generic_scheduler.go +180
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 func findNodesThatFit( Pod *v1.Pod Node NameToInfo map[string]*schedulercache.NodeInfo, Nodes []*v1.Node , predicateFuncs map[string]algorithm.FitPredicate, extenders []algorithm.SchedulerExtender, metadataProducer algorithm.MetadataProducer, ) ([]*v1.Node , FailedPredicateMap, error) { var filtered []*v1.Node failedPredicateMap := FailedPredicateMap{} if len(predicateFuncs) == 0 { filtered = Nodes } else { // ... checkNode := func(i int) { NodeName := Nodes[i].Name fits, failedPredicates, err := PodFitsOnNode(Pod, meta, NodeNameToInfo[NodeName], predicateFuncs) if err != nil { predicateResultLock.Lock() errs = append(errs, err) predicateResultLock.Unlock() return } if fits { filtered[atomic.AddInt32(&filteredLen, 1)-1] = Nodes[i] } else { predicateResultLock.Lock() failedPredicateMap[NodeName] = failedPredicates predicateResultLock.Unlock() } } workqueue.Parallelize(16, len(Nodes), checkNode) filtered = filtered[:filteredLen] // ... } // 如果配置了Extender,则再执行Extender的优选打分方法Extender.Prioritize if len(filtered) > 0 && len(extenders) != 0 { for _, extender := range extenders { filteredList, failedMap, err := extender.Filter(Podfiltered, NodeNameToInfo) if err != nil { return []*v1. Node {}, FailedPredicateMap{}, err } for failedNodeName, failedMsg := range failedMap { if _, found := failedPredicateMap[failedNodeName]; !found { failedPredicateMap[failedNodeName] = []algorithm.PredicateFailureReason{} } failedPredicateMap[failedNodeName] = append(failedPredicateMap[failedNodeName], predicates.NewFailureReason(failedMsg)) } filtered = filteredList if len(filtered) == 0 { break } } } return filtered, failedPredicateMap, nil }
上面的过滤过程,唯一需要注意的就是,过滤是并发的,并发度为 16 个 goroutine,如果 Node 数不到16则并发数为 Node 数。具体的 Predicate Policy
对应的 PredicateFunc
都定义在 plugin/pkg/scheduler/algorithm/predicates/predicates.go ,常用有:
NoDiskConflict : 评估是否存在volume冲突。如果该 volume 已经 mount 过了,k8s可能会不允许重复mount(取决于volume类型);
NoVolumeZoneConflict : 评估该节点上是否存在 Pod 请求的 volume;
PodFitsResources : 检查节点剩余资源(CPU、内存)是否能满足 Pod 的需求。剩余资源=总容量-所有 Pod 请求的资源;
MatchNodeSelector : 判断是否满足 Pod 设置的 NodeSelector;
CheckNodeMemoryPressure : 检查 Pod 是否可以调度到存在内存压力的节点;
CheckNodeDiskPressure : 检查 Pod 是否可以调度到存在硬盘压力的节点;
…
优选环节 同样的,在 PrioritizeNodes
环节也是并发执行,每个 goroutine 依次计算该 Pod 运行在该 Node 上的得分。具体的 Priorities Policy
对应的 PriorityFunc
都定义在 plugin/pkg/scheduler/algorithm/priorities/ .go* 中,常见有:
LeastRequestedPriority :最低请求优先级,即 Node 使用率越低,得分越高;
BalancedResourceAllocation :资源平衡分配,即CPU/内存配比合适的 Node 得分更高;
SelectorSpreadPriority : 尽量将同一 RC/Replica 的多个 Pod 分配到不同的 Node 上;
CalculateAntiAffinityPriority : 尽量将同一 Service 下的多个相同 Label 的 Pod 分配到不同的 Node;
ImageLocalityPriority : Image本地优先,Node 上如果已经存在 Pod 需要的镜像,并且镜像越大,得分越高,从而减少 Pod 拉取镜像的开销(时间);
NodeAffinityPriority : 根据亲和性标签进行选择;
…
源码走读总结 通过上面的源码学习,可以总结 kube-scheduler
的整体流程如下:
kube-scheduler
初始化:如配置 DefaultProvider
、解析命令行参数等;,
kube-scheduler
创建:通过工厂方法构造一个 Config
对象,然后运行这个 Config
对象,这中间包括:
构造资源缓存(scheduleCache
):通过 kube-apiserver
提供的 RestFul API 实现对资源的监控(Watch)和缓存;
构造调度算法的数据结构(Algorithm
):根据用户提供的 Policy File
或者内置的 DefaultProvider
生成;
其他逻辑:提供 kube-scheduler
的 HTTP 服务、 Leader 选举等;
kube-scheduler
运行时:
kube-scheduler
中维护了一个 FIFO 类型的 PodQueue
cache(scheduleCache
的子集),待调度的 Pod 会被及时添加到该 PodQueue 中;
通过 Config.Next Pod
获取一个即将被调度的 Pod ;
通过 Config.Schedule
计算出目标 Node ,分为 Predicates 和 Priorities 两步;;
通过 Config.Bind
进行更新 Pod 和 Node 的绑定;
Scheduler 源码流程图 最后附上一张手绘流程图(点击放大查看):