资讯专栏INFORMATION COLUMN

vertx实现redis版session共享

fancyLuo / 3083人阅读

摘要:今天分享的是里的共享问题。主要考虑到清除的时候使用,因为数据主要以保存在为主,本地保存是辅助作用。

现在越来越流行微服务架构了,提到微服务架构的话,大家能想到的是spring boot和vertx吧!前者大家听的比交多些,但是今天我给大家分享的是后者vertx。想要了解更多请阅读vertx官网http://vertx.io/docs/vertx-we...

废话不多说了,直接进主题。今天分享的是vertx web里的session共享问题。在公司我用vertx开发了一个web平台,但是需要防止宕机无法继续提供服务这种情况,所以部署了两台机器,这里就开始涉及到了session共享了。为了性能考虑,我就想把session放入redis里来达到目的,可是在vertx官网没有这种实现,当时我就用Hazelcast(网友说,性能不怎么好)将就先用着。前几天我抽时间看了底层代码,自己动手封装了下,将session放入redis里。github地址: https://github.com/robin0909/...

原生vertx session 设计

下面给出 LocalSessionStoreImpl 和 ClusteredSessionStoreImpl 的结构关系:

LocalSession:

ClusteredSession:

从上面的结构中我们能找到一个继承实现关系,顶级接口是SessionStore,
而SessionStore是什么接口呢?在vertx里,session有一个专门的设计,这里的SessionStore就是专门为存储session而定义接口,看看这个接口里定义了哪些方法吧!

</>复制代码

  1. public interface SessionStore {
  2. //主要在分布式session共享时会用到的属性,从store里获取session的重试时间
  3. long retryTimeout();
  4. Session createSession(long timeout);
  5. //根据sessionId从store里获取Session
  6. void get(String id, Handler> resultHandler);
  7. //删除
  8. void delete(String id, Handler> resultHandler);
  9. //增加session
  10. void put(Session session, Handler> resultHandler);
  11. //清空
  12. void clear(Handler> resultHandler);
  13. //store的size
  14. void size(Handler> resultHandler);
  15. //关闭,释放资源操作
  16. void close();
  17. }

上面很多会用到有一个属性,就是sessionId(id)。在session机制里,还需要依靠浏览器端的cookie。当服务器端session生成后,服务器会在cookie里设置一个vertx-web.session=4d9db69d-7577-4b17-8a66-4d6a2472cd33 返回给浏览器。想必大家也看出来了,就是一个uuid码,也就是sessionId。

接下来,我们可以看下二级子接口。二级子接口的作用,其实很简单,直接上代码,大家就懂了。

</>复制代码

  1. public interface LocalSessionStore extends SessionStore {
  2. long DEFAULT_REAPER_INTERVAL = 1000;
  3. String DEFAULT_SESSION_MAP_NAME = "vertx-web.sessions";
  4. static LocalSessionStore create(Vertx vertx) {
  5. return new LocalSessionStoreImpl(vertx, DEFAULT_SESSION_MAP_NAME, DEFAULT_REAPER_INTERVAL);
  6. }
  7. static LocalSessionStore create(Vertx vertx, String sessionMapName) {
  8. return new LocalSessionStoreImpl(vertx, sessionMapName, DEFAULT_REAPER_INTERVAL);
  9. }
  10. static LocalSessionStore create(Vertx vertx, String sessionMapName, long reaperInterval) {
  11. return new LocalSessionStoreImpl(vertx, sessionMapName, reaperInterval);
  12. }
  13. }

这里主要为了方面在使用和构造时很优雅,router.route().handler(SessionHandler.create(LocalSessionStore.create(vertx))); 有点类似工厂,创造对象。在这个接口里,也可以初始化一些专有参数。所以没有什么难度。

对官方代码我们也理解的差不多了,接下来开始动手封装自己的RedisSessionStore吧!

自己的RedisSessionStore封装

首先我们定义一个RedisSessionStore接口, 接口继承SessionStore接口。

