Kubernetes Scheduler 源码全解析(附流程图)

文章目录
  1. 1. Scheduler 是什么
  2. 2. Scheduler 调度流程
    1. 2.1. 调度资源监听
    2. 2.2. 调度节点分配
  3. 3. Scheduler 源码结构
  4. 4. Scheduler 源码走读
    1. 4.1. 构造并运行Scheduler
      1. 4.1.1. 入口函数
      2. 4.1.2. 配置并启动 Scheduler
      3. 4.1.3. 生成 Config 对象
      4. 4.1.4. 配置调度算法
    2. 4.2. 调度算法解析
      1. 4.2.1. 预选环节
      2. 4.2.2. 优选环节
    3. 4.3. 源码走读总结
  5. 5. Scheduler 源码流程图

最近的一个项目,需要对 kube-scheduler 进行改造,因此就系统性地阅读了下它的源码。几点阅读感受:

  1. Scheduler 的代码整体上代码可读性较好,不同的逻辑之间做了较好的解耦;
  2. 可能正是在设计之初,就想把调度模块做彻底解耦,因此该模块内外都存在一些问题:
    • 我阅读的 Kubernetes 版本(v1.6.6)里 Scheduler 模块位于 plugin 目录下,在最新的 Kubernetes 中,Scheduler 的主体已经被从 plugin 目录移出,和其他组件处于同一目录下;
    • Scheduler 模块代码中存在若干过度设计,当然这只是个人观点;
  3. GO 语言的设计增加了语言的灵活性,但一定程度上也降低了代码的可读性,最简单的例子就是接口;

Scheduler 是什么

Kubernetes 是 Google 团队发起并维护的基于 Docker 的开源容器集群管理系统,它不仅支持常见的云平台,而且支持内部数据中心。Scheduler(下称 kube-scheduler ) 在 Kubernetes 架构 中的位置如下图所示:

从上图可以知道,Kubernetes 中的核心组件有:API Server(下称 kube-apiserver)、Controller Manager(下称 kube-controller-manager)、Schedulerkube-scheduler)、etcd、Proxy(下称kube-proxy)、Kubelet 这几个,在一个典型的扩容请求的流程中,各个组件扮演的角色如下:

  1. 用户通过客户端(可以使 kubernetes-client 这类客户端、也可以是kubelet)通过 kube-apiserver 的 REST API 创建 Deployment/DaemonSet/Job等任务,Pod 状态由 kube-controller-manager 维护;
  2. kube-apiserver 收到用户请求,存储到相关数据到 etcd
  3. kube-scheduler 通过监听 kube-apiserver ,获取未调度的 Pod 列表,通过调度算法计算出分配给 Pod 的 Node ,并将 Node 信息和 Pod 进行绑定(Bind),结果存储到 etcd 中;
  4. Node 上的 kubelet 根据调度结果执行 Pod 创建操作。

可见, kube-scheduler 作为k8s中的核心模块,可以被视为一个黑盒,黑盒的输入为待调度的 Pod 和全部计算节点(Node)的信息,经过黑盒内部的调度算法和策略处理,输出为最优的节点,而后将 Pod 调度该节点上 。

Scheduler 调度流程

kube-scheduler 是作为单独的进程启动的,可以总结 kube-scheduler 的职责有以下这些:

  1. 集群高可用:如果 kube-scheduler 设置了 leader-elect 启动参数,那么会通过 etcd 进行节点选主( kube-schedulerkube-controller-manager 都使用了一主多从的高可用方案);
  2. 调度资源监听:通过 Watch 机制监听 kube-apiserver 上资源的变化,这里的资源主要指的是 Pod 和 Node ;
  3. 调度节点分配:通过预选(Predicates)与优选(Priorites),为待调度的 Pod 分配一个 Node ,同时将分配结果通过 kube-apiserver 写入 etcd

集群高可用这一部分不再详述,简单描述:kube-scheduler 启动时,会在 etcd 中创建 endpoint,endpoint 的信息中记录了当前的 leader 节点信息,以及记录的上次更新时间。leader 节点会定期更新 endpoint 的信息,维护自己的 leader 身份。每个从节点的服务都会定期检查 endpoint 的信息,如果 endpoint 的信息在时间范围内没有更新,它们会尝试更新自己为 leader 节点。

