程序印象

Kube-Scheduler 源码剖析

2019/12/02 Share

[toc]

主要功能

Kube-Scheduler 主要工作是为需要运行的 Pod 选择合适的 Node,从阶段上讲分为两个阶段:

  1. 预选 Predicates

    挑选出符合调度条件的 Node 列表

  2. 优选 Prioritizing

    从已经选择出来的 Node 列表中按照一定的算法选择出最优匹配的 Node,设置 Pod 对应的 NodeName

使用者可以使用自己定义的 config 文件,或直接使用系统提供的默认预选和优选算法;当用户可以自己通过扩展算法来实现自己的调度器;

image-20191202143724661

代码分析基于 v1.12.6

核心主流程

启动

k8s.io/kubernetes/cmd/kube-scheduler/scheduler.go

1
2
3
4
5
6
7
8
9
10
import (
// ...
"k8s.io/kubernetes/cmd/kube-scheduler/app"
)
func main() {
// ...

if err := command.Execute(); err != nil {
}
}

k8s.io/kubernetes/cmd/kube-scheduler/app/server.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// NewSchedulerCommand creates a *cobra.Command object with default parameters
func NewSchedulerCommand() *cobra.Command {
// ...

cmd := &cobra.Command{
Use: "kube-scheduler",
Long: `...`,
Run: func(cmd *cobra.Command, args []string) {
// ...

stopCh := make(chan struct{})
if err := Run(c.Complete(), stopCh); err != nil {
}
},
}
return cmd
}

Run() 函数入口

k8s.io/kubernetes/cmd/kube-scheduler/app/server.go

kube-scheduler 主函数在 Run 函数中,该函数主要的工作包括:

  1. 调度算法的初始化
  2. 启动 Informer 监听,从 Kube-APIServer 同步需要的数据
  3. 开启调度的工作,调度的主函数 sched.scheduleOne 顾名思义就是每次串行调度一个 Pod
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Run runs the Scheduler.
func Run(c schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {

// 注册算法 参见流程 算法初始化
algorithmprovider.ApplyFeatureGates()

// 配置初始化
schedulerConfig, err := NewSchedulerConfig(c)

// Create the scheduler.
sched := scheduler.NewFromConfig(schedulerConfig)

// ...

// 启动 informer 监听
go c.PodInformer.Informer().Run(stopCh)
c.InformerFactory.Start(stopCh)

// Wait for all caches to sync before scheduling.
c.InformerFactory.WaitForCacheSync(stopCh)
controller.WaitForCacheSync("scheduler", stopCh, c.PodInformer.Informer().HasSynced)

// Prepare a reusable run function.
run := func(ctx context.Context) {
sched.Run() // 主入口
<-ctx.Done()
}

// ...

run(ctx)
return fmt.Errorf("finished without leader elect")
}

k8s.io/kubernetes/pkg/scheduler/scheduler.go

1
2
3
4
5
6
7
8
9
// Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately.
func (sched *Scheduler) Run() {
if !sched.config.WaitForCacheSync() {
return
}

// 主入口函数 scheduleOne
go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}

配置初始化-之 config.CompletedConfig

如果对配置初始化过程不感兴趣,可以跳过,直接看调度算法

配置初始化的工作主要流程如下:

args -> options.Options -> config.Config -> config.CompleteConfig(仅包装了config.Config) -> scheduler.Config

其中 包含了 config.Config 包含了配置 config.KubeSchedulerConfiguration(调度器名称、调度器算法来源、是否禁止强制调度)

简略一点的路径是:

options.Options ->KubeSchedulerConfiguration -> config.CompleteConfig -> scheduler.Config

函数 c.Complete() 完成了 config 结构体的相关变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// NewSchedulerCommand creates a *cobra.Command object with default parameters
func NewSchedulerCommand() *cobra.Command {
opts, err := options.NewOptions()

cmd := &cobra.Command{
Use: "kube-scheduler",
Long: ``,
Run: func(cmd *cobra.Command, args []string) {
c, err := opts.Config() // 初始化配置

stopCh := make(chan struct{})
Run(c.Complete(), stopCh) // 完成配置返回 config.CompletedConfig 结构
},
}

return cmd
}

k8s.io/kubernetes/cmd/kube-scheduler/app/options/options.go

初始化配置的函数 opts.Config() 主要是完成 opt 参数向 config 对象的各种初始化动作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// Config return a scheduler config object
func (o *Options) Config() (*schedulerappconfig.Config, error) {
c := &schedulerappconfig.Config{}
// ApplyTo 函数完成 opt 参数或者configfile 变量到 schedulerappconfig.Config{} 对象
if err := o.ApplyTo(c); err != nil {
return nil, err
}

// prepare kube clients.
client, leaderElectionClient, eventClient, err := createClients(c.ComponentConfig.ClientConnection, o.Master, c.ComponentConfig.LeaderElection.RenewDeadline.Duration)

// Prepare event clients.
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, corev1.EventSource{Component: c.ComponentConfig.SchedulerName})

// Set up leader election if enabled.
var leaderElectionConfig *leaderelection.LeaderElectionConfig
if c.ComponentConfig.LeaderElection.LeaderElect {
leaderElectionConfig, err = makeLeaderElectionConfig(c.ComponentConfig.LeaderElection, leaderElectionClient, recorder)
}

c.Client = client
c.InformerFactory = informers.NewSharedInformerFactory(client, 0)
c.PodInformer = factory.NewPodInformer(client, 0)
c.EventClient = eventClient
c.Recorder = recorder
c.Broadcaster = eventBroadcaster
c.LeaderElection = leaderElectionConfig

return c, nil
}

结构体为 config.CompletedConfig ,里面内嵌了 config.Config 结构体

k8s.io/kubernetes/cmd/kube-scheduler/app/config/config.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
 type completedConfig struct {
*Config
}

