Part I: Map/Reduce input and output
添加代码:
common_map.go: doMap()
common_reduce.go: doReduce()
完成后测试:
cd "<project_position>/src/mapreduce"
go test -run Sequential
我们需要事先掌握如下:
- 保存读取json文件
- 文件输入输出.
- defer语句, 用于释放资源.
common_map.go: doMap()
要干什么?
- doMap()管理一个Map Task.
- 调用mapF()将输入文件的内容变成一个
[]KeyValue
. - 用划分函数将mapF()的输出划分到
nReduce
个中间文件. - 每个中间文件命由
<JobName>-<mapTask>-<reduceTask>
三部分组成.
代码构思:
- 创建
nReduce
个json文件, 将其句柄放入jsonArr
这个数组中, 其类型为[][]KeyValue
. - 读取文件将其内容转为String, 输入mapF()得到输出kvList.
- 对于kvList中每个kv, 用
index = ihash(kv.Key) % nReduce
来将其加入数组jsonArr[index]
中. - 把
jsonArr
每一个数组jsonArr[i]
保存到文件jobName-mapTask-i
中(调用reduceName(..)
). - 中间文件的格式由自己设计, 只要
doReduce()
能够正常decode就好.
实现:
func doMap(
jobName string, // the name of the MapReduce job
mapTask int, // which map task this is
inFile string,
nReduce int, // the number of reduce task that will be run ("R" in the paper)
mapF func(filename string, contents string) []KeyValue,
) {
contentsByte, err := ioutil.ReadFile(inFile)
if err != nil {
log.Fatal(err)
}
contents := string(contentsByte)
kvList := mapF(inFile, contents)
jsonArr := make([][]KeyValue, nReduce)
for _, kv := range kvList {
index := ihash(kv.Key) % nReduce // Key or Value ?
jsonArr[index] = append(jsonArr[index], kv)
}
// Save as json file.
for i := 0; i < nReduce; i++ {
filename := reduceName(jobName, mapTask, i)
file, err := os.Create(filename)
if err != nil {
log.Fatal(err)
}
enc := json.NewEncoder(file)
ok := enc.Encode(&jsonArr[i])
if ok != nil {
log.Fatal(ok)
}
file.Close()
}
}
common_reduce.go: doReduce()
要干什么?
-
doReduce()
管理一个reduce任务r
. - 从每一个MapTask的
ReduceTask
编号为r
的中间文件中(共nMap
个), 读取kv对, 转为String. - 将kv对按照key排序.
- 对每个不同的key调用
reduceF()
, 将输出结果写入磁盘
调用sort
包来排序, 教程.
实现
func doReduce(
jobName string, // the name of the whole MapReduce job
reduceTask int, // which reduce task this is
outFile string, // write the output here
nMap int, // the number of map tasks that were run ("M" in the paper)
reduceF func(key string, values []string) string,
) {
table := make(map[string][]string)
// Read nMap immediate files.
for i := 0; i < nMap; i++ {
file, err := os.Open(reduceName(jobName, i, reduceTask))
if err != nil {
log.Fatal(err)
}
enc := json.NewDecoder(file)
var tmp []KeyValue
decErr := enc.Decode(&tmp)
if decErr != nil {
log.Fatal(decErr)
}
for _, kv := range tmp {
table[kv.Key] = append(table[kv.Key], kv.Value)
}
file.Close()
}
// Sort them by Key
sortedKeyArr := make([]string, 0)
for k := range table {
sortedKeyArr = append(sortedKeyArr, k)
}
sort.Strings(sortedKeyArr)
outputFile, err := os.Create(outFile)
if err != nil {
log.Fatal(err)
}
defer outputFile.Close()
outputJSON := json.NewEncoder(outputFile)
for i := 0; i < len(sortedKeyArr); i++ {
key := sortedKeyArr[i]
val := table[key]
kv := KeyValue{key, reduceF(key, val)}
encErr := outputJSON.Encode(kv)
if encErr != nil {
log.Fatal(encErr)
}
}
}
可通过测试.