</>复制代码

  1. /**
  2. * Created by robinyang on 2017/3/13.
  3. */
  4. public interface RedisSessionStore extends SessionStore {
  5. long DEFAULT_RETRY_TIMEOUT = 2 * 1000;
  6. String DEFAULT_SESSION_MAP_NAME = "vertx-web.sessions";
  7. static RedisSessionStore create(Vertx vertx) {
  8. return new RedisSessionStoreImpl(vertx, DEFAULT_SESSION_MAP_NAME, DEFAULT_RETRY_TIMEOUT);
  9. }
  10. static RedisSessionStore create(Vertx vertx, String sessionMapName) {
  11. return new RedisSessionStoreImpl(vertx, sessionMapName, DEFAULT_RETRY_TIMEOUT);
  12. }
  13. static RedisSessionStore create(Vertx vertx, String sessionMapName, long reaperInterval) {
  14. return new RedisSessionStoreImpl(vertx, sessionMapName, reaperInterval);
  15. }
  16. RedisSessionStore host(String host);
  17. RedisSessionStore port(int port);
  18. RedisSessionStore auth(String pwd);
  19. }

接着创建一个RedisSessionStoreImpl类, 这里我先给出一个已经写好的RedisSessionStoreImpl, 稍后解释。

</>复制代码

  1. public class RedisSessionStoreImpl implements RedisSessionStore {
  2. private static final Logger logger = LoggerFactory.getLogger(RedisSessionStoreImpl.class);
  3. private final Vertx vertx;
  4. private final String sessionMapName;
  5. private final long retryTimeout;
  6. private final LocalMap localMap;
  7. //默认值
  8. private String host = "localhost";
  9. private int port = 6379;
  10. private String auth;
  11. RedisClient redisClient;
  12. // 清除所有时使用
  13. private List localSessionIds;
  14. public RedisSessionStoreImpl(Vertx vertx, String defaultSessionMapName, long retryTimeout) {
  15. this.vertx = vertx;
  16. this.sessionMapName = defaultSessionMapName;
  17. this.retryTimeout = retryTimeout;
  18. localMap = vertx.sharedData().getLocalMap(sessionMapName);
  19. localSessionIds = new Vector<>();
  20. redisManager();
  21. }
  22. @Override
  23. public long retryTimeout() {
  24. return retryTimeout;
  25. }
  26. @Override
  27. public Session createSession(long timeout) {
  28. return new SessionImpl(new PRNG(vertx), timeout, DEFAULT_SESSIONID_LENGTH);
  29. }
  30. @Override
  31. public Session createSession(long timeout, int length) {
  32. return new SessionImpl(new PRNG(vertx), timeout, length);
  33. }
  34. @Override
  35. public void get(String id, Handler> resultHandler) {
  36. redisClient.getBinary(id, res->{
  37. if(res.succeeded()) {
  38. Buffer buffer = res.result();
  39. if(buffer != null) {
  40. SessionImpl session = new SessionImpl(new PRNG(vertx));
  41. session.readFromBuffer(0, buffer);
  42. resultHandler.handle(Future.succeededFuture(session));
  43. } else {
  44. resultHandler.handle(Future.succeededFuture(localMap.get(id)));
  45. }
  46. } else {
  47. resultHandler.handle(Future.failedFuture(res.cause()));
  48. }
  49. });
  50. }
  51. @Override
  52. public void delete(String id, Handler> resultHandler) {
  53. redisClient.del(id, res->{
  54. if (res.succeeded()) {
  55. localSessionIds.remove(id);
  56. resultHandler.handle(Future.succeededFuture(true));
  57. } else {
  58. resultHandler.handle(Future.failedFuture(res.cause()));
  59. logger.error("redis里删除sessionId: {} 失败", id, res.cause());
  60. }
  61. });
  62. }
  63. @Override
  64. public void put(Session session, Handler> resultHandler) {
  65. //put 之前判断session是否存在,如果存在的话,校验下
  66. redisClient.getBinary(session.id(), res1->{
  67. if (res1.succeeded()) {
  68. //存在数据
  69. if(res1.result()!=null) {
  70. Buffer buffer = res1.result();
  71. SessionImpl oldSession = new SessionImpl(new PRNG(vertx));
  72. oldSession.readFromBuffer(0, buffer);
  73. SessionImpl newSession = (SessionImpl)session;
  74. if(oldSession.version() != newSession.version()) {
  75. resultHandler.handle(Future.failedFuture("Version mismatch"));
  76. return;
  77. }
  78. newSession.incrementVersion();
  79. writeSession(session, resultHandler);
  80. } else {
  81. //不存在数据
  82. SessionImpl newSession = (SessionImpl)session;
  83. newSession.incrementVersion();
  84. writeSession(session, resultHandler);
  85. }
  86. } else {
  87. resultHandler.handle(Future.failedFuture(res1.cause()));
  88. }
  89. });
  90. }
  91. private void writeSession(Session session, Handler> resultHandler) {
  92. Buffer buffer = Buffer.buffer();
  93. SessionImpl sessionImpl = (SessionImpl)session;
  94. //将session序列化到 buffer里
  95. sessionImpl.writeToBuffer(buffer);
  96. SetOptions setOptions = new SetOptions().setPX(session.timeout());
  97. redisClient.setBinaryWithOptions(session.id(), buffer, setOptions, res->{
  98. if (res.succeeded()) {
  99. logger.debug("set key: {} ", session.data());
  100. localSessionIds.add(session.id());
  101. resultHandler.handle(Future.succeededFuture(true));
  102. } else {
  103. resultHandler.handle(Future.failedFuture(res.cause()));
  104. }
  105. });
  106. }
  107. @Override
  108. public void clear(Handler> resultHandler) {
  109. localSessionIds.stream().forEach(id->{
  110. redisClient.del(id, res->{
  111. //如果在localSessionIds里存在,但是在redis里过期不存在了, 只要通知下就行
  112. localSessionIds.remove(id);
  113. });
  114. });
  115. resultHandler.handle(Future.succeededFuture(true));
  116. }
  117. @Override
  118. public void size(Handler> resultHandler) {
  119. resultHandler.handle(Future.succeededFuture(localSessionIds.size()));
  120. }
  121. @Override
  122. public void close() {
  123. redisClient.close(res->{
  124. logger.debug("关闭 redisClient ");
  125. });
  126. }
  127. private void redisManager() {
  128. RedisOptions redisOptions = new RedisOptions();
  129. redisOptions.setHost(host).setPort(port).setAuth(auth);
  130. redisClient = RedisClient.create(vertx, redisOptions);
  131. }
  132. @Override
  133. public RedisSessionStore host(String host) {
  134. this.host = host;
  135. return this;
  136. }
  137. @Override
  138. public RedisSessionStore port(int port) {
  139. this.port = port;
  140. return this;
  141. }
  142. @Override
  143. public RedisSessionStore auth(String pwd) {
  144. this.auth = pwd;
  145. return this;
  146. }
  147. }

