资讯专栏INFORMATION COLUMN

【Redis学习笔记】2018-06-28 redis命令源码学习1

KevinYan / 2996人阅读

摘要:检查是否存在创建序列化负载传输给客户端命令的核心内容是创建序列化负载,该功能的实现是调用函数。可以看出,四个命令的实现原理都是调用函数。

顺风车运营研发团队 谭淼
1、dump
dump命令可以序列化给定 key ,并返回被序列化的值,使用 RESTORE命令可以将这个值反序列化为 Redis 键。

/* DUMP keyname
 * DUMP is actually not used by Redis Cluster but it is the obvious
 * complement of RESTORE and can be useful for different applications. */
void dumpCommand(client *c) {
    robj *o, *dumpobj;
    rio payload;
 
    /* 检查key是否存在 */
    if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
        addReply(c,shared.nullbulk);
        return;
    }
 
    /* 创建序列化负载 */
    createDumpPayload(&payload,o);
 
    /* 传输给客户端 */
    dumpobj = createObject(OBJ_STRING,payload.io.buffer.ptr);
    addReplyBulk(c,dumpobj);
    decrRefCount(dumpobj);
    return;
}

dump命令的核心内容是创建序列化负载,该功能的实现是调用createDumpPayload()函数。

/* Generates a DUMP-format representation of the object "o", adding it to the
 * io stream pointed by "rio". This function can"t fail. */
void createDumpPayload(rio *payload, robj *o) {
    unsigned char buf[2];
    uint64_t crc;
 
    /* Serialize the object in a RDB-like format. It consist of an object type
     * byte followed by the serialized object. This is understood by RESTORE. */
    rioInitWithBuffer(payload,sdsempty());
    /* 在负载中添加对象的类型 */
    serverAssert(rdbSaveObjectType(payload,o));
    /* 根据不同的对象类型序列号对象 */
    serverAssert(rdbSaveObject(payload,o));
 
    /* Write the footer, this is how it looks like:
     * ----------------+---------------------+---------------+
     * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
     * ----------------+---------------------+---------------+
     * RDB version and CRC are both in little endian.
     */
 
    /* RDB的版本,这部分被分成了两个字节存储,可以表示0-65535*/
    buf[0] = RDB_VERSION & 0xff;
    buf[1] = (RDB_VERSION >> 8) & 0xff;
    payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);
 
    /* 计算CRC64校验码,共8字节 */
    crc = crc64(0,(unsigned char*)payload->io.buffer.ptr,
                sdslen(payload->io.buffer.ptr));
    memrev64ifbe(&crc);
    payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8);
}

根据上面的代码可以看出序列化后的内容由下面几部分组成:
+-------------+---------------------+---------------+
| RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
+-------------+---------------------+---------------+
浩含给出的更加详细的介绍wiki链接:dump与restore

2、exists
exists命令可以检查给定 key 是否存在。

/* EXISTS key1 key2 ... key_N.
 * Return value is the number of keys existing. */
void existsCommand(client *c) {
    long long count = 0;
    int j;
 
    for (j = 1; j < c->argc; j++) {
        /* 判断所给的key是否过期 */
        expireIfNeeded(c->db,c->argv[j]);
        /* 若不过期,则将计数器count自增1 */
        if (dbExists(c->db,c->argv[j])) count++;
    }
    /* 最后返回计数器的值 */
    addReplyLongLong(c,count);
}

示例如下:

127.0.0.1:7777> exists k1
(integer) 1
127.0.0.1:7777> exists k2
(integer) 1
127.0.0.1:7777> exists k1 k2
(integer) 2

3、expire、expireat、pexpire、pexpireat
这四个命令的作用是指定一个key的过期时间,它们的原理也都相同,最后都会转换为pexpireat命令来执行。

/* EXPIRE key seconds */
void expireCommand(client *c) {
    expireGenericCommand(c,mstime(),UNIT_SECONDS);
}
 
/* EXPIREAT key time */
void expireatCommand(client *c) {
    expireGenericCommand(c,0,UNIT_SECONDS);
}
 
/* PEXPIRE key milliseconds */
void pexpireCommand(client *c) {
    expireGenericCommand(c,mstime(),UNIT_MILLISECONDS);
}
 
/* PEXPIREAT key ms_time */
void pexpireatCommand(client *c) {
    expireGenericCommand(c,0,UNIT_MILLISECONDS);
}

可以看出,四个命令的实现原理都是调用pexpireatCommand()函数。