调度资源监听

kube-apiserver 提供了一套 Watch 机制给 kubeletkube-controller-managerkube-scheduler 等组件用来监控各种资源(Pod、Node、Service等)的变化,类似于消息中间件里的发布-订阅模式(Push), kube-apiserver 能够主动通知这些组件。

kube-apiserver 初始化时,建立对etcd的连接,并对etcd进行watch。当 kube-scheduler 等客户端调用 Watch API 时,kube-apiserver 内部会建立一个 WatchServer,后者会从 etcd 里面获取资源的 Watch event,event经过加工过滤后,就会发送给客户端。Watch API 实质是一个 GET 请求,有两种实现模式:

  1. 通过 websocket 协议发送;
  2. 通过 Transfer-Encoding=chunked 的方式建立一个长连接;

调度节点分配

调度节点分配主要可以分为预选(Predicates)与优选(Priorites)这两个环节,上面讲到的调度资源监听的实质就是为这两个环节提供输入。

  1. 预选:根据配置的 Predicates Policies(默认为 DefaultProvider 中定义的 default predicates policies 集合)过滤掉那些不满足这些 Policies 的 Node,预选的输出作为优选的输入;
  2. 优选:根据配置的 Priorities Policies(默认为 DefaultProvider 中定义的 default priorities policies 集合)给预选后的 Node 进行打分排名,得分最高的 Node 即作为最适合的 Node ,该 Pod 就绑定(Bind)到这个 Node 。

注:如果经过优选将 Node 打分排名后,有多个 Node 并列得分最高,那么 kube-scheduler 将随机从中选择一个 Node 作为目标 Node 。

Scheduler 源码结构

有了上面的基础知识后,我们来看一下上面所描述的 kube-scheduler 的功能在代码中是如何实现的。下面的提及的 Kubernetes 源码版本为 v1.6.6

kube-apiserver 的源码主要在 k8s.io/kubernetes/plugin/ 目录下,其中两个目录 cmd/schedulerpkg/scheduler 分别定义了 kube-scheduler 中使用到的结构体的参数封装和 scheduler 的具体内部实现。具体的目录结构如下所示。

从源码也可以得知,Kubernetes 的调度器以 plugin 化形式实现(也是唯一一个以 plugin 形式存在的模块),方便用户定制和二次开发。用户可以自定义调度器并以 plugin 形式与 Kubernetes 集成,或集成其他调度器,便于调度不同类型的任务。

Scheduler 源码走读

构造并运行Scheduler

入口函数

下图就是调度器的入口函数:

plugin/cmd/kube-scheduler/scheduler.go +30

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func main() {
s := options.NewSchedulerServer()
s.AddFlags(pflag.CommandLine)

flag.InitFlags()
logs.InitLogs()
defer logs.FlushLogs()

verflag.PrintAndExitIfRequested()

if err := app.Run(s); err != nil {
glog.Fatalf("scheduler app failed to run: %v", err)
}
}

在 main 函数的第一行,通过options.NewSchedulerServer() 创建一个SchedulerServer(结构体参数配置),具体过程如下所示:

plugin/cmd/kube-scheduler/optinons/optinons.go +37

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type SchedulerServer struct {
componentconfig.KubeSchedulerConfiguration
// kube-apiserver 的地址,可被 kubeconfig 中的配置覆盖).
Master string
// kubeconfig 文件路径
Kubeconfig string
}

// 通过默认参数创建一个 SchedulerServer
func NewSchedulerServer() *SchedulerServer {
versioned := &v1alpha1.KubeSchedulerConfiguration{}
api.Scheme.Default(versioned)
cfg := componentconfig.KubeSchedulerConfiguration{}
api.Scheme.Convert(versioned, &cfg, nil)
// 默认会有选主过程
cfg.LeaderElection.LeaderElect = true
s := SchedulerServer{
KubeSchedulerConfiguration: cfg,
}
return &s
}