// CompletedConfig same as Config, just to swap private object.
type CompletedConfig struct {
// Embed a private pointer that cannot be instantiated outside of this package.
// 嵌入一个私有的指针,避免在包外初始化
*completedConfig
}

// Config has all the context to run a Scheduler
type Config struct {
// config is the scheduler server's configuration object.
ComponentConfig kubeschedulerconfig.KubeSchedulerConfiguration

// 认证相关的
Authentication apiserver.AuthenticationInfo
Authorization apiserver.AuthorizationInfo

Client clientset.Interface // k8s client
InformerFactory informers.SharedInformerFactory // infromer factory
PodInformer coreinformers.PodInformer // pod informer
EventClient v1core.EventsGetter
Recorder record.EventRecorder
Broadcaster record.EventBroadcaster

// LeaderElection is optional.
LeaderElection *leaderelection.LeaderElectionConfig
}

其中 config.Config 内嵌了 KubeSchedulerConfiguration 的变量,结构体定义如下

k8s.io/kubernetes/pkg/scheduler/apis/config/types.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// KubeSchedulerConfiguration configures a scheduler
type KubeSchedulerConfiguration struct {
metav1.TypeMeta

// SchedulerName is name of the scheduler, used to select which pods
// will be processed by this scheduler, based on pod's "spec.SchedulerName".
// 调度器的名字,用于用户选择不同的调度器
SchedulerName string

// AlgorithmSource specifies the scheduler algorithm source.
// 指定算法的来源
AlgorithmSource SchedulerAlgorithmSource

// RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule
// corresponding to every RequiredDuringScheduling affinity rule.
// HardPodAffinitySymmetricWeight represents the weight of implicit PreferredDuringScheduling affinity rule, in the range 0-100.
HardPodAffinitySymmetricWeight int32

// LeaderElection defines the configuration of leader election client.
LeaderElection KubeSchedulerLeaderElectionConfiguration

// ClientConnection specifies the kubeconfig file and client connection
// settings for the proxy server to use when communicating with the apiserver.

ClientConnection apimachineryconfig.ClientConnectionConfiguration
// HealthzBindAddress is the IP address and port for the health check server to serve on,
// defaulting to 0.0.0.0:10251
HealthzBindAddress string

// MetricsBindAddress is the IP address and port for the metrics server to
// serve on, defaulting to 0.0.0.0:10251.
MetricsBindAddress string

// DisablePreemption disables the pod preemption feature.
// 是否禁止强制调度
DisablePreemption bool

// 每次 Pod 调度的时候获取到整体 Node 的比例,函数 minFeasibleNodesToFind 负责处理,一般使用在
// 大集群中, 低于 100 的总是返回全部 Node 进行匹配;如果 500 个 Node,设置 30,则搜索 500 * 30/100 = 150,每次搜索 150 个 Node 则停止
PercentageOfNodesToScore int32

BindTimeoutSeconds *int64
}

config.Complete 函数针对了结构体的封装:

k8s.io/kubernetes/cmd/kube-scheduler/app/config/config.go

1
2
3
4
5
6
7
8
9
10
11
12
13
// Complete fills in any fields not set that are required to have valid data. It's mutating the receiver.
func (c *Config) Complete() CompletedConfig {
cc := completedConfig{c}

if c.InsecureServing != nil {
c.InsecureServing.Name = "healthz"
}
if c.InsecureMetricsServing != nil {
c.InsecureMetricsServing.Name = "metrics"
}

return CompletedConfig{&cc}
}

配置初始化-之 scheduler.Config

切换整体,回到 Run 函数的初始化函数 schedulerConfig, err := NewSchedulerConfig(c)

  • configFactory 对应工厂模式的工厂模型,根据不同的配置和参数生成 config,当然事先会准备好 config 需要的各种数据
  • config 是调度器中最重要的组件,里面实现了调度的各个组件逻辑
  • scheduler 使用 config 提供的功能来完成调度

该函数完成了 config.CompletedConfigscheduler.Config 的转换和初始化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// NewSchedulerConfig creates the scheduler configuration. 
func NewSchedulerConfig(s schedulerserverconfig.CompletedConfig) (*scheduler.Config, error) {

// Set up the configurator which can create schedulers from configs.
// 创建 configurator,用于生成各种需要的 config
// 返回 configFactory, 文件定义在 k8s.io/kubernetes/pkg/scheduler/factory/factory.go
// NewConfigFactory() -> 初始化各种 informer,比较重要的是 PodInformer
// unscheduled pod queue
/*
args.PodInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool { // 用于过滤 pod.Spec.NodeName 为空等条件 Pod
switch t := obj.(type) {
case *v1.Pod:
return unassignedNonTerminatedPod(t) && responsibleForPod(t, args.SchedulerName)
case cache.DeletedFinalStateUnknown:
if pod, ok := t.Obj.(*v1.Pod); ok {
return unassignedNonTerminatedPod(pod) && responsibleForPod(pod, args.SchedulerName)
}
runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
return false
default:
runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: c.addPodToSchedulingQueue,
UpdateFunc: c.updatePodInSchedulingQueue,
DeleteFunc: c.deletePodFromSchedulingQueue,
},
},
)
*/
configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
SchedulerName: s.ComponentConfig.SchedulerName,
Client: s.Client,
NodeInformer: s.InformerFactory.Core().V1().Nodes(),
PodInformer: s.PodInformer,
PvInformer: s.InformerFactory.Core().V1().PersistentVolumes(),
PvcInformer: s.InformerFactory.Core().V1().PersistentVolumeClaims(),
ReplicationControllerInformer: s.InformerFactory.Core().V1().ReplicationControllers(),
ReplicaSetInformer: s.InformerFactory.Apps().V1().ReplicaSets(),
StatefulSetInformer: s.InformerFactory.Apps().V1().StatefulSets(),
ServiceInformer: s.InformerFactory.Core().V1().Services(),
PdbInformer: s.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
StorageClassInformer: storageClassInformer,
HardPodAffinitySymmetricWeight: s.ComponentConfig.HardPodAffinitySymmetricWeight,
EnableEquivalenceClassCache: utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
DisablePreemption: s.ComponentConfig.DisablePreemption,
PercentageOfNodesToScore: s.ComponentConfig.PercentageOfNodesToScore,
BindTimeoutSeconds: *s.ComponentConfig.BindTimeoutSeconds,
})

