Trino中Task源碼解析(tars源碼分析)
我們知道,在Trino中一個(gè)Query會(huì)拆分成多個(gè)Stage,一個(gè)Stage又會(huì)拆分成多個(gè)Task,Task是跑在Worker上的具體任務(wù),那一個(gè)Task周圍有哪些息息相關(guān)的類和方法呢,需要我們?nèi)ラ喿x源碼分析。
整體框架
和Task關(guān)系密切的幾個(gè)類以及關(guān)系如下圖所示
TaskResource
Task的創(chuàng)建,刪除,更新都是通過Http請(qǐng)求來完成,由TaskResource這個(gè)類來接受請(qǐng)求, 但具體的實(shí)現(xiàn)方法都封裝在SqlTaskManager中,TaskResource接收到請(qǐng)求后調(diào)用SqlTaskManager中對(duì)應(yīng)的方法,以創(chuàng)建Task為例:
SqlTaskManager
該類中有許多對(duì)Task進(jìn)行操作的方法,比如創(chuàng)建,更新,取消,中止等等
上圖中的類屬性tasks就是用來保存所有task相關(guān)信息的,本質(zhì)是一個(gè)不可驅(qū)逐的緩存,緩存中key是taskId(每個(gè)task的專屬標(biāo)識(shí)), value是對(duì)應(yīng)創(chuàng)建的sqlTask對(duì)象。
因此SqlTaskManager中對(duì)Task操作就是從tasks中根據(jù)taskId拿到SqlTask對(duì)象,再調(diào)用對(duì)應(yīng)的方法。
StucksplitTasksInterrupter
SqlTaskManager中還有個(gè)很重要的特性就是StuckSplitTasksInterrupter,他會(huì)定時(shí)的去檢查是否有task卡住,如果卡住則被標(biāo)記為stuck并會(huì)被kill,至于是否開啟這個(gè)功能以及多長(zhǎng)時(shí)間沒響應(yīng)才算卡住,都可以通過參數(shù)去配置。
可以看到代碼中會(huì)去遍歷當(dāng)前所有的runningSplit,如果該runningSplit執(zhí)行時(shí)間大于設(shè)置的閾值,則會(huì)被篩選出來拿到對(duì)應(yīng)的TaskId, 再調(diào)用sqlTask的fail方法,結(jié)束這個(gè)卡住的Task
SqlTask
每一個(gè)Task都對(duì)應(yīng)于一個(gè)SqlTask對(duì)象,其中比較重要的幾個(gè)屬性:
- TaskStateMachine: 用來記錄Task的狀態(tài),在調(diào)用sqlTask的cancel, abort等接口時(shí),其實(shí)就是修改狀態(tài)機(jī)的狀態(tài),并且狀態(tài)機(jī)會(huì)有一個(gè)Listener監(jiān)聽狀態(tài)機(jī)的變化,一旦有更新,就會(huì)執(zhí)行相應(yīng)的操作
- SqlTaskExecution: 在創(chuàng)建SqlTask對(duì)象時(shí),同時(shí)也會(huì)創(chuàng)建一個(gè)對(duì)應(yīng)的SqlTaskExecution。 SqlTaskExecution主要是負(fù)責(zé)Split到Driver的調(diào)度,并把split和TaskExecutor關(guān)聯(lián)起來, 通過創(chuàng)建一個(gè)TaskHandle放到TaskExecutor的隊(duì)列中去等待執(zhí)行。在創(chuàng)建TaskHandle時(shí),會(huì)添加一個(gè)Listener,如果這個(gè)Task的狀態(tài)被設(shè)置為Terminating或者Done時(shí),就會(huì)調(diào)用TaskExecutor中的removeTask方法來真正的取消這個(gè)task任務(wù)
TaskExecutor
TaskExecutor是具體執(zhí)行Task任務(wù)的地方。他有一個(gè)線程池,線程池的大小可以通過參數(shù)配置,初始化的時(shí)候就會(huì)創(chuàng)建出對(duì)應(yīng)個(gè)數(shù)的線程,每個(gè)線程執(zhí)行一個(gè)TaskRunner,在TaskRunner中,while循環(huán)從waitingSplit中拿到Split,放到runningSplit中并執(zhí)行這個(gè)split。前面提到的StuckSplitTasksInterrupter中獲取的runningSplit信息就是從這個(gè)地方更新的。而waitingSplit中的split就是在上面提到的SqlTaskExecution中放進(jìn)去的。