基于vertx-web-sstore-redis的改造实现vertx http应用的分布式session

article/2025/8/21 3:54:58

1. vertx 自身基于redis的分布式session的不足

vertx 本身有redis的分布式session的实现。只需要引入如下依赖

<dependency><groupId>io.vertx</groupId><artifactId>vertx-web-sstore-redis</artifactId><version>4.1.4</version></dependency>
RedisOptions redisOptions = new RedisOptions();redisOptions.addConnectionString("redis://192.168.72.8:6379");Redis redis = new RedisClient(vertx,redisOptions);RedisSessionStore redisSessionStore = RedisSessionStore.create(vertx,redis);//SessionStore sessionStore = SessionStore.create(vertx);router.routeWithRegex("/static.*").handler(StaticHandler.create());router.routeWithRegex(".*service.*").handler(SessionHandler.create(redisSessionStore)).handler(ctx->{if(ctx.request().path().contains("initSession")){ctx.request().response().setStatusCode(401).putHeader("Content-Type","application/json").end((new JsonObject().put("msg","illegal request").encode()));}else{ctx.next();}})

只需要将原本的SessionStore换成RedisSessionStore即可。
其本身的Redis客户端是基于vertx实现的,对redis的读写是异步的,不会阻塞io线程。但是其目前的RedisStore实现有个很致命的问题,不能直接支持对自定义对象或者jdk原有的常用对象如ArrayList,HashMap做序列化和反序列化,除8大基本类型和其包装类外,其他的复杂对象都要实现其序列化接口。ClusterSerializable ,