// https://kubernetes.io/docs/reference/command-line-tools-reference/kube-scheduler/
source := s.ComponentConfig.AlgorithmSource // KubeSchedulerConfiguration`
var config *scheduler.Config
switch {
// 创建调度算法有两种方式 Provider 和 用户指定的 Ploicy 文件(文件或Configmap)
// --algorithm-provider string
// DEPRECATED: the scheduling algorithm provider to use, one of:
// ClusterAutoscalerProvider | DefaultProvider
case source.Provider != nil:
// Create the config from a named algorithm provider.
// 使用前面创建的 configurator 创建命名的算法 provider 配置
// ClusterAutoscalerProvider | DefaultProvider
sc, err := configurator.CreateFromProvider(*source.Provider)
config = sc

// --policy-config-file string
//DEPRECATED: file with scheduler policy configuration. This file is used if policy ConfigMap is not provided or --use-legacy-policy-config=true

case source.Policy != nil:
// Create the config from a user specified policy source.
policy := &schedulerapi.Policy{}
switch {
case source.Policy.File != nil:
// Use a policy serialized in a file.
policyFile := source.Policy.File.Path
data, err := ioutil.ReadFile(policyFile)

err = runtime.DecodeInto(latestschedulerapi.Codec, []byte(data), policy)

// 从 configmap 初始化
case source.Policy.ConfigMap != nil:
// 处理流程同上
}

sc, err := configurator.CreateFromConfig(*policy)
config = sc
default:
return nil, fmt.Errorf("unsupported algorithm source: %v", source)
}
// Additional tweaks to the config produced by the configurator.
config.Recorder = s.Recorder

config.DisablePreemption = s.ComponentConfig.DisablePreemption
return config, nil
}

无论是函数 CreateFromProvider 还是 CreateFromConfig 底层都是调用 函数 func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error)

该函数输入 predicateKeys/priorityKeys/SchedulerExtender,最终返回 scheduler.Config对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// Creates a scheduler from a set of registered fit predicate keys and priority keys.
func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) {
// ...

predicateFuncs, err := c.GetPredicates(predicateKeys)
priorityConfigs, err := c.GetPriorityFunctionConfigs(priorityKeys)
priorityMetaProducer, err := c.GetPriorityMetadataProducer()
predicateMetaProducer, err := c.GetPredicateMetadataProducer()


// Init equivalence class cache
if c.enableEquivalenceClassCache {
c.equivalencePodCache = equivalence.NewCache()
glog.Info("Created equivalence class cache")
}

// 完成真正算法的初始化
algo := core.NewGenericScheduler(
c.schedulerCache,
c.equivalencePodCache,
c.podQueue,
predicateFuncs,
predicateMetaProducer,
priorityConfigs,
priorityMetaProducer,
extenders,
c.volumeBinder,
c.pVCLister,
c.alwaysCheckAllPredicates,
c.disablePreemption,
c.percentageOfNodesToScore,
)

podBackoff := util.CreateDefaultPodBackoff()
return &scheduler.Config{
SchedulerCache: c.schedulerCache,
Ecache: c.equivalencePodCache,
// The scheduler only needs to consider schedulable nodes.
NodeLister: &nodeLister{c.nodeLister},
Algorithm: algo, // 设置调度的算法
GetBinder: c.getBinderFunc(extenders),
PodConditionUpdater: &podConditionUpdater{c.client},
PodPreemptor: &podPreemptor{c.client},
WaitForCacheSync: func() bool {
return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced)
},
NextPod: func() *v1.Pod { // 设定 NextPod d的
return c.getNextPod()
},
Error: c.MakeDefaultErrorFunc(podBackoff, c.podQueue),
StopEverything: c.StopEverything,
VolumeBinder: c.volumeBinder,
SchedulingQueue: c.podQueue,
}, nil
}

到此完成了 scheduler.Config 的核心数据的初始化,特别是调度算法 Algorithm:algo

scheduleOne

k8s.io/kubernetes/pkg/scheduler/scheduler.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// scheduleOne does the entire scheduling workflow for a single pod.  It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne() {
// 从队列中取出待调度的 Pod,最终调用的函数为 configFactory.getNextPod()
pod := sched.config.NextPod()

// 检查是否是被删除的,如果是则跳过

// Synchronously attempt to find a fit for the pod.
start := time.Now()

// 获取待调度 Pod 匹配的主机名
suggestedHost, err := sched.schedule(pod) // 见后续分析
if err != nil {
// schedule() 可能因为没有满足调度的 Node 而失败,后续进行 preempt,
if fitError, ok := err.(*core.FitError); ok {
preemptionStartTime := time.Now()
sched.preempt(pod, fitError)
}
return
}
// Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
// This allows us to keep scheduling without waiting on binding to occur.
assumedPod := pod.DeepCopy()

// 判断是否需要VolumeScheduling特性
// Assume volumes first before assuming the pod. If all volumes are completely bound, then allBound is true and binding will be skipped. Otherwise, binding of volumes is started after the pod is assumed, but before pod binding. This function modifies 'assumedPod' if volume binding is required.
allBound, err := sched.assumeVolumes(assumedPod, suggestedHost)
if err != nil {
return
}

