摘要:对于某个原本带有生存时间的键来说,当命令成功在这个键上执行时,这个键原有的将被清除。设置键的过期时间为毫秒。只在键已经存在时,才对键进行设置操作。
顺风车运营研发团队 李乐
1.命令执行过程
1.1命令请求格式
当用户在客户端键入一条命令请求时,客户端会将其按照特定协议转换为字符串,发送给服务器;服务器解析字符串,获取命令请求;
例如,当用户执行 set key value 时,转换后的字符串为 *3rn3rnset3rnkey$5rnvaluern
其中,*3表示当前命令请求参数数目(set命令也是一个参数);rn用于分隔每个参数;3、5等表示参数字符串长度;
1.2 服务端读取命令请求
1.2.1客户端处理函数简介:
服务器启动时,会监听socket,并创建对应文件事件,监听此fd上的可读事件;(server.c/initServer)
//监听socket if (server.port != 0 && listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR) exit(1); //为所有监听的socket创建文件事件,监听可读事件;事件处理函数为acceptTcpHandler for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { } }
当客户端连接到服务器时,会调用acceptTcpHandler处理函数,服务器会为每个链接创建一个client对象,并创建相应文件事件,监听此链接fd的可读事件,并指定事件处理函数
//接收客户端链接请求 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); if (cfd == ANET_ERR) { } //创建客户端 if ((c = createClient(fd)) == NULL) { close(fd); /* May be already closed, just ignore errors */ return; } client *createClient(int fd) { client *c = zmalloc(sizeof(client)); //设置fd为非阻塞;非延迟(epoll等坚挺的额socket必须是非阻塞;延迟的话发送的数据会先存储在tcp缓冲区,等到一定事件或数据量大时才会发送) //创建文件事件,监听可读事件,事件处理函数为readQueryFromClient if (fd != -1) { anetNonBlock(NULL,fd); anetEnableTcpNoDelay(NULL,fd); if (server.tcpkeepalive) anetKeepAlive(NULL,fd,server.tcpkeepalive); if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; } } ……………………//初始化client结构体各字段 }
1.2.2 读取命令请求到输入缓冲区
命令请求字符串会先读入client的输入缓冲区中
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { client *c = (client*) privdata; ………… qblen = sdslen(c->querybuf); //统计 if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); //读取 nread = read(fd, c->querybuf+qblen, readlen); //处理输入缓冲区 processInputBuffer(c); ………… }
1.2.3 解析输入缓冲区数据
首先调用processMultibulkBuffer:解析*3获取行数,循环获取每一行参数(会先解析$3获取参数长度),构造为一个redisObject对象,存储在客户端结构体的argv和argc字段
其次调用processCommand处理命令请求
void processInputBuffer(client *c) { while(sdslen(c->querybuf)) { //判断命令请求类型;telnet发送的命令和redis-cli发送的命令请求格式不同 if (!c->reqtype) { if (c->querybuf[0] == "*") { c->reqtype = PROTO_REQ_MULTIBULK; } else { c->reqtype = PROTO_REQ_INLINE; } } if (c->reqtype == PROTO_REQ_INLINE) { if (processInlineBuffer(c) != C_OK) break; } else if (c->reqtype == PROTO_REQ_MULTIBULK) { if (processMultibulkBuffer(c) != C_OK) break; } else { serverPanic("Unknown request type"); } //参数个数为0时 if (c->argc == 0) { resetClient(c); } else { //处理命令 if (processCommand(c) == C_OK) { if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE) resetClient(c); } if (server.current_client == NULL) break; } } } //resetClient()函数会释放client结构体arg字段中的各参数,重置argc为0
解析缓冲区字符串逻辑如下:
int processMultibulkBuffer(client *c) { char *newline = NULL; long pos = 0; int ok; long long ll; if (c->multibulklen == 0) { //定位到第一行结束 newline = strchr(c->querybuf," "); //解析行数,即参数个数 ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll); c->multibulklen = ll; //argv村春命令参数,解析之前先清空 if (c->argv) zfree(c->argv); c->argv = zmalloc(sizeof(robj*)*c->multibulklen); } //循环解析所有参数 while(c->multibulklen) { //读取$3字符串,解析数值3 if (c->bulklen == -1) { newline = strchr(c->querybuf+pos," "); if (c->querybuf[pos] != "$") { return C_ERR; } ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll); c->bulklen = ll; } //读取参数 c->argv[c->argc++] = createStringObject(c->querybuf+pos,c->bulklen); pos += c->bulklen+2; c->bulklen = -1; c->multibulklen--; } /* We"re done when c->multibulk == 0 */ if (c->multibulklen == 0) return C_OK; /* Still not ready to process the command */ return C_ERR; }
1.2.4 补充:什么是redisObject
redis底层有多种数据结构:sds,intset,ziplist,linkedlist,skiplist,dict等;redis对外提供的数据类型有字符串、列表、哈希表、有需集合、集合;
redis会根据实际情况选择合适的数据结构来存储某一种数据类型;而同一种数据类型可能使用不同的数据结构存储;
redisObject是对底层多种数据结构的进一步封装;看看结构体:
typedef struct redisObject { unsigned type:4; //数据类型:字符串、列表、哈希表、集合、有序集合 unsigned encoding:4; //存储数据类型使用哪种数据结构,如列表的实现可能是ziplist或linkedlist unsigned lru:LRU_BITS; //lru,数据淘汰 int refcount; //引用计数 void *ptr; //指向具体的数据结构 } robj;
1.2.5 处理命令
processInputBuffer解析输入缓冲区中命令请求字符串,将各个参数转换为redisObject存储在client结构体的argv字段中,argc存储参数个数,下一步调用processCommand处理命令
1.2.5.1命令结构体redisCommand
struct redisCommand { char *name; //命令名称 redisCommandProc *proc; //命令处理函数指针 int arity; //命令参数个数,用于检查命令请求参数是否正确;当取值-N时,表示参数个数大于等于N char *sflags; //标识命令属性;如读命令或写命令等 int flags; //sflags的二进制表示 //没搞明白 redisGetKeysProc *getkeys_proc; int firstkey; int lastkey; int keystep; //命令执行总耗时和执行次数 long long microseconds, calls; };
1.2.5.2命令执行前校验
int processCommand(client *c) { //quit命令直接返回 if (!strcasecmp(c->argv[0]->ptr,"quit")) { addReply(c,shared.ok); c->flags |= CLIENT_CLOSE_AFTER_REPLY; return C_ERR; } //命令字典查找指定命令;所有的命令都存储在命令字典中 struct redisCommand redisCommandTable[]={} c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); if (!c->cmd) { //没有查找到命令 } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || (c->argc < -c->cmd->arity)) { //命令请求参数个数错误 } //是否通过认证;没有通过且必须认证时,只接受认证命令 if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand){ } //开启最大内存限制 if (server.maxmemory) { //释放内存 int retval = freeMemoryIfNeeded(); //CMD_DENYOOM表示内存不够时,禁止执行此命令 if ((c->cmd->flags & CMD_DENYOOM) && retval == C_ERR) { } //当此服务器是master时:aof持久化失败时,或上一次bgsave执行错误,且配置bgsave参数和stop_writes_on_bgsave_err;禁止执行写命令 if (((server.stop_writes_on_bgsave_err && server.saveparamslen > 0 && server.lastbgsave_status == C_ERR) || server.aof_last_write_status == C_ERR) && server.masterhost == NULL && (c->cmd->flags & CMD_WRITE || c->cmd->proc == pingCommand)){ } //当此服务器时master时:如果配置了repl_min_slaves_to_write,当slave数目小于时,禁止执行写命令 if (server.masterhost == NULL && server.repl_min_slaves_to_write && server.repl_min_slaves_max_lag && c->cmd->flags & CMD_WRITE && server.repl_good_slaves_count < server.repl_min_slaves_to_write){ } //当此服务器是slave,且配置了只读时,如果客户端不是master,则拒绝执行写命令 if (server.masterhost && server.repl_slave_ro && !(c->flags & CLIENT_MASTER) && c->cmd->flags & CMD_WRITE){ } //当客户端正在订阅频道时,只会执行以下命令 if (c->flags & CLIENT_PUBSUB && c->cmd->proc != pingCommand && c->cmd->proc != subscribeCommand && c->cmd->proc != unsubscribeCommand && c->cmd->proc != psubscribeCommand && c->cmd->proc != punsubscribeCommand) { } //服务器为slave,但没有正确连接master时,只会执行带有CMD_STALE标志的命令,如info等 if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED && server.repl_serve_stale_data == 0 && !(c->cmd->flags & CMD_STALE)){ } //正在加载数据库时,只会执行带有CMD_LOADING标志的命令,其余都会被拒绝 if (server.loading && !(c->cmd->flags & CMD_LOADING)) { } //当服务器因为执行lua脚本阻塞时,只会执行以下几个命令,其余都会拒绝 if (server.lua_timedout && c->cmd->proc != authCommand && c->cmd->proc != replconfCommand && !(c->cmd->proc == shutdownCommand && c->argc == 2 && tolower(((char*)c->argv[1]->ptr)[0]) == "n") && !(c->cmd->proc == scriptCommand && c->argc == 2 && tolower(((char*)c->argv[1]->ptr)[0]) == "k")){ } //执行命令 if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand){ //开启了事务,命令只会入队列; queueMultiCommand(c); addReply(c,shared.queued); } else { //直接执行命令 call(c,CMD_CALL_FULL); } }
1.2.5.3 命令执行
执行命令时,会需要做很多额外的操作,统计,记录慢查询日志,传播命令道monitor、slave,aof持久化等
//flags=CMD_CALL_FULL=(CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_PROPAGATE) //表示需要记录慢查询日志,统计,广播命令 void call(client *c, int flags) { //dirty记录数据库修改次数;start记录命令开始执行时间us;duration记录命令执行花费时间 long long dirty, start, duration; int client_old_flags = c->flags; //有监视器的话,需要将命令发送给监视器 if (listLength(server.monitors) && !server.loading && !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN))) { replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); } //处理命令,调用命令处理函数 dirty = server.dirty; start = ustime(); c->cmd->proc(c); duration = ustime()-start; dirty = server.dirty-dirty; if (dirty < 0) dirty = 0; //记录慢查询日志 if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) { char *latency_event = (c->cmd->flags & CMD_FAST) ? "fast-command" : "command"; latencyAddSampleIfNeeded(latency_event,duration/1000); slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration); } //统计 if (flags & CMD_CALL_STATS) { c->lastcmd->microseconds += duration; c->lastcmd->calls++; } //广播命令 if (flags & CMD_CALL_PROPAGATE && (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP) { int propagate_flags = PROPAGATE_NONE; //dirty大于0时,需要广播命令给slave和aof if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL); if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL; if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF; if (c->flags & CLIENT_PREVENT_REPL_PROP || !(flags & CMD_CALL_PROPAGATE_REPL)) propagate_flags &= ~PROPAGATE_REPL; if (c->flags & CLIENT_PREVENT_AOF_PROP || !(flags & CMD_CALL_PROPAGATE_AOF)) propagate_flags &= ~PROPAGATE_AOF; //广播命令,写如aof,发送命令到slave if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE)) propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags); } }
2.命令表
redis有个命令表redisCommandTable存储每个命令的详细属性
//命令名称,命令处理函数,命令参数(-3表示参数数目大于等于3个),命令属性标志,………… struct redisCommand redisCommandTable[] = { {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0}, {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0}, {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0}, {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0}, {"psetex",psetexCommand,4,"wm",0,NULL,1,1,1,0,0}, {"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0}, {"strlen",strlenCommand,2,"rF",0,NULL,1,1,1,0,0}, {"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0}, {"unlink",unlinkCommand,-2,"wF",0,NULL,1,-1,1,0,0}, {"exists",existsCommand,-2,"rF",0,NULL,1,-1,1,0,0}, ………… }
3.set命令执行
3.1 set命令介绍
SET key value [EX seconds] [PX milliseconds] [NX|XX]
将字符串值 value 关联到 key 。
如果 key 已经持有其他值, SET 就覆写旧值,无视类型。
对于某个原本带有生存时间(TTL)的键来说, 当 SET 命令成功在这个键上执行时, 这个键原有的 TTL 将被清除。
EX、PX、NX/XX可选参数含义如下:
EX second :设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value 。
PX millisecond :设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX keymillisecond value 。
NX :只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value 。
XX :只在键已经存在时,才对键进行设置操作。
3.2 set命令执行函数
3.2.1 解析set命令请求
/* SET key value [NX] [XX] [EX] [PX ] */ void setCommand(client *c) { int j; robj *expire = NULL; //过期时间 int unit = UNIT_SECONDS; //时间单位 int flags = OBJ_SET_NO_FLAGS; //标志命令是否携带nx、xx、ex、px可选参数 for (j = 3; j < c->argc; j++) { char *a = c->argv[j]->ptr; robj *next = (j == c->argc-1) ? NULL : c->argv[j+1]; //最后一个参数可能是过期时间 if ((a[0] == "n" || a[0] == "N") && (a[1] == "x" || a[1] == "X") && a[2] == " " && !(flags & OBJ_SET_XX)) { flags |= OBJ_SET_NX; //NX标志 } else if ((a[0] == "x" || a[0] == "X") && (a[1] == "x" || a[1] == "X") && a[2] == " " && !(flags & OBJ_SET_NX)) { flags |= OBJ_SET_XX; //XX标志 } else if ((a[0] == "e" || a[0] == "E") && (a[1] == "x" || a[1] == "X") && a[2] == " " && !(flags & OBJ_SET_PX) && next) { flags |= OBJ_SET_EX; //EX标志 unit = UNIT_SECONDS; expire = next; j++; } else if ((a[0] == "p" || a[0] == "P") && (a[1] == "x" || a[1] == "X") && a[2] == " " && !(flags & OBJ_SET_EX) && next) { flags |= OBJ_SET_PX; //PX标志 unit = UNIT_MILLISECONDS; expire = next; j++; } else { addReply(c,shared.syntaxerr); return; } } c->argv[2] = tryObjectEncoding(c->argv[2]); setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL); //处理命令 }
3.2.2 命令处理
void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) { long long milliseconds = 0; /* initialized to avoid any harmness warning */ //设置了过期时间;expire是robj类型,获取整数值 if (expire) { if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK) return; if (milliseconds <= 0) { addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name); return; } if (unit == UNIT_SECONDS) milliseconds *= 1000; } //NX,key存在时直接返回;XX,key不存在时直接返回 if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) || (flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL)) { addReply(c, abort_reply ? abort_reply : shared.nullbulk); return; } //添加都数据库字典 setKey(c->db,key,val); server.dirty++; //过期时间添加到过期字典 if (expire) setExpire(c,c->db,key,mstime()+milliseconds); //键空间通知 notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id); if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC, "expire",key,c->db->id); addReply(c, ok_reply ? ok_reply : shared.ok); }
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/36710.html
摘要:安装系统下的安装在系统下安装可以通过进行安装并可以减少大量的安装和配置的工作量安装方法打开终端运行命令系统下安装下载解压进入解压目录编译确保服务器上有配置启动操作关闭可执行文件文件名描述命令行客户端服务端基准测试的持久 1.安装 1.1 Mac系统下的安装: 在mac系统下安装redis可以通过brewHome进行安装并可以减少大量的安装和配置的工作量. $ brew inst...
摘要:本章将按照单个键遍历键数据库管理三个维度对一些通用命令进行介绍单键管理针对单个键的命令前面几节已经介绍过一部分了例如等下面介绍几个重要命令键重命名例如一个键名为值为下面操作将键改为如果在之前键已经存在那么它的值也将被覆盖为了防止被强行提供了 本章将按照单个键,遍历键,数据库管理三个维度对一些通用命令进行介绍. 1. 单键管理 针对单个键的命令,前面几节已经介绍过一部分了,例如type,...
摘要:支持和两种持久化机制持久化功能有效地避免因进程退出造成的数据丢失问题当下次重启时利用之前持久化的文件即可实现数据恢复掌握持久化机制对于学习非常重要持久化是把当前进程数据生成快照保存到硬盘的过程出发持久化过程分发手动触发和自动触发一触发机制手 Redis支持RDB和AOF两种持久化机制,持久化功能有效地避免因进程退出造成的数据丢失问题,当下次重启时利用之前持久化的文件即可实现数据恢复....
摘要:触发机制命令将阻塞当前服务器,知道过程完成为止。进程执行操作创建子进程,持久化过程由子进程负责,完成后自动结束。高效但是数据不会被持久化。 RDB 基本概念 RDB持久化是在指定的时间间隔内将内存中的数据集快照写入磁盘,相当于Snapshot快照,它恢复时是将快照文件直接读到内存里。 配置文件 showImg(https://segmentfault.com/img/bVbhN4V?w...
摘要:用于为做基准性能测试。设置请求总量代表客户端的请求总量,默认为。连接事务和基本概念提供了简单的事务功能以及集成脚本来解决问题。事务提供简单的事务功能,将一组需要一起执行的命令放到和两个命令之间。将结果保存在中。命令添加添加成功返回。 慢查询 基本概念 慢查询日志记录命令执行前后的超时的执行时间。(只记录命令执行时间) 慢查询的两个配置 Redis提供了slowlog-log-slowe...
阅读 1824·2021-10-08 10:04
阅读 2883·2021-09-22 10:02
阅读 2028·2019-08-30 15:56
阅读 649·2019-08-30 15:54
阅读 684·2019-08-30 15:54
阅读 1136·2019-08-30 15:53
阅读 2392·2019-08-30 11:21
阅读 3456·2019-08-30 10:56