6.824 Lab1

6.824 的 Lab1 是关于 MapReduct 的一个实验。 Lab1 的详细内容可以参考这篇文章。另外,关于如何 Git 和 Go 的环境搭建比较简单,不再赘述。

6.824 的 Lab 提供了一些骨架代码,可以省去整个程序框架的搭建以及非核心代码的编写。可以通过如下命令 clone 到本地:

git clone git://g.csail.mit.edu/6.824-golabs-2018 6.824

Lab 提供给了一个非常详细的指导说明,建议在实验前完整的阅读一遍。

代码中的 mapreduce 包提供了一个简单的 Map/Reduce 库,应用可以通过调用 Distributed() (master.go) 函数来启动一个作业,也可以通过调用 Sequential() (master.go) 函数进行顺序化的执行,方便进行 debug 。

Part I

在这一部分,我们需要实现几个函数来完成 Map/Reduce 过程,主要修改 common_map.go 中的 doMap() 函数、 common_reduct.go 中的 doReduce() 函数。

doMap() 函数读取 inFile 文件中的内容,调用 mapF() 函数生成一个 KeyValue 数组,并将 KeyValue 内容写到输出的中间文件,每个 reducer 都会生成一个对应的中间文件。

func doMap(
jobName string, // the name of the MapReduce job
mapTask int, // which map task this is
inFile string,
nReduce int, // the number of reduce task that will be run ("R" in the paper)
mapF func(filename string, contents string) []KeyValue,
) {
bs, err := ioutil.ReadFile(inFile)
if err != nil {
log.Fatal(err)
}

content := string(bs)
keyValues := mapF(inFile, content)

reduceFiles := make([]*os.File, nReduce)
for i := 0; i < nReduce; i++ {
reduceFileName := reduceName(jobName, mapTask, i)
reduceFiles[i], _ = os.Create(reduceFileName)
defer reduceFiles[i].Close()
}

for _, keyValue := range keyValues {
index := ihash(keyValue.Key) % nReduce
enc := json.NewEncoder(reduceFiles[index])
err := enc.Encode(&keyValue)
if err != nil {
log.Fatal(err)
}
}
}

doReduce() 函数读取其对应的中间文件,并按 key 对文件中的 KeyValue 对进行排序,然后调用用户定义的 reduceF() 函数,并将最终结果写到文件中。

func doReduce(
jobName string, // the name of the whole MapReduce job
reduceTask int, // which reduce task this is
outFile string, // write the output here
nMap int, // the number of map tasks that were run ("M" in the paper)
reduceF func(key string, values []string) string,
) {

outResults := make(map[string] []string)
for i := 0; i < nMap; i++ {
reduceFileName := reduceName(jobName, i, reduceTask)
file, err := os.Open(reduceFileName)
if err != nil {
log.Fatal(err)
}
defer file.Close()

dec := json.NewDecoder(file)
for {
var kv KeyValue
err := dec.Decode(&kv)
if err != nil {
break;
}
_, ok := outResults[kv.Key]
if !ok {
outResults[kv.Key] = make([]string, 0)
}
outResults[kv.Key] = append(outResults[kv.Key], kv.Value)
}
}

var keys []string
for k, _ := range outResults {
keys = append(keys, k)
}
sort.Strings(keys)

mergedFile, _ := os.Create(outFile)
defer mergedFile.Close()

enc := json.NewEncoder(mergedFile)
for _, k := range keys {
enc.Encode(&KeyValue{k, reduceF(k, outResults[k])})
}
}

完成 doMap()doReduce() 两个函数之后,可以通过执行 test_test.go 中提供的测试用例,来验证正确性。

$ cd 6.824
$ export "GOPATH=$PWD" # go needs $GOPATH to be set to the project's working directory
$ cd "$GOPATH/src/mapreduce"
$ go test -run Sequential
ok mapreduce 2.694s

Part II

接下来要实现一个非常知名的 Map/Reduce 例子 Word Count。这步主要修改 main/wc.go 文件,需要实现 mapF()reduceF() 两个函数。

func mapF(filename string, contents string) []mapreduce.KeyValue {
// Your code here (Part II).
words := strings.FieldsFunc(contents, func(r rune) bool {
return !unicode.IsLetter(r)
})
res := make([]mapreduce.KeyValue, 0)
for _, v := range words {
kv := mapreduce.KeyValue{v, "1"}
res = append(res, kv)
}
return res
}

func reduceF(key string, values []string) string {
// Your code here (Part II).
return strconv.Itoa(len(values))
}

之后执行 main/test-wc.sh ,验证正确性。

$ cd "$GOPATH/src/main"
$ bash ./test-wc.sh
master: Starting Map/Reduce task wcseq
Merge phaseMerge: read mrtmp.wcseq-res-0
Merge: read mrtmp.wcseq-res-1
Merge: read mrtmp.wcseq-res-2
master: Map/Reduce task completed
Passed test

Part III

在 PartII 中实现了一个单节点的 Map/Reduce 任务,实际上,Map/Reduce 最大的卖点就是其分布式特性,可以在使用者无感知下将任务分布在集群中多个节点中并行执行。在 Part III 中实现分布式这一特性,主要是将任务在多核 CPU 上并行执行来模拟分布在多台机器。

这一部分主要修改 mapreduce/schedule.go 文件中的 schedule() 函数。

func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
var n_other int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mapFiles)
n_other = nReduce
case reducePhase:
ntasks = nReduce
n_other = len(mapFiles)
}

fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)

// All ntasks tasks have to be scheduled on workers. Once all tasks
// have completed successfully, schedule() should return.
//
// Your code here (Part III, Part IV).
//
var wg sync.WaitGroup
for i := 0; i < ntasks; i++ {
wg.Add(1)
doTaskArgs := DoTaskArgs{jobName, mapFiles[i], phase, i, n_other}
go func (doTaskArgs DoTaskArgs, registerChan chan string) {
worker := <- registerChan
ok := call(worker, "Worker.DoTask", doTaskArgs, nil)
if ok {
go func() {
registerChan <- worker
} ()
}
wg.Done()
} (doTaskArgs, registerChan)
}
wg.Wait()
fmt.Printf("Schedule: %v done\n", phase)
}

Part IV

这部分是错误处理,当 call 函数的调用返回 false 的时候,需要把当前 task 交给其他的 worker 处理。替换上面 schedule 函数中 call 部分:

...
var wg sync.WaitGroup
for i := 0; i < ntasks; i++ {
wg.Add(1)
doTaskArgs := DoTaskArgs{jobName, mapFiles[i], phase, i, n_other}
go func (doTaskArgs DoTaskArgs, registerChan chan string) {
worker := <- registerChan
for !call(worker, "Worker.DoTask", doTaskArgs, nil) {
worker = <-registerChan
}
go func() {
registerChan <- worker
} ()
wg.Done()
} (doTaskArgs, registerChan)
}
wg.Wait()
...

Part V

这部分是实现一个倒排索引,修改 ii.go 这个文件。

func mapF(document string, value string) (res []mapreduce.KeyValue) {
// Your code here (Part V).
words := strings.FieldsFunc(value, func(r rune) bool {
return !unicode.IsLetter(r)
})
wordsSet := make(map[string]bool)
for _, word := range words {
wordsSet[word] = true
}
for word, _ := range wordsSet {
res = append(res, mapreduce.KeyValue{Key: word, Value:document})
}
return
}

func reduceF(key string, values []string) string {
// Your code here (Part V).
sort.Strings(values)
return strconv.Itoa(len(values)) + " " + strings.Join(values, ",")
}