首先,从get()和put()这两个方法开始,这两方法比较核心。

get(), 创建Cookie的时候会生成一个uuid,用这个id取session,第一次我们发现无法取到, 第56行代码就会根据这个id去生成一个session。

每次发送请求的时候,我们都会重置session过期时间,所以每次get完后,返回给浏览器之前都会有一个put操作,也就是更新数据。这里的put就稍微复杂一点点,在put之前,我们需要先根据传过来的session里的id从redis里取到session。如果获取不到,说明之前通过get获取的session不是同一个对象,就出异常,这就相当于设置了一道安全的门槛吧!当获取到了,再比较两个session的版本是不是一致的,如果不一致,说明session被破环了,算是第二个安全门槛设置吧!都没有问题了,就可以put session了,并且重新设置时间。

这里依赖vertx提供的redisClient来操作数据的,所以我们必须引入这个依赖:io.vertx:vertx-redis-client:3.4.1 。

接下来还有一点需要提的是序列化问题。这里我使用的是vertx封装的一种序列化,将数据序列化到Buffer里,而SessiomImpl类里又已经实现好了序列化,从SessionImple序列化成Buffer和Buffer反序列化。

</>复制代码

  1. public class SessionImpl implements Session, ClusterSerializable, Shareable {
  2. //...
  3. @Override
  4. public void writeToBuffer(Buffer buff) {
  5. byte[] bytes = id.getBytes(UTF8);
  6. buff.appendInt(bytes.length).appendBytes(bytes);
  7. buff.appendLong(timeout);
  8. buff.appendLong(lastAccessed);
  9. buff.appendInt(version);
  10. Buffer dataBuf = writeDataToBuffer();
  11. buff.appendBuffer(dataBuf);
  12. }
  13. @Override
  14. public int readFromBuffer(int pos, Buffer buffer) {
  15. int len = buffer.getInt(pos);
  16. pos += 4;
  17. byte[] bytes = buffer.getBytes(pos, pos + len);
  18. pos += len;
  19. id = new String(bytes, UTF8);
  20. timeout = buffer.getLong(pos);
  21. pos += 8;
  22. lastAccessed = buffer.getLong(pos);
  23. pos += 8;
  24. version = buffer.getInt(pos);
  25. pos += 4;
  26. pos = readDataFromBuffer(pos, buffer);
  27. return pos;
  28. }
  29. //...
  30. }

