资讯专栏INFORMATION COLUMN

购物网站的redis相关实现

_Zhao / 2168人阅读

摘要:购物网站的相关实现需求登录和缓存对于一个大型网上商店,假设每天都会有大约万不同的用户,这些用户会给网站带来亿次点击,并从网站购买超过万件商品。根据给定的令牌查找与之相应的用户,检查用户是否登录,并返回该用户的。

购物网站的redis相关实现
需求: (1)登录和cookie缓存

对于一个大型网上商店,假设每天都会有大约500万不同的用户,这些用户会给网站带来1亿次点击,并从网站购买超过10万件商品。

我们需要存储用户登录信息,用户的访问时长和已浏览商品的数量,如果将其保存到数据库中,会导致大量的数据库写入。

大多数关系数据库在每台数据库服务器上面每秒只能插入、更新或者删除200~2000个数据行,尽管批量操作可以以更快的速度执行,但客户点每次浏览网页都只更新少数几行数据,所以高速的批量插入在这里并不适用。

而对于负载量相对比较大的系统,譬如平均情况下每秒大约1200次写入,高峰时期每秒接近6000次写入,所以它必须部署10台关系数据库服务器才能应对高峰期的负载量。

为了提升系统的处理速度,降低资源的占用量,可以将传统数据库的一部分数据处理任务以及存储任务转交给Redis来完成。

(2)使用redis实现购物车

我们把购物车的信息也存储到Redis,并且使用与用户会话令牌一样的cookie id来引用购物车。

将用户和购物车都存储到Redis里面,这种做法除了可以减少请求体积外,我们可以根据用户浏览过的商品,用户放入购物车的商品以及用户最终购买的商品进行统计计算,并构建起很多大型网络零售上都在提供的”在查看过这件商品的用户当中,有X%的用户最终购买了这件商品“”购买了这件商品的用户也购买了某某其他商品“等功能,这些功能可以帮助用户查找其他相关的商品,并最终提升网站的销售业绩。

(3)网页缓存

购物网站上多数页面实际上并不会经常发生大变化,虽然会向分类中添加新商品、移除旧商品、有时候特价促销、有时甚至还有”热卖商品“页面,但是在一般情况下,网站只有账号设置、以往订单、购物车(结账信息)以及其他少数几个页面才包含需要每次载入都要动态生成的内容。

对于不需要动态生成的页面,我们需要尽量不再生成,减少网站在动态生成内容上面所花的时间,可以降低网站处理相同负载所需的服务器数量,让网站速度加快。

python应用框架大都存在中间件,我们创建中间件来调用Redis缓存函数:对于不能被缓存的请求,直接生成并返回页面,对于可以被缓存的请求,先从缓存取出缓存页面,如果缓存页面不存在,那么会生成页面并将其缓存在Redis,最后将页面返回给函数调用者。

这样的方式可以让网站在5分钟之内无需再为他们动态地生成视图页面。

(4) 数据行缓存

为了清空旧库存和吸引客户消费,决定开始新一轮的促销活动,每天都会推出一些特价商品供用户抢购,所有特价商品的数量都是限定的,卖完为止。在这种情况下,网站是不能对整个促销页面进行缓存,这会导致用户看到错误的特价商品和商品剩余数,但每次载入页面都从数据库中取出特价商品的剩余数量的话,又会给数据库带来巨大的压力。

为了应付促销活动带来的大量负载,需要对数据行进行缓存,可以编写一个持续运行的守护进程函数,让这个函数将指定的数据行缓存到Redis里面,并不定期地对这些缓存进行更新。缓存函数将数据和编码为json字典并存储在Redis的字符串中。

我们还需要使用两个有序集合来记录应该在何时对缓存进行更新,第一个有序集合为调度有序集合,成员为数据行的ID,分值为时间戳,记录应该在何时将制定的数据行缓存到Redis里面。第二个有序集合为延时有序集合,成员为数据行的ID,而分值记录指定数据行的缓存需要每隔多少秒更新一次。