创建了一个 SchedulerServer 后,然后是解析命令行参数、初始化日志系统参数等,最后通过 app.Run(s) 启动了调度器,下面我们直接分析 Run 函数中的逻辑。

配置并启动 Scheduler

plugin/cmd/kube-scheduler/app/server.go +71

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// 运行 SchedulerServer,不会退出.
func Run(s *options.SchedulerServer) error {

// 创建一个到 kube-apiserver master 的客户端连接
kubecli, err := createClient(s)

// 创建一个事件广播器,用于向集群中的 Node 发送调度的信息
recorder := createRecorder(kubecli, s)

// 通过工厂类创建 sharedInformerFactory 对象
informerFactory := informers.NewSharedInformerFactory(kubecli, 0)

// 创建一个 scheduler server
sched, err := createScheduler(
s,
kubecli,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
recorder,
)

// 创建 HTTP 服务,用于性能分析,性能指标度量,
// pprof 接口方便性能数据收集,metrics 接口供 prometheus 收集监控数据
go startHTTP(s,sched.GetConfig())

// 开始运行Informer,进行资源缓存
informerFactory.Start(stop)

// 运行调度程序
run := func(_ <-chan struct{}) {
sched.Run()
select {}
}

// 如果不选主,则直接运行
if !s.LeaderElection.LeaderElect {
run(nil)
panic("unreachable")
}

// 集群选主
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
RenewDeadline: s.LeaderElection.RenewDeadline.Duration,
RetryPeriod: s.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
glog.Fatalf("lost master")
},
},
})

panic("unreachable")
}

总结一下 Run 这个方法中的逻辑,有下面这些(有顺序):

  1. 通过 createClients 创建到 kube-apiserver 的连接 kubecli
  2. 通过 createRecorder 创建eventBroadcaster对象 recorder,用于向集群中的 Node 发送调度的信息;
  3. 通过 startHTTPkube-scheduler 通过 HTTP 接口暴露,用于性能分析,性能指标度量等;
  4. 通过 informers.NewSharedInformerFactory 创建 sharedInformerFactory 对象,该对象封装了 kube-apiserver 中的一些事件通知;
  5. 通过 createScheduler 创建一个运行时 Scheduler 实例 sched(定义在 plugin/pkg/scheduler/scheduler.go 中);
  6. 根据 leaderElect 这个标志位决定是否需要进行选主,如果不需要则直接到下一步,否则进行 Leader 选举;
  7. 开始循环执行 sched.Runsched.Run 内部启动 goroutine 进行 Pod 的调度。

在上面的步骤中,步骤4、5、7比较关键:

  • 第4步中得到一个静态工厂类 sharedInformerFactory,该类可以得到 PodInformer NodeInformer 等,Informer 的机制在这里不具体展开了;
  • 第5步主要是通过一个静态工厂生成了 kube-apiserver 运行时的若干配置信息(Config 对象);
  • 第7步通过 Config 对象启动调度器,5和7是整个调度器代码的核心,这两步后面再详细分析。

通过 sched.Run,程序逻辑终于从 plugin/cmd/ 跳转到了 plugin/pkg/ 目录,下面是 Run 方法的定义:

plugin/pkg/scheduler/scheduler.go +148

1
2
3
func (s *Scheduler) Run() {
go wait.Until(s.scheduleOne, 0, s.config.StopEverything)
}

启动一个协程,循环反复执行 Scheduler.scheduleOne 方法,直到收到中止的信号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (s *Scheduler) scheduleOne() {

// 1. 从队列中获取一个 Pod
Pod := s.config.NextPod()

// 2. 通过调度算法,得到待调度的节点
dest, err, extraEnvs := s.config.Algorithm.Schedule(Pod, s.config.NodeLister)
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))

// 3, 进行 Pod 和 Node 的绑定,理想情况绑定会成功
// 成功结果会发送给 apiserver,如果失败,会释放分配给 Pod 的资源
assumed := *Pod
assumed.Spec. NodeName = dest
if err := s.config.SchedulerCache.AssumePod (&assumed); err != nil {
glog.Errorf("scheduler cache AssumePod failed: %v", err)
return
}

