您好,欢迎来到九壹网。
搜索
您的当前位置:首页Mit-6.5840(6.824)分布式系统课程 lab2 KV服务器

Mit-6.5840(6.824)分布式系统课程 lab2 KV服务器

来源:九壹网

在此记录一下对Mit-6.5840的lab学习

系列文章链接:lab1:

个人实现参考

本次实验的主要任务是在考虑RPC消息丢失,重复请求和内存释放的情况下,完成一些基础对map的基础操作。

实验内容:实验内容分为三个部分commen.go,client.go,server.go

1、server.go中实现了请求的具体操作,处理重复请求,处理内存释放(根据实验提示,每次释放上一个requestId的内存)。主要是维护一个map(map[int]PutAppendReply)记录请求id和其对应的value的关系,用这个map同时实现判重和记录正确返回值的任务

type KVServer struct {
	mu              sync.Mutex
	kvMap           map[string]string
	pushAppendCache map[int]PutAppendReply
}

func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
	kv.mu.Lock()
	defer kv.mu.Unlock()
	value, ok := kv.kvMap[args.Key]
	if ok {
		reply.Value = value
	} else {
		reply.Value = ""
	}
}

func (kv *KVServer) Put(args *PutAppendArgs, reply *PutAppendReply) {
	kv.mu.Lock()
	defer kv.mu.Unlock()
	if cachedReply, ok := kv.pushAppendCache[args.RequestId]; ok {
		*reply = cachedReply
		return
	}
	key := args.Key
	kv.kvMap[key] = args.Value
	kv.pushAppendCache[args.RequestId] = *reply
	if args.RequestId > 0 {
		delete(kv.pushAppendCache, args.RequestId-1)
	}
}

// appends arg to key's value and returns the old value,
func (kv *KVServer) Append(args *PutAppendArgs, reply *PutAppendReply) {
	kv.mu.Lock()
	defer kv.mu.Unlock()
	if cachedReply, ok := kv.pushAppendCache[args.RequestId]; ok {
		*reply = cachedReply
		return
	}
	key := args.Key
	reply.Value = kv.kvMap[key]
	kv.kvMap[key] += args.Value
	reply.ReplyId = args.RequestId
	kv.pushAppendCache[args.RequestId] = *reply
	if args.RequestId > 0 {
		delete(kv.pushAppendCache, args.RequestId-1) // 清理上一个请求
	}

}

2、client.go中主要是调用server和维护一个全局唯一的id来标识请求

type Clerk struct {
	server    *labrpc.ClientEnd
	requestId int
}

func nrand() int {
	max := big.NewInt(int(1) << 62)
	bigx, _ := rand.Int(rand.Reader, max)
	x := bigx.Int()
	return x
}

func MakeClerk(server *labrpc.ClientEnd) *Clerk {
	ck := new(Clerk)
	ck.server = server
	ck.requestId = nrand()
	return ck
}

// fetch the current value for a key.
// returns "" if the key does not exist.
// keeps trying forever in the face of all other errors.
//
// you can send an RPC with code like this:
// ok := ck.server.Call("KVServer.Get", &args, &reply)
//
// the types of args and reply (including whether they are pointers)
// must match the declared types of the RPC handler function's
// arguments. and reply must be passed as a pointer.
func (ck *Clerk) Get(key string) string {
	args := GetArgs{Key: key}
	reply := GetReply{}
	for {
		ok := ck.server.Call("KVServer.Get", &args, &reply)
		if ok {
			return reply.Value
		}
	}
	return ""
}

// shared by Put and Append.
//
// you can send an RPC with code like this:
// ok := ck.server.Call("KVServer."+op, &args, &reply)
//
// the types of args and reply (including whether they are pointers)
// must match the declared types of the RPC handler function's
// arguments. and reply must be passed as a pointer.
func (ck *Clerk) PutAppend(key string, value string, op string) string {
	args := PutAppendArgs{Key: key, Value: value, RequestId: ck.requestId}
	reply := PutAppendReply{}
	for {
		ok := ck.server.Call("KVServer."+op, &args, &reply)
		if ok {
			ck.requestId++
			return reply.Value
		} else {
			time.Sleep(100 * time.Millisecond) // 添加100毫秒的休眠时间
		}
	}
	return ""
}

实验问题记录:

最开始遇到的问题是

--- FAIL: TestMemGet2 (0.47s)
    test_test.go:404: error: server using too much memory 10

单次get请求内存一直溢出,当时的代码:

func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
	kv.mu.Lock()
	defer kv.mu.Unlock()
	if cachedReply, ok := kv.getCache[args.RequestId]; ok {
		*reply = cachedReply
		return
	}
	value, ok := kv.kvMap[args.Key]
	if ok {
		reply.Value = value
	} else {
		reply.Value = ""
	}
	kv.getCache[args.RequestId] = *reply
	if len(kv.getCache) > 10 {
		for k := range kv.getCache {
			delete(kv.getCache, k)
			if len(kv.getCache) <= 10 {
				break
			}
		}
	}
}

遇到这个第一反应就是去看了下内存使用情况,在test.go文件中引入了pprof去查看内存情况:

go tool pprof http://localhost:6060/debug/pprof/heap

	go func() {
		log.Println("Starting pprof server on http://localhost:6060")
		if err := http.ListenAndServe("localhost:6060", nil); err != nil {
			log.Fatalf("pprof server failed to start: %v", err)
		}
	}()

可以看到gob.DecoderdecString 方法占用了 10240kB,约占总内存使用的 95.24%,这是一个显著的内存占用点。

gob.DecoderdecString 方法与字段的数量和类型有关,发现Get方法RPC参数是有冗余的,就算重复调用get方法也不需要Id标识去判重和存储信息,去掉Get相关结构中的Id变量后便能通过测试。

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- 91gzw.com 版权所有 湘ICP备2023023988号-2

违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务