1. 程式人生 > >【kubernetes/k8s原始碼分析】kubectl-controller-manager之cronjob原始碼分析

【kubernetes/k8s原始碼分析】kubectl-controller-manager之cronjob原始碼分析

crontab的基本格式

    支援 , - * / 四個字元

          *:表示匹配任意值,如果在Minutes 中使用表示每分鐘

          /: 表示起始時間開始觸發,然後每隔固定時間觸發一次,

      1    2    3    4    5  command

      分  時     日  月  周  命令

  • 1代表分鐘範圍為1~59:為*表示每分鐘都要執行;為*/n表示每n分鐘執行一次;為a-b表示從第a分鐘到第b分鐘這段時間要執行;為a,b,c,...表示第a,b,c分鐘要執行
  • 2代表小時範圍為0~23(0表示凌晨):當f2為*表示每小時都要執行;為*/n表示每n小數執行一次;為a-b表示從第a小時到第b小時這段時間要執行;為a,b,c,...表示第a,b,c小時要執行
  • 3代表日1~31:含義如上所示
  • 4代表月1~12:含義如上所示
  • 5代表星期0~6(0表示星期天):含義如上所示
  • 第六列command代表要執行的命令

 

CronJob

cronJob是基於時間進行任務的定時管理:

  • 在特定的時間點執行任務
  • 反覆在指定的時間點執行任務

CronJob Spec

  • .spec.schedule指定任務執行週期,格式同Cron
  • .spec.jobTemplate指定需要執行的任務,格式同Job
  • .spec.startingDeadlineSeconds指定任務開始的截止期限
  • .spec.concurrencyPolicy指定任務的併發策略,支援Allow、Forbid和Replace三個選項

      Allow(預設):允許併發執行 Job

      Forbid

:禁止併發執行,如果前一個還沒有完成,則直接跳過下一個

      Replace:取消當前正在執行的 Job,用一個新的來替換

 

0. 入口NewControllerInitializers函式

    註冊cronjob,controllers["cronjob"] = startCronJobController

// NewControllerInitializers is a public map of named controller groups (you can start more than one in an init func)
// paired to their InitFunc.  This allows for structured downstream composition and subdivision.
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
	controllers := map[string]InitFunc{}

	controllers["statefulset"] = startStatefulSetController
	controllers["cronjob"] = startCronJobController

	return controllers
}

  1.1 startCronJobController函式

    路徑:pkg/controller/cronjob/cronjob_controller.go

  • 呼叫NewCronJobController函式建立物件
  • 呼叫其Run方法執行主要邏輯
func startCronJobController(ctx ControllerContext) (http.Handler, bool, error) {
	if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v1beta1", Resource: "cronjobs"}] {
		return nil, false, nil
	}
	cjc, err := cronjob.NewCronJobController(
		ctx.ClientBuilder.ClientOrDie("cronjob-controller"),
	)
	if err != nil {
		return nil, true, fmt.Errorf("error creating CronJob controller: %v", err)
	}
	go cjc.Run(ctx.Stop)
	return nil, true, nil
}

2. NewCronJobController函式

    初始化CronJobController物件

func NewCronJobController(kubeClient clientset.Interface) (*CronJobController, error) {
	eventBroadcaster := record.NewBroadcaster()
	eventBroadcaster.StartLogging(glog.Infof)
	eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})

	if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
		if err := metrics.RegisterMetricAndTrackRateLimiterUsage("cronjob_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
			return nil, err
		}
	}

	jm := &CronJobController{
		kubeClient: kubeClient,
		jobControl: realJobControl{KubeClient: kubeClient},
		sjControl:  &realSJControl{KubeClient: kubeClient},
		podControl: &realPodControl{KubeClient: kubeClient},
		recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cronjob-controller"}),
	}

	return jm, nil
}

3. Run函式

    負責watching以及syncing jobs,間隔10s呼叫syncAll列出所有jobs,並消化他們