// assume modifies `assumedPod` by setting NodeName=suggestedHost
// Pod 对应的 NodeName 写上主机名,存入缓存
err = sched.assume(assumedPod, suggestedHost)

// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
// 请求apiserver,异步处理最终的绑定,写入到etcd
go func() {
// Bind volumes first before Pod
if !allBound {
err = sched.bindVolumes(assumedPod)
}

err := sched.bind(assumedPod, &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
Target: v1.ObjectReference{
Kind: "Node",
Name: suggestedHost,
},
})
metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
}()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// schedule implements the scheduling algorithm and returns the suggested host.
func (sched *Scheduler) schedule(pod *v1.Pod) (string, error) {
// 调用算法提供的调度器进行调度,为 GenericScheduler
host, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister)
// ...
}

// sched.config.Algorithm = core.NewGenericScheduler()
// k8s.io/kubernetes/pkg/scheduler/factory/factory.go
// 初始化过程参见 func (c *configFactory) CreateFromKeys(){...}
//
// k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go
// NewGenericScheduler creates a genericScheduler object.
func NewGenericScheduler(
cache schedulercache.Cache,
eCache *equivalence.Cache,
podQueue SchedulingQueue,
predicates map[string]algorithm.FitPredicate,
predicateMetaProducer algorithm.PredicateMetadataProducer,
prioritizers []algorithm.PriorityConfig,
priorityMetaProducer algorithm.PriorityMetadataProducer,
extenders []algorithm.SchedulerExtender,
volumeBinder *volumebinder.VolumeBinder,
pvcLister corelisters.PersistentVolumeClaimLister,
alwaysCheckAllPredicates bool,
disablePreemption bool,
percentageOfNodesToScore int32,
) algorithm.ScheduleAlgorithm {
return &genericScheduler{
cache: cache,
equivalenceCache: eCache,
schedulingQueue: podQueue,
predicates: predicates,
predicateMetaProducer: predicateMetaProducer,
prioritizers: prioritizers,
priorityMetaProducer: priorityMetaProducer,
extenders: extenders,
cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo),
volumeBinder: volumeBinder,
pvcLister: pvcLister,
alwaysCheckAllPredicates: alwaysCheckAllPredicates,
disablePreemption: disablePreemption,
percentageOfNodesToScore: percentageOfNodesToScore,
}
}

k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go

最终完成预选和优选的最终处理的地方,还是通过调用 genericScheduler:Schedule 函数完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Schedule tries to schedule the given pod to one of the nodes in the node list.
// If it succeeds, it will return the name of the node.
// If it fails, it will return a FitError error with reasons.
func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) {

nodes, err := nodeLister.List()

// Used for all fit and priority funcs.
err = g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)

// 1. 预选机器列表,获取到 filteredNodes 列表
// 1.1 如果没有配置预选算法,则直接返回全部 Node 列表
// 1.2 如果配置了预选算法,则对多个 Node 调用 checkNode 的方法,检查 Pod 是否可以调度到该 Node
// 1.3 预选筛选之后,如果有扩展算法,则继续匹配筛选,返回最终匹配的 Node 列表
filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)


// 2. 从预选的机器列表中获取优选机器列表
metaPrioritiesInterface := g.priorityMetaProducer(pod, g.cachedNodeInfoMap)

priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)

// 选择 Scores 最高的并返回
return g.selectHost(priorityList)
}

预选函数主流程

k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// Filters the nodes to find the ones that fit based on the given predicate functions
// Each node is passed through the predicate functions to determine if it is a fit
func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
var filtered []*v1.Node
failedPredicateMap := FailedPredicateMap{}

// 如果没有预选算法,则返回全部机器列表
if len(g.predicates) == 0 {
filtered = nodes
} else {
allNodes := int32(g.cache.NodeTree().NumNodes)
// 保证一次性不用返回过多的Node数量,避免数组过大,当前有个硬编码 100, 100 以内一般全部返回,否则
// 按照 percentageOfNodesToScore 比例设置每次返回集群中多少个节点返回,以提升调度性能
// percentageOfNodesToScore 为 0或者大于100,则返回集群全部 Node
numNodesToFind := g.numFeasibleNodesToFind(allNodes)

// Create filtered list with enough space to avoid growing it
// and allow assigning.
filtered = make([]*v1.Node, numNodesToFind)

// 用于检测 Pod 是否可以调度到 Node 的检查函数
checkNode := func(i int) {
var nodeCache *equivalence.NodeCache
nodeName := g.cache.NodeTree().Next()

fits, failedPredicates, err := podFitsOnNode(
pod,
meta,
g.cachedNodeInfoMap[nodeName],
g.predicates,
g.cache,
nodeCache,
g.schedulingQueue,
g.alwaysCheckAllPredicates,
equivClass,
)

if fits {
length := atomic.AddInt32(&filteredLen, 1)
if length > numNodesToFind {
cancel()
atomic.AddInt32(&filteredLen, -1)
} else {
filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
}
}

// ...
}

// Stops searching for more nodes once the configured number of feasible nodes
// are found.
workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)