/* This is the generic command implementation for EXPIRE, PEXPIRE, EXPIREAT
 * and PEXPIREAT. Because the commad second argument may be relative or absolute
 * the "basetime" argument is used to signal what the base time is (either 0
 * for *AT variants of the command, or the current time for relative expires).
 *
 * unit is either UNIT_SECONDS or UNIT_MILLISECONDS, and is only used for
 * the argv[2] parameter. The basetime is always specified in milliseconds. */
void expireGenericCommand(client *c, long long basetime, int unit) {
    /* 获取参数 */
    robj *key = c->argv[1], *param = c->argv[2];
    long long when; /* unix time in milliseconds when the key will expire. */
 
    /* 将输入的参数保存在变量when之中 */
    if (getLongLongFromObjectOrReply(c, param, &when, NULL) != C_OK)
        return;
 
    /* 将when转换为毫秒时间戳 */
    if (unit == UNIT_SECONDS) when *= 1000;
    when += basetime;
 
    /* 查询要设置的key是否存在 */
    if (lookupKeyWrite(c->db,key) == NULL) {
        addReply(c,shared.czero);
        return;
    }
 
    /* EXPIRE with negative TTL, or EXPIREAT with a timestamp into the past
     * should never be executed as a DEL when load the AOF or in the context
     * of a slave instance.
     *
     * Instead we take the other branch of the IF statement setting an expire
     * (possibly in the past) and wait for an explicit DEL from the master. */
 
    /* 如果设置的时间已经过期,且没有正在加载AOF或者RDB文件,且执行命令的服务器是主服务器,
       则需要对过期key进行删除 */
    if (when <= mstime() && !server.loading && !server.masterhost) {
        robj *aux;
 
        int deleted = server.lazyfree_lazy_expire ? dbAsyncDelete(c->db,key) :
                                                    dbSyncDelete(c->db,key);
        serverAssertWithInfo(c,key,deleted);
        server.dirty++;
 
        /* Replicate/AOF this as an explicit DEL or UNLINK. */
        aux = server.lazyfree_lazy_expire ? shared.unlink : shared.del;
        rewriteClientCommandVector(c,2,aux,key);
        signalModifiedKey(c->db,key);
        notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
        addReply(c, shared.cone);
        return;
    } else {
        /* 如果没有过期,则设置过期时间 */
        setExpire(c,c->db,key,when);
        addReply(c,shared.cone);
        signalModifiedKey(c->db,key);
        notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id);
        server.dirty++;
        return;
    }
}

设置过期时间,主要是调用setExpire()函数

/* Set an expire to the specified key. If the expire is set in the context
 * of an user calling a command "c" is the client, otherwise "c" is set
 * to NULL. The "when" parameter is the absolute unix time in milliseconds
 * after which the key will no longer be considered valid. */
void setExpire(client *c, redisDb *db, robj *key, long long when) {
    dictEntry *kde, *de;
 
    /* 首先在redis的dict中找到key,共用该key的sds */
    kde = dictFind(db->dict,key->ptr);
    serverAssertWithInfo(NULL,key,kde != NULL);
    /* 在过期dict中添加或找到该key */
    de = dictAddOrFind(db->expires,dictGetKey(kde));
    /* 为这个key设置过期时间 */
    dictSetSignedIntegerVal(de,when);
 
    int writable_slave = server.masterhost && server.repl_slave_ro == 0;
    if (c && writable_slave && !(c->flags & CLIENT_MASTER))
        rememberSlaveKeyWithExpire(db,key);
}

该原理如下:

对于redis的每个数据库,都有一个统一的数据结构:

/* Redis database representation. There are multiple databases identified
 * by integers from 0 (the default database) up to the max configured
 * database. The database number is the "id" field in the structure. */
typedef struct redisDb {
    dict *dict;                 /* The keyspace for this DB */
    dict *expires;              /* Timeout of keys with a timeout set */
    dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP)*/
    dict *ready_keys;           /* Blocked keys that received a PUSH */
    dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
    int id;                     /* Database ID */
    long long avg_ttl;          /* Average TTL, just for stats */
} redisDb;

其中dict是DB的键空间,expires记录了键空间的超时时间,需要注意的是dict和expires使用的是同一个key的sds。在设置过期时间的时候,首先会在dict中找到需要设置的key,找到后,在expires中添加或找到该key的sds,最后为key添加long long类型的过期时间。

4、keys
keys命令的作用是查找所有符合给定模式 pattern 的 key 。虽然keys速度很快,但是还是禁止在线上使用,因为会极大地消耗redis的资源。

