当前位置: 首页 > news >正文

volcano源码阅读——action/enqueue

enqueue
将pending状态的job的PodGroup状态设置为inqueue,当会话关闭的时候会更新PodGroup状态。PodGroup状态变为inqueue后,controller会为其创建Pod。
 
pkg\scheduler\actions\enqueue\enqueue.go
 1 func (enqueue *Action) Execute(ssn *framework.Session) {
 2     klog.V(5).Infof("Enter Enqueue ...")
 3     defer klog.V(5).Infof("Leaving Enqueue ...")
 4 
 5     queues := util.NewPriorityQueue(ssn.QueueOrderFn) // 临时队列
 6     queueSet := sets.NewString()                      // 临时集合
 7     jobsMap := map[api.QueueID]*util.PriorityQueue{}  // 临时字典:queueID与job队列对应关系
 8 
 9     for _, job := range ssn.Jobs {
10         // 检查调度开始时间戳,如果是零则为其设置当前时间
11         if job.ScheduleStartTimestamp.IsZero() {
12             ssn.Jobs[job.UID].ScheduleStartTimestamp = metav1.Time{
13                 Time: time.Now(),
14             }
15         }
16         if queue, found := ssn.Queues[job.Queue]; !found {
17             klog.Errorf("Failed to find Queue <%s> for Job <%s/%s>",
18                 job.Queue, job.Namespace, job.Name)
19             continue
20         } else if !queueSet.Has(string(queue.UID)) {
21             klog.V(5).Infof("Added Queue <%s> for Job <%s/%s>",
22                 queue.Name, job.Namespace, job.Name)
23 
24             queueSet.Insert(string(queue.UID))
25             queues.Push(queue)
26         }
27 
28         // 检查job的状态是不是pending,如果是则将其放到jobsMap的队列里
29         if job.IsPending() {
30             if _, found := jobsMap[job.Queue]; !found {
31                 jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
32             }
33             klog.V(5).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
34             jobsMap[job.Queue].Push(job)
35         }
36     }
37 
38     klog.V(3).Infof("Try to enqueue PodGroup to %d Queues", len(jobsMap))
39 
40     // 遍历PodGroup,将PodGroup状态改为inqueue,并在ssn.Jobs里记录job
41     for {
42         if queues.Empty() {
43             break
44         }
45 
46         queue := queues.Pop().(*api.QueueInfo)
47 
48         // skip the Queue that has no pending job
49         jobs, found := jobsMap[queue.UID]
50         if !found || jobs.Empty() { // 如果当前队列不存在或者job为空,则遍历下个队列
51             continue
52         }
53         job := jobs.Pop().(*api.JobInfo)
54 
55         // 如果未设置最小资源或判定job可以入队,则对job进行处理:
56         // 1、对job执行插件的JobEnqueue处理
57         // 2、将PodGroup的状态改为inqueue
58         // 3、将job记录到ssn上
59         if job.PodGroup.Spec.MinResources == nil || ssn.JobEnqueueable(job) {
60             ssn.JobEnqueued(job)
61             job.PodGroup.Status.Phase = scheduling.PodGroupInqueue
62             ssn.Jobs[job.UID] = job
63         }
64 
65         // Added Queue back until no job in Queue.
66         queues.Push(queue)
67     }
68 }
 
调度周期完毕后,关闭会话时会更信PodGroup
 1 func closeSession(ssn *Session) {
 2     ju := NewJobUpdater(ssn)
 3     ju.UpdateAll() // 更新所有
 4 
 5     updateQueueStatus(ssn)
 6 
 7     ssn.Jobs = nil
 8     ssn.Nodes = nil
 9     ssn.RevocableNodes = nil
10     ssn.plugins = nil
11     ssn.eventHandlers = nil
12     ssn.jobOrderFns = nil
13     ssn.queueOrderFns = nil
14     ssn.clusterOrderFns = nil
15     ssn.NodeList = nil
16     ssn.TotalResource = nil
17 
18     klog.V(3).Infof("Close Session %v", ssn.UID)
19 }
20 
21 // updateJob update specified job
22 func (ju *JobUpdater) updateJob(index int) {
23     job := ju.jobQueue[index]
24     ssn := ju.ssn
25 
26     job.PodGroup.Status = jobStatus(ssn, job)
27     oldStatus, found := ssn.PodGroupOldState.Status[job.UID]
28     updatePGStatus := !found || isPodGroupStatusUpdated(job.PodGroup.Status, oldStatus)
29     updatePGAnnotations := ju.isJobAllocatedHyperNodeChanged(job)
30     if _, err := ssn.cache.UpdateJobStatus(job, updatePGStatus, updatePGAnnotations); err != nil {
31         klog.Errorf("Failed to update job <%s/%s>: %v",
32             job.Namespace, job.Name, err)
33     }
34 }
35 
36 // UpdateJobStatus update the status of job and its tasks.
37 func (sc *SchedulerCache) UpdateJobStatus(job *schedulingapi.JobInfo, updatePGStatus, updatePGAnnotations bool) (*schedulingapi.JobInfo, error) {
38     if updatePGStatus || updatePGAnnotations {
39         if updatePGAnnotations {
40             sc.updateJobAnnotations(job)
41         }
42         pg, err := sc.StatusUpdater.UpdatePodGroup(job.PodGroup)
43         if err != nil {
44             return nil, err
45         }
46         job.PodGroup = pg
47     }
48     sc.RecordJobStatusEvent(job, updatePGStatus)
49 
50     return job, nil
51 }

 

 

