vetrx是基于netty封装的java网络编程框架,比netty开发较为简单,在其基础上提供很多有用功能,能在较短的时间内开发一个http服务器,或其他网络服务。今天我们展示下如何为vertx开发http网关实现分布式session,实现参考spring session。底层使用redis 存贮session
1.SessionStore
vertx读写session 都是通过实现SessionStore实现的,我们可以实现自己的RedisStore 和RedisSession,代码如下
/** Copyright 2014 Red Hat, Inc.** All rights reserved. This program and the accompanying materials* are made available under the terms of the Eclipse Public License v1.0* and Apache License v2.0 which accompanies this distribution.** The Eclipse Public License is available at* http://www.eclipse.org/legal/epl-v10.html** The Apache License v2.0 is available at* http://www.opensource.org/licenses/apache2.0.php** You may elect to redistribute this code under either of these licenses.*/package com.ly.session;import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.auth.VertxContextPRNG;
import io.vertx.ext.web.Session;
import io.vertx.ext.web.sstore.AbstractSession;
import io.vertx.ext.web.sstore.SessionStore;
import org.springframework.data.redis.core.RedisTemplate;import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;/*** @author <a href="http://tfox.org">Tim Fox</a>*/
public class RedisSessionStore implements SessionStore {/*** Default of how often, in ms, to check for expired sessions*/private static final long DEFAULT_REAPER_INTERVAL = 1000;/*** Default name for map used to store sessions*/private static final String DEFAULT_SESSION_MAP_NAME = "vertx-web.sessions";private VertxContextPRNG random;protected Vertx vertx;private SaveMode saveMode = SaveMode.ON_IMMEDIATE;public void setSaveMode(SaveMode saveMode) {this.saveMode = saveMode;}private RedisTemplate<String,Object> redisTemplate;public RedisTemplate<String, Object> getRedisTemplate() {return redisTemplate;}public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {this.redisTemplate = redisTemplate;}@Overridepublic Session createSession(long timeout) {return new RedisDataSessionImpl(random, timeout, DEFAULT_SESSIONID_LENGTH);}@Overridepublic Session createSession(long timeout, int length) {return new RedisDataSessionImpl(random, timeout, length);}@Overridepublic SessionStore init(Vertx vertx, JsonObject options) {// initialize a secure randomthis.random = VertxContextPRNG.current(vertx);this.vertx = vertx;return this;}@Overridepublic long retryTimeout() {return 0;}@Overridepublic void get(String id, Handler<AsyncResult<Session>> resultHandler) {Map<Object,Object> sessionInfoMap = redisTemplate.boundHashOps(id).entries();if(sessionInfoMap != null && !sessionInfoMap.isEmpty()){RedisDataSessionImpl redisDataSession = new RedisDataSessionImpl(sessionInfoMap);resultHandler.handle(Future.succeededFuture(redisDataSession));}else{resultHandler.handle(Future.succeededFuture());}}@Overridepublic void delete(String id, Handler<AsyncResult<Void>> resultHandler) {redisTemplate.delete(id);resultHandler.handle(Future.succeededFuture());}@Overridepublic void put(Session session, Handler<AsyncResult<Void>> resultHandler) {Map<Object,Object> sessionMap = ((RedisDataSessionImpl)session).writeToMap();redisTemplate.boundHashOps(session.id()).putAll(sessionMap);redisTemplate.expire(session.id(),session.timeout(),TimeUnit.SECONDS);resultHandler.handle(Future.succeededFuture());}@Overridepublic void clear(Handler<AsyncResult<Void>> resultHandler) {}@Overridepublic void size(Handler<AsyncResult<Integer>> resultHandler) {}@Overridepublic synchronized void close() {}public class RedisDataSessionImpl extends AbstractSession {private final String ID_FIELD_NAME= "id";private final String EXPIRED_NAME_TIME_NAME = "expiredAtTime";private final String LAST_ACCESS_TIME_NAME = "lastAccessTime";private final String CREATE_TIME_NAME = "createTime";/*** 其他session 属性 key的前缀*/private final String ATTR_PREFIX = "attr_";private final String TIME_OUT = "timeout";/*** 保存session中更改的属性*/private Map<String, Object> deltaMap = new HashMap<>();public RedisDataSessionImpl(Map<Object, Object> sessionInfoMap) {super();Map<String,Object> data = new HashMap<>();for(Map.Entry<Object,Object> entry:sessionInfoMap.entrySet()){String key = (String) entry.getKey();Object val = entry.getValue();if(key.equals(ID_FIELD_NAME)){setId((String) val);}else if(key.equals(TIME_OUT)){setTimeout((Long) val);}else if(key.startsWith(ATTR_PREFIX)){data.put(key.substring(ATTR_PREFIX.length()),val);}}deltaMap.put(LAST_ACCESS_TIME_NAME,System.currentTimeMillis());setLastAccessed(System.currentTimeMillis());setData(data);}@Overrideprotected void setId(String id) {super.setId(id);}@Overrideprotected void setData(Map<String, Object> data) {super.setData(data);}@Overrideprotected void setData(JsonObject data) {super.setData(data);}/*** Important note: This constructor (even though not referenced anywhere) is required for serialization purposes. Do* not remove.*/public RedisDataSessionImpl() {super();}public RedisDataSessionImpl(VertxContextPRNG random) {super(random);}public RedisDataSessionImpl(VertxContextPRNG random, long timeout, int length) {super(random, timeout, length);}@Overridepublic <T> T get(String key) {return super.get(key);}/*** put 是否立即刷新到redis中** @param key* @param obj* @return*/@Overridepublic Session put(String key, Object obj) {Session result = super.put(key, obj);putAndFlush(key,obj);return result;}void putAndFlush(String key,Object val){deltaMap.put(ATTR_PREFIX+key, val);saveIfRequired();}private void saveIfRequired() {if (saveMode.equals(SaveMode.ON_IMMEDIATE)){redisTemplate.boundHashOps(this.id()).putAll(deltaMap);deltaMap = new HashMap<>();}}@Overridepublic Session putIfAbsent(String key, Object obj) {return super.putIfAbsent(key, obj);}@Overridepublic Session computeIfAbsent(String key, Function<String, Object> mappingFunction) {return super.computeIfAbsent(key, mappingFunction);}@Overridepublic <T> T remove(String key) {putAndFlush(key,null);return super.remove(key);}public Map<Object, Object> writeToMap() {Map<Object,Object> sessionMap = new HashMap<>();sessionMap.put(ID_FIELD_NAME,id());sessionMap.put(LAST_ACCESS_TIME_NAME,lastAccessed());sessionMap.put(TIME_OUT,timeout());Map<String,Object> data = data();for(Map.Entry<String,Object> entry:data.entrySet()){sessionMap.put(ATTR_PREFIX+entry.getKey(),entry.getValue());}return sessionMap;}}
}
enum SaveMode{ON_IMMEDIATE,ON_SAVE
}
在实例化handler是,传入RedisStore
package com.ly;import com.ly.entity.Good;
import com.ly.session.RedisStore;
import com.ly.session.SessionEntity;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.http.*;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.Session;
import io.vertx.ext.web.handler.SessionHandler;
import io.vertx.ext.web.sstore.SessionStore;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;@Component
public class VertxServer implements ApplicationListener<ContextRefreshedEvent> {@Autowiredprivate RedisTemplate<String,Object> redisTemplate;@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {if(event.getApplicationContext().getParent() == null){VertxOptions vertxOptions = new VertxOptions();vertxOptions.setMaxEventLoopExecuteTime(10000*60);vertxOptions.setMaxEventLoopExecuteTimeUnit(TimeUnit.MILLISECONDS);Vertx vertx = Vertx.vertx(vertxOptions);HttpServerOptions options = new HttpServerOptions();options.setIdleTimeout(3600);options.setTcpKeepAlive(true);HttpServer server = vertx.createHttpServer(options);Router router = Router.router(vertx);SessionStore sessionStore = RedisStore.create(vertx,this.redisTemplate);//SessionStore sessionStore = SessionStore.create(vertx);router.routeWithRegex(".*service.*").handler(SessionHandler.create(sessionStore)).handler(ctx->{if(ctx.request().path().contains("initSession")){ctx.request().response().setStatusCode(401).putHeader("Content-Type","application/json").end((new JsonObject().put("msg","illegal request").encode()));}else{ctx.next();}}).handler(ctx->{HttpServerResponse response = ctx.response();Session session =ctx.session();if(session.get("wow") ==null){session.put("wow","wow");System.err.println((String) session.get("wow"));ctx.request().remoteAddress();List<Good> goodList = new ArrayList<>();goodList.add(Good.builder().id(1).stockNumber(100).build());session.put("goods",goodList);}else{System.out.println("goods:"+session.get("goods"));}SessionEntity sessionEntity = new SessionEntity();sessionEntity.setId("28947208947");session.put("typeTest",sessionEntity);String path = ctx.request().path();if(ctx.request().path().contains("service/removeCookie")){Cookie cookie = Cookie.cookie("vertx-web.session",new Date().getTime()+"");cookie.setPath("/");cookie.setSameSite(CookieSameSite.NONE);ctx.addCookie(cookie);ctx.request().response().addCookie(cookie);}else if(path.contains("clearSession")){ctx.session().destroy();}response.putHeader("content-type","text/plain");response.end("hello word!");}).failureHandler((ctx)->{System.out.println(ctx.failure());});server.requestHandler(router).listen(8081);System.err.println("vertxServer start success");}}
}
详细代码可以去github上下载
https://github.com/haozhi-ly/spring-boot-tutorial/tree/master/spring-boot-vertx
启动只需替换redis集群地址
vertx 也有自己官方的分布式session的实现,也包括redis的版本,大家有兴趣的可以了解下。
https://vertx.io/docs/vertx-web/java/#_handling_sessions