Part III: Distributing MapReduce tasks
要干什么?
前面的都是串行执行MRTask, 这次我们要用多线程模拟分布式环境, 来进行分布式MR.
具体的任务是: 完善schedule.go: schedule()
:
- 从registerChan读取已注册worker, 它会返回一个包含worker的rpc地址的字符串.
- 给每一个worker分配一系列任务
- 等待所有Task完成后, 返回
- schedule()应该使用全部worker, 有一些worker可能在schedule() 执行时才启动.
- schedul()通过
Worker.DoTask()
让worker执行任务.
前置条件
因为设计到并发编程, 所以我们可能要用到:
goroutine
channel
- go的RPC库, 用来和Worker通信
sync.WaitGroup
- Go的race detector.
-
select
语句, 用来检查超时
我们还要了解如下文件:
mapreduce/common_rpc.go
mapreduce/master.go
mapreduce/worker.go
代码构思
为每个worker分配若干个task
这是作者最初的思路:
- 创建布尔数组追踪每个job是否完成
- 对每个worker, 用goroutine调用call()来分配Task
- 对于每个call(), 设定timeout, 如果timeout内返回true, 则标记该Task完成; 否则重新分配该Task给另一个worker.
- 如果所有Task完成, 则break.
- 注意:
registerChan
返回的是已注册的worker的RPC地址,
不等y于空闲的worker
!要自己管理这些worker!
这里笔者把问题想复杂了, 导致代码一团糟, 且出现很多多线程bug. 实验P3前提是分布式无差错环境, 不用考虑容错。
为每个task分配一个worker
参考了这篇博客。
一个重要的思路: 每个worker完成task后, 将其名字放入registerChan
, 日后再用.
一个小坑: 最后一个goroutine中把名字放入chan, 这时没人来取它了, 会导致阻塞。
通过把:
registerChan <- workerName // 阻塞, 会导致任务完成但goroutine阻塞不返回
改为:
// 最后一个task时会阻塞但是没问题, 主线程退出, 它也就结束了。
go func(){
registerChan <- workerName
}
代码:
RPCMethodName := "Worker.DoTask"
var wg sync.WaitGroup
// For each task, assign it to a worker.
// Not for each worker , assign many tasks to it.
for i := 0; i < ntasks; i++ {
workerName := <-registerChan
go func(TaskIndex int, waitG *sync.WaitGroup) {
waitG.Add(1)
defer waitG.Done()
args := DoTaskArgs{
JobName: jobName,
File: mapFiles[TaskIndex],
Phase: phase,
TaskNumber: TaskIndex,
NumOtherPhase: nOther,
}
call(workerName, RPCMethodName, args, nil)
// For the last task, the goroutine will block. But it will be killed while main exiting.
go func() {
registerChan <- workerName
}()
return
}(i, &wg)
}
wg.Wait()
可以通过测试.