Kubernetes Scheduler 源码全解析(附流程图)

最近的一个项目,需要对 kube-scheduler 进行改造,因此就系统性地阅读了下它的源码。几点阅读感受:

  1. Scheduler 的代码整体上代码可读性较好,不同的逻辑之间做了较好的解耦;
  2. 可能正是在设计之初,就想把调度模块做彻底解耦,因此该模块内外都存在一些问题:
    • 我阅读的 Kubernetes 版本(v1.6.6)里 Scheduler 模块位于 plugin 目录下,在最新的 Kubernetes 中,Scheduler 的主体已经被从 plugin 目录移出,和其他组件处于同一目录下;
    • Scheduler 模块代码中存在若干过度设计,当然这只是个人观点;
  3. GO 语言的设计增加了语言的灵活性,但一定程度上也降低了代码的可读性,最简单的例子就是接口;

Scheduler 是什么

Kubernetes 是 Google 团队发起并维护的基于 Docker 的开源容器集群管理系统,它不仅支持常见的云平台,而且支持内部数据中心。Scheduler(下称 kube-scheduler ) 在 Kubernetes 架构 中的位置如下图所示:

从上图可以知道,Kubernetes 中的核心组件有:API Server(下称 kube-apiserver)、Controller Manager(下称 kube-controller-manager)、Schedulerkube-scheduler)、etcd、Proxy(下称kube-proxy)、Kubelet 这几个,在一个典型的扩容请求的流程中,各个组件扮演的角色如下:

  1. 用户通过客户端(可以使 kubernetes-client 这类客户端、也可以是kubelet)通过 kube-apiserver 的 REST API 创建 Deployment/DaemonSet/Job等任务,Pod 状态由 kube-controller-manager 维护;
  2. kube-apiserver 收到用户请求,存储到相关数据到 etcd
  3. kube-scheduler 通过监听 kube-apiserver ,获取未调度的 Pod 列表,通过调度算法计算出分配给 Pod 的 Node ,并将 Node 信息和 Pod 进行绑定(Bind),结果存储到 etcd 中;
  4. Node 上的 kubelet 根据调度结果执行 Pod 创建操作。

可见, kube-scheduler 作为k8s中的核心模块,可以被视为一个黑盒,黑盒的输入为待调度的 Pod 和全部计算节点(Node)的信息,经过黑盒内部的调度算法和策略处理,输出为最优的节点,而后将 Pod 调度该节点上 。

Scheduler 调度流程

kube-scheduler 是作为单独的进程启动的,可以总结 kube-scheduler 的职责有以下这些:

  1. 集群高可用:如果 kube-scheduler 设置了 leader-elect 启动参数,那么会通过 etcd 进行节点选主( kube-schedulerkube-controller-manager 都使用了一主多从的高可用方案);
  2. 调度资源监听:通过 Watch 机制监听 kube-apiserver 上资源的变化,这里的资源主要指的是 Pod 和 Node ;
  3. 调度节点分配:通过预选(Predicates)与优选(Priorites),为待调度的 Pod 分配一个 Node ,同时将分配结果通过 kube-apiserver 写入 etcd

集群高可用这一部分不再详述,简单描述:kube-scheduler 启动时,会在 etcd 中创建 endpoint,endpoint 的信息中记录了当前的 leader 节点信息,以及记录的上次更新时间。leader 节点会定期更新 endpoint 的信息,维护自己的 leader 身份。每个从节点的服务都会定期检查 endpoint 的信息,如果 endpoint 的信息在时间范围内没有更新,它们会尝试更新自己为 leader 节点。

调度资源监听

kube-apiserver 提供了一套 Watch 机制给 kubeletkube-controller-managerkube-scheduler 等组件用来监控各种资源(Pod、Node、Service等)的变化,类似于消息中间件里的发布-订阅模式(Push), kube-apiserver 能够主动通知这些组件。