go func() {
b := &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: Pod.Namespace, Name: Pod.Name},
Target: v1.ObjectReference{
Kind: "Node",
Name: dest,
},
}
err := s.config.Binder.Bind(b)
}()
}

scheduleOne 中的逻辑主要分为三块:

  1. 通过 Scheduler.config.NextPod() 取得一个 Pod ;
  2. 通过 Scheduler.config.Algorithm.Schedule(Pod, Scheduler.config. NodeLister) 为 Pod 选择一个合适的 Node ;
  3. 通过 Scheduler.config.Binder.Bind(b) 将此 Node 同 Pod 进行绑定;

在进行 Bind 之前,需要从缓存的资源中进行资源的扣除:s.config.SchedulerCache.AssumePod(&assumed),最终调用的逻辑:

/plugin/pkg/scheduler/schedulercache/node_info.go +244

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (n *NodeInfo) addPod(pod *v1.Pod) {
res, non0_cpu, non0_mem := calculateResource(pod)
n.requestedResource.MilliCPU += res.MilliCPU
n.requestedResource.Memory += res.Memory
n.requestedResource.NvidiaGPU += res.NvidiaGPU
if n.requestedResource.OpaqueIntResources == nil && len(res.OpaqueIntResources) > 0 {
n.requestedResource.OpaqueIntResources = map[v1.ResourceName]int64{}
}
for rName, rQuant := range res.OpaqueIntResources {
n.requestedResource.OpaqueIntResources[rName] += rQuant
}

n.nonzeroRequest.MilliCPU += non0_cpu
n.nonzeroRequest.Memory += non0_mem
n.pods = append(n.pods, pod)
if hasPodAffinityConstraints(pod) {
n.podsWithAffinity = append(n.podsWithAffinity, pod)
}
n.generation++
}

绑定操作的逻辑如下:

plugin/pkg/scheduler/factory/factory.go +583

1
2
3
4
5
6
7
func (b *binder) Bind(binding *v1.Binding) error {
glog.V(3).Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name)
ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), binding.Namespace)
return b.Client.Core().RESTClient().Post().Namespace(genericapirequest.NamespaceValue(ctx)).Resource("bindings").Body(binding).Do().Error()
// TODO: use Pods interface for binding once clusters are upgraded
// return b.Pods(binding.Namespace).Bind(binding)
}

绑定就是创建一个 binding 对象(为 Pod 指定了 Node name)后,通过 POST 请求 kube-apiserver。一旦绑定,kubelet 认为 Pod 找到了合适的 Node ,然后 Node 上的 kubelet 会拉起 Pod 。

Pod 的调度逻辑在这里就完成了,上面未展开的 createSchedulerConfig 对象生成部分,了解 Config 的创建和内容对后面了解调度器的工作原理非常重要。

生成 Config 对象

createScheduler 创建的 Scheduler 定义如下:

plugin/pkg/scheduler/scheduler.go +32

1
2
3
type Scheduler struct {
config *Config
}

其中 Config 对象被定义为下面所示的结构,这是整个 kube-scheduler 的核心:

1
2
3
4
5
6
7
8
9
10
type Config struct {
SchedulerCache schedulercache.Cache
NodeLister algorithm.NodeLister
Algorithm algorithm.ScheduleAlgorithm
Binder Binder
NextPod func() *v1.Pod
Error func(*v1.Pod error)
Recorder record.EventRecorder
StopEverything chan struct{}
}
  • SchedulerCache 能够暂时保存调度中的 Pod 和 Node 信息;
  • NodeLister 方法缓存获取所有 Node 信息;
  • NextPod() 方法能返回下一个需要调度的 Pod,FIFO 队列;
  • Algorithm.Schedule() 方法能计算出某个 Pod 在节点中的结果;
  • Error() 方法能够在出错的时候重新把 Pod 放到调度队列中进行重试;
  • Binder.Bind 方法在调度成功之后把调度结果发送到 kube-apiserver 中保存起来;