对于更新频率,如果数据行记录的是特价促销商品的剩余数量,并且参与促销活动的用户非常多,那么我么最好每隔几秒更新一次数据行缓存,如果数据并不经常改变,或者商品缺货是可以接受的,我们可以每分钟更新一次缓存。

(5)网页分析

之前对于网页的缓存,如果网站总共包含100000件商品,贸然缓存所有商品页面将耗尽整个网站的全部内存,所以我们可以只针对那些浏览量较高的商品页面进行缓存。

每个用户都有一个相应的记录用户浏览商品历史的有序集合,我们在记录的过程中,我们也痛死记录所有商品的浏览次数,根据浏览次数对商品进行排序,被浏览得最多的商品放到有序集合的索引0位置上,并且具有整个有序集合最少的分值。

除了缓存最常被浏览的商品外,我们还需要发现那些变得越来越流畅的新商品,于是我们需要定期修剪有序集合的长度并调整已有元素的分值,才能使得新流行的商品在排行榜中占据一席之地。

Redis数据结构设计

(1)登录令牌与用户映射关系的散列 "login:"
(2)记录最近登录用户的有序集合 "recent:"
(3)记录各个用户最近浏览商品的有序集合 "viewed:94233rhsYRIq3yi3qryrye"
(4)每个用户的购物车散列,存储商品ID与商品订购数量之间的映射。"cart:94233rhsYRIq3yi3qryrye"
(5)请求页面缓存集合 "cache:wre9w3rieruerwe3" (wre9w3rieruerwe3代表请求ID)

(94233rhsYRIq3yi3qryrye假设为某个用户的令牌)
(6)数据行缓存字符串,数据列(column)的名字会被映射为json字典的键,而数据行的值会被映射为json字典的值,"inv:273" (其中273为数据行id)。
(7)数据行缓存调度有序集合,成员为数据行的ID,分值为时间戳,记录应该在何时将制定的数据行缓存到Redis里面,"schedule:"。
(8)数据行缓存延时有序集合,成员为数据行的ID,而分值记录指定数据行的缓存需要每隔多少秒更新一次,"delay:"。

(9)商品浏览次数有序集合,成员为商品,分值为浏览次数负值,方便保持在有序集合的较前的索引位置,"viewed"。

Redis实现

(1)使用散列来存储登录cookie令牌与已登录用户之前的映射。根据给定的令牌查找与之相应的用户,检查用户是否登录,并返回该用户的ID。

"""
获取并返回令牌对应的用户

@param {object}
@param {string} token

@return {string} 用户id
"""
def checkToken(conn, token):
    return conn.hget("login:", token)

(2)用户每次浏览页面的时候,需要更新“登录令牌与用户映射关系的散列”里面的信息,
并将用户的令牌和当前时间戳添加到 “记录最近登录用户的有序集合” 里面,
将浏览商品添加到记录“记录各个用户最近浏览商品的有序集合”中,如果记录的商品数量超过25个,对这个有序集合进行修剪。

"""
更新令牌时,需要更改用户令牌信息,将用户记录到最近登录用户的有序集合中,
如果用户浏览的是商品,则需要将浏览商品写入该用户浏览过商品的有序集合中,并保证该集合不超过25个

@param {object}
@param {string} token
@param {string} user
@param {string} item

"""
def updateToken(conn, token, user, item = None):
    timestamp = time.time()
    # 更新用户令牌登录对应的用户信息
    conn.hset("login:", token, user)
    # 增加最近访问的用户到有序集合
    conn.zadd("recent:", token, timestamp)

    # 如果浏览产品,记录该用户最近访问的25个产品
    if item:
        conn.zadd("viewed:" + token, item, timestamp)
        conn.zremrangebyrank("viewed:" + token, 0, -26)
        # 记录每个商品的浏览量
        conn.zincrby("viewed:", item, -1)

(3)存储会话的内存会随着时间的推移而不断增加,需要定期清理会话数据,我们决定只保留最新的1000万个会话。

我们可以用 守护进程的方式来运行或者定义一个cron job每隔一段时间运行
检查最近 “记录最近登录用户的有序集合” 大小是否超过了限制,超过限制每秒从集合中删除最旧的100个令牌,并且移除相应的“登录令牌与用户映射关系的散列”的信息和对应的“记录各个用户最近浏览商品的有序集合”,对应的”美国用户的购物车散列“。