kube-apiserver 初始化时,建立对etcd的连接,并对etcd进行watch。当 kube-scheduler 等客户端调用 Watch API 时,kube-apiserver 内部会建立一个 WatchServer,后者会从 etcd 里面获取资源的 Watch event,event经过加工过滤后,就会发送给客户端。Watch API 实质是一个 GET 请求,有两种实现模式:

  1. 通过 websocket 协议发送;
  2. 通过 Transfer-Encoding=chunked 的方式建立一个长连接;

调度节点分配

调度节点分配主要可以分为预选(Predicates)与优选(Priorites)这两个环节,上面讲到的调度资源监听的实质就是为这两个环节提供输入。

  1. 预选:根据配置的 Predicates Policies(默认为 DefaultProvider 中定义的default predicates policies 集合)过滤掉那些不满足这些 Policies 的 Node,预选的输出作为优选的输入;
  2. 优选:根据配置的 Priorities Policies(默认为 DefaultProvider 中定义的 default priorities policies 集合)给预选后的 Node 进行打分排名,得分最高的 Node 即作为最适合的 Node ,该 Pod 就绑定(Bind)到这个 Node 。

注:如果经过优选将 Node 打分排名后,有多个 Node 并列得分最高,那么 kube-scheduler 将随机从中选择一个 Node 作为目标 Node 。

Scheduler 源码结构

有了上面的基础知识后,我们来看一下上面所描述的 kube-scheduler 的功能在代码中是如何实现的。下面的提及的 Kubernetes 源码版本为 v1.6.6

kube-apiserver 的源码主要在 k8s.io/kubernetes/plugin/ 目录下,其中两个目录 cmd/schedulerpkg/scheduler 分别定义了 kube-scheduler 中使用到的结构体的参数封装和 scheduler 的具体内部实现。具体的目录结构如下所示。

从源码也可以得知,Kubernetes 的调度器以 plugin 化形式实现(也是唯一一个以 plugin 形式存在的模块),方便用户定制和二次开发。用户可以自定义调度器并以 plugin 形式与 Kubernetes 集成,或集成其他调度器,便于调度不同类型的任务。

Scheduler 源码走读

构造并运行Scheduler

入口函数

下图就是调度器的入口函数:

plugin/cmd/kube-scheduler/scheduler.go +30

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func main() {
s := options.NewSchedulerServer()
s.AddFlags(pflag.CommandLine)

flag.InitFlags()
logs.InitLogs()
defer logs.FlushLogs()

verflag.PrintAndExitIfRequested()

if err := app.Run(s); err != nil {
glog.Fatalf("scheduler app failed to run: %v", err)
}
}

在 main 函数的第一行,通过options.NewSchedulerServer() 创建一个SchedulerServer(结构体参数配置),具体过程如下所示:

plugin/cmd/kube-scheduler/optinons/optinons.go +37

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type SchedulerServer struct {
componentconfig.KubeSchedulerConfiguration
// kube-apiserver 的地址,可被 kubeconfig 中的配置覆盖).
Master string
// kubeconfig 文件路径
Kubeconfig string
}

// 通过默认参数创建一个 SchedulerServer
func NewSchedulerServer() *SchedulerServer {
versioned := &v1alpha1.KubeSchedulerConfiguration{}
api.Scheme.Default(versioned)
cfg := componentconfig.KubeSchedulerConfiguration{}
api.Scheme.Convert(versioned, &cfg, nil)
// 默认会有选主过程
cfg.LeaderElection.LeaderElect = true
s := SchedulerServer{
KubeSchedulerConfiguration: cfg,
}
return &s
}

创建了一个 SchedulerServer 后,然后是解析命令行参数、初始化日志系统参数等,最后通过 app.Run(s) 启动了调度器,下面我们直接分析 Run 函数中的逻辑。

配置并启动 Scheduler

