概述
这次我们要处理worker故障.
- master通过调用
common_rpc.go: call()
来分配任务.当超时发生, 该函数 返回false
, 这时候master要把该任务重新分配给另一个worker. -
common_rpc.go: call()
返回false
不等于执行失败, worker可能执行得比较慢导致超时.这样可能导致, 实际上有多个worker在执行同一个task(对于master来说, 只有一个). - 此外, mapreduce通过GFS保证task的输出是完整的和原子的(要么全部文件都输出, 要么都不输出), 这个实验并没有实现这个保证, 之间简单地令worker crash.
问题
面对worker故障, 我们应该如何调整schedule()的代码?
当所有task都被完成了, schedule就结束, 那么如何跟踪每个task的状态?
有别于论文, 实验中可能出现的故障有(按阶段分类):
- 执行map阶段发现worker crash, 重新分配task
- map阶段结束, 中间输出已写入到本地disk, mapper不可访问, 但由于实验是多线程模拟分布式环境, 所有内容都是可见的, 这些中间结果也是可访问的.
- 执行reduce任务出错, 重新分配该任务即可。
- reduce执行完毕, reducer出故障, 结果也是可访问的, 所以不予考虑.
- worker向disk输出结果(包括中间文件、job result)出错。这里没有GFS, 只是简单地让worker crash。所以也等价于执行task中出错。
所以我们只需要考虑: task执行失败如何处理。
实现
我们首先看test_test.go
文件:
func TestOneFailure(t *testing.T) {
mr := setup()
// Start 2 workers that fail after 10 tasks
go RunWorker(mr.address, port("worker"+strconv.Itoa(0)),
MapFunc, ReduceFunc, 10, nil)
go RunWorker(mr.address, port("worker"+strconv.Itoa(1)),
MapFunc, ReduceFunc, -1, nil)
mr.Wait()
check(t, mr.files)
checkWorker(t, mr.stats)
cleanup(mr)
}
func TestManyFailures(t *testing.T) {
mr := setup()
i := 0
done := false
for !done {
select {
case done = <-mr.doneChannel:
check(t, mr.files)
cleanup(mr)
break
default:
// Start 2 workers each sec. The workers fail after 10 tasks
w := port("worker" + strconv.Itoa(i))
go RunWorker(mr.address, w, MapFunc, ReduceFunc, 10, nil)
i++
w = port("worker" + strconv.Itoa(i))
go RunWorker(mr.address, w, MapFunc, ReduceFunc, 10, nil)
i++
time.Sleep(1 * time.Second)
}
}
}
TestOneFailure()
使得worker0在10个rpc后dump掉,
TestManyFailure()
每秒启动2两个worker, 都是在10个rpc后崩掉, 模拟频繁故障的场景.
第一个测试导致如下错误:
Schedule: mapPhase done
Schedule: 10 reducePhase tasks (20 I/Os)
/var/tmp/824-1000/mr28342-worker1: given reducePhase task #0 on file 824-mrinput-0.txt (nios: 20)
/var/tmp/824-1000/mr28342-worker1: reducePhase task #0 done
Schedule: reducePhase done
Master: RPC /var/tmp/824-1000/mr28342-worker0 shutdown error
Merge: read mrtmp.test-res-0
Merge: read mrtmp.test-res-1
2019/09/29 15:38:28 Merge: open mrtmp.test-res-1: no such file or directory
我们看到, Master提示worker0 shutdown errot
后, 就开始调用Merge().
这句提示来自master.go: KillWorkers()
, 其关键部分如下:
var reply ShutdownReply
ok := call(w, "Worker.Shutdown", new(struct{}), &reply)
if ok == false {
fmt.Printf("Master: RPC %s shutdown error\n", w)
} else {
ntasks = append(ntasks, reply.Ntasks)
}
当call()
返回false时, 就表示Worker关闭出错. 正常情况下它返回Worker
完成的task数量.
通过调试发现, 在reduce阶段, 除了worker1分配到了一个任务外, 每次迭代从registerChan
中取到的都是worker0的rpc地址.
根据P3的实现, 直接调用call()
而没有处理返回值, 所以即使call()
发现超时, 也只会正常返回并且把worker0的地址放入registerChan
, 所以worker0的名字一直霸占着registerChan
.
实现1
如下代码, 用一个[]bool
跟踪task完成情况, 通过goroutine和chan来接受完成的Task的索引.
实现1没有实现如何重新分配task, 所以不能通过测试。
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
// ...
var (
wg sync.WaitGroup
RPCMethodName = "Worker.DoTask"
TaskTracker = make([]bool, ntasks) // should be protected by mutex
inform = make(chan int) // get index of task done .
)
// ------------------------------------------------------------------ init
for i := 0; i < ntasks; i++ {
TaskTracker[i] = false
}
// ------------------------------------------------------------------ tracker.
go func(tracker *[]bool, informChan chan int) {
NumTaskDone := 0
for {
i := <-informChan // get index of task finished.
if !TaskTracker[i] {
TaskTracker[i] = true
NumTaskDone++
log.Println("Task i = ", i, " done.")
}
if NumTaskDone >= ntasks {
break
}
}
log.Println("All task done.")
}(&TaskTracker, inform)
for i := 0; i < ntasks; i++ {
if !TaskTracker[i] {
workerName := <-registerChan
go func(TaskIndex int, waitG *sync.WaitGroup, InformChan chan int) {
waitG.Add(1)
defer waitG.Done()
args := DoTaskArgs{
JobName: jobName,
File: mapFiles[TaskIndex],
Phase: phase,
TaskNumber: TaskIndex,
NumOtherPhase: nOther,
}
isDone := call(workerName, RPCMethodName, args, nil)
if isDone {
go func() {
registerChan <- workerName
InformChan <- TaskIndex // ---------------------- New
}()
}
return
}(i, &wg, inform)
}
}
wg.Wait()
fmt.Printf("Schedule: %v done\n", phase)
}
实现2
在实现1基础上添加重新分配Task
功能.
构思如下:
不需要采用互斥锁, main派生出dispatcher和tracker两个goroutine。这两个线程通过同步chan与Controller形成MPSC队列,来对TaskTracker进行访问。
代码:
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
var nOther int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mapFiles)
nOther = nReduce
case reducePhase:
ntasks = nReduce
nOther = len(mapFiles)
}
fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, nOther)
// All ntasks tasks have to be scheduled on workers. Once all tasks
// have completed successfully, schedule() should return.
//
// Your code here (Part III, Part IV).
//
var (
wg sync.WaitGroup
RPCMethodName = "Worker.DoTask"
TaskTracker = make([]bool, ntasks) // should be protected by mutex
TaskDoneChan = make(chan int) // get index of task done .
ToDoChan = make(chan int)
NumTaskDone = 0
)
// init
for i := 0; i < ntasks; i++ {
TaskTracker[i] = false
}
// --------------------------------------------------------------- tracker.
go func() {
for {
i := <-TaskDoneChan // get index of task finished.
TaskState := TaskTracker[i]
if TaskState == false {
TaskTracker[i] = true
NumTaskDone++
log.Println("Task i = ", i, " done.")
}
if NumTaskDone >= ntasks {
break
}
}
log.Println("All task done.")
}()
// --------------------------------------------------------------- Dispatcher.
go func() {
for {
i := <-ToDoChan
// We should locking it.
TaskState := TaskTracker[i]
if TaskState == false {
workerName := <-registerChan
// Controller.
go func(TaskIndex int, waitG *sync.WaitGroup) {
waitG.Add(1)
defer waitG.Done()
//log.Println("Assign task", i, " to ", workerName)
args := DoTaskArgs{
JobName: jobName,
File: mapFiles[TaskIndex],
Phase: phase,
TaskNumber: TaskIndex,
NumOtherPhase: nOther,
}
isDone := call(workerName, RPCMethodName, args, nil)
if isDone {
go func() {
registerChan <- workerName
TaskDoneChan <- TaskIndex
}()
// set TaskTracker is done.
//go func(){
} else { // The worker may crashed.
go func() {
ToDoChan <- i
}()
}
return
}(i, &wg)
}
}
}()
for i := 0; i < ntasks; i++ {
ToDoChan <- i
}
// In case of main exit before dispatcher and tracker start working.
time.Sleep(time.Duration(100) * time.Millisecond)
wg.Wait()
fmt.Printf("Schedule: %v done\n", phase)
}
通过测试:
/var/tmp/824-1000/mr10741-master: Map/Reduce task completed
PASS
ok mapreduce 3.255s