回到 createScheduler,我们看 Config 对象是怎么创建的:

plugin/cmd/kube-scheduler/app/configurator.go +75

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func createScheduler(
s *options.SchedulerServer,
kubecli *clientset.Clientset,
nodeInformer coreinformers.NodeInformer,
pvInformer coreinformers.PersistentVolumeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
replicationControllerInformer coreinformers.ReplicationControllerInformer,
replicaSetInformer extensionsinformers.ReplicaSetInformer,
statefulSetInformer appsinformers.StatefulSetInformer,
serviceInformer coreinformers.ServiceInformer,
recorder record.EventRecorder,
) (*scheduler.Scheduler, error) {
configurator := factory.NewConfigFactory(
s.SchedulerName,
kubecli,
nodeInformer,
pvInformer,
pvcInformer,
replicationControllerInformer,
replicaSetInformer,
statefulSetInformer,
serviceInformer,
s.HardPodAffinitySymmetricWeight,
)

// Rebuild the configurator with a default Create(...) method.
configurator = &schedulerConfigurator{
configurator,
s.PolicyConfigFile,
s.AlgorithmProvider}

return scheduler.NewFromConfigurator(configurator, func(cfg *scheduler.Config) {
cfg.Recorder = recorder
})
}

首先 factory.NewConfigFactory 生成了一个 ConfiguratorConfigurator 是一个为了构建最终 Config 对象的结构体,为了不增加代码阅读的难度,这里就不列出来了。其实到了这一步的时候(即构造完成 Configurator),Config 对象中 除了 Algorithm 这个域,其他的部分都已经设置好了,如关键的 SchedulerCache 已经通过 Node Informer 完成了配置。

下一步,是进行调度核心:调度算法配置的环节。

配置调度算法

Configurator 创建完成后,调用 scheduler.NewFromConfigurator 方法生成最终的 Config 对象。具体逻辑是:会根据用户是否配置了 Policy文件 ,决定调用 Configurator 中的 CreateFromProvider 还是 CreateFromConfig方法。

plugin/cmd/kube-scheduler/app/configurator.go +120

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (sc schedulerConfigurator) Create() (*scheduler.Config, error) {
if _, err := os.Stat(sc.policyFile); err != nil {
if sc.Configurator != nil {
return sc.Configurator.CreateFromProvider(sc.algorithmProvider)
}
return nil, fmt.Errorf("Configurator was nil")
}

// policy file is valid, try to create a configuration from it.
var policy schedulerapi.Policy
configData, err := ioutil.ReadFile(sc.policyFile)
if err != nil {
return nil, fmt.Errorf("unable to read policy config: %v", err)
}
if err := runtime.DecodeInto(latestschedulerapi.Codec, configData, &policy); err != nil {
return nil, fmt.Errorf("invalid configuration: %v", err)
}
return sc.CreateFromConfig(policy)
}

用户自定义的 Policy 文件,可以在命令行参数中通过 policy-config-file 来指定,它包含了 PodFitsPorts PodFitsResourcesNoDiskConflict 等过滤函数,LeastRequestedPriorityBalancedResourceAllocation 等加权函数。下面是一个 Policy 文件样例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
{
"kind" : "Policy",
"apiVersion" : "v1",
"predicates" : [
{"name" : "PodFitsPorts"},
{"name" : "PodFitsResources"},
{"name" : "NoDiskConflict"},
{"name" : "NoVolumeZoneConflict"},
{"name" : "MatchNodeSelector"},
{"name" : "HostName"}
],
"priorities" : [
{"name" : "LeastRequestedPriority", "weight" : 1},
{"name" : "BalancedResourceAllocation", "weight" : 1},
{"name" : "ServiceSpreadingPriority", "weight" : 1},
{"name" : "EqualPriority", "weight" : 1}
]
}

如果用户没有指定 Policy 文件,那么调度算法将会使用 DefaultProvider 这个过滤函数(PredicateFunc)和打分加权函数(PriorityFunc)的集合。DefaultProvider 的初始化位于
plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go 中的 init() 方法中,init() 方法在 main 运行前就会运行,它会提前将 DefaultProviderFitPredicate(过滤)、Priority(打分加权)进行注册。