// 如果配置了 extender 算法,则使用扩展算法继续过滤
if len(filtered) > 0 && len(g.extenders) != 0 {
for _, extender := range g.extenders {
filteredList, failedMap, err := extender.Filter(pod, filtered, g.cachedNodeInfoMap)

// ...
filtered = filteredList
}
}
return filtered, failedPredicateMap, nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// podFitsOnNode checks whether a node given by NodeInfo satisfies the given predicate functions.
// For given pod, podFitsOnNode will check if any equivalent pod exists and try to reuse its cached
// predicate results as possible.
// This function is called from two different places: Schedule and Preempt.
// When it is called from Schedule, we want to test whether the pod is schedulable
// on the node with all the existing pods on the node plus higher and equal priority
// pods nominated to run on the node.
// When it is called from Preempt, we should remove the victims of preemption and
// add the nominated pods. Removal of the victims is done by SelectVictimsOnNode().
// It removes victims from meta and NodeInfo before calling this function.
func podFitsOnNode(
pod *v1.Pod,
meta algorithm.PredicateMetadata,
info *schedulercache.NodeInfo,
predicateFuncs map[string]algorithm.FitPredicate,
cache schedulercache.Cache,
nodeCache *equivalence.NodeCache,
queue SchedulingQueue,
alwaysCheckAllPredicates bool,
equivClass *equivalence.Class,
) (bool, []algorithm.PredicateFailureReason, error) {
var (
eCacheAvailable bool
failedPredicates []algorithm.PredicateFailureReason
)

podsAdded := false
// We run predicates twice in some cases. If the node has greater or equal priority
// nominated pods, we run them when those pods are added to meta and nodeInfo.
// If all predicates succeed in this pass, we run them again when these
// nominated pods are not added. This second pass is necessary because some
// predicates such as inter-pod affinity may not pass without the nominated pods.
// If there are no nominated pods for the node or if the first run of the
// predicates fail, we don't run the second pass.
// We consider only equal or higher priority pods in the first pass, because
// those are the current "pod" must yield to them and not take a space opened
// for running them. It is ok if the current "pod" take resources freed for
// lower priority pods.
// Requiring that the new pod is schedulable in both circumstances ensures that
// we are making a conservative decision: predicates like resources and inter-pod
// anti-affinity are more likely to fail when the nominated pods are treated
// as running, while predicates like pod affinity are more likely to fail when
// the nominated pods are treated as not running. We can't just assume the
// nominated pods are running because they are not running right now and in fact,
// they may end up getting scheduled to a different node.
for i := 0; i < 2; i++ {
metaToUse := meta
nodeInfoToUse := info
// 第一次调度,根据NominatedPods更新meta和nodeInfo信息,pod根据更新后的信息去预选
// 第二次调度,meta和nodeInfo信息不变,保证pod不完全依赖于NominatedPods(主要考虑到pod亲和性之类的)
if i == 0 {
podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(pod, meta, info, queue)
} else if !podsAdded || len(failedPredicates) != 0 {
break
}
// Bypass eCache if node has any nominated pods.
// TODO(bsalamat): consider using eCache and adding proper eCache invalidations
// when pods are nominated or their nominations change.
eCacheAvailable = equivClass != nil && nodeCache != nil && !podsAdded
for _, predicateKey := range predicates.Ordering() {
var (
fit bool
reasons []algorithm.PredicateFailureReason
err error
)

if predicate, exist := predicateFuncs[predicateKey]; exist {
if eCacheAvailable {
fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, pod, metaToUse, nodeInfoToUse, equivClass, cache)
} else {
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
}


if !fit {
// eCache is available and valid, and predicates result is unfit, record the fail reasons
failedPredicates = append(failedPredicates, reasons...)
// if alwaysCheckAllPredicates is false, short circuit all predicates when one predicate fails.

}
}
}
}

return len(failedPredicates) == 0, failedPredicates, nil
}

优选函数主流程

使用与预选类似的多任务同步调用方式,采用 MapReduce的思想,Map 根据不同的优选算法获取对某一 Node 的值,根据 Reduce 统计最终的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// PrioritizeNodes prioritizes the nodes by running the individual priority functions in parallel.
// Each priority function is expected to set a score of 0-10
// 0 is the lowest priority score (least preferred node) and 10 is the highest
// Each priority function can also have its own weight
// The node scores returned by the priority function are multiplied by the weights to get weighted scores
// All scores are finally combined (added) to get the total weighted scores of all nodes
func PrioritizeNodes(
pod *v1.Pod,
nodeNameToInfo map[string]*schedulercache.NodeInfo,
meta interface{},
priorityConfigs []algorithm.PriorityConfig,
nodes []*v1.Node,
extenders []algorithm.SchedulerExtender,
) (schedulerapi.HostPriorityList, error) {
// If no priority configs are provided, then the EqualPriority function is applied
// This is required to generate the priority list in the required format
if len(priorityConfigs) == 0 && len(extenders) == 0 {
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
for i := range nodes {
hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
if err != nil {
return nil, err
}
result = append(result, hostPriority)
}
return result, nil
}

var (
mu = sync.Mutex{}
wg = sync.WaitGroup{}
errs []error
)

results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))

for i, priorityConfig := range priorityConfigs {
if priorityConfig.Function != nil {
wg.Add(1)
go func(index int, config algorithm.PriorityConfig) {
defer wg.Done()
var err error
results[index], err = config.Function(pod, nodeNameToInfo, nodes)
if err != nil {
appendError(err)
}
}(i, priorityConfig)
} else {
results[i] = make(schedulerapi.HostPriorityList, len(nodes))
}
}

processNode := func(index int) {
nodeInfo := nodeNameToInfo[nodes[index].Name]
var err error
for i := range priorityConfigs {
// 使用 map 函数计算过程数据
results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
}
}
workqueue.Parallelize(16, len(nodes), processNode)
for i, priorityConfig := range priorityConfigs {
wg.Add(1)
go func(index int, config algorithm.PriorityConfig) {
// 使用 reduce 函数结算结果
if err := config.Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
appendError(err)
}
}(i, priorityConfig)
}
// Wait for all computations to be finished.
wg.Wait()
// Summarize all scores.
result := make(schedulerapi.HostPriorityList, 0, len(nodes))

for i := range nodes {
result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
for j := range priorityConfigs {
result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
}
}