// Run the main goroutine responsible for watching and syncing jobs.
func (jm *CronJobController) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	glog.Infof("Starting CronJob Manager")
	// Check things every 10 second.
	go wait.Until(jm.syncAll, 10*time.Second, stopCh)
	<-stopCh
	glog.Infof("Shutting down CronJob Manager")
}

4. syncAll函式

  •   呼叫Jobs的list方法取出所有Job
  •   呼叫ConJobs的list方法取出所有controjobs
  •   對於每一個呼叫syncOnce函式同步執行一次
// syncAll lists all the CronJobs and Jobs and reconciles them.
func (jm *CronJobController) syncAll() {
	// List children (Jobs) before parents (CronJob).
	// This guarantees that if we see any Job that got orphaned by the GC orphan finalizer,
	// we must also see that the parent CronJob has non-nil DeletionTimestamp (see #42639).
	// Note that this only works because we are NOT using any caches here.
	jl, err := jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(metav1.ListOptions{})
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("can't list Jobs: %v", err))
		return
	}
	js := jl.Items
	glog.V(4).Infof("Found %d jobs", len(js))

	sjl, err := jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(metav1.ListOptions{})
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("can't list CronJobs: %v", err))
		return
	}
	sjs := sjl.Items
	glog.V(4).Infof("Found %d cronjobs", len(sjs))

	jobsBySj := groupJobsByParent(js)
	glog.V(4).Infof("Found %d groups", len(jobsBySj))

	for _, sj := range sjs {
		syncOne(&sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.podControl, jm.recorder)
		cleanupFinishedJobs(&sj, jobsBySj[sj.UID], jm.jobControl, jm.sjControl, jm.podControl, jm.recorder)
	}
}

