在此记录一下对Mit-6.5840的lab学习
系列文章链接:lab1:
个人实现参考
本次实验的主要任务是在考虑RPC消息丢失,重复请求和内存释放的情况下,完成一些基础对map的基础操作。
实验内容:实验内容分为三个部分commen.go,client.go,server.go
1、server.go中实现了请求的具体操作,处理重复请求,处理内存释放(根据实验提示,每次释放上一个requestId的内存)。主要是维护一个map(map[int]PutAppendReply)记录请求id和其对应的value的关系,用这个map同时实现判重和记录正确返回值的任务
type KVServer struct {
mu sync.Mutex
kvMap map[string]string
pushAppendCache map[int]PutAppendReply
}
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
kv.mu.Lock()
defer kv.mu.Unlock()
value, ok := kv.kvMap[args.Key]
if ok {
reply.Value = value
} else {
reply.Value = ""
}
}
func (kv *KVServer) Put(args *PutAppendArgs, reply *PutAppendReply) {
kv.mu.Lock()
defer kv.mu.Unlock()
if cachedReply, ok := kv.pushAppendCache[args.RequestId]; ok {
*reply = cachedReply
return
}
key := args.Key
kv.kvMap[key] = args.Value
kv.pushAppendCache[args.RequestId] = *reply
if args.RequestId > 0 {
delete(kv.pushAppendCache, args.RequestId-1)
}
}
// appends arg to key's value and returns the old value,
func (kv *KVServer) Append(args *PutAppendArgs, reply *PutAppendReply) {
kv.mu.Lock()
defer kv.mu.Unlock()
if cachedReply, ok := kv.pushAppendCache[args.RequestId]; ok {
*reply = cachedReply
return
}
key := args.Key
reply.Value = kv.kvMap[key]
kv.kvMap[key] += args.Value
reply.ReplyId = args.RequestId
kv.pushAppendCache[args.RequestId] = *reply
if args.RequestId > 0 {
delete(kv.pushAppendCache, args.RequestId-1) // 清理上一个请求
}
}
2、client.go中主要是调用server和维护一个全局唯一的id来标识请求
type Clerk struct {
server *labrpc.ClientEnd
requestId int
}
func nrand() int {
max := big.NewInt(int(1) << 62)
bigx, _ := rand.Int(rand.Reader, max)
x := bigx.Int()
return x
}
func MakeClerk(server *labrpc.ClientEnd) *Clerk {
ck := new(Clerk)
ck.server = server
ck.requestId = nrand()
return ck
}
// fetch the current value for a key.
// returns "" if the key does not exist.
// keeps trying forever in the face of all other errors.
//
// you can send an RPC with code like this:
// ok := ck.server.Call("KVServer.Get", &args, &reply)
//
// the types of args and reply (including whether they are pointers)
// must match the declared types of the RPC handler function's
// arguments. and reply must be passed as a pointer.
func (ck *Clerk) Get(key string) string {
args := GetArgs{Key: key}
reply := GetReply{}
for {
ok := ck.server.Call("KVServer.Get", &args, &reply)
if ok {
return reply.Value
}
}
return ""
}
// shared by Put and Append.
//
// you can send an RPC with code like this:
// ok := ck.server.Call("KVServer."+op, &args, &reply)
//
// the types of args and reply (including whether they are pointers)
// must match the declared types of the RPC handler function's
// arguments. and reply must be passed as a pointer.
func (ck *Clerk) PutAppend(key string, value string, op string) string {
args := PutAppendArgs{Key: key, Value: value, RequestId: ck.requestId}
reply := PutAppendReply{}
for {
ok := ck.server.Call("KVServer."+op, &args, &reply)
if ok {
ck.requestId++
return reply.Value
} else {
time.Sleep(100 * time.Millisecond) // 添加100毫秒的休眠时间
}
}
return ""
}
实验问题记录:
最开始遇到的问题是
--- FAIL: TestMemGet2 (0.47s)
test_test.go:404: error: server using too much memory 10
单次get请求内存一直溢出,当时的代码:
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
kv.mu.Lock()
defer kv.mu.Unlock()
if cachedReply, ok := kv.getCache[args.RequestId]; ok {
*reply = cachedReply
return
}
value, ok := kv.kvMap[args.Key]
if ok {
reply.Value = value
} else {
reply.Value = ""
}
kv.getCache[args.RequestId] = *reply
if len(kv.getCache) > 10 {
for k := range kv.getCache {
delete(kv.getCache, k)
if len(kv.getCache) <= 10 {
break
}
}
}
}
遇到这个第一反应就是去看了下内存使用情况,在test.go文件中引入了pprof去查看内存情况:
go tool pprof http://localhost:6060/debug/pprof/heap
go func() {
log.Println("Starting pprof server on http://localhost:6060")
if err := http.ListenAndServe("localhost:6060", nil); err != nil {
log.Fatalf("pprof server failed to start: %v", err)
}
}()
可以看到gob.Decoder 的 decString 方法占用了 10240kB,约占总内存使用的 95.24%,这是一个显著的内存占用点。
gob.Decoder 的 decString 方法与字段的数量和类型有关,发现Get方法RPC参数是有冗余的,就算重复调用get方法也不需要Id标识去判重和存储信息,去掉Get相关结构中的Id变量后便能通过测试。