DefaultProvider 中默认配置的 PredicateFunc有:

  • NoVolumeZoneConflict
  • MaxEBSVolumeCount
  • MaxGCEPDVolumeCount
  • MaxAzureDiskVolumeCount
  • MatchInterPodAffinity
  • NoDiskConflict
  • GeneralPredicates
  • PodToleratesNodeTaints
  • CheckNodeMemoryPressure
  • CheckNodeDiskPressure

DefaultProvider 中默认配置的 PriorityFunc 有:

  • SelectorSpreadPriority
  • InterPodAffinityPriority
  • LeastRequestedPriority
  • BalancedResourceAllocation
  • NodePreferAvoidPodsPriority
  • NodeAffinityPriority
  • TaintTolerationPriority

无论是使用 Policy 文件,还是使用 DefaultProvider,最终的目的都是为了得到调度算法的一种组合实现,最终完成 Config 对象的组装。因此,CreateFromProviderCreateFromConfig 两个方法殊途同归,最后都抵达了 func (f *ConfigFactory) CreateFromKeys 方法。

plugin/pkg/scheduler/factory/factory.go +346

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) {
glog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v", predicateKeys, priorityKeys)

if f.GetHardPodAffinitySymmetricWeight() < 0 || f.GetHardPodAffinitySymmetricWeight() > 100 {
return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 0-100", f.GetHardPodAffinitySymmetricWeight())
}

predicateFuncs, err := f.GetPredicates(predicateKeys)
if err != nil {
return nil, err
}

priorityConfigs, err := f.GetPriorityFunctionConfigs(priorityKeys)
if err != nil {
return nil, err
}

priorityMetaProducer, err := f.GetPriorityMetadataProducer()
if err != nil {
return nil, err
}

predicateMetaProducer, err := f.GetPredicateMetadataProducer()
if err != nil {
return nil, err
}

// 开始监控待调度的 Pod ,将其加入 PodQueue
f.Run()

algo := core.NewGenericScheduler(f.schedulerCache, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders)
PodBackoff := util.CreateDefaultPodBackoff()
return &scheduler.Config{
SchedulerCache: f.schedulerCache,
// The scheduler only needs to consider schedulable Nodes.
NodeLister: &NodePredicateLister{f.NodeLister},
Algorithm: algo,
Binder: &binder{f.client},
PodConditionUpdater: &PodConditionUpdater{f.client},
NextPod : func() *v1.Pod {
return f.getNextPod ()
},
Error: f.MakeDefaultErrorFunc(PodBackoff, f.PodQueue),
StopEverything: f.StopEverything,
}, nil
}

在使用 schedulerCachepredicatesprioritizersextender 等通过 NewGenericScheduler 方法构建了一个 genericScheduler, genericScheduler 类型为 scheduler.ScheduleAlgorithm,准确说应该是一个调度器算法,它实现了 Schedule() 接口。

plugin/pkg/scheduler/core/generic_scheduler.go +439

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func NewGenericScheduler(
cache schedulercache.Cache,
predicates map[string]algorithm.FitPredicate,
predicateMetaProducer algorithm.MetadataProducer,
prioritizers []algorithm.PriorityConfig,
priorityMetaProducer algorithm.MetadataProducer,
extenders []algorithm.SchedulerExtender) algorithm.ScheduleAlgorithm {
return &genericScheduler{
cache: cache,
predicates: predicates,
predicateMetaProducer: predicateMetaProducer,
prioritizers: prioritizers,
priorityMetaProducer: priorityMetaProducer,
extenders: extenders,
cached Node InfoMap: make(map[string]*schedulercache.NodeInfo),
}
}

至此,整个 Scheduler 的启动流程就打通了:在 scheduleOne 中通过 NextPod() 获取待调度 Pod ,然后根据 genericScheduler 中的 Schedule() 方法完成 Node 节点的选择,最后通过 Bind 操作完成整个调度流程。

调度算法解析