plugin/cmd/kube-scheduler/app/server.go +71

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// 运行 SchedulerServer,不会退出.
func Run(s *options.SchedulerServer) error {

// 创建一个到 kube-apiserver master 的客户端连接
kubecli, err := createClient(s)

// 创建一个事件广播器,用于向集群中的 Node 发送调度的信息
recorder := createRecorder(kubecli, s)

// 通过工厂类创建 sharedInformerFactory 对象
informerFactory := informers.NewSharedInformerFactory(kubecli, 0)

// 创建一个 scheduler server
sched, err := createScheduler(
s,
kubecli,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
recorder,
)

// 创建 HTTP 服务,用于性能分析,性能指标度量,
// pprof 接口方便性能数据收集,metrics 接口供 prometheus 收集监控数据
go startHTTP(s,sched.GetConfig())

// 开始运行Informer,进行资源缓存
informerFactory.Start(stop)

// 运行调度程序
run := func(_ <-chan struct{}) {
sched.Run()
select {}
}

// 如果不选主,则直接运行
if !s.LeaderElection.LeaderElect {
run(nil)
panic("unreachable")
}

// 集群选主
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
RenewDeadline: s.LeaderElection.RenewDeadline.Duration,
RetryPeriod: s.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
glog.Fatalf("lost master")
},
},
})

panic("unreachable")
}

总结一下 Run 这个方法中的逻辑,有下面这些(有顺序):

  1. 通过 createClients 创建到 kube-apiserver 的连接 kubecli
  2. 通过 createRecorder 创建eventBroadcaster对象 recorder,用于向集群中的 Node 发送调度的信息;
  3. 通过 startHTTPkube-scheduler 通过 HTTP 接口暴露,用于性能分析,性能指标度量等;
  4. 通过 informers.NewSharedInformerFactory 创建 sharedInformerFactory 对象,该对象封装了 kube-apiserver 中的一些事件通知;
  5. 通过 createScheduler 创建一个运行时 Scheduler 实例 sched(定义在 plugin/pkg/scheduler/scheduler.go 中);
  6. 根据 leaderElect 这个标志位决定是否需要进行选主,如果不需要则直接到下一步,否则进行 Leader 选举;
  7. 开始循环执行 sched.Runsched.Run 内部启动 goroutine 进行 Pod 的调度。

在上面的步骤中,步骤4、5、7比较关键:

  • 第4步中得到一个静态工厂类 sharedInformerFactory,该类可以得到 PodInformerNodeInformer 等,Informer 的机制在这里不具体展开了;
  • 第5步主要是通过一个静态工厂生成了 kube-apiserver 运行时的若干配置信息(Config 对象);
  • 第7步通过 Config 对象启动调度器,5和7是整个调度器代码的核心,这两步后面再详细分析。

通过 sched.Run,程序逻辑终于从 plugin/cmd/ 跳转到了 plugin/pkg/ 目录,下面是 Run 方法的定义:

plugin/pkg/scheduler/scheduler.go +148

1
2
3
func (s *Scheduler) Run() {
go wait.Until(s.scheduleOne, 0, s.config.StopEverything)
}

启动一个协程,循环反复执行 Scheduler.scheduleOne 方法,直到收到中止的信号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (s *Scheduler) scheduleOne() {

// 1. 从队列中获取一个 Pod
Pod := s.config.NextPod()

// 2. 通过调度算法,得到待调度的节点
dest, err, extraEnvs := s.config.Algorithm.Schedule(Pod, s.config.NodeLister)
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))

// 3, 进行 Pod 和 Node 的绑定,理想情况绑定会成功
// 成功结果会发送给 apiserver,如果失败,会释放分配给 Pod 的资源
assumed := *Pod
assumed.Spec. NodeName = dest
if err := s.config.SchedulerCache.AssumePod (&assumed); err != nil {
glog.Errorf("scheduler cache AssumePod failed: %v", err)
return
}

go func() {
b := &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: Pod.Namespace, Name: Pod.Name},
Target: v1.ObjectReference{
Kind: "Node",
Name: dest,
},
}
err := s.config.Binder.Bind(b)
}()
}

scheduleOne 中的逻辑主要分为三块:

  1. 通过 Scheduler.config.NextPod() 取得一个 Pod ;
  2. 通过 Scheduler.config.Algorithm.Schedule(Pod, Scheduler.config. NodeLister) 为 Pod 选择一个合适的 Node ;
  3. 通过 Scheduler.config.Binder.Bind(b) 将此 Node 同 Pod 进行绑定;