package io.vertx.core.shareddata.impl;import io.vertx.core.buffer.Buffer;/*** Objects implementing this interface will be write to and read from a {@link Buffer} when respectively* stored and read from an {@link io.vertx.core.shareddata.AsyncMap}.* <p>* Implementations must have a public no-argument constructor.** @author <a href="http://tfox.org">Tim Fox</a>*/
public interface ClusterSerializable {void writeToBuffer(Buffer buffer);int readFromBuffer(int pos, Buffer buffer);
}
/** Copyright 2014 Red Hat, Inc.**  All rights reserved. This program and the accompanying materials*  are made available under the terms of the Eclipse Public License v1.0*  and Apache License v2.0 which accompanies this distribution.**  The Eclipse Public License is available at*  http://www.eclipse.org/legal/epl-v10.html**  The Apache License v2.0 is available at*  http://www.opensource.org/licenses/apache2.0.php**  You may elect to redistribute this code under either of these licenses.*/package io.vertx.ext.web.sstore.impl;import io.vertx.core.VertxException;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.shareddata.Shareable;
import io.vertx.core.shareddata.impl.ClusterSerializable;
import io.vertx.ext.auth.VertxContextPRNG;
import io.vertx.ext.web.impl.Utils;
import io.vertx.ext.web.sstore.AbstractSession;import java.lang.reflect.InvocationTargetException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** @author <a href="http://tfox.org">Tim Fox</a>*/
public class SharedDataSessionImpl extends AbstractSession implements ClusterSerializable, Shareable {private static final Charset UTF8 = StandardCharsets.UTF_8;private static final byte TYPE_LONG = 1;private static final byte TYPE_INT = 2;private static final byte TYPE_SHORT = 3;private static final byte TYPE_BYTE = 4;private static final byte TYPE_DOUBLE = 5;private static final byte TYPE_FLOAT = 6;private static final byte TYPE_CHAR = 7;private static final byte TYPE_BOOLEAN = 8;private static final byte TYPE_STRING = 9;private static final byte TYPE_BUFFER = 10;private static final byte TYPE_BYTES = 11;private static final byte TYPE_CLUSTER_SERIALIZABLE = 13;/*** Important note: This constructor (even though not referenced anywhere) is required for serialization purposes. Do* not remove.*/public SharedDataSessionImpl() {super();}public SharedDataSessionImpl(VertxContextPRNG random) {super(random);}public SharedDataSessionImpl(VertxContextPRNG random, long timeout, int length) {super(random, timeout, length);}@Overridepublic void writeToBuffer(Buffer buff) {byte[] bytes = id().getBytes(UTF8);buff.appendInt(bytes.length).appendBytes(bytes);buff.appendLong(timeout());buff.appendLong(lastAccessed());buff.appendInt(version());// use cacheBuffer dataBuf = writeDataToBuffer();buff.appendBuffer(dataBuf);}@Overridepublic int readFromBuffer(int pos, Buffer buffer) {int len = buffer.getInt(pos);pos += 4;byte[] bytes = buffer.getBytes(pos, pos + len);pos += len;setId(new String(bytes, UTF8));setTimeout(buffer.getLong(pos));pos += 8;setLastAccessed(buffer.getLong(pos));pos += 8;setVersion(buffer.getInt(pos));pos += 4;pos = readDataFromBuffer(pos, buffer);return pos;}private Buffer writeDataToBuffer() {Buffer buffer = Buffer.buffer();if (isEmpty()) {buffer.appendInt(0);} else {final Map<String, Object> data = data();buffer.appendInt(data.size());for (Map.Entry<String, Object> entry : data.entrySet()) {String key = entry.getKey();byte[] keyBytes = key.getBytes(UTF8);buffer.appendInt(keyBytes.length).appendBytes(keyBytes);Object val = entry.getValue();if (val instanceof Long) {buffer.appendByte(TYPE_LONG).appendLong((long) val);} else if (val instanceof Integer) {buffer.appendByte(TYPE_INT).appendInt((int) val);} else if (val instanceof Short) {buffer.appendByte(TYPE_SHORT).appendShort((short) val);} else if (val instanceof Byte) {buffer.appendByte(TYPE_BYTE).appendByte((byte) val);} else if (val instanceof Double) {buffer.appendByte(TYPE_DOUBLE).appendDouble((double) val);} else if (val instanceof Float) {buffer.appendByte(TYPE_FLOAT).appendFloat((float) val);} else if (val instanceof Character) {buffer.appendByte(TYPE_CHAR).appendShort((short) ((Character) val).charValue());} else if (val instanceof Boolean) {buffer.appendByte(TYPE_BOOLEAN).appendByte((byte) ((boolean) val ? 1 : 0));} else if (val instanceof String) {byte[] bytes = ((String) val).getBytes(UTF8);buffer.appendByte(TYPE_STRING).appendInt(bytes.length).appendBytes(bytes);} else if (val instanceof Buffer) {Buffer buff = (Buffer) val;buffer.appendByte(TYPE_BUFFER).appendInt(buff.length()).appendBuffer(buff);} else if (val instanceof byte[]) {byte[] bytes = (byte[]) val;buffer.appendByte(TYPE_BYTES).appendInt(bytes.length).appendBytes(bytes);} else if (val instanceof ClusterSerializable) {buffer.appendByte(TYPE_CLUSTER_SERIALIZABLE);String className = val.getClass().getName();byte[] classNameBytes = className.getBytes(UTF8);buffer.appendInt(classNameBytes.length).appendBytes(classNameBytes);((ClusterSerializable) val).writeToBuffer(buffer);} else {if (val != null) {throw new IllegalStateException("Invalid type for data in session: " + val.getClass());}}}}return buffer;}private int readDataFromBuffer(int pos, Buffer buffer) {try {int entries = buffer.getInt(pos);pos += 4;if (entries > 0) {final Map<String, Object> data = new ConcurrentHashMap<>(entries);for (int i = 0; i < entries; i++) {int keylen = buffer.getInt(pos);pos += 4;byte[] keyBytes = buffer.getBytes(pos, pos + keylen);pos += keylen;String key = new String(keyBytes, UTF8);byte type = buffer.getByte(pos++);Object val;switch (type) {case TYPE_LONG:val = buffer.getLong(pos);pos += 8;break;case TYPE_INT:val = buffer.getInt(pos);pos += 4;break;case TYPE_SHORT:val = buffer.getShort(pos);pos += 2;break;case TYPE_BYTE:val = buffer.getByte(pos);pos++;break;case TYPE_FLOAT:val = buffer.getFloat(pos);pos += 4;break;case TYPE_DOUBLE:val = buffer.getDouble(pos);pos += 8;break;case TYPE_CHAR:short s = buffer.getShort(pos);pos += 2;val = (char) s;break;case TYPE_BOOLEAN:byte b = buffer.getByte(pos);pos++;val = b == 1;break;case TYPE_STRING:int len = buffer.getInt(pos);pos += 4;byte[] bytes = buffer.getBytes(pos, pos + len);val = new String(bytes, UTF8);pos += len;break;case TYPE_BUFFER:len = buffer.getInt(pos);pos += 4;bytes = buffer.getBytes(pos, pos + len);val = Buffer.buffer(bytes);pos += len;break;case TYPE_BYTES:len = buffer.getInt(pos);pos += 4;val = buffer.getBytes(pos, pos + len);pos += len;break;case TYPE_CLUSTER_SERIALIZABLE:int classNameLen = buffer.getInt(pos);pos += 4;byte[] classNameBytes = buffer.getBytes(pos, pos + classNameLen);pos += classNameLen;String className = new String(classNameBytes, UTF8);Class<?> clazz = Utils.getClassLoader().loadClass(className);if (!ClusterSerializable.class.isAssignableFrom(clazz)) {throw new ClassCastException(new String(classNameBytes) + " is not assignable from ClusterSerializable");}ClusterSerializable obj = (ClusterSerializable) clazz.getDeclaredConstructor().newInstance();pos = obj.readFromBuffer(pos, buffer);val = obj;break;default:throw new IllegalStateException("Invalid serialized type: " + type);}data.put(key, val);}setData(data);}return pos;} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e) {throw new VertxException(e);}}
}

复杂对象如果没有实现ClusterSerializable 接口,就会提示IllegalStateException。

2. 改写ShareDataSessionImpl

所以我们在原有的序列化类型上加上PB类型,复杂的对象我们都采取pb进行序列化和反序列化。
pb序列化工具类

package com.ly;import com.ly.entity.Good;
import io.protostuff.LinkedBuffer;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;import java.io.FileInputStream;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;public class ProtoStuffUtil {private static final Map<Class<?>,Schema<?>> SCHEMA_MAP = new ConcurrentHashMap<>();private static final Class<SerializeDeserializeWrapperObj> SERIALIZE_DESERIALIZE_WRAPPER_OBJ_CLASS =SerializeDeserializeWrapperObj.class;private static final Schema<SerializeDeserializeWrapperObj> WRAPPER_SCHEMA =RuntimeSchema.createFrom(SERIALIZE_DESERIALIZE_WRAPPER_OBJ_CLASS);private static final Set<Class<?>> WRAPPER_CLASS_SET  = new HashSet<>();static{WRAPPER_CLASS_SET.add(List.class);WRAPPER_CLASS_SET.add(Integer.class);WRAPPER_CLASS_SET.add(Boolean.class);WRAPPER_CLASS_SET.add(Character.class);WRAPPER_CLASS_SET.add(Double.class);WRAPPER_CLASS_SET.add(int.class);WRAPPER_CLASS_SET.add(boolean.class);WRAPPER_CLASS_SET.add(char.class);WRAPPER_CLASS_SET.add(double.class);WRAPPER_CLASS_SET.add(ArrayList.class);WRAPPER_CLASS_SET.add(Set.class);WRAPPER_CLASS_SET.add(Map.class);WRAPPER_CLASS_SET.add(HashMap.class);WRAPPER_CLASS_SET.add(Date.class);}public static  <T> byte[] serializer(T o){if(WRAPPER_CLASS_SET.contains(o.getClass())){return ProtostuffIOUtil.toByteArray(SerializeDeserializeWrapperObj.builder(o),WRAPPER_SCHEMA, LinkedBuffer.allocate(1024));}else{return ProtostuffIOUtil.toByteArray(o,getSchema(o.getClass()), LinkedBuffer.allocate(1024));}}public static  <T> byte[] serializerV1(T o){Schema<T> schema = getSchema(o.getClass());return ProtostuffIOUtil.toByteArray(o,schema, LinkedBuffer.allocate(1024));}public static <T> Schema getSchema(Class<T> clazz){if(SCHEMA_MAP.containsKey(clazz)){return SCHEMA_MAP.get(clazz);}else{Schema<T> schema = RuntimeSchema.createFrom(clazz);SCHEMA_MAP.put(clazz,schema);return schema;}}public static <T> T deserializer(byte[] bytes,Class<T> clazz)  {if(WRAPPER_CLASS_SET.contains(clazz)){SerializeDeserializeWrapperObj<T> obj = new SerializeDeserializeWrapperObj<>();ProtostuffIOUtil.mergeFrom(bytes, obj, WRAPPER_SCHEMA);return obj.getData();}else{Schema<T> schema = getSchema(clazz);T obj = null;try {obj = clazz.newInstance();} catch (InstantiationException | IllegalAccessException e) {return null;}ProtostuffIOUtil.mergeFrom(bytes, obj, schema);return obj;}}public static Object deserializerToObj(byte[] bytes,Class clazz)  {if(WRAPPER_CLASS_SET.contains(clazz)){SerializeDeserializeWrapperObj obj = new SerializeDeserializeWrapperObj<>();ProtostuffIOUtil.mergeFrom(bytes, obj, WRAPPER_SCHEMA);return obj.getData();}else{Schema schema = RuntimeSchema.createFrom(clazz);Object obj = null;try {obj = clazz.newInstance();} catch (InstantiationException | IllegalAccessException e) {return null;}ProtostuffIOUtil.mergeFrom(bytes, obj, schema);return obj;}}public static <T> T deserializerV1(byte[] bytes,Class<T> clazz) throws IllegalAccessException, InstantiationException {Schema<T> schema = getSchema(clazz);T obj = clazz.newInstance();ProtostuffIOUtil.mergeFrom(bytes, obj, schema);return obj;}private static class SerializeDeserializeWrapperObj<T> {private T data;public static <T> SerializeDeserializeWrapperObj<T> builder(T data) {SerializeDeserializeWrapperObj<T> wrapper = new SerializeDeserializeWrapperObj<>();wrapper.setData(data);return wrapper;}public T getData() {return data;}public void setData(T data) {this.data = data;}}}

改写ShareDataSessionImpl

/** Copyright 2014 Red Hat, Inc.**  All rights reserved. This program and the accompanying materials*  are made available under the terms of the Eclipse Public License v1.0*  and Apache License v2.0 which accompanies this distribution.**  The Eclipse Public License is available at*  http://www.eclipse.org/legal/epl-v10.html**  The Apache License v2.0 is available at*  http://www.opensource.org/licenses/apache2.0.php**  You may elect to redistribute this code under either of these licenses.*/package com.ly.session;import com.ly.ProtoStuffUtil;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.core.shareddata.impl.ClusterSerializable;
import io.vertx.ext.auth.VertxContextPRNG;
import io.vertx.ext.web.Session;
import io.vertx.ext.web.impl.Utils;
import io.vertx.ext.web.sstore.AbstractSession;
import io.vertx.ext.web.sstore.SessionStore;
import io.vertx.ext.web.sstore.redis.RedisSessionStore;
import io.vertx.redis.client.Redis;
import io.vertx.redis.client.RedisOptions;
import io.vertx.redis.client.Request;
import io.vertx.redis.client.Response;import java.lang.reflect.InvocationTargetException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;import static io.vertx.redis.client.Command.*;
import static io.vertx.redis.client.Request.cmd;/*** @author <a href="http://tfox.org">Tim Fox</a>*/
public class ReactiveRedisSessionStore implements SessionStore {private Redis redis;private VertxContextPRNG random;private long retryTimeout;@Overridepublic SessionStore init(Vertx vertx, JsonObject options) {Objects.requireNonNull(options, "options are required");long timeout = options.getLong("retryTimeout", RedisSessionStore.DEFAULT_RETRY_TIMEOUT_MS);Redis redis = Redis.createClient(vertx, new RedisOptions(options));return init(vertx, timeout, redis);}public SessionStore init(Vertx vertx, long retryTimeout, Redis redis) {random = VertxContextPRNG.current(vertx);this.retryTimeout = retryTimeout;this.redis = Objects.requireNonNull(redis, "redis is required");return this;}@Overridepublic long retryTimeout() {return retryTimeout;}@Overridepublic Session createSession(long timeout) {return createSession(timeout, DEFAULT_SESSIONID_LENGTH);}@Overridepublic Session createSession(long timeout, int length) {return new RedisShardDataSessionImpl(random, timeout, length);}public ReactiveRedisSessionStore() {super();}@Overridepublic void get(String id, Handler<AsyncResult<Session>> resultHandler) {redis.send(cmd(GET).arg(id), resGet -> {if (resGet.failed()) {resultHandler.handle(Future.failedFuture(resGet.cause()));return;}Response response = resGet.result();if (response != null) {RedisShardDataSessionImpl session = new RedisShardDataSessionImpl(random);session.readFromBuffer(0, response.toBuffer());// postpone expiration time, this cannot be done in a single frame with GET cmd
//                redis.send(cmd(PEXPIRE).arg(id).arg(session.timeout()), resExpire -> {
//                    if (resExpire.failed()) {
//                        resultHandler.handle(Future.failedFuture(resExpire.cause()));
//                    } else {
//                        resultHandler.handle(Future.succeededFuture(session));
//                    }
//                });resultHandler.handle(Future.succeededFuture(session));} else {resultHandler.handle(Future.succeededFuture());}});}@Overridepublic void delete(String id, Handler<AsyncResult<Void>> resultHandler) {redis.send(cmd(DEL).arg(id), res -> {if (res.failed()) {resultHandler.handle(Future.failedFuture(res.cause()));} else {resultHandler.handle(Future.succeededFuture());}});}@Overridepublic void put(Session session, Handler<AsyncResult<Void>> resultHandler) {Buffer buffer = Buffer.buffer();RedisShardDataSessionImpl sessionImpl = (RedisShardDataSessionImpl) session;sessionImpl.writeToBuffer(buffer);// submit with all session data & expiration TO in msRequest rq = cmd(SET).arg(session.id()).arg(buffer).arg("PX").arg(session.timeout());redis.send(rq, res -> {if (res.failed()) {resultHandler.handle(Future.failedFuture(res.cause()));} else {resultHandler.handle(Future.succeededFuture());}});}@Overridepublic void clear(Handler<AsyncResult<Void>> resultHandler) {}@Overridepublic void size(Handler<AsyncResult<Integer>> resultHandler) {}@Overridepublic void close() {}public static class RedisShardDataSessionImpl extends AbstractSession{private static final Charset UTF8 = StandardCharsets.UTF_8;private static final byte TYPE_LONG = 1;private static final byte TYPE_INT = 2;private static final byte TYPE_SHORT = 3;private static final byte TYPE_BYTE = 4;private static final byte TYPE_DOUBLE = 5;private static final byte TYPE_FLOAT = 6;private static final byte TYPE_CHAR = 7;private static final byte TYPE_BOOLEAN = 8;private static final byte TYPE_STRING = 9;private static final byte TYPE_BUFFER = 10;private static final byte TYPE_BYTES = 11;private static final byte TYPE_CLUSTER_SERIALIZABLE = 13;private static final byte TYPE_PB_SERIALIZABLE = 14;/*** Important note: This constructor (even though not referenced anywhere) is required for serialization purposes. Do* not remove.*/public RedisShardDataSessionImpl() {super();}public RedisShardDataSessionImpl(VertxContextPRNG random) {super(random);}public RedisShardDataSessionImpl(VertxContextPRNG random, long timeout, int length) {super(random, timeout, length);}public void writeToBuffer(Buffer buff) {byte[] bytes = id().getBytes(UTF8);buff.appendInt(bytes.length).appendBytes(bytes);buff.appendLong(timeout());buff.appendLong(lastAccessed());buff.appendInt(version());// use cacheBuffer dataBuf = writeDataToBuffer();buff.appendBuffer(dataBuf);}public int readFromBuffer(int pos, Buffer buffer) {int len = buffer.getInt(pos);pos += 4;byte[] bytes = buffer.getBytes(pos, pos + len);pos += len;setId(new String(bytes, UTF8));setTimeout(buffer.getLong(pos));pos += 8;setLastAccessed(buffer.getLong(pos));pos += 8;setVersion(buffer.getInt(pos));pos += 4;pos = readDataFromBuffer(pos, buffer);return pos;}private Buffer writeDataToBuffer() {Buffer buffer = Buffer.buffer();if (isEmpty()) {buffer.appendInt(0);} else {final Map<String, Object> data = data();buffer.appendInt(data.size());for (Map.Entry<String, Object> entry : data.entrySet()) {String key = entry.getKey();byte[] keyBytes = key.getBytes(UTF8);buffer.appendInt(keyBytes.length).appendBytes(keyBytes);Object val = entry.getValue();if (val instanceof Long) {buffer.appendByte(TYPE_LONG).appendLong((long) val);} else if (val instanceof Integer) {buffer.appendByte(TYPE_INT).appendInt((int) val);} else if (val instanceof Short) {buffer.appendByte(TYPE_SHORT).appendShort((short) val);} else if (val instanceof Byte) {buffer.appendByte(TYPE_BYTE).appendByte((byte) val);} else if (val instanceof Double) {buffer.appendByte(TYPE_DOUBLE).appendDouble((double) val);} else if (val instanceof Float) {buffer.appendByte(TYPE_FLOAT).appendFloat((float) val);} else if (val instanceof Character) {buffer.appendByte(TYPE_CHAR).appendShort((short) ((Character) val).charValue());} else if (val instanceof Boolean) {buffer.appendByte(TYPE_BOOLEAN).appendByte((byte) ((boolean) val ? 1 : 0));} else if (val instanceof String) {byte[] bytes = ((String) val).getBytes(UTF8);buffer.appendByte(TYPE_STRING).appendInt(bytes.length).appendBytes(bytes);} else if (val instanceof Buffer) {Buffer buff = (Buffer) val;buffer.appendByte(TYPE_BUFFER).appendInt(buff.length()).appendBuffer(buff);} else if (val instanceof byte[]) {byte[] bytes = (byte[]) val;buffer.appendByte(TYPE_BYTES).appendInt(bytes.length).appendBytes(bytes);} else if (val instanceof ClusterSerializable) {buffer.appendByte(TYPE_CLUSTER_SERIALIZABLE);String className = val.getClass().getName();byte[] classNameBytes = className.getBytes(UTF8);buffer.appendInt(classNameBytes.length).appendBytes(classNameBytes);((ClusterSerializable) val).writeToBuffer(buffer);} else {// 默认序列化成pbbuffer.appendByte(TYPE_PB_SERIALIZABLE);String className = val.getClass().getName();byte[] classNameBytes = className.getBytes(UTF8);buffer.appendInt(classNameBytes.length).appendBytes(classNameBytes);byte[] serializedBytes = ProtoStuffUtil.serializer(val);buffer.appendInt(serializedBytes.length).appendBytes(serializedBytes);}}}return buffer;}private int readDataFromBuffer(int pos, Buffer buffer) {try {int entries = buffer.getInt(pos);pos += 4;if (entries > 0) {final Map<String, Object> data = new ConcurrentHashMap<>(entries);for (int i = 0; i < entries; i++) {int keylen = buffer.getInt(pos);pos += 4;byte[] keyBytes = buffer.getBytes(pos, pos + keylen);pos += keylen;String key = new String(keyBytes, UTF8);byte type = buffer.getByte(pos++);Object val;switch (type) {case TYPE_LONG:val = buffer.getLong(pos);pos += 8;break;case TYPE_INT:val = buffer.getInt(pos);pos += 4;break;case TYPE_SHORT:val = buffer.getShort(pos);pos += 2;break;case TYPE_BYTE:val = buffer.getByte(pos);pos++;break;case TYPE_FLOAT:val = buffer.getFloat(pos);pos += 4;break;case TYPE_DOUBLE:val = buffer.getDouble(pos);pos += 8;break;case TYPE_CHAR:short s = buffer.getShort(pos);pos += 2;val = (char) s;break;case TYPE_BOOLEAN:byte b = buffer.getByte(pos);pos++;val = b == 1;break;case TYPE_STRING:int len = buffer.getInt(pos);pos += 4;byte[] bytes = buffer.getBytes(pos, pos + len);val = new String(bytes, UTF8);pos += len;break;case TYPE_BUFFER:len = buffer.getInt(pos);pos += 4;bytes = buffer.getBytes(pos, pos + len);val = Buffer.buffer(bytes);pos += len;break;case TYPE_BYTES:len = buffer.getInt(pos);pos += 4;val = buffer.getBytes(pos, pos + len);pos += len;break;case TYPE_CLUSTER_SERIALIZABLE:int classNameLen = buffer.getInt(pos);pos += 4;byte[] classNameBytes = buffer.getBytes(pos, pos + classNameLen);pos += classNameLen;String className = new String(classNameBytes, UTF8);Class<?> clazz = Utils.getClassLoader().loadClass(className);if (!ClusterSerializable.class.isAssignableFrom(clazz)) {throw new ClassCastException(new String(classNameBytes) + " is not assignable from ClusterSerializable");}ClusterSerializable obj = (ClusterSerializable) clazz.getDeclaredConstructor().newInstance();pos = obj.readFromBuffer(pos, buffer);val = obj;break;case TYPE_PB_SERIALIZABLE:classNameLen = buffer.getInt(pos);pos += 4;classNameBytes = buffer.getBytes(pos, pos + classNameLen);pos += classNameLen;className = new String(classNameBytes, UTF8);Class<?> clazzByPb = Utils.getClassLoader().loadClass(className);int serializeObjLength = buffer.getInt(pos);pos+=4;byte[] serializeObjBytes = buffer.getBytes(pos,pos+serializeObjLength);pos+=serializeObjLength;val = ProtoStuffUtil.deserializerToObj(serializeObjBytes,clazzByPb);break;default:throw new IllegalStateException("Invalid serialized type: " + type);}data.put(key, val);}setData(data);}return pos;} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e) {throw new VertxException(e);}}}
}

序列化时默认采用pb做序列化,反序列化时,如果类型是pb就采用pb做反序列化。
在这里插入图片描述
在这里插入图片描述
完整代码其看github
https://github.com/haozhi-ly/spring-boot-tutorial/tree/master/spring-boot-vertx


http://chatgpt.dhexx.cn/article/u0lqT9CT.shtml

相关文章

Spring调用vertx异步service Interface

Spring调用Vertx异步service Interface 情景原理实现工程结构说明目录结构样例代码example-vertx-interfacepom依赖代码 example-vertx-clusterpom依赖代码 lm-spring-vertx-ebclientpom依赖代码VertxEBClientScanVertxEBClientScannerRegisterVertxEBProxyBeanDefinitionRegis…

Vert.x - SpringBoot 整合 vertx 使用 thymeleaf、freemarker 模板引擎

一、模板引擎 在 SpringMVC 项目中使用模板引擎&#xff0c;使用较多的应该是 thymeleaf 及 freemarker 了吧&#xff0c;虽然现在前后端分离的浪潮已经席卷而来&#xff0c;但对于 SEO 或者 页面静态话来说&#xff0c;后端的模板引擎还是具有一定的作用力。本篇文章继续上篇…

Vert.x - SpringBoot 整合 vertx

一、vertx 前面的文章讲解了 vertx 的简介及 vertx-web 的路由&#xff0c;看过的小伙伴应该对 vertx 有了一定的了解&#xff0c;从前面的演示来看&#xff0c;都是单独使用的 vertx &#xff0c;对于目前的后端来说 SpringBoot 可是一个非常火热的框架&#xff0c;那如果将 …

Vert.x - vertx-web 路由讲解总结

一、vertx-web 上篇文章我们对 vertx 进行了简单的介绍&#xff0c;并使用 vertx-web 实践了 restFul 接口的书写&#xff0c;本篇文章接着上篇继续讲解 vertx-web 的路由。 下面是上篇文章的地址&#xff1a; https://blog.csdn.net/qq_43692950/article/details/123955368 …

vertx的http服务实现分布式session

vetrx是基于netty封装的java网络编程框架&#xff0c;比netty开发较为简单&#xff0c;在其基础上提供很多有用功能&#xff0c;能在较短的时间内开发一个http服务器&#xff0c;或其他网络服务。今天我们展示下如何为vertx开发http网关实现分布式session,实现参考spring sessi…

Vert.x入门1 —— 《基础概念》

一、什么是Vertx Vert.x 在JVM上构建响应式应用程序的工具包&#xff0c;Vert.x不是框架而是工具包。Vert.x基于Netty项目&#xff0c;该项目是JVM的高性能异步网络库。 此描述中有三个重要点&#xff1a;工具包&#xff0c;响应式和“在JVM上”。 首先&#xff0c;Vert.x是…

Vertx快速入门参考

Vertx学习 什么是vertx&#xff1f; Vert.x最大的特点就在于异步&#xff08;底层基于Netty&#xff09;&#xff0c;通过事件循环&#xff08;EventLoop&#xff09;来调起存储在异步任务队列&#xff08;CallBackQueue&#xff09;中的任务&#xff0c;大大降低了传统阻塞模…

Vertx入门学习(含代码)

Vertx入门学习 一、Vertx是什么&#xff1f;二、Vertx基本概念三、Vertx能干什么&#xff1f;四、Vertx的技术体系五、快速体验&#xff1a;搭建一个简单的Vertx项目并输出Hello World六、单元测试总结 一、Vertx是什么&#xff1f; github: https://github.com/vert-x3 官网&…

Vert.x(vertx) 简明介绍

摘要 Vert.x最大的特点就在于异步&#xff08;底层基于Netty&#xff09;&#xff0c;通过事件循环&#xff08;EventLoop&#xff09;来调起存储在异步任务队列&#xff08;CallBackQueue&#xff09;中的任务&#xff0c;大大降低了传统阻塞模型中线程对于操作系统的开销。因…

Vertx学习一:这玩意是到底是个啥

Vertx&#xff0c;融合Java、Ruby、Python等语言的高性能架构&#xff0c;架构师必读 原文链接&#xff1a; http://www.360doc.com/content/18/0203/14/39530679_727432611.shtml 目录&#xff1a; 一、Vert.x简介 二、Vert.x原理解析 三、Vert牛刀小试 四、Vert应用实践 五…

【java】vertx从入门到放弃——入门(四)Codec

什么是Codec&#xff1f;概念这个玩意我是真不想说&#xff0c;毕竟我没有找到具体的概念&#xff0c;我自己大致的理解就是&#xff0c;用EventBus传输类的时候&#xff0c;对这个类进行序列化和反序列化的东西&#xff0c;因为vertx使用json进行传输&#xff0c;所以这个玩意…

Android thinker

国家虽安&#xff0c;忘战必危&#xff0c; Android虽爽&#xff0c;不学则忙&#xff0c;老夫纵横江湖数十载&#xff0c;深谙世事之难料&#xff0c;顾。。 ok&#xff0c;ok 不装比了&#xff0c;进入正题&#xff0c;今天要讲的是Android 热修之 thinker。 在研究这个之前…

ESP32-CAM AI THINKER 引脚排列:GPIO 用法说明

ESP32-CAM 是一款开发板,带有一个 ESP32-S 芯片、一个 OV2640 摄像头、microSD 卡插槽和几个用于连接外设的 GPIO。在本指南中,我们将介绍 ESP32-CAM GPIO 以及如何使用它们。 引脚排列图 下图显示了 ESP32-CAM AI-Thinker 的引脚排列图。 电路原理示意图 下图显示了 ESP…

Thinkpad 各系列简介

ThinkPad 各系列简介 如果提到商务笔记本&#xff0c;大家一定会想到凭借坚固和可靠的特性在业界享有很高声誉ThinkPad笔记本电脑&#xff0c;以及ThinkPad那经典的键盘红点设计和纯黑低调的外表。 在这里我就简单介绍一下ThinkPad的各系列产品。 Thinkpad名称来历 “ThinkP…

python thinker canvas create_arc 使用详解

版权声明&#xff1a;本文为博主原创文章&#xff0c;遵循 CC 4.0 BY-SA 版权协议&#xff0c;转载请附上原文出处链接和本声明。 本文链接&#xff1a;https://blog.csdn.net/A757291228/article/details/106739556 ———————————————— 版权声明&#xff1a;本文…

二、让界面动起来,Python基于thinker实现的简单的俄罗斯方块小游戏

文章目录 界面动起来定时刷新页面移动俄罗斯方块 界面动起来 **功能&#xff1a;**如何让方块「动起来」? 分析过程&#xff1a; 要想让方块动起来&#xff0c;需要实现以下两点&#xff1a; 定时刷新页面移动页面内部的俄罗斯方块 定时刷新页面 实现定时刷新页面&#xff…

三、生成随机方块,Python基于thinker实现的简单的俄罗斯方块小游戏

文章目录 生成第一步&#xff1a;随机生成方块对象第二步&#xff1a;修改游戏运行方法 固定第一步&#xff1a;记录固定的方块第二步&#xff1a;修改check_move方法 左右移动第一步&#xff1a;左右移动方块第二步&#xff1a;绑定键盘 旋转第一步&#xff1a;添加旋转方法第…

【G-thinker】G-thinker部分源码解析

一、main 子图匹配程序run.cpp中主要使用到worker.h和comper.h分别对应线程和进程单位&#xff0c;接下来我们从main函数入手解析源码 从主函数可以看到&#xff0c;子图匹配程序中GMatchWorker继承了worker&#xff0c;主函数声明了workerparams并且传入了路径和线程参数&am…

python用thinker库制作一个进制转换器(可打包exe)

进制类型分为&#xff1a; 二进制 字母B表示 八进制 字母O表示 十进制 字母D表示 十六机制 字母H表示 进制转换之间很麻烦&#xff0c;还得计算&#xff0c;如果可以做一个进制转换器多nice&#xff0c;其实也不难&#xff0c;就利用一个tkinter库就能制作&#xff0c;废话不多…

Thinker Board 2开发板上使用奥比中光 astra 深度相机

Thinker Board 2 国产开发板 arm架构 上使用奥比中光 astra 深度相机 准备工作 1、下载astraSDK 选择linux_arm 下载 https://developer.orbbec.com.cn/download.html?id53 2、下载openNI https://dl.orbbec3d.com/dist/openni2/OpenNI_2.3.0.66.zip 开始安装 1、安装fre…