Spark原始碼系列(三)作業執行過程
作業執行
上一章講了RDD的轉換,但是沒講作業的執行,它和Driver Program的關係是啥,和RDD的關係是啥?
官方給的例子裡面,一執行collect方法就能出結果,那我們就從collect開始看吧,進入RDD,找到collect方法。
def collect(): Array[T] = {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
它進行了兩個操作:
1、呼叫SparkContext的runJob方法,把自身的引用傳入去,再傳了一個匿名函式(把Iterator轉換成Array陣列)
2、把result結果合併成一個Array,注意results是一個Array[Array[T]]型別,所以第二句的那個寫法才會那麼奇怪。這個操作是很重的一個操作,如果結果很大的話,這個操作是會報OOM的,因為它是把結果儲存在Driver程式的記憶體當中的result數組裡面。
我們點進去runJob這個方法吧。
val callSite = getCallSite val cleanedFunc = clean(func) dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal, resultHandler, localProperties.get) rdd.doCheckpoint()
追蹤下去,我們會發現經過多個不同的runJob同名函式呼叫之後,執行job作業靠的是dagScheduler,最後把結果通過resultHandler儲存返回。
DAGScheduler如何劃分作業
好的,我們繼續看DAGScheduler的runJob方法,提交作業,然後等待結果,成功什麼都不做,失敗丟擲錯誤,我們接著看submitJob方法。
val jobId = nextJobId.getAndIncrement() val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] // 記錄作業成功與失敗的資料結構,一個作業的Task數量是和分片的數量一致的,Task成功之後呼叫resultHandler儲存結果。 val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) eventProcessActor ! JobSubmitted(jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)
走到這裡,感覺有點兒繞了,為什麼到了這裡,還不直接執行呢,還要給eventProcessActor傳送一個JobSubmitted請求呢,new一個執行緒和這個區別有多大?
不管了,搜尋一下eventProcessActor吧,結果發現它是一個DAGSchedulerEventProcessActor,它的定義也在DAGScheduler這個類裡面。它的receive方法裡面定義了12種事件的處理方法,這裡我們只需要看
JobSubmitted的就行,它也是呼叫了自身的handleJobSubmitted方法。但是這裡很奇怪,沒辦法打斷點除錯,但是它的結果倒是能返回的,因此我們得用另外一種方式,開啟test工程,找到scheduler目錄下的DAGSchedulerSuite這個類,我們自己寫一個test方法,首先我們要在import那裡加上import org.apache.spark.SparkContext._ ,然後加上這一段測試程式碼。
test("run shuffle") {
val rdd1 = sc.parallelize(1 to 100, 4)
val rdd2 = rdd1.filter(_ % 2 == 0).map(_ + 1)
val rdd3 = rdd2.map(_ - 1).filter(_ < 50).map(i => (i, i))
val rdd4 = rdd3.reduceByKey(_ + _)
submit(rdd4, Array(0,1,2,3))
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
complete(taskSets(1), Seq((Success, 42)))
complete(taskSets(2), Seq(
(Success, makeMapStatus("hostA", 2)),
(Success, makeMapStatus("hostB", 2))))
complete(taskSets(3), Seq((Success, 68)))
}
這個例子的重點還是shuffle那塊,另外也包括了map的多個轉換,大家可以按照這個例子去測試下。
我們接著看handleJobSubmitted吧。
var finalStage: Stage = null
try {
finalStage = newStage(finalRDD, partitions.size, None, jobId, Some(callSite))
} catch {
// 錯誤處理,告訴監聽器作業失敗,返回....
}
if (finalStage != null) {
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
clearCacheLocs()
if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
// 很短、沒有父stage的本地操作,比如 first() or take() 的操作本地執行.
listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
runLocally(job)
} else {
// collect等操作走的是這個過程,更新相關的關係對映,用監聽器監聽,然後提交作業
jobIdToActiveJob(jobId) = job
activeJobs += job
resultStageToJob(finalStage) = job
listenerBus.post(SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray, properties))
// 提交stage
submitStage(finalStage)
}
}
// 提交stage
submitWaitingStages()
從上面這個方法來看,我們應該重點關注newStage方法、submitStage方法和submitWaitingStages方法。
我們先看newStage,它得到的結果叫做finalStage,挺奇怪的哈,為啥?先看吧
val id = nextStageId.getAndIncrement()
val stage = new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stageToInfos(stage) = StageInfo.fromStage(stage)
stage
可以看出來Stage也沒有太多的東西可言,它就是把rdd給傳了進去,tasks的數量,shuffleDep是空,parentStage。
那它的parentStage是啥呢?
private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
val parents = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
def visit(r: RDD[_]) {
if (!visited(r)) {
visited += r
// 在visit函式裡面,只有存在ShuffleDependency的,parent才通過getShuffleMapStage計算出來
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_,_] =>
parents += getShuffleMapStage(shufDep, jobId)
case _ =>
visit(dep.rdd)
}
}
}
}
visit(rdd)
parents.toList
}
它是通過不停的遍歷它之前的rdd,如果碰到有依賴是ShuffleDependency型別的,就通過getShuffleMapStage方法計算出來它的Stage來。
那我們就開始看submitStage方法吧。
private def submitStage(stage: Stage) {
//...
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing == Nil) {
// 沒有父stage,執行這stage的tasks
submitMissingTasks(stage, jobId.get)
runningStages += stage
} else {
// 提交父stage的task,這裡是個遞迴,真正的提交在上面的註釋的地方
for (parent <- missing) {
submitStage(parent)
}
// 暫時不能提交的stage,先新增到等待佇列
waitingStages += stage
}
}
}
這個提交stage的過程是一個遞迴的過程,它是先要把父stage先提交,然後把自己新增到等待佇列中,直到沒有父stage之後,就提交該stage中的任務。等待佇列在最後的submitWaitingStages方法中提交。
這裡我引用一下上一章當中我所畫的那個圖來表示這個過程哈。
從getParentStages方法可以看出來,RDD當中存在ShuffleDependency的Stage才會有父Stage, 也就是圖中的虛線的位置!
所以我們只需要記住凡是涉及到shuffle的作業都會至少有兩個Stage,即shuffle前和shuffle後。
TaskScheduler提交Task
那我們接著看submitMissingTasks方法,下面是主體程式碼。
private def submitMissingTasks(stage: Stage, jobId: Int) {
val myPending = pendingTasks.getOrElseUpdate(stage, new HashSet)
myPending.clear()
var tasks = ArrayBuffer[Task[_]]()
if (stage.isShuffleMap) {
// 這是shuffle stage的情況
for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) {
val locs = getPreferredLocs(stage.rdd, p)
tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs)
}
} else {
// 這是final stage的情況
val job = resultStageToJob(stage)
for (id <- 0 until job.numPartitions if !job.finished(id)) {
val partition = job.partitions(id)
val locs = getPreferredLocs(stage.rdd, partition)
tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
}
}
if (tasks.size > 0) {
myPending ++= tasks
taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
stageToInfos(stage).submissionTime = Some(System.currentTimeMillis())
} else {
runningStages -= stage
}
}
Task也是有兩類的,一種是ShuffleMapTask,一種是ResultTask,我們需要注意這兩種Task的runTask方法。最後Task是通過taskScheduler.submitTasks來提交的。
我們找到TaskSchedulerImpl裡面看這個方法。
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasksthis.synchronized {
val manager = new TaskSetManager(this, taskSet, maxTaskFailures)
activeTaskSets(taskSet.id) = manager
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
hasReceivedTask = true
}
backend.reviveOffers()
}
排程器有兩種模式,FIFO和FAIR,預設是FIFO, 可以通過spark.scheduler.mode來設定,schedulableBuilder也有相應的兩種FIFOSchedulableBuilder和FairSchedulableBuilder。
那backend是啥?據說是為了給TaskSchedulerImpl提供外掛式的排程服務的。
它是怎麼例項化出來的,這裡我們需要追溯回到SparkContext的createTaskScheduler方法,下面我直接把常用的3中型別的TaskScheduler給列出來了。
mode Scheduler Backend
cluster TaskSchedulerImpl SparkDeploySchedulerBackend
yarn-cluster YarnClusterScheduler CoarseGrainedSchedulerBackend
yarn-client YarnClientClusterScheduler YarnClientSchedulerBackend
好,我們回到之前的程式碼上,schedulableBuilder.addTaskSetManager比較簡單,把作業集新增到排程器的隊列當中。
我們接著看backend的reviveOffers,裡面只有一句話driverActor ! ReviveOffers。真是頭暈,搞那麼多Actor,只是為了接收訊息。。。
照舊吧,找到它的receive方法,找到ReviveOffers這個case,發現它呼叫了makeOffers方法,我們繼續追殺!
def makeOffers() {
launchTasks(scheduler.resourceOffers(executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
}
從executorHost中隨機抽出一些來給排程器,然後排程器返回TaskDescription,executorHost怎麼來的,待會兒再說,我們接著看resourceOffers方法。
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
SparkEnv.set(sc.env)
// 遍歷worker提供的資源,更新executor相關的對映
for (o <- offers) {
executorIdToHost(o.executorId) = o.host
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
executorAdded(o.executorId, o.host)
}
}
// 從worker當中隨機選出一些來,防止任務都堆在一個機器上
val shuffledOffers = Random.shuffle(offers)
// worker的task列表
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
val sortedTaskSets = rootPool.getSortedTaskSetQueue
// 隨機遍歷抽出來的worker,通過TaskSetManager的resourceOffer,把本地性最高的Task分給Worker
var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
do {
launchedTask = false
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
if (availableCpus(i) >= CPUS_PER_TASK) {
// 把本地性最高的Task分給Worker
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetId(tid) = taskSet.taskSet.id
taskIdToExecutorId(tid) = execId
activeExecutorIds += execId
executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK
assert (availableCpus(i) >= 0)
launchedTask = true
}
}
}
} while (launchedTask)
}
if (tasks.size > 0) {
hasLaunchedTask = true
}
return tasks
}
resourceOffers主要做了3件事:
1、從Workers裡面隨機抽出一些來執行任務。
2、通過TaskSetManager找出和Worker在一起的Task,最後編譯打包成TaskDescription返回。
3、將Worker-->Array[TaskDescription]的對映關係返回。
我們繼續看TaskSetManager的resourceOffer,看看它是怎麼找到和host再起的Task,並且包裝成TaskDescription。
通過檢視程式碼,我發現之前我解釋的和它具體實現的差別比較大,它所謂的本地性是根據當前的等待時間來確定的任務本地性的級別。
它的本地性主要是包括四類:PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY。
private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
while (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex) &&
currentLocalityIndex < myLocalityLevels.length - 1)
{
// 成立條件是當前時間-上次釋出任務的時間 > 當前本地性級別的,條件成立就跳到下一個級別
lastLaunchTime += localityWaits(currentLocalityIndex)
currentLocalityIndex += 1
}
myLocalityLevels(currentLocalityIndex)
}
等待時間是可以通過引數去設定的,具體的自己查下面的程式碼。
private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
val defaultWait = conf.get("spark.locality.wait", "3000")
level match {
case TaskLocality.PROCESS_LOCAL =>
conf.get("spark.locality.wait.process", defaultWait).toLong
case TaskLocality.NODE_LOCAL =>
conf.get("spark.locality.wait.node", defaultWait).toLong
case TaskLocality.RACK_LOCAL =>
conf.get("spark.locality.wait.rack", defaultWait).toLong
case TaskLocality.ANY =>
0L
}
}
下面繼續看TaskSetManager的resourceOffer的方法,通過findTask來從Task集合裡面找到相應的Task。
findTask(execId, host, allowedLocality) match {
case Some((index, taskLocality)) => {
val task = tasks(index)
val serializedTask = Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
val timeTaken = clock.getTime() - startTime
addRunningTask(taskId)
val taskName = "task %s:%d".format(taskSet.id, index)
sched.dagScheduler.taskStarted(task, info)
return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask))
}
它的findTask方法如下:
private def findTask(execId: String, host: String, locality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value)] =
{
// 同一個Executor,通過execId來查詢相應的等待的task
for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
return Some((index, TaskLocality.PROCESS_LOCAL))
}
// 通過主機名找到相應的Task,不過比之前的多了一步判斷
if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
return Some((index, TaskLocality.NODE_LOCAL))
}
}
// 通過Rack的名稱查詢Task
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
for {
rack <- sched.getRackForHost(host)
index <- findTaskFromList(execId, getPendingTasksForRack(rack))
} {
return Some((index, TaskLocality.RACK_LOCAL))
}
}
// 查詢那些preferredLocations為空的,不指定在哪裡執行的Task來執行
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL))
}
// 查詢那些preferredLocations為空的,不指定在哪裡執行的Task來執行
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
for (index <- findTaskFromList(execId, allPendingTasks)) {
return Some((index, TaskLocality.ANY))
}
}
// 最後沒辦法了,拖的時間太長了,只能啟動推測執行了
findSpeculativeTask(execId, host, locality)
}
從這個方面可以看得出來,Spark對執行時間還是很注重的,等待的時間越長,它就可能越飢不擇食,從PROCESS_LOCAL一直讓步到ANY,最後的最後,推測執行都用到了。
找到任務之後,它就呼叫dagScheduler.taskStarted方法,通知dagScheduler任務開始了,taskStarted方法就不詳細講了,它觸發dagScheduler的BeginEvent事件,裡面只做了2件事:
1、檢查Task序列化的大小,超過100K就警告。
2、提交等待的Stage。
好,我們繼續回到釋出Task上面來,中間過程講完了,我們應該是要回到CoarseGrainedSchedulerBackend的launchTasks方法了。
def makeOffers() {
launchTasks(scheduler.resourceOffers(executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
}
它的方法體是:
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
executorActor(task.executorId) ! LaunchTask(task)
}
}
通過executorId找到相應的executorActor,然後傳送LaunchTask過去,一個Task佔用一個Cpu。
註冊Application
那這個executorActor是怎麼來的呢?找唄,最後發現它是在receive方法裡面接受到RegisterExecutor訊息的時候註冊的。通過搜尋,我們找到CoarseGrainedExecutorBackend這個類,在它的preStart方法裡面赫然找到了driver ! RegisterExecutor(executorId, hostPort, cores) 帶的這三個引數都是在初始化的時候傳入的,那是誰例項化的它呢,再逆向搜尋找到SparkDeploySchedulerBackend!之前的backend一直都是它,我們看reviveOffers是在它的父類CoarseGrainedSchedulerBackend裡面。
關係清楚了,在這個backend的start方法裡面啟動了一個AppClient,AppClient的其中一個引數ApplicationDescription就是封裝的執行CoarseGrainedExecutorBackend的命令。AppClient內部啟動了一個ClientActor,這個ClientActor啟動之後,會嘗試向Master傳送一個指令actor ! RegisterApplication(appDescription) 註冊一個Application。
別廢話了,Ctrl +Shift + N吧,定位到Master吧。
case RegisterApplication(description) => {
val app = createApplication(description, sender)
registerApplication(app)
persistenceEngine.addApplication(app)
sender ! RegisteredApplication(app.id, masterUrl)
schedule()
}
它做了5件事:
1、createApplication為這個app構建一個描述App資料結構的ApplicationInfo。
2、註冊該Application,更新相應的對映關係,新增到等待佇列裡面。
3、用persistenceEngine持久化Application資訊,預設是不儲存的,另外還有兩種方式,儲存在檔案或者Zookeeper當中。
4、通過傳送方註冊成功。
5、開始作業排程。
關於排程的問題,在第一章《spark-submit提交作業過程》已經介紹過了,建議回去再看看,搞清楚Application和Executor之間的關係。
Application一旦獲得資源,Master會發送launchExecutor指令給Worker去啟動Executor。
進到Worker裡面搜尋LaunchExecutor。
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, self, workerId, host,
appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome), workDir, akkaUrl, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
memoryUsed += memory_
masterLock.synchronized {
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
}
原來ExecutorRunner還不是傳說中的Executor,它內部是執行了appDesc內部的那個命令,啟動了CoarseGrainedExecutorBackend,它才是我們的真命天子Executor。
啟動之後ExecutorRunner報告ExecutorStateChanged事件給Master。
Master幹了兩件事:
1、轉發給Driver,這個Driver是之前註冊Application的那個AppClient
2、如果是Executor執行結束,從相應的對映關係裡面刪除
釋出Task
上面又花了那麼多時間講Task的執行環境ExecutorRunner是怎麼註冊,那我們還是回到我們的主題,Task的釋出。
釋出任務是傳送LaunchTask指令給CoarseGrainedExecutorBackend,接受到指令之後,讓它內部的executor來發布這個任務。
這裡我們看一下Executor的launchTask。
def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
val tr = new TaskRunner(context, taskId, serializedTask)
runningTasks.put(taskId, tr)
threadPool.execute(tr)
}
TaskRunner是這裡的重頭戲啊!看它的run方法吧。
override def run() {
// 準備工作若干...那天我們放學回家經過一片玉米地,以上省略一百字
try {
// 反序列化Task
SparkEnv.set(env)
Accumulators.clear()
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
updateDependencies(taskFiles, taskJars)
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
// 命令為嘗試執行,和hadoop的mapreduce作業是一致的
attemptedTask = Some(task)
logDebug("Task " + taskId + "'s epoch is " + task.epoch)
env.mapOutputTracker.updateEpoch(task.epoch)
// 執行Task, 具體可以去看之前讓大家關注的ResultTask和ShuffleMapTask
taskStart = System.currentTimeMillis()
val value = task.run(taskId.toInt)
val taskFinish = System.currentTimeMillis()
// 對結果進行序列化
val resultSer = SparkEnv.get.serializer.newInstance()
val beforeSerialization = System.currentTimeMillis()
val valueBytes = resultSer.serialize(value)
val afterSerialization = System.currentTimeMillis()
// 更新任務的相關監控資訊,會反映到監控頁面上的
for (m <- task.metrics) {
m.hostname = Utils.localHostName()
m.executorDeserializeTime = taskStart - startTime
m.executorRunTime = taskFinish - taskStart
m.jvmGCTime = gcTime - startGCTime
m.resultSerializationTime = afterSerialization - beforeSerialization
}
val accumUpdates = Accumulators.values
// 對結果進行再包裝,包裝完再進行序列化
val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.getOrElse(null))
val serializedDirectResult = ser.serialize(directResult)
// 如果中間結果的大小超過了spark.akka.frameSize(預設是10M)的大小,就要提升序列化級別了,超過記憶體的部分要儲存到硬碟的
val serializedResult = {
if (serializedDirectResult.limit >= akkaFrameSize - 1024) {
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
ser.serialize(new IndirectTaskResult[Any](blockId))
} else {
serializedDirectResult
}
}
// 返回結果
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
} catch {
// 這部分是錯誤處理,被我省略掉了,主要內容是通關相關負責人處理後事
} finally {
// 清理為ResultTask註冊的shuffle記憶體,最後把task從正在執行的列表當中刪除
val shuffleMemoryMap = env.shuffleMemoryMap
shuffleMemoryMap.synchronized {
shuffleMemoryMap.remove(Thread.currentThread().getId)
}
runningTasks.remove(taskId)
}
}
}
以上程式碼被我這些了,但是建議大家看看註釋吧。
最後結果是通過statusUpdate返回的。
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
driver ! StatusUpdate(executorId, taskId, state, data)
}
這回這個Driver又不是剛才那個AppClient,而是它的家長SparkDeploySchedulerBackend,是在SparkDeploySchedulerBackend的父類CoarseGrainedSchedulerBackend接受了這個StatusUpdate訊息。
這關係真他娘夠亂的。。
繼續,Task裡面走的是TaskSchedulerImpl這個方法。
scheduler.statusUpdate(taskId, state, data.value)
到這裡,一個Task就執行結束了,後面就不再擴充套件了,作業執行這塊是Spark的核心,再擴充套件基本就能寫出來一本書了,限於文章篇幅,這裡就不再深究了。
以上的過程應該是和下面的圖一致的。
看完這篇文章,估計大家會雲裡霧裡的,在下一章《作業生命週期》會把剛才描述的整個過程重新梳理出來,便於大家記憶,敬請期待!