在进行 Bind 之前,需要从缓存的资源中进行资源的扣除:s.config.SchedulerCache.AssumePod(&assumed),最终调用的逻辑:

/plugin/pkg/scheduler/schedulercache/node_info.go +244

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (n *NodeInfo) addPod(pod *v1.Pod) {
res, non0_cpu, non0_mem := calculateResource(pod)
n.requestedResource.MilliCPU += res.MilliCPU
n.requestedResource.Memory += res.Memory
n.requestedResource.NvidiaGPU += res.NvidiaGPU
if n.requestedResource.OpaqueIntResources == nil && len(res.OpaqueIntResources) > 0 {
n.requestedResource.OpaqueIntResources = map[v1.ResourceName]int64{}
}
for rName, rQuant := range res.OpaqueIntResources {
n.requestedResource.OpaqueIntResources[rName] += rQuant
}

n.nonzeroRequest.MilliCPU += non0_cpu
n.nonzeroRequest.Memory += non0_mem
n.pods = append(n.pods, pod)
if hasPodAffinityConstraints(pod) {
n.podsWithAffinity = append(n.podsWithAffinity, pod)
}
n.generation++
}

绑定操作的逻辑如下:

plugin/pkg/scheduler/factory/factory.go +583

1
2
3
4
5
6
7
func (b *binder) Bind(binding *v1.Binding) error {
glog.V(3).Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name)
ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), binding.Namespace)
return b.Client.Core().RESTClient().Post().Namespace(genericapirequest.NamespaceValue(ctx)).Resource("bindings").Body(binding).Do().Error()
// TODO: use Pods interface for binding once clusters are upgraded
// return b.Pods(binding.Namespace).Bind(binding)
}

绑定就是创建一个 binding 对象(为 Pod 指定了 Node name)后,通过 POST 请求 kube-apiserver。一旦绑定,kubelet 认为 Pod 找到了合适的 Node ,然后 Node 上的 kubelet 会拉起 Pod 。

Pod 的调度逻辑在这里就完成了,上面未展开的 createSchedulerConfig 对象生成部分,了解 Config 的创建和内容对后面了解调度器的工作原理非常重要。

生成 Config 对象

createScheduler 创建的 Scheduler 定义如下:

plugin/pkg/scheduler/scheduler.go +32

1
2
3
type Scheduler struct {
config *Config
}

其中 Config 对象被定义为下面所示的结构,这是整个 kube-scheduler 的核心:

1
2
3
4
5
6
7
8
9
10
type Config struct {
SchedulerCache schedulercache.Cache
NodeLister algorithm.NodeLister
Algorithm algorithm.ScheduleAlgorithm
Binder Binder
NextPod func() *v1.Pod
Error func(*v1.Pod error)
Recorder record.EventRecorder
StopEverything chan struct{}
}
  • SchedulerCache 能够暂时保存调度中的 Pod 和 Node 信息;
  • NodeLister 方法缓存获取所有 Node 信息;
  • NextPod() 方法能返回下一个需要调度的 Pod,FIFO 队列;
  • Algorithm.Schedule() 方法能计算出某个 Pod 在节点中的结果;
  • Error() 方法能够在出错的时候重新把 Pod 放到调度队列中进行重试;
  • Binder.Bind 方法在调度成功之后把调度结果发送到 kube-apiserver 中保存起来;

回到 createScheduler,我们看 Config 对象是怎么创建的:

plugin/cmd/kube-scheduler/app/configurator.go +75

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func createScheduler(
s *options.SchedulerServer,
kubecli *clientset.Clientset,
nodeInformer coreinformers.NodeInformer,
pvInformer coreinformers.PersistentVolumeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
replicationControllerInformer coreinformers.ReplicationControllerInformer,
replicaSetInformer extensionsinformers.ReplicaSetInformer,
statefulSetInformer appsinformers.StatefulSetInformer,
serviceInformer coreinformers.ServiceInformer,
recorder record.EventRecorder,
) (*scheduler.Scheduler, error) {
configurator := factory.NewConfigFactory(
s.SchedulerName,
kubecli,
nodeInformer,
pvInformer,
pvcInformer,
replicationControllerInformer,
replicaSetInformer,
statefulSetInformer,
serviceInformer,
s.HardPodAffinitySymmetricWeight,
)

// Rebuild the configurator with a default Create(...) method.
configurator = &schedulerConfigurator{
configurator,
s.PolicyConfigFile,
s.AlgorithmProvider}

return scheduler.NewFromConfigurator(configurator, func(cfg *scheduler.Config) {
cfg.Recorder = recorder
})
}

首先 factory.NewConfigFactory 生成了一个 ConfiguratorConfigurator 是一个为了构建最终 Config 对象的结构体,为了不增加代码阅读的难度,这里就不列出来了。其实到了这一步的时候(即构造完成 Configurator),Config 对象中 除了 Algorithm 这个域,其他的部分都已经设置好了,如关键的 SchedulerCache 已经通过 Node Informer 完成了配置。

下一步,是进行调度核心:调度算法配置的环节。

配置调度算法

Configurator 创建完成后,调用 scheduler.NewFromConfigurator 方法生成最终的 Config 对象。具体逻辑是:会根据用户是否配置了 Policy文件 ,决定调用 Configurator 中的 CreateFromProvider 还是 CreateFromConfig方法。

plugin/cmd/kube-scheduler/app/configurator.go +120

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (sc schedulerConfigurator) Create() (*scheduler.Config, error) {
if _, err := os.Stat(sc.policyFile); err != nil {
if sc.Configurator != nil {
return sc.Configurator.CreateFromProvider(sc.algorithmProvider)
}
return nil, fmt.Errorf("Configurator was nil")
}

// policy file is valid, try to create a configuration from it.
var policy schedulerapi.Policy
configData, err := ioutil.ReadFile(sc.policyFile)
if err != nil {
return nil, fmt.Errorf("unable to read policy config: %v", err)
}
if err := runtime.DecodeInto(latestschedulerapi.Codec, configData, &policy); err != nil {
return nil, fmt.Errorf("invalid configuration: %v", err)
}
return sc.CreateFromConfig(policy)
}

用户自定义的 Policy 文件,可以在命令行参数中通过 policy-config-file 来指定,它包含了 PodFitsPortsPodFitsResourcesNoDiskConflict 等过滤函数,LeastRequestedPriorityBalancedResourceAllocation 等加权函数。下面是一个 Policy 文件样例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
{
"kind" : "Policy",
"apiVersion" : "v1",
"predicates" : [
{"name" : "PodFitsPorts"},
{"name" : "PodFitsResources"},
{"name" : "NoDiskConflict"},
{"name" : "NoVolumeZoneConflict"},
{"name" : "MatchNodeSelector"},
{"name" : "HostName"}
],
"priorities" : [
{"name" : "LeastRequestedPriority", "weight" : 1},
{"name" : "BalancedResourceAllocation", "weight" : 1},
{"name" : "ServiceSpreadingPriority", "weight" : 1},
{"name" : "EqualPriority", "weight" : 1}
]
}

如果用户没有指定 Policy 文件,那么调度算法将会使用 DefaultProvider 这个过滤函数(PredicateFunc)和打分加权函数(PriorityFunc)的集合。DefaultProvider 的初始化位于
plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go 中的 init() 方法中,init() 方法在 main 运行前就会运行,它会提前将 DefaultProviderFitPredicate(过滤)、Priority(打分加权)进行注册。

DefaultProvider 中默认配置的 PredicateFunc有:

  • NoVolumeZoneConflict
  • MaxEBSVolumeCount
  • MaxGCEPDVolumeCount
  • MaxAzureDiskVolumeCount
  • MatchInterPodAffinity
  • NoDiskConflict
  • GeneralPredicates
  • PodToleratesNodeTaints
  • CheckNodeMemoryPressure
  • CheckNodeDiskPressure