void keysCommand(client *c) {
    dictIterator *di;
    dictEntry *de;
    sds pattern = c->argv[1]->ptr;
    int plen = sdslen(pattern), allkeys;
    unsigned long numkeys = 0;
    void *replylen = addDeferredMultiBulkLength(c);
    /* 安全迭代器 */
    di = dictGetSafeIterator(c->db->dict);
    allkeys = (pattern[0] == "*" && pattern[1] == "");
    /* 遍历dict的entry */
    while((de = dictNext(di)) != NULL) {
        sds key = dictGetKey(de);
        robj *keyobj;
        /* 判断key是否与正则匹配,若匹配且key没有过期则在回复给客户端的内容中记录 */
        if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
            keyobj = createStringObject(key,sdslen(key));
            if (expireIfNeeded(c->db,keyobj) == 0) {
                addReplyBulk(c,keyobj);
                numkeys++;
            }
            decrRefCount(keyobj);
        }
    }
    dictReleaseIterator(di);
    setDeferredMultiBulkLength(c,replylen,numkeys);
}

遍历与迭代:

迭代(iterate) - 按顺序访问线性结构中的每一项
遍历(traversal) - 按规则访问非线性结构中的每一项
参考链接:https://www.zhihu.com/questio...

洪宝的关于keys命令的wiki:2018.06.27日记(redis keys命令)

5、migrate
migrate的作用是将 key 原子性地从当前实例传送到目标实例的指定数据库上,一旦传送成功, key 保证会出现在目标实例上。

这个命令是一个原子操作,它在执行的时候会阻塞进行迁移的两个实例,直到迁移成功,迁移失败或等待超时。

该命令使用的场景不多,故没有详细分析,该命令的实现函数是migratecommand()函数,其原理是在当前实例对给定 key 执行DUMP命令 ,将它序列化,然后通过socket传送到目标实例,目标实例再使用RESTORE命令对数据进行反序列化,并将反序列化所得的数据添加到数据库中。

6、move
move命令也是对key进行转移,不过是将key从当前数据库转移到指定数据库中。

void moveCommand(client *c) {
    robj *o;
    redisDb *src, *dst;
    int srcid;
    long long dbid, expire;
 
    if (server.cluster_enabled) {
        addReplyError(c,"MOVE is not allowed in cluster mode");
        return;
    }
 
    /* 获取源数据库和目标数据库 */
    src = c->db;
    srcid = c->db->id;
 
    /* 判断DB是否合法 */
    if (getLongLongFromObject(c->argv[2],&dbid) == C_ERR ||
        dbid < INT_MIN || dbid > INT_MAX ||
        selectDb(c,dbid) == C_ERR)
    {
        addReply(c,shared.outofrangeerr);
        return;
    }
    dst = c->db;
    selectDb(c,srcid); /* Back to the source DB */
 
    /* 源和目标数据库不能相同 */
    if (src == dst) {
        addReply(c,shared.sameobjecterr);
        return;
    }
 
    /* 检查目标数据库是否存在需要移动的key */
    o = lookupKeyWrite(c->db,c->argv[1]);
    if (!o) {
        addReply(c,shared.czero);
        return;
    }
    expire = getExpire(c->db,c->argv[1]);
 
    /* Return zero if the key already exists in the target DB */
    if (lookupKeyWrite(dst,c->argv[1]) != NULL) {
        addReply(c,shared.czero);
        return;
    }
    /* 向目标数据库添加数据 */
    dbAdd(dst,c->argv[1],o);
    if (expire != -1) setExpire(c,dst,c->argv[1],expire);
    incrRefCount(o);
 
    /* 删除源数据库的数据 */
    dbDelete(src,c->argv[1]);
    server.dirty++;
    addReply(c,shared.cone);
}

7、object
object命令允许从内部察看给定 key 的 Redis 对象,可以查看对象的refcount、encoding、idletime和freq。主要的思路是找到Redis对象,返回对象结构体中的相关参数(refcount和encoding)或者通过相关参数计算出需要查询的内容(idletime和freq)。

/* Object command allows to inspect the internals of an Redis Object.
 * Usage: OBJECT   */