5. syncOnce函式

  程式碼比較長,分開講解

  5.1 驗證機制

    使用types.UID判斷cronjob和job關係,每個cronjob擁有唯一的UID,遍歷job的來判斷job應該屬於哪個cronjob

  • 遍歷所有jobs,記錄到 childrenJobs  map中,表示當前屬於該 cronJob 的所有 Jobs
  • 該 job 是否在 cronJob.Status.Active 的列表中
  • if不存在且job未完成在報告異常event
  • else if存在且Job 已經 finished,從 active list 中刪除
	childrenJobs := make(map[types.UID]bool)
	for _, j := range js {
		childrenJobs[j.ObjectMeta.UID] = true
		found := inActiveList(*sj, j.ObjectMeta.UID)
		if !found && !IsJobFinished(&j) {
			recorder.Eventf(sj, v1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %v", j.Name)
			// We found an unfinished job that has us as the parent, but it is not in our Active list.
			// This could happen if we crashed right after creating the Job and before updating the status,
			// or if our jobs list is newer than our sj status after a relist, or if someone intentionally created
			// a job that they wanted us to adopt.

			// TODO: maybe handle the adoption case?  Concurrency/suspend rules will not apply in that case, obviously, since we can't
			// stop users from creating jobs if they have permission.  It is assumed that if a
			// user has permission to create a job within a namespace, then they have permission to make any scheduledJob
			// in the same namespace "adopt" that job.  ReplicaSets and their Pods work the same way.
			// TBS: how to update sj.Status.LastScheduleTime if the adopted job is newer than any we knew about?
		} else if found && IsJobFinished(&j) {
			deleteFromActiveList(sj, j.ObjectMeta.UID)
			// TODO: event to call out failure vs success.
			recorder.Eventf(sj, v1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %v", j.Name)
		}
	}

  5.2 遍歷所有cronjob

    在childrenJobs沒有找到則從active list刪除掉

	// Remove any job reference from the active list if the corresponding job does not exist any more.
	// Otherwise, the cronjob may be stuck in active mode forever even though there is no matching
	// job running.
	for _, j := range sj.Status.Active {
		if found := childrenJobs[j.UID]; !found {
			recorder.Eventf(sj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name)
			deleteFromActiveList(sj, j.UID)
		}
	}

  5.3 更新cronjob狀態資訊

	updatedSJ, err := sjc.UpdateStatus(sj)
	if err != nil {
		glog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)
		return
	}
	*sj = *updatedSJ

  5.4 刪除狀態的,或者暫停狀態的不做處理

	if sj.DeletionTimestamp != nil {
		// The CronJob is being deleted.
		// Don't do anything other than updating status.
		return
	}

	if sj.Spec.Suspend != nil && *sj.Spec.Suspend {
		glog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog)
		return
	}

  5.5 

	times, err := getRecentUnmetScheduleTimes(*sj, now)
	if err != nil {
		recorder.Eventf(sj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err)
		glog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err)
		return
	}
	// TODO: handle multiple unmet start times, from oldest to newest, updating status as needed.
	if len(times) == 0 {
		glog.V(4).Infof("No unmet start times for %s", nameForLog)
		return
	}
	if len(times) > 1 {
		glog.V(4).Infof("Multiple unmet start times for %s so only starting last one", nameForLog)
	}

  5.6 scheduledTime最後一詞執行的時間

	scheduledTime := times[len(times)-1]
	tooLate := false
	if sj.Spec.StartingDeadlineSeconds != nil {
		tooLate = scheduledTime.Add(time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds)).Before(now)
	}
	if tooLate {
		glog.V(4).Infof("Missed starting window for %s", nameForLog)
		recorder.Eventf(sj, v1.EventTypeWarning, "MissSchedule", "Missed scheduled time to start a job: %s", scheduledTime.Format(time.RFC1123Z))
		// TODO: Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing
		// the miss every cycle.  In order to avoid sending multiple events, and to avoid processing
		// the sj again and again, we could set a Status.LastMissedTime when we notice a miss.
		// Then, when we call getRecentUnmetScheduleTimes, we can take max(creationTimestamp,
		// Status.LastScheduleTime, Status.LastMissedTime), and then so we won't generate
		// and event the next time we process it, and also so the user looking at the status
		// can see easily that there was a missed execution.
		return
	}

  5.7 併發策略

  •     Forbid:禁止併發執行,如果前一個還沒有完成,則直接return
  •     Replace:取消當前正在執行的 Job
	if sj.Spec.ConcurrencyPolicy == batchv1beta1.ForbidConcurrent && len(sj.Status.Active) > 0 {
		// Regardless which source of information we use for the set of active jobs,
		// there is some risk that we won't see an active job when there is one.
		// (because we haven't seen the status update to the SJ or the created pod).
		// So it is theoretically possible to have concurrency with Forbid.
		// As long the as the invocations are "far enough apart in time", this usually won't happen.
		//
		// TODO: for Forbid, we could use the same name for every execution, as a lock.
		// With replace, we could use a name that is deterministic per execution time.
		// But that would mean that you could not inspect prior successes or failures of Forbid jobs.
		glog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog)
		return
	}
	if sj.Spec.ConcurrencyPolicy == batchv1beta1.ReplaceConcurrent {
		for _, j := range sj.Status.Active {
			// TODO: this should be replaced with server side job deletion
			// currently this mimics JobReaper from pkg/kubectl/stop.go
			glog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog)

			job, err := jc.GetJob(j.Namespace, j.Name)
			if err != nil {
				recorder.Eventf(sj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err)
				return
			}
			if !deleteJob(sj, job, jc, pc, recorder, "") {
				return
			}
		}
	}

  5.8 建立job

  獲得配置建立job

	jobReq, err := getJobFromTemplate(sj, scheduledTime)
	if err != nil {
		glog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err)
		return
	}
	jobResp, err := jc.CreateJob(sj.Namespace, jobReq)
	if err != nil {
		recorder.Eventf(sj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
		return
	}
	glog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog)
	recorder.Eventf(sj, v1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)