http://www.zskr.cn/news/22803.html

相关文章:

  • 2025年工业大吊扇厂家权威推荐榜:大型厂房通风降温设备源头企业综合实力与客户口碑深度解析
  • 【左扬精讲】SRE 别慌!我用 故障预测与诊断,性能评估与优化,资源分配与规划 讲概率与贝叶斯算法的实战应用,都是咱运维人能懂的话(含代码)
  • 学校社团招新的题目(莫队+树状数组统计区间逆序对个数)(蒟蒻被薄纱QAQ)
  • 2025 年 PP 管厂家最新推荐榜:全面甄选优质 pp 风管、PP 喷淋塔等产品厂家,助力实验室场景精准选型
  • MyEMS:衔接 “双控” 政策与企业实践的开源能源管理利器
  • 2025 电动轮椅厂家最新推荐榜:深度解析智能轻便 / 长续航 / 高安全国产优质品牌核心优势
  • 2025年信息流代运营服务商权威推荐榜单:专业投放策略与高效转化服务口碑之选
  • 一些框架
  • 微调 - Lora
  • 2025年轮胎厂家权威推荐榜:舒适轮胎,耐磨轮胎,高性能轮胎与静音轮胎全系列选购指南
  • RTP推流测试
  • 2025 年板材厂家最新推荐排行榜:涵盖环保、密度、净化、零醛添加等类型,胖胖熊等优质品牌详细解析
  • 告别客服焦虑!用 PandaWiki 打造永不下班的 AI 在线客服
  • 2025 年修补剂厂家最新推荐排行榜:金属 / 陶瓷 / 橡胶等多材质适配品牌深度解析,助力企业精准选型
  • 2025 工业电子胶粘剂厂家最新推荐榜单发布:国产实力品牌深度解析,选购指南全攻略高端工业/进口国产工业/工业电子胶粘剂胶水厂家推荐
  • Elasticsearch安装和Kibana安装
  • three自带的框选工具SelectionBox、SelectionHelper
  • 10 17
  • 2025年铝单板厂家推荐排行榜,氟碳铝单板,木纹铝单板,冲孔铝单板,外墙铝单板,雕花铝单板,异形铝单板,双曲铝单板公司推荐!
  • 2025 年最新推荐热熔胶源头厂家榜:覆盖书刊装订 / 包装等场景,助企业选高性价比产品
  • C++中的new操作符:new operator、operator new、placement new
  • C++20 协程的简单示例
  • 同一设备多账号登录,如何避免消息推送“串门”?
  • Character Animator 2025下载安装教程:2D角色动画软件零基础入门,附最新下载安装教程及激活方法
  • 2025年彩钢瓦/镀锌板/折弯件/C型钢/Z型钢/压型瓦/楼承板/次檩条厂家推荐排行榜,专业钢结构安装与定制加工实力解析
  • 完整教程:display ospf peer 概念及题目
  • 开源数据采集工具 logstash(收集日志)/telegraf(收集指标)
  • 2025 年堆高车厂家最新推荐排行榜:聚焦专利技术、华为等大牌合作案例及国内优质品牌解析手动液压/手动液压/卷筒/油桶堆高车厂家推荐
  • Excelize 开源基础库发布 2.10.0 版本更新
  • 高效搞定outlook大附件怎么发送的方法与技巧