void objectCommand(client *c) {
    robj *o;
 
    if (!strcasecmp(c->argv[1]->ptr,"help") && c->argc == 2) {
        void *blenp = addDeferredMultiBulkLength(c);
        int blen = 0;
        blen++; addReplyStatus(c,
        "OBJECT  key. Subcommands:");
        blen++; addReplyStatus(c,
        "refcount -- Return the number of references of the value associated with the specified key.");
        blen++; addReplyStatus(c,
        "encoding -- Return the kind of internal representation used in order to store the value associated with a key.");
        blen++; addReplyStatus(c,
        "idletime -- Return the idle time of the key, that is the approximated number of seconds elapsed since the last access to the key.");
        blen++; addReplyStatus(c,
        "freq -- Return the access frequency index of the key. The returned integer is proportional to the logarithm of the recent access frequency of the key.");
        setDeferredMultiBulkLength(c,blenp,blen);
    } else if (!strcasecmp(c->argv[1]->ptr,"refcount") && c->argc == 3) {
        if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
                == NULL) return;
        addReplyLongLong(c,o->refcount);
    } else if (!strcasecmp(c->argv[1]->ptr,"encoding") && c->argc == 3) {
        if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
                == NULL) return;
        addReplyBulkCString(c,strEncoding(o->encoding));
    } else if (!strcasecmp(c->argv[1]->ptr,"idletime") && c->argc == 3) {
        if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
                == NULL) return;
        if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
            addReplyError(c,"An LFU maxmemory policy is selected, idle time not tracked. Please note that when switching between policies at runtime LRU and LFU data will take some time to adjust.");
            return;
        }
        addReplyLongLong(c,estimateObjectIdleTime(o)/1000);
    } else if (!strcasecmp(c->argv[1]->ptr,"freq") && c->argc == 3) {
        if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
                == NULL) return;
        if (!(server.maxmemory_policy & MAXMEMORY_FLAG_LFU)) {
            addReplyError(c,"An LFU maxmemory policy is not selected, access frequency not tracked. Please note that when switching between policies at runtime LRU and LFU data will take some time to adjust.");
            return;
        }
        /* LFUDecrAndReturn should be called
         * in case of the key has not been accessed for a long time,
         * because we update the access time only
         * when the key is read or overwritten. */
        addReplyLongLong(c,LFUDecrAndReturn(o));
    } else {
        addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for "%s". Try OBJECT help",
            (char *)c->argv[1]->ptr);
    }
}

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/36724.html

相关文章

  • Redis学习笔记2018-06-14 Redis源码学习之sentinel

    摘要:顺风车运营研发团队方波是的高可用解决方案,由一个或多个实例组成的系统可以同时监听多组实例后面简称一组,当发现进入下线状态时进行故障转移。 顺风车运营研发团队 方波sentinel是redis的高可用解决方案,由一个或多个sentinel实例组成的系统可以同时监听多组master-slave实例(后面简称一组),当发现master进入下线状态时进行故障转移。一:启动并初始化redis-s...

    hsluoyz 评论0 收藏0
  • Redis学习笔记2018-07-05 Redis指令学习1

    摘要:返回值不存在比字符串长度大时返回否则返回对应位上的值。返回值返回列表的尾部元素,不存在时返回时间复杂度源码实现查找中没有元素返回结果键空间通知通知正在监听这个的具体实现为将的开启脏标志加参考 顺风车运营研发团队 施洪宝 getbit, getrange, blpop, brpop, rpop 一. getbit1.命令说明使用方式: getbit key offset 功能: 对key...

    omgdog 评论0 收藏0
  • Laravel学习笔记之Session源码解析(上)

    摘要:然后中间件使用方法来启动获取实例,使用类来管理主要分为两步获取实例,主要步骤是通过该实例从存储介质中读取该次请求所需要的数据,主要步骤是。 说明:本文主要通过学习Laravel的session源码学习Laravel是如何设计session的,将自己的学习心得分享出来,希望对别人有所帮助。Laravel在web middleware中定义了session中间件IlluminateSess...

    NervosNetwork 评论0 收藏0
  • 大数据系列——Redis学习笔记

    摘要:数据类型之集合的是类型的无序集合。集合中添加元素集合中的元素是不能重复如果添加的已经存在在中则返回不存在则返回数据类型之有序集合和一样也是类型元素的集合且不允许重复的成员。不同的是每个元素都会关联一个类型的分数。 1. Redis的简介    Redis是一个开源(BSD许可),内存存储的数据结构服务器,可用作数据库,高速缓存和消息队列代理 它支持字符串、哈希表、列表、集合、有序集合...

    Mr_houzi 评论0 收藏0
  • Redis学习笔记2018-06-08 主从复制实现

    摘要:方案一的复杂性较低,方案二则内存利用率更高,采用的是方案一。主服务收到后,则执行命令,异步启动进程,生成文件,新收到的写命令则往缓冲区写入。部分主从复制方案实现源码实现待补充 顺风车运营研发团队 黄桃1、主从复制问题 方案讨论 方案一: 主收到从服务的同步请求,则立马生成一个数据副本+缓冲区,再异步启动BGSAVE进程,读取数据副本并生成RDB文件,新收到的写命令则往缓冲区写入。待RD...

    Cympros 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<