本文是对Kubernetes 1.5的Scheduler源码层面的剖析,包括对应的源码目录结构分析、kube-scheduler运行机制分析、整体代码流程图、核心代码走读分析等内容。阅读本文前,请先了解。
Kubernetes源码目录结构分析
Kubernetes Scheduler是作为kubernetes的一个plugin来设计的,这种可插拔的设计极大方便用户自定义调度算法,在不同的公司,通常大家对调度的需求是不同的,自定义调度是很常见的。
Scheduler的源码主要在k8s.io/kubernetes/plugin/
目录下,其中两个目录cmd/scheduler和pkg/scheduler分别定义了kube-scheduler command的参数封装和app启动运行和scheduler的具体内部实现。具体的目录结构分析如下所示。
k8s.io/kubernetes/plugin/.├── cmd│ └── kube-scheduler // kube-scheduler command的相关代码│ ├── app // kube-scheduler app的启动│ │ ├── options │ │ │ └── options.go // 封装SchedulerServer对象和AddFlags方法│ │ └── server.go // 定义SchedulerServer的config封装和Run方法│ └── scheduler.go // kube-scheduler main方法入口└── pkg ├── scheduler // scheduler后端核心代码 │ ├── algorithm │ │ ├── doc.go │ │ ├── listers.go // 定义NodeLister和PodLister等Interface │ │ ├── predicates // 定义kubernetes自带的Predicates Policies的Function实现 │ │ │ ├── error.go │ │ │ ├── metadata.go │ │ │ ├── predicates.go // 自带Predicates Policies的主要实现 │ │ │ ├── predicates_test.go │ │ │ ├── utils.go │ │ │ └── utils_test.go │ │ ├── priorities // 定义kubernetes自带的Priorities Policies的Function实现 │ │ │ ├── balanced_resource_allocation.go // defaultProvider - BalancedResourceAllocation │ │ │ ├── balanced_resource_allocation_test.go │ │ │ ├── image_locality.go // defaultProvider - ImageLocalityPriority │ │ │ ├── image_locality_test.go │ │ │ ├── interpod_affinity.go // defaultProvider - InterPodAffinityPriority │ │ │ ├── interpod_affinity_test.go │ │ │ ├── least_requested.go // defaultProvider - LeastRequestedPriority │ │ │ ├── least_requested_test.go │ │ │ ├── metadata.go // priorityMetadata定义 │ │ │ ├── most_requested.go // defaultProvider - MostRequestedPriority │ │ │ ├── most_requested_test.go │ │ │ ├── node_affinity.go // defaultProvider - NodeAffinityPriority │ │ │ ├── node_affinity_test.go │ │ │ ├── node_label.go // 当policy.Argument.LabelPreference != nil时,会注册该Policy │ │ │ ├── node_label_test.go │ │ │ ├── node_prefer_avoid_pods.go // defaultProvider - NodePreferAvoidPodsPriority │ │ │ ├── node_prefer_avoid_pods_test.go │ │ │ ├── selector_spreading.go // defaultProvider - SelectorSpreadPriority │ │ │ ├── selector_spreading_test.go │ │ │ ├── taint_toleration.go // defaultProvider - TaintTolerationPriority │ │ │ ├── taint_toleration_test.go │ │ │ ├── test_util.go │ │ │ └── util // 工具类 │ │ │ ├── non_zero.go │ │ │ ├── topologies.go │ │ │ └── util.go │ │ ├── scheduler_interface.go // 定义SchedulerExtender和ScheduleAlgorithm Interface │ │ ├── scheduler_interface_test.go │ │ └── types.go // 定义了Predicates和Priorities Algorithm要实现的方法类型(FitPredicate, PriorityMapFunction) │ ├── algorithmprovider // algorithm-provider参数配置的项 │ │ ├── defaults │ │ │ ├── compatibility_test.go │ │ │ └── defaults.go // "DefaultProvider"的实现 │ │ ├── plugins.go // 空,预留自定义 │ │ └── plugins_test.go │ ├── api // 定义Scheduelr API接口和对象,用于SchedulerExtender处理来自HTTPExtender的请求。 │ │ ├── latest │ │ │ └── latest.go │ │ ├── register.go │ │ ├── types.go // 定义Policy, PredicatePolicy,PriorityPolicy等 │ │ ├── v1 │ │ │ ├── register.go │ │ │ └── types.go │ │ └── validation │ │ ├── validation.go // 验证Policy的定义是否合法 │ │ └── validation_test.go │ ├── equivalence_cache.go // │ ├── extender.go // 定义HTTPExtender的新建以及对应的Filter和Prioritize方法来干预预选和优选 │ ├── extender_test.go │ ├── factory // 根据配置的Policies注册和匹配到对应的预选(FitPredicateFactory)和优选(PriorityFunctionFactory2)函数 │ │ ├── factory.go // 核心是定义ConfigFactory来工具配置完成scheduler的封装函数,最关键的CreateFromConfig和CreateFromKeys │ │ ├── factory_test.go │ │ ├── plugins.go // 核心是定义注册自定义预选和优选Policy的方法 │ │ └── plugins_test.go │ ├── generic_scheduler.go // 定义genericScheduler,其Schedule(...)方法作为调度执行的真正开始的地方 │ ├── generic_scheduler_test.go │ ├── metrics // 支持注册metrics到Prometheus │ │ └── metrics.go │ ├── scheduler.go // 定义Scheduler及Run(),核心的scheduleOne()方法也在此,scheduleOne()一个完成的调度流程,包括或许待调度Pod、调度、Bind等 │ ├── scheduler_test.go │ ├── schedulercache │ │ ├── cache.go // 定义schedulerCache对Pod,Node,以及Bind的CURD,以及超时维护等工作 │ │ ├── cache_test.go │ │ ├── interface.go // schedulerCache要实现的Interface │ │ ├── node_info.go // 定义NodeInfo及其相关Opertation │ │ └── util.go │ └── testing │ ├── fake_cache.go │ └── pods_to_cache.go
Kube-scheduler运行机制分析
-
kube-scheduler作为kubernetes master上一个单独的进程提供调度服务,通过--master指定kube-api-server的地址,用来watch pod和node和调用api server bind接口完成node和pod的Bind操作。
-
kube-scheduler中维护了一个FIFO类型的PodQueue cache,新创建的Pod都会被ConfigFactory watch到,被添加到该PodQueue中,每次调度都从该PodQueue中getNextPod作为即将调度的Pod。
-
获取到待调度的Pod后,就执行AlgorithmProvider配置Algorithm的Schedule方法进行调度,整个调度过程分两个关键步骤:Predicates和Priorities,最终选出一个最适合该Pod借宿的Node返回。
-
更新SchedulerCache中Pod的状态(AssumePod),标志该Pod为scheduled,并更新到最有NodeInfo中。
-
调用api server的Bind接口,完成node和pod的Bind操作,如果Bind失败,从SchedulerCache中删除上一步中已经Assumed的Pod。
Kubernetes Scheduler代码流程图
由于图片布局较大,请下载到本地放大查看。
Kubernetes Scheduler核心代码走读分析
Scheduler的main入口如下,负责创建SchedulerServer和启动。
plugin/cmd/kube-scheduler/scheduler.gofunc main() { s := options.NewSchedulerServer() s.AddFlags(pflag.CommandLine) flag.InitFlags() logs.InitLogs() defer logs.FlushLogs() verflag.PrintAndExitIfRequested() if err := app.Run(s); err != nil { glog.Fatalf("scheduler app failed to run: %v", err) }}
kuber-scheduler的参数说明在options中定义如下:
plugin/cmd/kube-scheduler/app/options/options.go// AddFlags adds flags for a specific SchedulerServer to the specified FlagSetfunc (s *SchedulerServer) AddFlags(fs *pflag.FlagSet) { fs.Int32Var(&s.Port, "port", s.Port, "The port that the scheduler's http service runs on") fs.StringVar(&s.Address, "address", s.Address, "The IP address to serve on (set to 0.0.0.0 for all interfaces)") fs.StringVar(&s.AlgorithmProvider, "algorithm-provider", s.AlgorithmProvider, "The scheduling algorithm provider to use, one of: "+factory.ListAlgorithmProviders()) fs.StringVar(&s.PolicyConfigFile, "policy-config-file", s.PolicyConfigFile, "File with scheduler policy configuration") fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/") fs.BoolVar(&s.EnableContentionProfiling, "contention-profiling", false, "Enable lock contention profiling, if profiling is enabled") fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)") fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization and master location information.") fs.StringVar(&s.ContentType, "kube-api-content-type", s.ContentType, "Content type of requests sent to apiserver.") fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", s.KubeAPIQPS, "QPS to use while talking with kubernetes apiserver") fs.Int32Var(&s.KubeAPIBurst, "kube-api-burst", s.KubeAPIBurst, "Burst to use while talking with kubernetes apiserver") fs.StringVar(&s.SchedulerName, "scheduler-name", s.SchedulerName, "Name of the scheduler, used to select which pods will be processed by this scheduler, based on pod's annotation with key 'scheduler.alpha.kubernetes.io/name'") fs.IntVar(&s.HardPodAffinitySymmetricWeight, "hard-pod-affinity-symmetric-weight", api.DefaultHardPodAffinitySymmetricWeight, "RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule corresponding "+ "to every RequiredDuringScheduling affinity rule. --hard-pod-affinity-symmetric-weight represents the weight of implicit PreferredDuringScheduling affinity rule.") fs.StringVar(&s.FailureDomains, "failure-domains", api.DefaultFailureDomains, "Indicate the \"all topologies\" set for an empty topologyKey when it's used for PreferredDuringScheduling pod anti-affinity.") leaderelection.BindFlags(&s.LeaderElection, fs) config.DefaultFeatureGate.AddFlag(fs)}
server.Run方法是cmd/kube-scheduler中最重要的方法:
- 负责config的生成。
- 并根据config创建sheduler对象。
- 启动HTTP服务,提供/debug/pprof http接口方便进行性能数据收集调优,提供/metrics http接口以供prometheus收集监控数据。
- kube-scheduler自选举完成后立刻开始循环执行scheduler.Run进行调度。
plugin/cmd/kube-scheduler/app/server.go:75// Run runs the specified SchedulerServer. This should never exit.func Run(s *options.SchedulerServer) error { ... config, err := createConfig(s, kubecli) ... sched := scheduler.New(config) go startHTTP(s) run := func(_ <-chan struct{}) { sched.Run() select {} } ... leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{ Lock: rl, LeaseDuration: s.LeaderElection.LeaseDuration.Duration, RenewDeadline: s.LeaderElection.RenewDeadline.Duration, RetryPeriod: s.LeaderElection.RetryPeriod.Duration, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: run, OnStoppedLeading: func() { glog.Fatalf("lost master") }, }, }) ...}
开始进入Scheduler.Run的逻辑,启动goroutine,循环反复执行Scheduler.scheduleOne方法,直到收到shut down scheduler的信号。
Scheduler.scheduleOne开始真正的调度逻辑,每次负责一个Pod的调度:
- 从PodQueue中获取一个Pod。
- 执行对应Algorithm的Schedule,进行预选和优选。
- AssumePod
- Bind Pod, 如果Bind Failed,ForgetPod。
plugin/pkg/scheduler/scheduler.go:86// Run begins watching and scheduling. It starts a goroutine and returns immediately.func (s *Scheduler) Run() { go wait.Until(s.scheduleOne, 0, s.config.StopEverything)}func (s *Scheduler) scheduleOne() { pod := s.config.NextPod() ... dest, err := s.config.Algorithm.Schedule(pod, s.config.NodeLister) ... assumed := *pod assumed.Spec.NodeName = dest if err := s.config.SchedulerCache.AssumePod(&assumed); err != nil { ... return } go func() { ... b := &v1.Binding{ ObjectMeta: v1.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name}, Target: v1.ObjectReference{ Kind: "Node", Name: dest, }, } ... err := s.config.Binder.Bind(b) if err != nil { glog.V(1).Infof("Failed to bind pod: %v/%v", pod.Namespace, pod.Name) if err := s.config.SchedulerCache.ForgetPod(&assumed); err != nil { ... return } }()}
下面是Schedule Algorithm要实现的Schedule接口:
plugin/pkg/scheduler/algorithm/scheduler_interface.go:41// ScheduleAlgorithm is an interface implemented by things that know how to schedule pods onto machines.type ScheduleAlgorithm interface { Schedule(*v1.Pod, NodeLister) (selectedMachine string, err error)}
genericScheduler作为一个默认Scheduler,当然也必须实现上述接口:
plugin/pkg/scheduler/generic_scheduler.go:89func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) { // 从cache中获取可被调度的Nodes ... nodes, err := nodeLister.List() ... // 开始预选 trace.Step("Computing predicates") filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer) ... // 开始优选打分 trace.Step("Prioritizing") metaPrioritiesInterface := g.priorityMetaProducer(pod, g.cachedNodeInfoMap) priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders) ... // 如果优选出多个Node,则随机选择一个Node作为最佳Node返回 trace.Step("Selecting host") return g.selectHost(priorityList)}// findNodesThatFit是预选的入口func findNodesThatFit( pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*v1.Node, predicateFuncs map[string]algorithm.FitPredicate, extenders []algorithm.SchedulerExtender, metadataProducer algorithm.MetadataProducer,) ([]*v1.Node, FailedPredicateMap, error) { var filtered []*v1.Node failedPredicateMap := FailedPredicateMap{} if len(predicateFuncs) == 0 { filtered = nodes } else { ... // checkNode会调用podFitsOnNode完成配置的所有Predicates Policies对该Node的检查。 checkNode := func(i int) { nodeName := nodes[i].Name fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs) ... } // 根据nodes数量,启动最多16个个goroutine worker执行checkNode方法 workqueue.Parallelize(16, len(nodes), checkNode) filtered = filtered[:filteredLen] if len(errs) > 0 { return []*v1.Node{}, FailedPredicateMap{}, errors.NewAggregate(errs) } } // 如果配置了Extender,则执行Extender的Filter逻辑再次进行甩选。 if len(filtered) > 0 && len(extenders) != 0 { for _, extender := range extenders { filteredList, failedMap, err := extender.Filter(pod, filtered) ... } } return filtered, failedPredicateMap, nil}// 循环执行所有配置的Predicates Polic对应的predicateFunc。func podFitsOnNode(pod *v1.Pod, meta interface{}, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate) (bool, []algorithm.PredicateFailureReason, error) { var failedPredicates []algorithm.PredicateFailureReason for _, predicate := range predicateFuncs { fit, reasons, err := predicate(pod, meta, info) ... } return len(failedPredicates) == 0, failedPredicates, nil}// 根据所有配置到Priorities Policies对所有预选后的Nodes进行优选打分// 每个Priorities policy对每个node打分范围为0-10分,分越高表示越合适func PrioritizeNodes( pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, meta interface{}, priorityConfigs []algorithm.PriorityConfig, nodes []*v1.Node, extenders []algorithm.SchedulerExtender,) (schedulerapi.HostPriorityList, error) { ... // 对单个node遍历所有的Priorities Policies,得到每个node每个policy打分的二维数据数据 processNode := func(index int) { nodeInfo := nodeNameToInfo[nodes[index].Name] var err error for i := range priorityConfigs { if priorityConfigs[i].Function != nil { continue } results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo) if err != nil { appendError(err) return } } } // 根据nodes数量,启动最多16个goroutine worker执行processNode方法 workqueue.Parallelize(16, len(nodes), processNode) // 遍历所有配置的Priorities policies,如果某个policy配置了Reduce,则执行对应的Reduce,更新result[node][policy]得分 for i, priorityConfig := range priorityConfigs { if priorityConfig.Reduce == nil { continue } wg.Add(1) go func(index int, config algorithm.PriorityConfig) { defer wg.Done() if err := config.Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil { appendError(err) } }(i, priorityConfig) } // Wait for all computations to be finished. wg.Wait() ... // 对得分进行加权求和得到最终分数 result := make(schedulerapi.HostPriorityList, 0, len(nodes)) // TODO: Consider parallelizing it. for i := range nodes { result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0}) for j := range priorityConfigs { result[i].Score += results[j][i].Score * priorityConfigs[j].Weight } } // 如果配置了Extender,则再执行Extender的优选打分方法Extender.Prioritize if len(extenders) != 0 && nodes != nil { combinedScores := make(map[string]int, len(nodeNameToInfo)) for _, extender := range extenders { wg.Add(1) go func(ext algorithm.SchedulerExtender) { defer wg.Done() prioritizedList, weight, err := ext.Prioritize(pod, nodes) ... }(extender) } // wait for all go routines to finish wg.Wait() // 执行combinedScores,将非Extender优选后的node得分再次经过Extender的优选打分排序 for i := range result { result[i].Score += combinedScores[result[i].Host] } } ...}
具体的Predicate Policy对应的PredicateFunc都定义在plugin/pkg/scheduler/algorithm/predicates/predicates.go
中,下面是CheckNodeMemoryPressurePredicate的定义。
plugin/pkg/scheduler/algorithm/predicates/predicates.go:1202// CheckNodeMemoryPressurePredicate checks if a pod can be scheduled on a node// reporting memory pressure condition.func CheckNodeMemoryPressurePredicate(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { var podBestEffort bool if predicateMeta, ok := meta.(*predicateMetadata); ok { podBestEffort = predicateMeta.podBestEffort } else { // We couldn't parse metadata - fallback to computing it. podBestEffort = isPodBestEffort(pod) } // pod is not BestEffort pod if !podBestEffort { return true, nil, nil } // is node under presure? if nodeInfo.MemoryPressureCondition() == v1.ConditionTrue { return false, []algorithm.PredicateFailureReason{ErrNodeUnderMemoryPressure}, nil } return true, nil, nil}
具体的Priorities Policy对应的PriorityFunc都定义在plugin/pkg/scheduler/algorithm/priorities/*.go
中,下面是MostRequestedPriority的定义。
plugin/pkg/scheduler/algorithm/priorities/most_requested.go:33// MostRequestedPriority is a priority function that favors nodes with most requested resources.// It calculates the percentage of memory and CPU requested by pods scheduled on the node, and prioritizes// based on the maximum of the average of the fraction of requested to capacity.// Details: (cpu(10 * sum(requested) / capacity) + memory(10 * sum(requested) / capacity)) / 2func MostRequestedPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) { var nonZeroRequest *schedulercache.Resource if priorityMeta, ok := meta.(*priorityMetadata); ok { nonZeroRequest = priorityMeta.nonZeroRequest } else { // We couldn't parse metadatat - fallback to computing it. nonZeroRequest = getNonZeroRequests(pod) } return calculateUsedPriority(pod, nonZeroRequest, nodeInfo)}
kubernetes默认给kube-scheduler配置了DefaultProvider。DefaultProvider配置了哪些Predicates和Priorities Policies呢?这些都定义在plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go
中,如下所示:
plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go:205// DefaultProvider配置的默认Predicates Policiesfunc defaultPredicates() sets.String { return sets.NewString( // Fit is determined by volume zone requirements. factory.RegisterFitPredicateFactory( "NoVolumeZoneConflict", func(args factory.PluginFactoryArgs) algorithm.FitPredicate { return predicates.NewVolumeZonePredicate(args.PVInfo, args.PVCInfo) }, ), ... // Fit is determined by non-conflicting disk volumes. factory.RegisterFitPredicate("NoDiskConflict", predicates.NoDiskConflict), // GeneralPredicates are the predicates that are enforced by all Kubernetes components // (e.g. kubelet and all schedulers) factory.RegisterFitPredicate("GeneralPredicates", predicates.GeneralPredicates), // Fit is determined based on whether a pod can tolerate all of the node's taints factory.RegisterFitPredicate("PodToleratesNodeTaints", predicates.PodToleratesNodeTaints), // Fit is determined by node memory pressure condition. factory.RegisterFitPredicate("CheckNodeMemoryPressure", predicates.CheckNodeMemoryPressurePredicate), // Fit is determined by node disk pressure condition. factory.RegisterFitPredicate("CheckNodeDiskPressure", predicates.CheckNodeDiskPressurePredicate), )}// DefaultProvider配置的默认Priorities Policiesfunc defaultPriorities() sets.String { return sets.NewString( // spreads pods by minimizing the number of pods (belonging to the same service or replication controller) on the same node. factory.RegisterPriorityConfigFactory( "SelectorSpreadPriority", factory.PriorityConfigFactory{ Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction { return priorities.NewSelectorSpreadPriority(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister) }, Weight: 1, }, ), ... // TODO: explain what it does. factory.RegisterPriorityFunction2("TaintTolerationPriority", priorities.ComputeTaintTolerationPriorityMap, priorities.ComputeTaintTolerationPriorityReduce, 1), )}
上面核心代码的走读分析,请结合上一节Kubernetes Scheduler代码流程图
进行阅读。相信读到这里,你对整个scheduler的代码已经有一定的理解了。
总结
-
kube-scheduler作为kubernetes master上一个单独的进程提供调度服务,通过–master指定kube-api-server的地址,用来watch pod和node和调用api server bind接口完成node和pod的Bind操作。
-
kube-scheduler中维护了一个FIFO类型的PodQueue cache,新创建的Pod都会被ConfigFactory watch到,被添加到该PodQueue中,每次调度都从该PodQueue中getNextPod作为即将调度的Pod。
-
获取到待调度的Pod后,就执行AlgorithmProvider配置Algorithm的Schedule方法进行调度,整个调度过程分两个关键步骤:Predicates和Priorities,最终选出一个最适合该Pod借宿的Node返回。
-
更新SchedulerCache中Pod的状态(AssumePod),标志该Pod为scheduled,并更新到最有NodeInfo中。
-
调用api server的Bind接口,完成node和pod的Bind操作,如果Bind失败,从SchedulerCache中删除上一步中已经Assumed的Pod。