我们也可以使用EXPIRE命令,为用户令牌设记录用户商品浏览记录的有序集合设置过期时间,让Redis在一段时间之后自动删除它们,这样就不用使用有序集合来记录最近出现的令牌了,但是这样我们就没办法将会话数限制在1000万之内了。

"""
定期清理会话数据,只保留最新的1000万个会话。

使用 *守护进程的方式来运行或者定义一个cron job每隔一段时间运行* ,
检查最近 “记录最近登录用户的有序集合” 大小是否超过了限制,超过限制每秒从集合中删除最旧的100个令牌,
并且移除相应的“登录令牌与用户映射关系的散列”的信息和对应的“记录各个用户最近浏览商品的有序集合”。

@param {object}
"""

# 循环判断,如果是cron job可以不用循环
QUIT = False
# 限制保留的最大会话数据
LIMIT = 10000000

def cleanFullSession(conn):
    # 循环判断,如果是cron job可以不用循环
    while not QUIT:
        # 查询最近登录用户会话数
        size = conn.zcard("recent:")
        # 没有超过限制,休眠1秒再继续执行
        if size <= LIMIT:
            time.sleep(1)
            continue
        
        # 查询最旧登录的最多100个令牌范围
        end_index = min(size - LIMIT, 100)
        tokens = conn.zrange("recent:", 0, end_index - 1)
        
        # 将要删除的key都推入到数组中,要时候一起删除
        session_keys = []
        for token in tokens:
            session_keys.append("viewed:" + token)
            session_keys.append("cart:" + token)
        
        # 批量删除相应的用户最近浏览商品有序集合,用户的购物车,登录令牌与用户映射关系的散列和记录最近登录用户的有序集合
        conn.delete(*session_keys)
        conn.hdel("login:", *tokens)
        conn.zrem("recent:", *tokens)

(4)对购物车进行更新,如果用户订购某件商品数量大于0,将商品信息添加到 “用户的购物车散列”中,如果购买商品已经存在,那么更新购买数量。

"""
对购物车进行更新,如果用户订购某件商品数量大于0,将商品信息添加到 “用户的购物车散列”中,如果购买商品已经存在,那么更新购买数量

@param {object}
@param {string} session
@param {string} item
@param {float}  count

"""
def addToCart(conn, session, item, count):
    if count <= 0:
        # 从购物车移除指定商品
        conn.hrem("cart:" + session, item)
    else:
        # 将指定商品添加到对应的购物车中
        conn.hset("cart:" + session, item, count)

(5)在用户请求页面时,对于不能被缓存的请求,直接生成并返回页面,对于可以被缓存的请求,先从缓存取出缓存页面,如果缓存页面不存在,那么会生成页面并将其缓存在Redis,最后将页面返回给函数调用者。

"""
在用户请求页面时,对于不能被缓存的请求,直接生成并返回页面,
对于可以被缓存的请求,先从缓存取出缓存页面,如果缓存页面不存在,那么会生成页面并将其缓存在Redis,最后将页面返回给函数调用者。

@param {object} conn
@param {string} request
@param {callback}

@return 
"""
def cacheRequest(conn, request, callback):
    # 判断请求是否能被缓存,不能的话直接调用回调函数
    if not canCache(conn, request):
        return callback(request)
    
    # 将请求转换为一个简单的字符串健,方便之后进行查找
    page_key = "cache:" + hashRequest(request)
    content = conn.get(page_key)
    
    # 没有缓存的页面,调用回调函数生成页面,并缓存到redis中
    if not content:
        content = callback(request)
        conn.setex(page_key, content, 300)

    return content

"""
判断页面是否能被缓存,检查商品是否被缓存以及页面是否为商品页面,根据商品排名来判断是否需要缓存

@param {object} conn
@param {string} request

@return {boolean}
"""
def canCache(conn, request):
    # 根据请求的URL,得到商品ID
    item_id = extractItemId(request)
    # 检查这个页面能否被缓存以及这个页面是否为商品页面
    if not item_id or isDynamic(request):
        return False

    # 商品的浏览排名
    rank = conn.zrank("viewed:", item_id)
    return rank is not None and rank < 10000

