当前位置: 首页 > news >正文

实现kvstore的持久化功能:全量持久化和增量持久化

目录

一、设计思路

二、核心代码

三、测试功能


一、设计思路

数据结构:使用哈希表(链式地址法解决哈希冲突)存储键值对
全量持久化和增量持久化的核心流程:
增删改操作:先写 WAL 日志 → 再更新内存哈希表
Checkpoint:全量持久化内存数据 → 截断 WAL 日志(清空)
重启恢复:加载全量文件 → 重放 WAL 日志 → 恢复增量数据
异步 Checkpoint:后台线程执行全量持久化,避免阻塞前台写操作。

全量持久化(Checkpoint)的触发条件:
后台线程checkpoint_worker循环检查触发条件,满足任一条件则执行 Checkpoint:
定时触发:每隔CHECKPOINT_INTERVAL(30 秒)执行一次。
条件触发:WAL 文件大小≥10MB。

二、核心代码

实现全量持久化的两个函数:

int kvs_hash_persist(hashtable_t *hash) { if (hash == NULL) return -1; FILE *fp = fopen(PERSIST_FILE, "w"); if (fp == NULL) return -1; // 遍历所有哈希桶 for (size_t i = 0; i < hash->max_slots; i++) { hashnode_t *curr = hash->nodes[i]; while (curr != NULL) { // 写入格式:key value\n if (fprintf(fp, "%s %s\n", curr->key, curr->value) < 0) { fclose(fp); return -1; } curr = curr->next; } } fflush(fp);//用户态缓冲区→内核缓冲区 的刷新,参数是流指针FILE * fclose(fp); return 0; } int kvs_hash_load(hashtable_t *hash) { if (hash == NULL) return -1; FILE *fp = fopen(PERSIST_FILE, "r"); if (fp == NULL) return -1; char line[4096]; // 每行最大长度(可根据需求调整) while (fgets(line, sizeof(line), fp) != NULL) { // 去除换行符 int len = strlen(line); if(len==0)continue;//跳过空行 if (len > 0 && line[len - 1] == '\n') { line[len - 1] = '\0'; } // 分割键和值(空格" "为分隔符) char *key = strtok(line, " "); char *value = strtok(NULL, " "); if (key == NULL || value == NULL) { /*fclose(fp); return -1;*/ continue; } // 插入到KVStore,hash存储引擎先初始化才能调用如下函数,否则报段错误 if (kvs_hash_set(hash, key, value) != 0) { /*fclose(fp); return -1;*/ continue; } } fclose(fp); return 0; }

实现增量持久化的两个函数:

int kvs_hash_logpersist(char *op_type,char *key,char *value){ FILE *fp=fopen(WAL_FILE,"a");//权限a:只追加写;权限a+:追加写+可读 if(!fp)return -1; if(value){//增/改操作 fprintf(fp,"%s %s %s\n",op_type,key,value); }else {//删除操作 fprintf(fp,"%s %s\n",op_type,key); } fflush(fp); //fsync(fp); fclose(fp); } int kvs_hash_logreplay(hashtable_t *hash){ FILE *fp=fopen(WAL_FILE,"r"); if(!fp)return -1; char line[1024];//行缓冲区 int count=0;//重放计数器 //逐行读取WAL日志文件 while(fgets(line,sizeof(line),fp)){ // 去除行末的换行符 line[strcspn(line,"\n")]='\0'; //跳过空行 if(strlen(line)==0)continue; char *data=strdup(line);//strtok会改变原字符串 char *op_type=strtok(data," "); char *key=strtok(NULL," "); char *value=strtok(NULL," "); if(!op_type||!key)continue; if(strcmp(op_type,"HSET")==0||strcmp(op_type,"HMOD")==0){ if(value){ kvs_hash_set(hash,key,value); count++; } }else if(strcmp(op_type,"HDEL")==0){ kvs_hash_del(hash,key); count++; } free(data); } fclose(fp); return count; }

后台线程实现全量持久化的入口函数:

// -------------------------- Checkpoint后台线程 -------------------------- //获取文件大小 static int get_file_size(char *filename){ struct stat stat_buf; if(stat(filename,&stat_buf)==-1)return -1; return (int)stat_buf.st_size; } // 检查Checkpoint触发条件 static int is_checkpoint_needed(hashtable_t *hash) { // 1. 检查WAL文件大小 int wal_size = get_file_size(WAL_FILE); if (wal_size >= WAL_SIZE_THRESHOLD) return 1; return 0; } // Checkpoint后台线程函数 void *checkpoint_worker(void *arg) { hashtable_t *hash = (hashtable_t *)arg; time_t last_checkpoint = time(NULL);//返回的单位是秒s while (ENABLE_PERSIST) { time_t now = time(NULL); //以下两者都是满足条件,进行全量持久化并截断WAL日志(清空) // 触发条件1:定时(间隔30秒) if (difftime(now, last_checkpoint) >= CHECKPOINT_INTERVAL) { kvs_hash_persist(hash); FILE *fp=fopen(WAL_FILE,"w+"); if(fp==NULL)return NULL; fclose(fp); last_checkpoint = now; sleep(1); // 避免频繁触发 continue; } // 触发条件2:WAL大小/ KV数量阈值 if (is_checkpoint_needed(hash)) { kvs_hash_persist(hash); FILE *fp=fopen(WAL_FILE,"w+"); if(fp==NULL)return NULL; fclose(fp); last_checkpoint = now; sleep(1); continue; } // 每秒检查一次 sleep(1); } return NULL; }

创建线程进行异步全量持久化,并在重新运行kvstore时恢复数据(加载全量和增量文件)

int init_persist(void){ //创建线程 pthread_t pid; pthread_create(&pid,NULL,checkpoint_worker,(void *)&Hash); pthread_detach(pid); //恢复数据(加载全量 + 重放WAL增量日志) kvstore_hash_load(); kvstore_hash_logreplay(); }

kvstore解析协议函数,增删改操作时先进行增量持久化再进行内存写操作

//“CRUD” 对应 Create(创建)、Read(读取)、Update(更新)、Delete(删除) 四个关键动作 int kvstore_parser_protocol(struct conn_item *item,char **tokens,int count){ if(item==NULL||tokens==NULL||count==0)return -1; int cmd; for(cmd=KVS_CMD_START;cmd<KVS_CMD_SIZE;cmd++){ if(strcmp(commands[cmd],tokens[0])==0){//strcmp当中传的就是地址,自动根据地址比较值 break; } } char *msg=item->wbuffer; //memset(msg,0,BUFFER_LENGTH); int len=strlen(msg); char *key=tokens[1]; char *value=tokens[2]; //printf("cmd=%d,commands=%s\n",cmd,commands[cmd]); switch(cmd){ 。。。。。。 //hash case KVS_CMD_HSET:{ //printf("set\n"); #if ENABLE_PERSIST kvs_hash_logpersist(tokens[0],key,value); #endif int res=kvstore_hash_set(key,value); //需要给客户端回信息 if(res==0){ snprintf(msg+len,BUFFER_LENGTH-len,"SUCCESS\r\n");//指定缓冲区最大容量,避免缓冲区溢出 }else{ snprintf(msg+len,BUFFER_LENGTH-len,"FAILED\r\n"); } break; } case KVS_CMD_HGET:{ //printf("get\n"); char *val=kvstore_hash_get(key); if(val){//value不为空 snprintf(msg+len,BUFFER_LENGTH-len,"%s\r\n",val); }else{ snprintf(msg+len,BUFFER_LENGTH-len,"NO EXIST\r\n"); } //LOG("get:%s\n",val); break; } case KVS_CMD_HDEL:{ //printf("del\n"); #if ENABLE_PERSIST kvs_hash_logpersist(tokens[0],key,value); #endif int res=kvstore_hash_del(key); if(res<0){ snprintf(msg+len,BUFFER_LENGTH-len,"ERROR\r\n"); }else if(res==0){ snprintf(msg+len,BUFFER_LENGTH-len,"SUCCESS\r\n"); }else { snprintf(msg+len,BUFFER_LENGTH-len,"NO EXIST\r\n"); } break; } case KVS_CMD_HMOD:{ //printf("mod\n"); #if ENABLE_PERSIST kvs_hash_logpersist(tokens[0],key,value); #endif int res=kvstore_hash_mod(key,value); if(res<0){ snprintf(msg+len,BUFFER_LENGTH-len,"ERROR\r\n"); }else if(res==0){ snprintf(msg+len,BUFFER_LENGTH-len,"SUCCESS\r\n"); }else { snprintf(msg+len,BUFFER_LENGTH-len,"NO EXIST\r\n"); } break; } case KVS_CMD_HCOUNT:{ int count=kvstore_hash_count(); if(count<0){ snprintf(msg+len,BUFFER_LENGTH-len,"ERROR\r\n"); }else { snprintf(msg+len,BUFFER_LENGTH-len,"%d\r\n",count);//"%d",count } break; } default:{ //LOG("cmd:%s\n",commands[cmd]); assert(0); } } }

三、测试功能

上次运行kvstore服务器是已通过协议存入key-value对并进行了持久化:1-2,2-3,3-4

重新运行,测试恢复数据情况

http://www.zskr.cn/news/83754.html

相关文章:

  • 如何找書
  • 面试必问:如何快速定位BUG?BUG定位技巧及N板斧!
  • 如何啓動一個本地服務
  • ROS2节点和话题
  • Wan2.2-T2V-A14B如何生成带有烟花绽放效果的节日庆典视频?
  • Jetson Secure Boot 完整实战指南:从 Fuse Key → Boot Chain → 验签代码路径的源码级解析
  • 5分钟快速上手MONAI 2D扩散模型:医学图像生成的终极指南
  • 程序员转行到大模型开发领域,以下是几个推荐的方向、推荐原因以
  • 机器学习基础(线性,逻辑回归)
  • Windows11制作docker linux-arm64镜像
  • Wsappx进程异常占用的深度解析与修复方案
  • 【2025必看】AI Agent技术全解析:从概念到开发框架的全面指南(建议收藏)
  • 2025年12月乌兹别克斯坦EAC认证,SGR认证,OTTC认证公司推荐,综合服务能力与资质解析 - 品牌鉴赏师
  • VS2022二次元背景板痛改教程!
  • 山西临汾卤制品制作技艺的技术路径分析
  • 2025最新的电子实验记录本软件,引领科研数字化变革的智能中枢
  • 12月11日日记
  • 【量子机器学习调试终极指南】:手把手教你用VSCode攻克QML代码难题
  • PyMe是一款面向大众的可视化低代码Python开发工具
  • Ubuntu系统火狐浏览器配置http代理
  • 1-Year XTOOL D9S PRO Update: Latest Diagnostics for European/American Mechanics Car Owners
  • 2025年苏州GEO排名产品TOP10,本地企业表现亮眼,企业短视频矩阵/ai排行榜/GEO排名/短视频矩阵/视频矩阵GEO排名厂商排行榜 - 品牌推荐师
  • 详细介绍:Chrome HSTS(HTTP Strict Transport Security)
  • 2025年12月上海别墅装修,上海极简风装修,上海新中式装修公司权威推荐,设计实力与市场口碑深度解析 - 品牌鉴赏师
  • 2025年机械手数控车床品牌价格排行:前十名多少钱一台?英伟达液冷数控车/车铣复合数控机床/医疗器械数控机床数控车床采购排行榜 - 品牌推荐师
  • 聚焦2025:这些数控车床品牌是军工精密加工的保障,液冷接头数控机床/军工配件数控机床/空调配件数控机床/动力刀塔数控车数控车床批发排行 - 品牌推荐师
  • 【笔记】队列
  • 【笔记】队列
  • ZROJ 3272. 地皮
  • Conda 常用命令总结01