// 继续使用 extenders 来进行优选
if len(extenders) != 0 && nodes != nil {
combinedScores := make(map[string]int, len(nodeNameToInfo))
for _, extender := range extenders {
wg.Add(1)
go func(ext algorithm.SchedulerExtender) {
defer wg.Done()
prioritizedList, weight, err := ext.Prioritize(pod, nodes)
if err != nil {
// Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
return
}
mu.Lock()
for i := range *prioritizedList {
host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
combinedScores[host] += score * weight
}
mu.Unlock()
}(extender)
}
// wait for all go routines to finish
wg.Wait()
for i := range result {
result[i].Score += combinedScores[result[i].Host]
}
}

return result, nil
}

算法初始化

k8s.io/kubernetes/pkg/scheduler/algorithmprovider/plugins.go 函数 ApplyFeatureGates

1
2
3
4
5
6
7
8
9
10
package algorithmprovider

import (
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults"
)

// ApplyFeatureGates applies algorithm by feature gates.
func ApplyFeatureGates() {
defaults.ApplyFeatureGates()
}

k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/defaults.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
func init() {
// Register functions that extract metadata used by predicates and priorities computations.
factory.RegisterPredicateMetadataProducerFactory(
func(args factory.PluginFactoryArgs) algorithm.PredicateMetadataProducer {
return predicates.NewPredicateMetadataFactory(args.PodLister)
})

factory.RegisterPriorityMetadataProducerFactory(
func(args factory.PluginFactoryArgs) algorithm.PriorityMetadataProducer {
return priorities.NewPriorityMetadataFactory(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister)
})

// 注册默认的预选算法和优选算法
registerAlgorithmProvider(defaultPredicates(), defaultPriorities())

// IMPORTANT NOTES for predicate developers:
// We are using cached predicate result for pods belonging to the same equivalence class.
// So when implementing a new predicate, you are expected to check whether the result
// of your predicate function can be affected by related API object change (ADD/DELETE/UPDATE).
// If yes, you are expected to invalidate the cached predicate result for related API object change.
// For example:
// https://github.com/kubernetes/kubernetes/blob/36a218e/plugin/pkg/scheduler/factory/factory.go#L422

// Registers predicates and priorities that are not enabled by default, but user can pick when creating their
// own set of priorities/predicates.

// PodFitsPorts has been replaced by PodFitsHostPorts for better user understanding.
// For backwards compatibility with 1.0, PodFitsPorts is registered as well.
factory.RegisterFitPredicate("PodFitsPorts", predicates.PodFitsHostPorts)
// Fit is defined based on the absence of port conflicts.
// This predicate is actually a default predicate, because it is invoked from
// predicates.GeneralPredicates()
factory.RegisterFitPredicate(predicates.PodFitsHostPortsPred, predicates.PodFitsHostPorts)
// Fit is determined by resource availability.
// This predicate is actually a default predicate, because it is invoked from
// predicates.GeneralPredicates()
factory.RegisterFitPredicate(predicates.PodFitsResourcesPred, predicates.PodFitsResources)
// Fit is determined by the presence of the Host parameter and a string match
// This predicate is actually a default predicate, because it is invoked from
// predicates.GeneralPredicates()
factory.RegisterFitPredicate(predicates.HostNamePred, predicates.PodFitsHost)
// Fit is determined by node selector query.
factory.RegisterFitPredicate(predicates.MatchNodeSelectorPred, predicates.PodMatchNodeSelector)

// ServiceSpreadingPriority is a priority config factory that spreads pods by minimizing
// the number of pods (belonging to the same service) on the same node.
// Register the factory so that it's available, but do not include it as part of the default priorities
// Largely replaced by "SelectorSpreadPriority", but registered for backward compatibility with 1.0
factory.RegisterPriorityConfigFactory(
"ServiceSpreadingPriority",
factory.PriorityConfigFactory{
MapReduceFunction: func(args factory.PluginFactoryArgs) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {
return priorities.NewSelectorSpreadPriority(args.ServiceLister, algorithm.EmptyControllerLister{}, algorithm.EmptyReplicaSetLister{}, algorithm.EmptyStatefulSetLister{})
},
Weight: 1,
},
)
// EqualPriority is a prioritizer function that gives an equal weight of one to all nodes
// Register the priority function so that its available
// but do not include it as part of the default priorities
factory.RegisterPriorityFunction2("EqualPriority", core.EqualPriorityMap, nil, 1)
// Optional, cluster-autoscaler friendly priority function - give used nodes higher priority.
factory.RegisterPriorityFunction2("MostRequestedPriority", priorities.MostRequestedPriorityMap, nil, 1)
factory.RegisterPriorityFunction2(
"RequestedToCapacityRatioPriority",
priorities.RequestedToCapacityRatioResourceAllocationPriorityDefault().PriorityMap,
nil,
1)
}

预选算法

k8s.io/kubernetes/pkg/scheduler/algorithm/predicates/predicates.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// IMPORTANT NOTE: this list contains the ordering of the predicates, if you develop a new predicate
// it is mandatory to add its name to this list.
// Otherwise it won't be processed, see generic_scheduler#podFitsOnNode().
// The order is based on the restrictiveness & complexity of predicates.
// Design doc: https://github.com/kubernetes/community/blob/master/contributors/design-proposals/scheduling/predicates-ordering.md

// 预选算法的顺序
var (
predicatesOrdering = []string{CheckNodeConditionPred, CheckNodeUnschedulablePred,
GeneralPred, HostNamePred, PodFitsHostPortsPred,
MatchNodeSelectorPred, PodFitsResourcesPred, NoDiskConflictPred,
PodToleratesNodeTaintsPred, PodToleratesNodeNoExecuteTaintsPred, CheckNodeLabelPresencePred,
CheckServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred, MaxCSIVolumeCountPred,
MaxAzureDiskVolumeCountPred, CheckVolumeBindingPred, NoVolumeZoneConflictPred,
CheckNodeMemoryPressurePred, CheckNodePIDPressurePred, CheckNodeDiskPressurePred, MatchInterPodAffinityPred}
)