"""
解析请求的URL,取得query中的item id

@param {string} request

@return {string}
"""
def extractItemId(request):
    parsed = urlparse.urlparse(request)
    # 返回query字典
    query  = urlparse.parse_qs(parsed.query)
    return (query.get("item") or [None])[0]

"""
判断请求的页面是否动态页面

@param {string} request

@return {boolean}
"""
def isDynamic(request):
    parsed = urlparse.urlparse(request)
    query = urlparse.parse_qs(parsed.query)
    return "_" in query

"""
将请求转换为一个简单的字符串健,方便之后进行查找
@param {string} request

@return {string}
"""
def hashRequest(request):
    return str(hash(request))

(6)为了让缓存函数定期地缓存数据行,首先需要将行ID和给定的延迟值添加到延迟有序集合中,再将行ID和当前时间的时间戳添加到调度有序集合中。如果某个数据行的延迟值不存在,那么程序将取消对这个数据行的调度。如果我们想要移除某个数据行已有的缓存并且不再缓存那个数据行,只需要把那个数据行的延迟值设置为小于或等于0即可。

"""
设置数据行缓存的延迟值和调度时间

@param {object} conn
@param {int}    row id
@param {int}    delay

"""
def scheduleRowCache(conn, row_id, delay):
    conn.zadd("delay:", row_id, delay)
    conn.zadd("schedule:", row_id, time.time())

(7)尝试读取”数据行缓存调度有序集合“的第一个元素以及该元素的分支,如果”数据行缓存调度有序集合“没有包含任何元素,或者分值存储的时间戳所指定的时间尚未来临,那么函数先休眠50毫秒,然后再重新进行检查。

当发现一个需要立即进行更新的数据行时,如果数据行的延迟值小于或者等于0,会从”数据行缓存延时有序集合“和”数据行缓存调度有序集合“移除这个数据行的ID,并从缓存里面删除这个数据行已有的缓存,再重新进行检查。

对于延迟值大于0的数据行来说,从数据库里面取出这些行,将他们编码为json格式并存储到Redis里面,然后更新这些行的调度时间。

"""
守护进程,根据调度时间有序集合和延迟值缓存数据行

@param {object} conn

"""
def cacheRow(conn):
    while not QUIT:
        # 需要读取”数据行缓存调度有序集合“的第一个元素,如果没有包含任何元素,或者分值存储的时间戳所指定的时间尚未来临,那么函数先休眠50毫秒,然后再重新进行检查
        next = conn.zrange("schedule:", 0, 0, withscores=True)
        now = time.time()
        if not next or next[0][1] > now:
            time.sleep(.05)
            continue
        
        row_id = next[0][0]
        # 取出延迟值
        delay = conn.zscore("delay:", row_id)
        # 如果延迟值小于等于0,则不再缓存该数据行
        if delay <= 0:
            conn.zrem("schedule:", row_id)
            conn.zrem("delay:", row_id)
            conn.delete("inv:" + row_id) 
            continue;

        # 需要缓存的,更新缓存调度的有序集合,并缓存该数据行
        row = Inventory.get(row_id)
        conn.zadd("schedule:", row_id, now + delay)
        conn.set("inv:" + row_id, json.dumps(row.toDict()))
        
"""
库存类,库存的商品信息
"""
class Inventory(object):
    def __init__(self, id):
        self.id = id

    @classmethod
    def get(cls, id):
        return Inventory(id)
    
    def toDict(self):
        return {"id":self.id, "data":"data to cache...","cached":time.time()}

(8)我们需要在用户浏览页面时,“商品浏览次数有序集合”对应的商品中需要减一,使得保持在有序集合较前的索引位置。

同时我们需要开启一个守护进程,每隔5分钟,删除所有排名在20000名之后的商品浏览数,并使用ZINTERSTORE将删除之后剩余的所有商品的浏览次数减半。