plugin/pkg/scheduler/generic_scheduler.go +101

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
func (g *genericScheduler) Schedule(Pod *v1.PodNodeLister algorithm.NodeLister) (string, error, []v1.EnvVar) {
trace := utiltrace.New(fmt.Sprintf("Scheduling %s/%s", Pod.Namespace, Pod .Name))
defer trace.LogIfLong(100 * time.Millisecond)
Nodes, err := NodeLister.List()
if err != nil {
return "", err, nil
}
if len(Nodes) == 0 {
return "", ErrNoNodesAvailable, nil
}

// Used for all fit and priority funcs.
err = g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)
if err != nil {
return "", err, nil
}

// 1. 根据给定的过滤函数(即g.predicates)筛选符合所有要求的 Node
trace.Step("Computing predicates")
filteredNodes, failedPredicateMap, err := findNodesThatFit(Pod g.cachedNodeInfoMap, Nodes, g.predicates, g.extenders, g.predicateMetaProducer)
if err != nil {
return "", err, nil
}

predicateEnvs := []v1.EnvVar{
{
Name: v1.SchedFailedReason,
Value: failedPredicateMap.String(),
},
}

// 没有符合要求的 Node 则返回
if len(filteredNodes) == 0 {
return "", &FitError{
Pod : Pod
FailedPredicates: failedPredicateMap,
}, predicateEnvs
}

// 2. 根据给定的打分函数(即g.prioritizers)给输入的各个 Node 进行打分
trace.Step("Prioritizing")
metaPrioritiesInterface := g.priorityMetaProducer(Pod g.cachedNodeInfoMap)
priorityList, err := PrioritizeNodes(Pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
if err != nil {
return "", err, nil
}

// 3. 从最终的 Node 列表中选出一个 Node
trace.Step("Selecting host")
dest, priorityErr, priorityEnvs := g.selectHost(priorityList)
return dest, priorityErr, v1.MergeEnvs(predicateEnvs,priorityEnvs)
}

从上面可以看到,Schedule() 方法为一个 Pod 选出一个 Node 需要经历下面三步:

  1. 通过 findNodesThatFit 过滤缓存的 Node 列表,过滤标准为 Config 对象中 的 Predicates(过滤);
  2. 通过 PrioritizeNodes 为过滤后的每个 Node 进行打分,具体某个 Node 的分数为各优先级算法得分的加权和;
  3. 如果第二步中的 Node 列表中的元素不止一个,会通过 selectHost(最终选)进行排序,选出得分最高的节点,如果仍然有多个节点得分一致,则走 round-robin 选择一个。

预选环节

下面是 findNodesThatFit 方法的具体实现:

plugin/pkg/scheduler/generic_scheduler.go +180

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
func findNodesThatFit(
Pod *v1.Pod
Node NameToInfo map[string]*schedulercache.NodeInfo,
Nodes []*v1.Node ,
predicateFuncs map[string]algorithm.FitPredicate,
extenders []algorithm.SchedulerExtender,
metadataProducer algorithm.MetadataProducer,
) ([]*v1.Node , FailedPredicateMap, error) {
var filtered []*v1.Node
failedPredicateMap := FailedPredicateMap{}

if len(predicateFuncs) == 0 {
filtered = Nodes
} else {
// ...
checkNode := func(i int) {
NodeName := Nodes[i].Name
fits, failedPredicates, err := PodFitsOnNode(Pod, meta, NodeNameToInfo[NodeName], predicateFuncs)
if err != nil {
predicateResultLock.Lock()
errs = append(errs, err)
predicateResultLock.Unlock()
return
}
if fits {
filtered[atomic.AddInt32(&filteredLen, 1)-1] = Nodes[i]
} else {
predicateResultLock.Lock()
failedPredicateMap[NodeName] = failedPredicates
predicateResultLock.Unlock()
}
}
workqueue.Parallelize(16, len(Nodes), checkNode)
filtered = filtered[:filteredLen]

// ...
}

// 如果配置了Extender,则再执行Extender的优选打分方法Extender.Prioritize
if len(filtered) > 0 && len(extenders) != 0 {
for _, extender := range extenders {
filteredList, failedMap, err := extender.Filter(Podfiltered, NodeNameToInfo)
if err != nil {
return []*v1. Node {}, FailedPredicateMap{}, err
}

for failedNodeName, failedMsg := range failedMap {
if _, found := failedPredicateMap[failedNodeName]; !found {
failedPredicateMap[failedNodeName] = []algorithm.PredicateFailureReason{}
}
failedPredicateMap[failedNodeName] = append(failedPredicateMap[failedNodeName], predicates.NewFailureReason(failedMsg))
}
filtered = filteredList
if len(filtered) == 0 {
break
}
}
}

return filtered, failedPredicateMap, nil
}