DefaultProvider 中默认配置的 PriorityFunc 有:

  • SelectorSpreadPriority
  • InterPodAffinityPriority
  • LeastRequestedPriority
  • BalancedResourceAllocation
  • NodePreferAvoidPodsPriority
  • NodeAffinityPriority
  • TaintTolerationPriority

无论是使用 Policy 文件,还是使用 DefaultProvider,最终的目的都是为了得到调度算法的一种组合实现,最终完成 Config 对象的组装。因此,CreateFromProviderCreateFromConfig 两个方法殊途同归,最后都抵达了 func (f *ConfigFactory) CreateFromKeys 方法。

plugin/pkg/scheduler/factory/factory.go +346

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) {
glog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v", predicateKeys, priorityKeys)

if f.GetHardPodAffinitySymmetricWeight() < 0 || f.GetHardPodAffinitySymmetricWeight() > 100 {
return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 0-100", f.GetHardPodAffinitySymmetricWeight())
}

predicateFuncs, err := f.GetPredicates(predicateKeys)
if err != nil {
return nil, err
}

priorityConfigs, err := f.GetPriorityFunctionConfigs(priorityKeys)
if err != nil {
return nil, err
}

priorityMetaProducer, err := f.GetPriorityMetadataProducer()
if err != nil {
return nil, err
}

predicateMetaProducer, err := f.GetPredicateMetadataProducer()
if err != nil {
return nil, err
}

// 开始监控待调度的 Pod ,将其加入 PodQueue
f.Run()

algo := core.NewGenericScheduler(f.schedulerCache, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders)
PodBackoff := util.CreateDefaultPodBackoff()
return &scheduler.Config{
SchedulerCache: f.schedulerCache,
// The scheduler only needs to consider schedulable Nodes.
NodeLister: &NodePredicateLister{f.NodeLister},
Algorithm: algo,
Binder: &binder{f.client},
PodConditionUpdater: &PodConditionUpdater{f.client},
NextPod : func() *v1.Pod {
return f.getNextPod ()
},
Error: f.MakeDefaultErrorFunc(PodBackoff, f.PodQueue),
StopEverything: f.StopEverything,
}, nil
}

在使用 schedulerCachepredicatesprioritizersextender 等通过 NewGenericScheduler 方法构建了一个 genericScheduler, genericScheduler 类型为 scheduler.ScheduleAlgorithm,准确说应该是一个调度器算法,它实现了 Schedule() 接口。

plugin/pkg/scheduler/core/generic_scheduler.go +439

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func NewGenericScheduler(
cache schedulercache.Cache,
predicates map[string]algorithm.FitPredicate,
predicateMetaProducer algorithm.MetadataProducer,
prioritizers []algorithm.PriorityConfig,
priorityMetaProducer algorithm.MetadataProducer,
extenders []algorithm.SchedulerExtender) algorithm.ScheduleAlgorithm {
return &genericScheduler{
cache: cache,
predicates: predicates,
predicateMetaProducer: predicateMetaProducer,
prioritizers: prioritizers,
priorityMetaProducer: priorityMetaProducer,
extenders: extenders,
cached Node InfoMap: make(map[string]*schedulercache.NodeInfo),
}
}

至此,整个 Scheduler 的启动流程就打通了:在 scheduleOne 中通过 NextPod() 获取待调度 Pod ,然后根据 genericScheduler 中的 Schedule() 方法完成 Node 节点的选择,最后通过 Bind 操作完成整个调度流程。

调度算法解析

plugin/pkg/scheduler/generic_scheduler.go +101

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
func (g *genericScheduler) Schedule(Pod *v1.PodNodeLister algorithm.NodeLister) (string, error, []v1.EnvVar) {
trace := utiltrace.New(fmt.Sprintf("Scheduling %s/%s", Pod.Namespace, Pod .Name))
defer trace.LogIfLong(100 * time.Millisecond)
Nodes, err := NodeLister.List()
if err != nil {
return "", err, nil
}
if len(Nodes) == 0 {
return "", ErrNoNodesAvailable, nil
}

// Used for all fit and priority funcs.
err = g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)
if err != nil {
return "", err, nil
}