以上就是序列化和反序列化的实现。

localSessionIds 主要考虑到清除session的时候使用,因为数据主要以保存在session为主,本地localSessionIds 保存sessionId是辅助作用。

用法

用法很简单,一行代码就说明。

</>复制代码

  1. router.route().handler(SessionHandler.create(RedisSessionStore.create(vertx).host("127.0.0.1").port(6349)));

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/70057.html

相关文章

  • Vert.x Blueprint 系列教程(一) | 待办事项服务开发教程

    摘要:本文章是蓝图系列的第一篇教程。是事件驱动的,同时也是非阻塞的。是一组负责分发和处理事件的线程。注意,我们绝对不能去阻塞线程,否则事件的处理过程会被阻塞,我们的应用就失去了响应能力。每个负责处理请求并且写入回应结果。 本文章是 Vert.x 蓝图系列 的第一篇教程。全系列: Vert.x Blueprint 系列教程(一) | 待办事项服务开发教程 Vert.x Blueprint 系...

    frank_fun 评论0 收藏0
  • Vert.x Blueprint 系列教程(二) | 开发基于消息的应用 - Vert.x Kue

    摘要:本文章是蓝图系列的第二篇教程。这就是请求回应模式。好多属性我们一个一个地解释一个序列,作为的地址任务的编号任务的类型任务携带的数据,以类型表示任务优先级,以枚举类型表示。默认优先级为正常任务的延迟时间,默认是任务状态,以枚举类型表示。 本文章是 Vert.x 蓝图系列 的第二篇教程。全系列: Vert.x Blueprint 系列教程(一) | 待办事项服务开发教程 Vert.x B...

    elina 评论0 收藏0
  • vertx的一些问题

    摘要:但经过一段使用后,发现的一些问题。这样产生了一系列问题。部署的是异步的多线程环境,这个方法必须是线程安全的。小结的体系结构无疑是非常先进的,多线程异步结构,内置,支持,支持高可用度,这些都不是轻易能够提供的。 最近想选高效,简洁,扩充性强的web框做为移动平台后台,在对一系列框架对比后,选择了vertx。但经过一段使用后,发现vertx的一些问题。 1.vertx使用共享资源产生的重复...

    MRZYD 评论0 收藏0
  • 【小项目】全栈开发培训手册 | 后端(1) vert.x框架理解

    摘要:二来,给大家新开坑的项目一个参考。因此,本系列以主要以官方文档为基础,将尽可能多的特性融入本项目,并标注官网原文出处,有兴趣的小伙伴可点击深入了解。可以通过一些特殊协议例如将消息作为统一消息服务导出。下载完成后自行修改和。 开坑前言 我给这个专栏的名气取名叫做小项目,听名字就知道,这个专题最终的目的是带领大家完成一个项目。为什么要开这么大一个坑呢,一来,虽然网上讲IT知识点的书籍铺天盖...

    hightopo 评论0 收藏0

发表评论

0条评论

fancyLuo

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<