MIT6824-Lab1-P4

概述

这次我们要处理worker故障.

  • master通过调用common_rpc.go: call()来分配任务.当超时发生, 该函数 返回false, 这时候master要把该任务重新分配给另一个worker.
  • common_rpc.go: call()返回false不等于执行失败, worker可能执行得比较慢导致超时.这样可能导致, 实际上有多个worker在执行同一个task(对于master来说, 只有一个).
  • 此外, mapreduce通过GFS保证task的输出是完整的和原子的(要么全部文件都输出, 要么都不输出), 这个实验并没有实现这个保证, 之间简单地令worker crash.

问题

面对worker故障, 我们应该如何调整schedule()的代码?

当所有task都被完成了, schedule就结束, 那么如何跟踪每个task的状态?

有别于论文, 实验中可能出现的故障有(按阶段分类):

  • 执行map阶段发现worker crash, 重新分配task
  • map阶段结束, 中间输出已写入到本地disk, mapper不可访问, 但由于实验是多线程模拟分布式环境, 所有内容都是可见的, 这些中间结果也是可访问的.
  • 执行reduce任务出错, 重新分配该任务即可。
  • reduce执行完毕, reducer出故障, 结果也是可访问的, 所以不予考虑.
  • worker向disk输出结果(包括中间文件、job result)出错。这里没有GFS, 只是简单地让worker crash。所以也等价于执行task中出错。

所以我们只需要考虑: task执行失败如何处理。

实现

我们首先看test_test.go文件:

func TestOneFailure(t *testing.T) {
    mr := setup()
    // Start 2 workers that fail after 10 tasks
    go RunWorker(mr.address, port("worker"+strconv.Itoa(0)),
        MapFunc, ReduceFunc, 10, nil)
    go RunWorker(mr.address, port("worker"+strconv.Itoa(1)),
        MapFunc, ReduceFunc, -1, nil)
    mr.Wait()
    check(t, mr.files)
    checkWorker(t, mr.stats)
    cleanup(mr)
}

func TestManyFailures(t *testing.T) {
    mr := setup()
    i := 0
    done := false
    for !done {
        select {
        case done = <-mr.doneChannel:
            check(t, mr.files)
            cleanup(mr)
            break
        default:
            // Start 2 workers each sec. The workers fail after 10 tasks
            w := port("worker" + strconv.Itoa(i))
            go RunWorker(mr.address, w, MapFunc, ReduceFunc, 10, nil)
            i++
            w = port("worker" + strconv.Itoa(i))
            go RunWorker(mr.address, w, MapFunc, ReduceFunc, 10, nil)
            i++
            time.Sleep(1 * time.Second)
        }
    }
}

TestOneFailure() 使得worker0在10个rpc后dump掉,

TestManyFailure()每秒启动2两个worker, 都是在10个rpc后崩掉, 模拟频繁故障的场景.

第一个测试导致如下错误:

Schedule: mapPhase done
Schedule: 10 reducePhase tasks (20 I/Os)
/var/tmp/824-1000/mr28342-worker1: given reducePhase task #0 on file 824-mrinput-0.txt (nios: 20)
/var/tmp/824-1000/mr28342-worker1: reducePhase task #0 done
Schedule: reducePhase done
Master: RPC /var/tmp/824-1000/mr28342-worker0 shutdown error
Merge: read mrtmp.test-res-0
Merge: read mrtmp.test-res-1
2019/09/29 15:38:28 Merge: open mrtmp.test-res-1: no such file or directory

我们看到, Master提示worker0 shutdown errot后, 就开始调用Merge().
这句提示来自master.go: KillWorkers(), 其关键部分如下:

var reply ShutdownReply
ok := call(w, "Worker.Shutdown", new(struct{}), &reply)
if ok == false {
    fmt.Printf("Master: RPC %s shutdown error\n", w)
} else {
    ntasks = append(ntasks, reply.Ntasks)
}

call()返回false时, 就表示Worker关闭出错. 正常情况下它返回Worker完成的task数量.

通过调试发现, 在reduce阶段, 除了worker1分配到了一个任务外, 每次迭代从registerChan中取到的都是worker0的rpc地址.

根据P3的实现, 直接调用call()而没有处理返回值, 所以即使call()发现超时, 也只会正常返回并且把worker0的地址放入registerChan, 所以worker0的名字一直霸占着registerChan.

实现1

如下代码, 用一个[]bool跟踪task完成情况, 通过goroutine和chan来接受完成的Task的索引.
实现1没有实现如何重新分配task, 所以不能通过测试。