k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/defaults.go

1
2
3
4
5
6
7
8
9
10
func defaultPredicates() sets.String {
return sets.NewString(
// ...

// Fit is determined by node conditions: not ready,
// network unavailable or out of disk.
factory.RegisterMandatoryFitPredicate(predicates.CheckNodeConditionPred, predicates.CheckNodeConditionPredicate), // 见下文函数分析

// ...
}

k8s.io/kubernetes/pkg/scheduler/algorithm/predicates/predicates.go

CheckNodeConditionPredicate 函数用于判断 Node 的状态是否符合预期:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// CheckNodeConditionPredicate checks if a pod can be scheduled on a node reporting out of disk,
// network unavailable and not ready condition. Only node conditions are accounted in this predicate.
func CheckNodeConditionPredicate(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
reasons := []algorithm.PredicateFailureReason{}

if nodeInfo == nil || nodeInfo.Node() == nil {
return false, []algorithm.PredicateFailureReason{ErrNodeUnknownCondition}, nil
}

node := nodeInfo.Node()
for _, cond := range node.Status.Conditions {
// We consider the node for scheduling only when its:
// - NodeReady condition status is ConditionTrue,
// - NodeOutOfDisk condition status is ConditionFalse,
// - NodeNetworkUnavailable condition status is ConditionFalse.
if cond.Type == v1.NodeReady && cond.Status != v1.ConditionTrue {
reasons = append(reasons, ErrNodeNotReady)
} else if cond.Type == v1.NodeOutOfDisk && cond.Status != v1.ConditionFalse {
reasons = append(reasons, ErrNodeOutOfDisk)
} else if cond.Type == v1.NodeNetworkUnavailable && cond.Status != v1.ConditionFalse {
reasons = append(reasons, ErrNodeNetworkUnavailable)
}
}

if node.Spec.Unschedulable {
reasons = append(reasons, ErrNodeUnschedulable)
}

return len(reasons) == 0, reasons, nil
}

其中函数 RegisterFitPredicate 是对 RegisterFitPredicateFactory 的一层封装:

1
2
3
4
5
// RegisterFitPredicate registers a fit predicate with the algorithm
// registry. Returns the name with which the predicate was registered.
func RegisterFitPredicate(name string, predicate algorithm.FitPredicate) string {
return RegisterFitPredicateFactory(name, func(PluginFactoryArgs) algorithm.FitPredicate { return predicate })
}

优选算法 - 重点

k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/defaults.go

序号 优选策略 策略说明
1 SelectorSpreadPriority 尽量将属于同一 Service,StatefulSet 或 ReplicaSet 的 Pod 跨主机调度
2 InterPodAffinityPriority 基于 Pod 亲和情况打分, 通过循环计算 weightedPodAffinityTerm 的和,如果该节点满足相应的PodAffinityTerm,则在总和中添加 “权重” 来计算总和;总和最高的节点是最优选的
3 LeastRequestedPriority 计算 Pod 需要的 CPU 和内存资源与在 Node 可用资源的百分比,具有最小百分比的节点就是最优
4 BalancedResourceAllocation 根据 Node 上各项资源(CPU、内存) 使用率均衡情况进行打分
5 NodePreferAvoidPodsPriority 根据节点注释 scheduler.alpha.kubernetes.io/preferAvoidPods 对节点进行优先级排序。您可以使用它来暗示两个不同的Pod不应在同一节点上运行
6 NodeAffinityPriority 根据 PreferredDuringSchedulingIgnoredDuringExecution 中指示的节点相似性调度首选项对节点进行优先级排序。您可以在将Pod分配给节点中了解有关此内容的更多信息
7 TaintTolerationPriority 根据节点上无法忍受的污点数量,为所有节点准备优先级列表。该策略会根据该列表来调整节点的排名
8 ImageLocalityPriority 根据节点上无法忍受的污点数量,为所有节点准备优先级列表。该策略会根据该列表来调整节点的排名
以上是 default priotirty 函数注册
9 ServiceSpreadingPriority 对于给定的服务的 Pod 分配到不同的节点上运行,它有利于安排到尚未在其中分配了服务的 Pod 的节点上进行调度。总体结果是,该服务对于单个节点故障变得更具弹性
10 EqualPriority 对所有节点给予相等的权重
11 MostRequestedPriority 根据 Node 上所提供的资源进行打分;使用请求最多的资源来支持节点。此策略将使计划的 Pod 适应运行整体工作负载所需的最少数量的节点
12 RequestedToCapacityRatioPriority 使用默认资源评分功能形状创建基于 requestToCapacity 的ResourceAllocationPriority
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func defaultPriorities() sets.String {
return sets.NewString(
// spreads pods by minimizing the number of pods (belonging to the same service or replication controller) on the same node.
factory.RegisterPriorityConfigFactory(
"SelectorSpreadPriority",
factory.PriorityConfigFactory{
MapReduceFunction: func(args factory.PluginFactoryArgs) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {
return priorities.NewSelectorSpreadPriority(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister)
},
Weight: 1,
},
),
// pods should be placed in the same topological domain (e.g. same node, same rack, same zone, same power domain, etc.)
// as some other pods, or, conversely, should not be placed in the same topological domain as some other pods.
factory.RegisterPriorityConfigFactory(
"InterPodAffinityPriority",
factory.PriorityConfigFactory{
Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction {
return priorities.NewInterPodAffinityPriority(args.NodeInfo, args.NodeLister, args.PodLister, args.HardPodAffinitySymmetricWeight)
},
Weight: 1,
},
),

// Prioritize nodes by least requested utilization.
factory.RegisterPriorityFunction2("LeastRequestedPriority", priorities.LeastRequestedPriorityMap, nil, 1),

// Prioritizes nodes to help achieve balanced resource usage
factory.RegisterPriorityFunction2("BalancedResourceAllocation", priorities.BalancedResourceAllocationMap, nil, 1),

// Set this weight large enough to override all other priority functions.
// TODO: Figure out a better way to do this, maybe at same time as fixing #24720.
factory.RegisterPriorityFunction2("NodePreferAvoidPodsPriority", priorities.CalculateNodePreferAvoidPodsPriorityMap, nil, 10000),

// Prioritizes nodes that have labels matching NodeAffinity
factory.RegisterPriorityFunction2("NodeAffinityPriority", priorities.CalculateNodeAffinityPriorityMap, priorities.CalculateNodeAffinityPriorityReduce, 1),

// Prioritizes nodes that marked with taint which pod can tolerate.
factory.RegisterPriorityFunction2("TaintTolerationPriority", priorities.ComputeTaintTolerationPriorityMap, priorities.ComputeTaintTolerationPriorityReduce, 1),

// ImageLocalityPriority prioritizes nodes that have images requested by the pod present.
factory.RegisterPriorityFunction2("ImageLocalityPriority", priorities.ImageLocalityPriorityMap, nil, 1),
)
}

img 来源https://ggaaooppeenngg.github.io/zh-CN/2017/09/26/kubernetes-%E6%8C%87%E5%8C%97/Schedule.jpeg

Preempt

当通过正常的调度流程如果没有找到合适的节点(主要是预选没有合适的节点),会判断需不需要进行抢占调度,具体的代码在pkg/scheduler/scheduler.go文件下,用到的方法preempt

自定义调度器

kube-scheduler 在启动的时候可以通过 --policy-config-file 参数可以指定调度策略文件,用户可以根据需要组装 predicates 和 priority 函数。配置多个调度器参见文档:Configure Multiple Schedulers

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
"kind" : "Policy",
"apiVersion" : "v1",
"predicates" : [
{"name" : "PodFitsHostPorts"},
{"name" : "PodFitsResources"},
{"name" : "NoDiskConflict"},
{"name" : "NoVolumeZoneConflict"},
{"name" : "MatchNodeSelector"},
{"name" : "HostName"}
],
"priorities" : [
{"name" : "LeastRequestedPriority", "weight" : 1},
{"name" : "BalancedResourceAllocation", "weight" : 1},
{"name" : "ServiceSpreadingPriority", "weight" : 1},
{"name" : "EqualPriority", "weight" : 1}
],
"hardPodAffinitySymmetricWeight" : 10
}