// 1. 根据给定的过滤函数(即g.predicates)筛选符合所有要求的 Node
trace.Step("Computing predicates")
filteredNodes, failedPredicateMap, err := findNodesThatFit(Pod g.cachedNodeInfoMap, Nodes, g.predicates, g.extenders, g.predicateMetaProducer)
if err != nil {
return "", err, nil
}

predicateEnvs := []v1.EnvVar{
{
Name: v1.SchedFailedReason,
Value: failedPredicateMap.String(),
},
}

// 没有符合要求的 Node 则返回
if len(filteredNodes) == 0 {
return "", &FitError{
Pod : Pod
FailedPredicates: failedPredicateMap,
}, predicateEnvs
}

// 2. 根据给定的打分函数(即g.prioritizers)给输入的各个 Node 进行打分
trace.Step("Prioritizing")
metaPrioritiesInterface := g.priorityMetaProducer(Pod g.cachedNodeInfoMap)
priorityList, err := PrioritizeNodes(Pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
if err != nil {
return "", err, nil
}

// 3. 从最终的 Node 列表中选出一个 Node
trace.Step("Selecting host")
dest, priorityErr, priorityEnvs := g.selectHost(priorityList)
return dest, priorityErr, v1.MergeEnvs(predicateEnvs,priorityEnvs)
}

从上面可以看到,Schedule() 方法为一个 Pod 选出一个 Node 需要经历下面三步:

  1. 通过 findNodesThatFit 过滤缓存的 Node 列表,过滤标准为 Config 对象中 的 Predicates(过滤);
  2. 通过 PrioritizeNodes 为过滤后的每个 Node 进行打分,具体某个 Node 的分数为各优先级算法得分的加权和;
  3. 如果第二步中的 Node 列表中的元素不止一个,会通过 selectHost(最终选)进行排序,选出得分最高的节点,如果仍然有多个节点得分一致,则走 round-robin 选择一个。

预选环节

下面是 findNodesThatFit 方法的具体实现:

plugin/pkg/scheduler/generic_scheduler.go +180

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
func findNodesThatFit(
Pod *v1.Pod
Node NameToInfo map[string]*schedulercache.NodeInfo,
Nodes []*v1.Node ,
predicateFuncs map[string]algorithm.FitPredicate,
extenders []algorithm.SchedulerExtender,
metadataProducer algorithm.MetadataProducer,
) ([]*v1.Node , FailedPredicateMap, error) {
var filtered []*v1.Node
failedPredicateMap := FailedPredicateMap{}

if len(predicateFuncs) == 0 {
filtered = Nodes
} else {
// ...
checkNode := func(i int) {
NodeName := Nodes[i].Name
fits, failedPredicates, err := PodFitsOnNode(Pod, meta, NodeNameToInfo[NodeName], predicateFuncs)
if err != nil {
predicateResultLock.Lock()
errs = append(errs, err)
predicateResultLock.Unlock()
return
}
if fits {
filtered[atomic.AddInt32(&filteredLen, 1)-1] = Nodes[i]
} else {
predicateResultLock.Lock()
failedPredicateMap[NodeName] = failedPredicates
predicateResultLock.Unlock()
}
}
workqueue.Parallelize(16, len(Nodes), checkNode)
filtered = filtered[:filteredLen]

// ...
}

// 如果配置了Extender,则再执行Extender的优选打分方法Extender.Prioritize
if len(filtered) > 0 && len(extenders) != 0 {
for _, extender := range extenders {
filteredList, failedMap, err := extender.Filter(Podfiltered, NodeNameToInfo)
if err != nil {
return []*v1. Node {}, FailedPredicateMap{}, err
}

for failedNodeName, failedMsg := range failedMap {
if _, found := failedPredicateMap[failedNodeName]; !found {
failedPredicateMap[failedNodeName] = []algorithm.PredicateFailureReason{}
}
failedPredicateMap[failedNodeName] = append(failedPredicateMap[failedNodeName], predicates.NewFailureReason(failedMsg))
}
filtered = filteredList
if len(filtered) == 0 {
break
}
}
}

return filtered, failedPredicateMap, nil
}