func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {

    // ...

    var (
        wg            sync.WaitGroup
        RPCMethodName = "Worker.DoTask"
        TaskTracker   = make([]bool, ntasks) // should be protected by mutex
        inform        = make(chan int)       // get index of task done .
    )
    // ------------------------------------------------------------------ init
    for i := 0; i < ntasks; i++ {
        TaskTracker[i] = false
    }
    // ------------------------------------------------------------------ tracker.
    go func(tracker *[]bool, informChan chan int) {
        NumTaskDone := 0
        for {
            i := <-informChan // get index of task finished.
            if !TaskTracker[i] {
                TaskTracker[i] = true
                NumTaskDone++
                log.Println("Task i = ", i, " done.")
            }
            if NumTaskDone >= ntasks {
                break
            }
        }
        log.Println("All task done.")

    }(&TaskTracker, inform)

    for i := 0; i < ntasks; i++ {
        if !TaskTracker[i] {
            workerName := <-registerChan
            go func(TaskIndex int, waitG *sync.WaitGroup, InformChan chan int) {
                waitG.Add(1)
                defer waitG.Done()
                args := DoTaskArgs{
                    JobName:       jobName,
                    File:          mapFiles[TaskIndex],
                    Phase:         phase,
                    TaskNumber:    TaskIndex,
                    NumOtherPhase: nOther,
                }
                isDone := call(workerName, RPCMethodName, args, nil)
                if isDone {
                    go func() {
                        registerChan <- workerName
                        InformChan <- TaskIndex   // ---------------------- New
                    }()
                } 
                return
            }(i, &wg, inform)
        }
    }

    wg.Wait()

    fmt.Printf("Schedule: %v done\n", phase)
}

实现2

在实现1基础上添加重新分配Task功能.

构思如下:

不需要采用互斥锁, main派生出dispatcher和tracker两个goroutine。这两个线程通过同步chan与Controller形成MPSC队列,来对TaskTracker进行访问。

代码:

func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
    var ntasks int
    var nOther int // number of inputs (for reduce) or outputs (for map)
    switch phase {
    case mapPhase:
        ntasks = len(mapFiles)
        nOther = nReduce
    case reducePhase:
        ntasks = nReduce
        nOther = len(mapFiles)
    }

    fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, nOther)

    // All ntasks tasks have to be scheduled on workers. Once all tasks
    // have completed successfully, schedule() should return.
    //
    // Your code here (Part III, Part IV).
    //

    var (
        wg            sync.WaitGroup
        RPCMethodName = "Worker.DoTask"
        TaskTracker   = make([]bool, ntasks) // should be protected by mutex
        TaskDoneChan  = make(chan int)       // get index of task done .
        ToDoChan      = make(chan int)
        NumTaskDone   = 0

    )
    // init
    for i := 0; i < ntasks; i++ {
        TaskTracker[i] = false
    }
    // --------------------------------------------------------------- tracker.
    go func() {
        for {

            i := <-TaskDoneChan // get index of task finished.
            TaskState := TaskTracker[i]
            if TaskState == false {
                TaskTracker[i] = true
                NumTaskDone++
                log.Println("Task i = ", i, " done.")
            }
            if NumTaskDone >= ntasks {
                break
            }
        }
        log.Println("All task done.")

    }()

    // --------------------------------------------------------------- Dispatcher.
    go func() {
        for {
            i := <-ToDoChan
            // We should locking it.
            TaskState := TaskTracker[i]
            if TaskState == false {
                workerName := <-registerChan
                // Controller.
                go func(TaskIndex int, waitG *sync.WaitGroup) {
                    waitG.Add(1)
                    defer waitG.Done()
                    //log.Println("Assign task", i, " to ", workerName)
                    args := DoTaskArgs{
                        JobName:       jobName,
                        File:          mapFiles[TaskIndex],
                        Phase:         phase,
                        TaskNumber:    TaskIndex,
                        NumOtherPhase: nOther,
                    }
                    isDone := call(workerName, RPCMethodName, args, nil)
                    if isDone {
                        go func() {
                            registerChan <- workerName
                            TaskDoneChan <- TaskIndex
                        }()
                        // set TaskTracker is done.
                        //go func(){
                    } else { // The worker may crashed.
                        go func() {
                            ToDoChan <- i
                        }()
                    }

                    return
                }(i, &wg)
            }
        }
    }()

    for i := 0; i < ntasks; i++ {
        ToDoChan <- i
    }

    // In case of main exit before dispatcher and tracker start working.
    time.Sleep(time.Duration(100) * time.Millisecond)

    wg.Wait()

    fmt.Printf("Schedule: %v done\n", phase)
}

通过测试:

/var/tmp/824-1000/mr10741-master: Map/Reduce task completed
PASS
ok      mapreduce    3.255s
    原文作者:Tsukami
    原文地址: https://segmentfault.com/a/1190000020548111
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