当然我们也可以自己编写 predicate 或 priority 函数,并完成注册

1
2
3
4
5
6
7
// predicate 
type FitPredicate func(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []PredicateFailureReason, error)

// 完成注册
factory.RegisterFitPredicate("MyFunc", predicates.PodFitsHostPorts)

// 自定义 policy 文件使用即可

Pod 可以通过 spec.schedulername 字段指定特定的调度器;

调取器的名字并没有统一保存在 apiserver 中进行统一管理,而是每个调取器去 apiserver 中获取和自己名字一直的 pod 来调度。也就是说,调度器是自己管理名字的,因此做到不冲突而且逻辑正确是每个调度器的工作。

一个非常简单的 shell 调度器,它通过 kubectl 命令从 apiserver 获取未调度的 pod(spec.schedulerNamemy-scheduler,并且spec.nodeName 为空),同样地,用 kubectl 从 apiserver 获取 nodes 的信息,然后随机选择一个 node 作为调度结果,并写入到 apiserver 中。更加详细的可以参见 https://github.com/kelseyhightower/schedulerWriting custom Kubernetes schedulers,还有 https://github.com/martonsereg/random-scheduler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#!/bin/bash
SERVER='localhost:8001'
while true;
do
for PODNAME in $(kubectl --server $SERVER get pods -o json | jq '.items[] | select(.spec.schedulerName == "my-scheduler") | select(.spec.nodeName == null) | .metadata.name' | tr -d '"')
;
do
NODES=($(kubectl --server $SERVER get nodes -o json | jq '.items[].metadata.name' | tr -d '"'))
NUMNODES=${#NODES[@]}
CHOSEN=${NODES[$[ $RANDOM % $NUMNODES ]]}
curl --header "Content-Type:application/json" --request POST --data '{"apiVersion":"v1", "kind": "Binding", "metadata": {"name": "'$PODNAME'"}, "target": {"apiVersion": "v1", "kind"
: "Node", "name": "'$CHOSEN'"}}' http://$SERVER/api/v1/namespaces/default/pods/$PODNAME/binding/
echo "Assigned $PODNAME to $CHOSEN"
done
sleep 1
done

参考

  1. Kubernetes Scheduler
  2. Kubernetes源码分析之kube-scheduler
  3. kubelet scheduler 源码分析:调度器的工作原理
  4. Kubernetes Scheduler 源码全解析(附流程图)
  5. k8s-调度算法
  6. A toy kubernetes scheduler
  7. Kubernetes 调度器介绍
CATALOG
  1. 1. 主要功能
  2. 2. 核心主流程
    1. 2.1. 启动
    2. 2.2. Run() 函数入口
    3. 2.3. 配置初始化-之 config.CompletedConfig
    4. 2.4. 配置初始化-之 scheduler.Config
    5. 2.5. scheduleOne
    6. 2.6. 预选函数主流程
    7. 2.7. 优选函数主流程
  3. 3. 算法初始化
  4. 4. 预选算法
  5. 5. 优选算法 - 重点
  6. 6. Preempt
  7. 7. 自定义调度器
  8. 8. 参考