摘要:今天分享的是里的共享問題。主要考慮到清除的時候使用,因為數(shù)據(jù)主要以保存在為主,本地保存是輔助作用。
現(xiàn)在越來越流行微服務架構(gòu)了,提到微服務架構(gòu)的話,大家能想到的是spring boot和vertx吧!前者大家聽的比交多些,但是今天我給大家分享的是后者vertx。想要了解更多請閱讀vertx官網(wǎng)http://vertx.io/docs/vertx-we...
廢話不多說了,直接進主題。今天分享的是vertx web里的session共享問題。在公司我用vertx開發(fā)了一個web平臺,但是需要防止宕機無法繼續(xù)提供服務這種情況,所以部署了兩臺機器,這里就開始涉及到了session共享了。為了性能考慮,我就想把session放入redis里來達到目的,可是在vertx官網(wǎng)沒有這種實現(xiàn),當時我就用Hazelcast(網(wǎng)友說,性能不怎么好)將就先用著。前幾天我抽時間看了底層代碼,自己動手封裝了下,將session放入redis里。github地址: https://github.com/robin0909/...
原生vertx session 設(shè)計下面給出 LocalSessionStoreImpl 和 ClusteredSessionStoreImpl 的結(jié)構(gòu)關(guān)系:
LocalSession:
ClusteredSession:
從上面的結(jié)構(gòu)中我們能找到一個繼承實現(xiàn)關(guān)系,頂級接口是SessionStore,
而SessionStore是什么接口呢?在vertx里,session有一個專門的設(shè)計,這里的SessionStore就是專門為存儲session而定義接口,看看這個接口里定義了哪些方法吧!
public interface SessionStore { //主要在分布式session共享時會用到的屬性,從store里獲取session的重試時間 long retryTimeout(); Session createSession(long timeout); //根據(jù)sessionId從store里獲取Session void get(String id, Handler> resultHandler); //刪除 void delete(String id, Handler > resultHandler); //增加session void put(Session session, Handler > resultHandler); //清空 void clear(Handler > resultHandler); //store的size void size(Handler > resultHandler); //關(guān)閉,釋放資源操作 void close(); }
上面很多會用到有一個屬性,就是sessionId(id)。在session機制里,還需要依靠瀏覽器端的cookie。當服務器端session生成后,服務器會在cookie里設(shè)置一個vertx-web.session=4d9db69d-7577-4b17-8a66-4d6a2472cd33 返回給瀏覽器。想必大家也看出來了,就是一個uuid碼,也就是sessionId。
接下來,我們可以看下二級子接口。二級子接口的作用,其實很簡單,直接上代碼,大家就懂了。
public interface LocalSessionStore extends SessionStore { long DEFAULT_REAPER_INTERVAL = 1000; String DEFAULT_SESSION_MAP_NAME = "vertx-web.sessions"; static LocalSessionStore create(Vertx vertx) { return new LocalSessionStoreImpl(vertx, DEFAULT_SESSION_MAP_NAME, DEFAULT_REAPER_INTERVAL); } static LocalSessionStore create(Vertx vertx, String sessionMapName) { return new LocalSessionStoreImpl(vertx, sessionMapName, DEFAULT_REAPER_INTERVAL); } static LocalSessionStore create(Vertx vertx, String sessionMapName, long reaperInterval) { return new LocalSessionStoreImpl(vertx, sessionMapName, reaperInterval); } }
這里主要為了方面在使用和構(gòu)造時很優(yōu)雅,router.route().handler(SessionHandler.create(LocalSessionStore.create(vertx))); 有點類似工廠,創(chuàng)造對象。在這個接口里,也可以初始化一些專有參數(shù)。所以沒有什么難度。
對官方代碼我們也理解的差不多了,接下來開始動手封裝自己的RedisSessionStore吧!
自己的RedisSessionStore封裝首先我們定義一個RedisSessionStore接口, 接口繼承SessionStore接口。
/** * Created by robinyang on 2017/3/13. */ public interface RedisSessionStore extends SessionStore { long DEFAULT_RETRY_TIMEOUT = 2 * 1000; String DEFAULT_SESSION_MAP_NAME = "vertx-web.sessions"; static RedisSessionStore create(Vertx vertx) { return new RedisSessionStoreImpl(vertx, DEFAULT_SESSION_MAP_NAME, DEFAULT_RETRY_TIMEOUT); } static RedisSessionStore create(Vertx vertx, String sessionMapName) { return new RedisSessionStoreImpl(vertx, sessionMapName, DEFAULT_RETRY_TIMEOUT); } static RedisSessionStore create(Vertx vertx, String sessionMapName, long reaperInterval) { return new RedisSessionStoreImpl(vertx, sessionMapName, reaperInterval); } RedisSessionStore host(String host); RedisSessionStore port(int port); RedisSessionStore auth(String pwd); }
接著創(chuàng)建一個RedisSessionStoreImpl類, 這里我先給出一個已經(jīng)寫好的RedisSessionStoreImpl, 稍后解釋。
public class RedisSessionStoreImpl implements RedisSessionStore { private static final Logger logger = LoggerFactory.getLogger(RedisSessionStoreImpl.class); private final Vertx vertx; private final String sessionMapName; private final long retryTimeout; private final LocalMaplocalMap; //默認值 private String host = "localhost"; private int port = 6379; private String auth; RedisClient redisClient; // 清除所有時使用 private List localSessionIds; public RedisSessionStoreImpl(Vertx vertx, String defaultSessionMapName, long retryTimeout) { this.vertx = vertx; this.sessionMapName = defaultSessionMapName; this.retryTimeout = retryTimeout; localMap = vertx.sharedData().getLocalMap(sessionMapName); localSessionIds = new Vector<>(); redisManager(); } @Override public long retryTimeout() { return retryTimeout; } @Override public Session createSession(long timeout) { return new SessionImpl(new PRNG(vertx), timeout, DEFAULT_SESSIONID_LENGTH); } @Override public Session createSession(long timeout, int length) { return new SessionImpl(new PRNG(vertx), timeout, length); } @Override public void get(String id, Handler > resultHandler) { redisClient.getBinary(id, res->{ if(res.succeeded()) { Buffer buffer = res.result(); if(buffer != null) { SessionImpl session = new SessionImpl(new PRNG(vertx)); session.readFromBuffer(0, buffer); resultHandler.handle(Future.succeededFuture(session)); } else { resultHandler.handle(Future.succeededFuture(localMap.get(id))); } } else { resultHandler.handle(Future.failedFuture(res.cause())); } }); } @Override public void delete(String id, Handler > resultHandler) { redisClient.del(id, res->{ if (res.succeeded()) { localSessionIds.remove(id); resultHandler.handle(Future.succeededFuture(true)); } else { resultHandler.handle(Future.failedFuture(res.cause())); logger.error("redis里刪除sessionId: {} 失敗", id, res.cause()); } }); } @Override public void put(Session session, Handler > resultHandler) { //put 之前判斷session是否存在,如果存在的話,校驗下 redisClient.getBinary(session.id(), res1->{ if (res1.succeeded()) { //存在數(shù)據(jù) if(res1.result()!=null) { Buffer buffer = res1.result(); SessionImpl oldSession = new SessionImpl(new PRNG(vertx)); oldSession.readFromBuffer(0, buffer); SessionImpl newSession = (SessionImpl)session; if(oldSession.version() != newSession.version()) { resultHandler.handle(Future.failedFuture("Version mismatch")); return; } newSession.incrementVersion(); writeSession(session, resultHandler); } else { //不存在數(shù)據(jù) SessionImpl newSession = (SessionImpl)session; newSession.incrementVersion(); writeSession(session, resultHandler); } } else { resultHandler.handle(Future.failedFuture(res1.cause())); } }); } private void writeSession(Session session, Handler > resultHandler) { Buffer buffer = Buffer.buffer(); SessionImpl sessionImpl = (SessionImpl)session; //將session序列化到 buffer里 sessionImpl.writeToBuffer(buffer); SetOptions setOptions = new SetOptions().setPX(session.timeout()); redisClient.setBinaryWithOptions(session.id(), buffer, setOptions, res->{ if (res.succeeded()) { logger.debug("set key: {} ", session.data()); localSessionIds.add(session.id()); resultHandler.handle(Future.succeededFuture(true)); } else { resultHandler.handle(Future.failedFuture(res.cause())); } }); } @Override public void clear(Handler > resultHandler) { localSessionIds.stream().forEach(id->{ redisClient.del(id, res->{ //如果在localSessionIds里存在,但是在redis里過期不存在了, 只要通知下就行 localSessionIds.remove(id); }); }); resultHandler.handle(Future.succeededFuture(true)); } @Override public void size(Handler > resultHandler) { resultHandler.handle(Future.succeededFuture(localSessionIds.size())); } @Override public void close() { redisClient.close(res->{ logger.debug("關(guān)閉 redisClient "); }); } private void redisManager() { RedisOptions redisOptions = new RedisOptions(); redisOptions.setHost(host).setPort(port).setAuth(auth); redisClient = RedisClient.create(vertx, redisOptions); } @Override public RedisSessionStore host(String host) { this.host = host; return this; } @Override public RedisSessionStore port(int port) { this.port = port; return this; } @Override public RedisSessionStore auth(String pwd) { this.auth = pwd; return this; } }
首先,從get()和put()這兩個方法開始,這兩方法比較核心。
get(), 創(chuàng)建Cookie的時候會生成一個uuid,用這個id取session,第一次我們發(fā)現(xiàn)無法取到, 第56行代碼就會根據(jù)這個id去生成一個session。
每次發(fā)送請求的時候,我們都會重置session過期時間,所以每次get完后,返回給瀏覽器之前都會有一個put操作,也就是更新數(shù)據(jù)。這里的put就稍微復雜一點點,在put之前,我們需要先根據(jù)傳過來的session里的id從redis里取到session。如果獲取不到,說明之前通過get獲取的session不是同一個對象,就出異常,這就相當于設(shè)置了一道安全的門檻吧!當獲取到了,再比較兩個session的版本是不是一致的,如果不一致,說明session被破環(huán)了,算是第二個安全門檻設(shè)置吧!都沒有問題了,就可以put session了,并且重新設(shè)置時間。
這里依賴vertx提供的redisClient來操作數(shù)據(jù)的,所以我們必須引入這個依賴:io.vertx:vertx-redis-client:3.4.1 。
接下來還有一點需要提的是序列化問題。這里我使用的是vertx封裝的一種序列化,將數(shù)據(jù)序列化到Buffer里,而SessiomImpl類里又已經(jīng)實現(xiàn)好了序列化,從SessionImple序列化成Buffer和Buffer反序列化。
public class SessionImpl implements Session, ClusterSerializable, Shareable { //... @Override public void writeToBuffer(Buffer buff) { byte[] bytes = id.getBytes(UTF8); buff.appendInt(bytes.length).appendBytes(bytes); buff.appendLong(timeout); buff.appendLong(lastAccessed); buff.appendInt(version); Buffer dataBuf = writeDataToBuffer(); buff.appendBuffer(dataBuf); } @Override public int readFromBuffer(int pos, Buffer buffer) { int len = buffer.getInt(pos); pos += 4; byte[] bytes = buffer.getBytes(pos, pos + len); pos += len; id = new String(bytes, UTF8); timeout = buffer.getLong(pos); pos += 8; lastAccessed = buffer.getLong(pos); pos += 8; version = buffer.getInt(pos); pos += 4; pos = readDataFromBuffer(pos, buffer); return pos; } //... }
以上就是序列化和反序列化的實現(xiàn)。
localSessionIds 主要考慮到清除session的時候使用,因為數(shù)據(jù)主要以保存在session為主,本地localSessionIds 保存sessionId是輔助作用。
用法用法很簡單,一行代碼就說明。
router.route().handler(SessionHandler.create(RedisSessionStore.create(vertx).host("127.0.0.1").port(6349)));
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/70057.html
摘要:本文章是藍圖系列的第一篇教程。是事件驅(qū)動的,同時也是非阻塞的。是一組負責分發(fā)和處理事件的線程。注意,我們絕對不能去阻塞線程,否則事件的處理過程會被阻塞,我們的應用就失去了響應能力。每個負責處理請求并且寫入回應結(jié)果。 本文章是 Vert.x 藍圖系列 的第一篇教程。全系列: Vert.x Blueprint 系列教程(一) | 待辦事項服務開發(fā)教程 Vert.x Blueprint 系...
摘要:本文章是藍圖系列的第二篇教程。這就是請求回應模式。好多屬性我們一個一個地解釋一個序列,作為的地址任務的編號任務的類型任務攜帶的數(shù)據(jù),以類型表示任務優(yōu)先級,以枚舉類型表示。默認優(yōu)先級為正常任務的延遲時間,默認是任務狀態(tài),以枚舉類型表示。 本文章是 Vert.x 藍圖系列 的第二篇教程。全系列: Vert.x Blueprint 系列教程(一) | 待辦事項服務開發(fā)教程 Vert.x B...
摘要:但經(jīng)過一段使用后,發(fā)現(xiàn)的一些問題。這樣產(chǎn)生了一系列問題。部署的是異步的多線程環(huán)境,這個方法必須是線程安全的。小結(jié)的體系結(jié)構(gòu)無疑是非常先進的,多線程異步結(jié)構(gòu),內(nèi)置,支持,支持高可用度,這些都不是輕易能夠提供的。 最近想選高效,簡潔,擴充性強的web框做為移動平臺后臺,在對一系列框架對比后,選擇了vertx。但經(jīng)過一段使用后,發(fā)現(xiàn)vertx的一些問題。 1.vertx使用共享資源產(chǎn)生的重復...
摘要:二來,給大家新開坑的項目一個參考。因此,本系列以主要以官方文檔為基礎(chǔ),將盡可能多的特性融入本項目,并標注官網(wǎng)原文出處,有興趣的小伙伴可點擊深入了解??梢酝ㄟ^一些特殊協(xié)議例如將消息作為統(tǒng)一消息服務導出。下載完成后自行修改和。 開坑前言 我給這個專欄的名氣取名叫做小項目,聽名字就知道,這個專題最終的目的是帶領(lǐng)大家完成一個項目。為什么要開這么大一個坑呢,一來,雖然網(wǎng)上講IT知識點的書籍鋪天蓋...
閱讀 3962·2021-09-22 10:02
閱讀 3380·2019-08-30 15:52
閱讀 3071·2019-08-30 12:51
閱讀 770·2019-08-30 11:08
閱讀 2073·2019-08-29 15:18
閱讀 3116·2019-08-29 12:13
閱讀 3606·2019-08-29 11:29
閱讀 1883·2019-08-29 11:13