而判断页面是否需要缓存,我们需要通过ZRANK取出商品的浏览次数排名,如果排名在10000内,那么说明该页面需要缓存。

"""
守护进程,删除所有排名在20000名之后的商品,并将删除之后剩余的所有商品浏览次数减半,5分钟执行一次

@param {object} conn

"""
def rescaleViewed(conn):
    while not QUIT:
        conn.zremrangebyrank("viewed:", 20000, -1)
        conn.zinterstore("viewed:", {"viewed:", .5})
        time.sleep(300)
测试代码
"""
测试
"""
import time
import urlparse
import uuid
import threading
import unittest
import json

class TestShoppingWebsite(unittest.TestCase):
    def setUp(self):
        import redis
        self.conn = redis.Redis(db=15)
    
    def tearDown(self):
        conn = self.conn
        to_del = (
            conn.keys("login:*") + conn.keys("recent:*") + conn.keys("viewed:*") +
            conn.keys("cart:*") + conn.keys("cache:*") + conn.keys("delay:*") + 
            conn.keys("schedule:*") + conn.keys("inv:*"))

        if to_del:
            conn.delete(*to_del)

        del self.conn

        global QUIT, LIMIT
        QUIT = False
        LIMIT = 10000000
        print
        print

    def testLoginCookies(self):
        conn = self.conn
        global LIMIT, QUIT
        token = str(uuid.uuid4())

        updateToken(conn, token, "username", "itemX")
        print "We just logged-in/updated token:", token
        print "For user:", "username"
        print

        print "What username do we get when we look-up that tokan?"
        r = checkToken(conn, token)
        print r
        print
        self.assertTrue(r)

        print "Let"s drop the maximun number of cookies to 0 to clear them out"
        print "We will start a thread to do the cleaning, while we stop it later"

        LIMIT = 0
        t = threading.Thread(target = cleanFullSession, args = (conn,))
        t.setDaemon(1)
        t.start()
        time.sleep(1)
        QUIT = True
        time.sleep(2)
        if t.isAlive():
            raise Exception("The clean sessions thread is still slive?!?")

        s = conn.hlen("login:")
        print "The current number of session still available is:", s
        self.assertFalse(s)

    def testShoppingCartCookies(self):
        conn = self.conn
        global LIMIT, QUIT
        token = str(uuid.uuid4())

        print "We"ll refresh our session..."
        updateToken(conn, token, "username", "itemX")
        print "And add an item to the shopping cart"
        addToCart(conn, token, "itemY", 3)
        r = conn.hgetall("cart:" + token)
        print "Our Shopping cart currently has:", r
        print

        self.assertTrue(len(r) >= 1)

        print "Let"s clean out our sessions an carts"
        LIMIT = 0
        t = threading.Thread(target=cleanFullSession, args=(conn,))
        t.setDaemon(1)
        t.start()
        time.sleep(1)
        QUIT = True
        time.sleep(2)
        if t.isAlive():
            raise Exception("The clean sessions thread is still alive?!?")

        r = conn.hgetall("cart:" + token)
        print "Our shopping cart now contains:", r

        self.assertFalse(r)

    def testCacheRequest(self):
        conn = self.conn
        token = str(uuid.uuid4())

        def callback(request):
            return "content for " + request

        updateToken(conn, token, "username", "itemX")
        url = "http://test.com/?item=itemX"
        print "We are going to cache a simple request against", url
        result = cacheRequest(conn, url, callback)
        print "We got initial content:", repr(result)
        print

        self.assertTrue(result)

        print "To test that we"ve cached the request, we"ll pass a bad callback"
        result2 = cacheRequest(conn, url, None)
        print "We ended up getting the same response!", repr(result2)

        self.assertEquals(result, result2)

        self.assertFalse(canCache(conn, "http://test.com/"))
        self.assertFalse(canCache(conn, "http://test.com/?item=itemX&_=1234567"))

    def testCacheRows(self):
        import pprint
        conn = self.conn
        global  QUIT

        print "First, let"s schedule caching of itemX every 5 seconds"
        scheduleRowCache(conn, "itemX", 5)
        print "Our schedule looks like:"
        s = conn.zrange("schedule:", 0, -1, withscores = True)
        pprint.pprint(s)
        self.assertTrue(s)

        print "We"ll start a caching thread that will cache the data..."
        t = threading.Thread(target=cacheRow, args=(conn,))
        t.setDaemon(1)
        t.start()
        time.sleep(1)
        print "Our cached data looks like:"
        r = conn.get("inv:itemX")
        print repr(r)
        self.assertTrue(r)
        print
        print "We"ll check again in 5 seconds..."
        time.sleep(5)
        print "Notice that the data has changed..."
        r2 = conn.get("inv:itemX")
        print repr(r2)
        print
        self.assertTrue(r2)
        self.assertTrue(r != r2)

        print "Let"s force un-caching"
        scheduleRowCache(conn, "itemX", -1)
        time.sleep(1)
        r = conn.get("inv:itemX")
        print "The cache was cleared?", not r
        print
        self.assertFalse(r)

        QUIT = True
        time.sleep(2)
        if t.isAlive():
            raise Exception("The database caching thread is still alive?!?")