上面的过滤过程,唯一需要注意的就是,过滤是并发的,并发度为 16 个 goroutine,如果 Node 数不到16则并发数为 Node 数。具体的 Predicate Policy 对应的 PredicateFunc 都定义在 plugin/pkg/scheduler/algorithm/predicates/predicates.go,常用有:

  • NoDiskConflict: 评估是否存在volume冲突。如果该 volume 已经 mount 过了,k8s可能会不允许重复mount(取决于volume类型);
  • NoVolumeZoneConflict: 评估该节点上是否存在 Pod 请求的 volume;
  • PodFitsResources: 检查节点剩余资源(CPU、内存)是否能满足 Pod 的需求。剩余资源=总容量-所有 Pod 请求的资源;
  • MatchNodeSelector: 判断是否满足 Pod 设置的 NodeSelector;
  • CheckNodeMemoryPressure: 检查 Pod 是否可以调度到存在内存压力的节点;
  • CheckNodeDiskPressure: 检查 Pod 是否可以调度到存在硬盘压力的节点;

优选环节

同样的,在 PrioritizeNodes 环节也是并发执行,每个 goroutine 依次计算该 Pod 运行在该 Node 上的得分。具体的 Priorities Policy 对应的 PriorityFunc 都定义在 plugin/pkg/scheduler/algorithm/priorities/.go* 中,常见有:

  • LeastRequestedPriority:最低请求优先级,即 Node 使用率越低,得分越高;
  • BalancedResourceAllocation:资源平衡分配,即CPU/内存配比合适的 Node 得分更高;
  • SelectorSpreadPriority: 尽量将同一 RC/Replica 的多个 Pod 分配到不同的 Node 上;
  • CalculateAntiAffinityPriority: 尽量将同一 Service 下的多个相同 Label 的 Pod 分配到不同的 Node;
  • ImageLocalityPriority: Image本地优先,Node 上如果已经存在 Pod 需要的镜像,并且镜像越大,得分越高,从而减少 Pod 拉取镜像的开销(时间);
  • NodeAffinityPriority: 根据亲和性标签进行选择;

源码走读总结

通过上面的源码学习,可以总结 kube-scheduler 的整体流程如下:

  • kube-scheduler 初始化:如配置 DefaultProvider、解析命令行参数等;,
  • kube-scheduler 创建:通过工厂方法构造一个 Config 对象,然后运行这个 Config 对象,这中间包括:
    1. 构造资源缓存(scheduleCache):通过 kube-apiserver 提供的 RestFul API 实现对资源的监控(Watch)和缓存;
    2. 构造调度算法的数据结构(Algorithm):根据用户提供的 Policy File 或者内置的 DefaultProvider 生成;
    3. 其他逻辑:提供 kube-scheduler 的 HTTP 服务、 Leader 选举等;
  • kube-scheduler 运行时:
    1. kube-scheduler 中维护了一个 FIFO 类型的 PodQueue cache(scheduleCache的子集),待调度的 Pod 会被及时添加到该 PodQueue 中;
    2. 通过 Config.Next Pod 获取一个即将被调度的 Pod ;
    3. 通过 Config.Schedule 计算出目标 Node ,分为 Predicates 和 Priorities 两步;;
    4. 通过 Config.Bind 进行更新 Pod 和 Node 的绑定;

Scheduler 源码流程图

最后附上一张手绘流程图(点击放大查看):

---(完)---
Yves wechat
扫一扫互相关注吧~
  • 本文作者: Yves
  • 本文标题: Kubernetes Scheduler 源码全解析(附流程图)
  • 发布时间: 2018年07月24日 - 11:07
  • 更新时间: 2020年07月22日 - 00:07
  • 本文链接: /2018/07/24/learning-kubernetes-source-code/
  • 版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!

扫一扫关注公众号