上面的过滤过程,唯一需要注意的就是,过滤是并发的,并发度为 16 个 goroutine,如果 Node 数不到16则并发数为 Node 数。具体的 Predicate Policy 对应的 PredicateFunc 都定义在 plugin/pkg/scheduler/algorithm/predicates/predicates.go,常用有:

  • NoDiskConflict: 评估是否存在volume冲突。如果该 volume 已经 mount 过了,k8s可能会不允许重复mount(取决于volume类型);
  • NoVolumeZoneConflict: 评估该节点上是否存在 Pod 请求的 volume;
  • PodFitsResources: 检查节点剩余资源(CPU、内存)是否能满足 Pod 的需求。剩余资源=总容量-所有 Pod 请求的资源;
  • MatchNodeSelector: 判断是否满足 Pod 设置的 NodeSelector;
  • CheckNodeMemoryPressure: 检查 Pod 是否可以调度到存在内存压力的节点;
  • CheckNodeDiskPressure: 检查 Pod 是否可以调度到存在硬盘压力的节点;

优选环节

同样的,在 PrioritizeNodes 环节也是并发执行,每个 goroutine 依次计算该 Pod 运行在该 Node 上的得分。具体的 Priorities Policy 对应的 PriorityFunc 都定义在 plugin/pkg/scheduler/algorithm/priorities/.go* 中,常见有:

  • LeastRequestedPriority:最低请求优先级,即 Node 使用率越低,得分越高;
  • BalancedResourceAllocation:资源平衡分配,即CPU/内存配比合适的 Node 得分更高;
  • SelectorSpreadPriority: 尽量将同一 RC/Replica 的多个 Pod 分配到不同的 Node 上;
  • CalculateAntiAffinityPriority: 尽量将同一 Service 下的多个相同 Label 的 Pod 分配到不同的 Node;
  • ImageLocalityPriority: Image本地优先,Node 上如果已经存在 Pod 需要的镜像,并且镜像越大,得分越高,从而减少 Pod 拉取镜像的开销(时间);
  • NodeAffinityPriority: 根据亲和性标签进行选择;

源码走读总结

通过上面的源码学习,可以总结 kube-scheduler 的整体流程如下:

  • kube-scheduler 初始化:如配置 DefaultProvider、解析命令行参数等;,
  • kube-scheduler 创建:通过工厂方法构造一个 Config 对象,然后运行这个 Config 对象,这中间包括:
    1. 构造资源缓存(scheduleCache):通过 kube-apiserver 提供的 RestFul API 实现对资源的监控(Watch)和缓存;
    2. 构造调度算法的数据结构(Algorithm):根据用户提供的 Policy File 或者内置的 DefaultProvider 生成;
    3. 其他逻辑:提供 kube-scheduler 的 HTTP 服务、 Leader 选举等;
  • kube-scheduler 运行时:
    1. kube-scheduler 中维护了一个 FIFO 类型的 PodQueue cache(scheduleCache的子集),待调度的 Pod 会被及时添加到该 PodQueue 中;
    2. 通过 Config.Next Pod 获取一个即将被调度的 Pod ;
    3. 通过 Config.Schedule 计算出目标 Node ,分为 Predicates 和 Priorities 两步;;
    4. 通过 Config.Bind 进行更新 Pod 和 Node 的绑定;

Scheduler 源码流程图

最后附上一张手绘流程图(点击放大查看):