if __name__ == "__main__":
    unittest.main()

完整示例代码地址:https://github.com/NancyLin/r...

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/36457.html

相关文章

  • 购物网站redis相关实现

    摘要:购物网站的相关实现需求登录和缓存对于一个大型网上商店,假设每天都会有大约万不同的用户,这些用户会给网站带来亿次点击,并从网站购买超过万件商品。根据给定的令牌查找与之相应的用户,检查用户是否登录,并返回该用户的。 购物网站的redis相关实现 需求: (1)登录和cookie缓存 对于一个大型网上商店,假设每天都会有大约500万不同的用户,这些用户会给网站带来1亿次点击,并从网站购买超...

    twohappy 评论0 收藏0
  • 购物网站redis相关实现(Java)

    摘要:处理器根据取出的数据对模板进行渲染处理器向客户端返回渲染后的内容作为请求的相应。于此相反,如果令牌的数量没有超过限制,那么程序会先休眠一秒,之后在重新进行检查。找出目前已有令牌的数量。 购物网站的redis相关实现 1、使用Redis构建文章投票网站(Java) 本文主要内容: 1、登录cookie 2、购物车cookie 3、缓存数据库行 4、测试 必备知识点 WEB应用就是通...

    big_cat 评论0 收藏0
  • 购物网站redis相关实现(Java)

    摘要:处理器根据取出的数据对模板进行渲染处理器向客户端返回渲染后的内容作为请求的相应。于此相反,如果令牌的数量没有超过限制,那么程序会先休眠一秒,之后在重新进行检查。找出目前已有令牌的数量。 购物网站的redis相关实现 1、使用Redis构建文章投票网站(Java) 本文主要内容: 1、登录cookie 2、购物车cookie 3、缓存数据库行 4、测试 必备知识点 WEB应用就是通...

    zsy888 评论0 收藏0
  • 购物网站redis相关实现(Java)

    摘要:处理器根据取出的数据对模板进行渲染处理器向客户端返回渲染后的内容作为请求的相应。于此相反,如果令牌的数量没有超过限制,那么程序会先休眠一秒,之后在重新进行检查。找出目前已有令牌的数量。 购物网站的redis相关实现 1、使用Redis构建文章投票网站(Java) 本文主要内容: 1、登录cookie 2、购物车cookie 3、缓存数据库行 4、测试 必备知识点 WEB应用就是通...

    lunaticf 评论0 收藏0
  • 文章投票网站redis相关实现

    摘要:需求要构建一个文章投票网站,文章需要在一天内至少获得张票,才能优先显示在当天文章列表前列。根据评分或者发布时间对群组文章进行排序和分页文章添加的群组移除的群组群组有序集合名以上就是一个文章投票网站的相关实现。 需求: 要构建一个文章投票网站,文章需要在一天内至少获得200张票,才能优先显示在当天文章列表前列。 但是为了避免发布时间较久的文章由于累计的票数较多而一直停留在文章列表前列,我...

    沈俭 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<