1. vertx 自身基于redis的分布式session的不足
vertx 本身有redis的分布式session的实现。只需要引入如下依赖
<dependency><groupId>io.vertx</groupId><artifactId>vertx-web-sstore-redis</artifactId><version>4.1.4</version></dependency>
RedisOptions redisOptions = new RedisOptions();redisOptions.addConnectionString("redis://192.168.72.8:6379");Redis redis = new RedisClient(vertx,redisOptions);RedisSessionStore redisSessionStore = RedisSessionStore.create(vertx,redis);//SessionStore sessionStore = SessionStore.create(vertx);router.routeWithRegex("/static.*").handler(StaticHandler.create());router.routeWithRegex(".*service.*").handler(SessionHandler.create(redisSessionStore)).handler(ctx->{if(ctx.request().path().contains("initSession")){ctx.request().response().setStatusCode(401).putHeader("Content-Type","application/json").end((new JsonObject().put("msg","illegal request").encode()));}else{ctx.next();}})
只需要将原本的SessionStore换成RedisSessionStore即可。
其本身的Redis客户端是基于vertx实现的,对redis的读写是异步的,不会阻塞io线程。但是其目前的RedisStore实现有个很致命的问题,不能直接支持对自定义对象或者jdk原有的常用对象如ArrayList,HashMap做序列化和反序列化,除8大基本类型和其包装类外,其他的复杂对象都要实现其序列化接口。ClusterSerializable ,
package io.vertx.core.shareddata.impl;import io.vertx.core.buffer.Buffer;/*** Objects implementing this interface will be write to and read from a {@link Buffer} when respectively* stored and read from an {@link io.vertx.core.shareddata.AsyncMap}.* <p>* Implementations must have a public no-argument constructor.** @author <a href="http://tfox.org">Tim Fox</a>*/
public interface ClusterSerializable {void writeToBuffer(Buffer buffer);int readFromBuffer(int pos, Buffer buffer);
}
/** Copyright 2014 Red Hat, Inc.** All rights reserved. This program and the accompanying materials* are made available under the terms of the Eclipse Public License v1.0* and Apache License v2.0 which accompanies this distribution.** The Eclipse Public License is available at* http://www.eclipse.org/legal/epl-v10.html** The Apache License v2.0 is available at* http://www.opensource.org/licenses/apache2.0.php** You may elect to redistribute this code under either of these licenses.*/package io.vertx.ext.web.sstore.impl;import io.vertx.core.VertxException;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.shareddata.Shareable;
import io.vertx.core.shareddata.impl.ClusterSerializable;
import io.vertx.ext.auth.VertxContextPRNG;
import io.vertx.ext.web.impl.Utils;
import io.vertx.ext.web.sstore.AbstractSession;import java.lang.reflect.InvocationTargetException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** @author <a href="http://tfox.org">Tim Fox</a>*/
public class SharedDataSessionImpl extends AbstractSession implements ClusterSerializable, Shareable {private static final Charset UTF8 = StandardCharsets.UTF_8;private static final byte TYPE_LONG = 1;private static final byte TYPE_INT = 2;private static final byte TYPE_SHORT = 3;private static final byte TYPE_BYTE = 4;private static final byte TYPE_DOUBLE = 5;private static final byte TYPE_FLOAT = 6;private static final byte TYPE_CHAR = 7;private static final byte TYPE_BOOLEAN = 8;private static final byte TYPE_STRING = 9;private static final byte TYPE_BUFFER = 10;private static final byte TYPE_BYTES = 11;private static final byte TYPE_CLUSTER_SERIALIZABLE = 13;/*** Important note: This constructor (even though not referenced anywhere) is required for serialization purposes. Do* not remove.*/public SharedDataSessionImpl() {super();}public SharedDataSessionImpl(VertxContextPRNG random) {super(random);}public SharedDataSessionImpl(VertxContextPRNG random, long timeout, int length) {super(random, timeout, length);}@Overridepublic void writeToBuffer(Buffer buff) {byte[] bytes = id().getBytes(UTF8);buff.appendInt(bytes.length).appendBytes(bytes);buff.appendLong(timeout());buff.appendLong(lastAccessed());buff.appendInt(version());// use cacheBuffer dataBuf = writeDataToBuffer();buff.appendBuffer(dataBuf);}@Overridepublic int readFromBuffer(int pos, Buffer buffer) {int len = buffer.getInt(pos);pos += 4;byte[] bytes = buffer.getBytes(pos, pos + len);pos += len;setId(new String(bytes, UTF8));setTimeout(buffer.getLong(pos));pos += 8;setLastAccessed(buffer.getLong(pos));pos += 8;setVersion(buffer.getInt(pos));pos += 4;pos = readDataFromBuffer(pos, buffer);return pos;}private Buffer writeDataToBuffer() {Buffer buffer = Buffer.buffer();if (isEmpty()) {buffer.appendInt(0);} else {final Map<String, Object> data = data();buffer.appendInt(data.size());for (Map.Entry<String, Object> entry : data.entrySet()) {String key = entry.getKey();byte[] keyBytes = key.getBytes(UTF8);buffer.appendInt(keyBytes.length).appendBytes(keyBytes);Object val = entry.getValue();if (val instanceof Long) {buffer.appendByte(TYPE_LONG).appendLong((long) val);} else if (val instanceof Integer) {buffer.appendByte(TYPE_INT).appendInt((int) val);} else if (val instanceof Short) {buffer.appendByte(TYPE_SHORT).appendShort((short) val);} else if (val instanceof Byte) {buffer.appendByte(TYPE_BYTE).appendByte((byte) val);} else if (val instanceof Double) {buffer.appendByte(TYPE_DOUBLE).appendDouble((double) val);} else if (val instanceof Float) {buffer.appendByte(TYPE_FLOAT).appendFloat((float) val);} else if (val instanceof Character) {buffer.appendByte(TYPE_CHAR).appendShort((short) ((Character) val).charValue());} else if (val instanceof Boolean) {buffer.appendByte(TYPE_BOOLEAN).appendByte((byte) ((boolean) val ? 1 : 0));} else if (val instanceof String) {byte[] bytes = ((String) val).getBytes(UTF8);buffer.appendByte(TYPE_STRING).appendInt(bytes.length).appendBytes(bytes);} else if (val instanceof Buffer) {Buffer buff = (Buffer) val;buffer.appendByte(TYPE_BUFFER).appendInt(buff.length()).appendBuffer(buff);} else if (val instanceof byte[]) {byte[] bytes = (byte[]) val;buffer.appendByte(TYPE_BYTES).appendInt(bytes.length).appendBytes(bytes);} else if (val instanceof ClusterSerializable) {buffer.appendByte(TYPE_CLUSTER_SERIALIZABLE);String className = val.getClass().getName();byte[] classNameBytes = className.getBytes(UTF8);buffer.appendInt(classNameBytes.length).appendBytes(classNameBytes);((ClusterSerializable) val).writeToBuffer(buffer);} else {if (val != null) {throw new IllegalStateException("Invalid type for data in session: " + val.getClass());}}}}return buffer;}private int readDataFromBuffer(int pos, Buffer buffer) {try {int entries = buffer.getInt(pos);pos += 4;if (entries > 0) {final Map<String, Object> data = new ConcurrentHashMap<>(entries);for (int i = 0; i < entries; i++) {int keylen = buffer.getInt(pos);pos += 4;byte[] keyBytes = buffer.getBytes(pos, pos + keylen);pos += keylen;String key = new String(keyBytes, UTF8);byte type = buffer.getByte(pos++);Object val;switch (type) {case TYPE_LONG:val = buffer.getLong(pos);pos += 8;break;case TYPE_INT:val = buffer.getInt(pos);pos += 4;break;case TYPE_SHORT:val = buffer.getShort(pos);pos += 2;break;case TYPE_BYTE:val = buffer.getByte(pos);pos++;break;case TYPE_FLOAT:val = buffer.getFloat(pos);pos += 4;break;case TYPE_DOUBLE:val = buffer.getDouble(pos);pos += 8;break;case TYPE_CHAR:short s = buffer.getShort(pos);pos += 2;val = (char) s;break;case TYPE_BOOLEAN:byte b = buffer.getByte(pos);pos++;val = b == 1;break;case TYPE_STRING:int len = buffer.getInt(pos);pos += 4;byte[] bytes = buffer.getBytes(pos, pos + len);val = new String(bytes, UTF8);pos += len;break;case TYPE_BUFFER:len = buffer.getInt(pos);pos += 4;bytes = buffer.getBytes(pos, pos + len);val = Buffer.buffer(bytes);pos += len;break;case TYPE_BYTES:len = buffer.getInt(pos);pos += 4;val = buffer.getBytes(pos, pos + len);pos += len;break;case TYPE_CLUSTER_SERIALIZABLE:int classNameLen = buffer.getInt(pos);pos += 4;byte[] classNameBytes = buffer.getBytes(pos, pos + classNameLen);pos += classNameLen;String className = new String(classNameBytes, UTF8);Class<?> clazz = Utils.getClassLoader().loadClass(className);if (!ClusterSerializable.class.isAssignableFrom(clazz)) {throw new ClassCastException(new String(classNameBytes) + " is not assignable from ClusterSerializable");}ClusterSerializable obj = (ClusterSerializable) clazz.getDeclaredConstructor().newInstance();pos = obj.readFromBuffer(pos, buffer);val = obj;break;default:throw new IllegalStateException("Invalid serialized type: " + type);}data.put(key, val);}setData(data);}return pos;} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e) {throw new VertxException(e);}}
}
复杂对象如果没有实现ClusterSerializable 接口,就会提示IllegalStateException。
2. 改写ShareDataSessionImpl
所以我们在原有的序列化类型上加上PB类型,复杂的对象我们都采取pb进行序列化和反序列化。
pb序列化工具类
package com.ly;import com.ly.entity.Good;
import io.protostuff.LinkedBuffer;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;import java.io.FileInputStream;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;public class ProtoStuffUtil {private static final Map<Class<?>,Schema<?>> SCHEMA_MAP = new ConcurrentHashMap<>();private static final Class<SerializeDeserializeWrapperObj> SERIALIZE_DESERIALIZE_WRAPPER_OBJ_CLASS =SerializeDeserializeWrapperObj.class;private static final Schema<SerializeDeserializeWrapperObj> WRAPPER_SCHEMA =RuntimeSchema.createFrom(SERIALIZE_DESERIALIZE_WRAPPER_OBJ_CLASS);private static final Set<Class<?>> WRAPPER_CLASS_SET = new HashSet<>();static{WRAPPER_CLASS_SET.add(List.class);WRAPPER_CLASS_SET.add(Integer.class);WRAPPER_CLASS_SET.add(Boolean.class);WRAPPER_CLASS_SET.add(Character.class);WRAPPER_CLASS_SET.add(Double.class);WRAPPER_CLASS_SET.add(int.class);WRAPPER_CLASS_SET.add(boolean.class);WRAPPER_CLASS_SET.add(char.class);WRAPPER_CLASS_SET.add(double.class);WRAPPER_CLASS_SET.add(ArrayList.class);WRAPPER_CLASS_SET.add(Set.class);WRAPPER_CLASS_SET.add(Map.class);WRAPPER_CLASS_SET.add(HashMap.class);WRAPPER_CLASS_SET.add(Date.class);}public static <T> byte[] serializer(T o){if(WRAPPER_CLASS_SET.contains(o.getClass())){return ProtostuffIOUtil.toByteArray(SerializeDeserializeWrapperObj.builder(o),WRAPPER_SCHEMA, LinkedBuffer.allocate(1024));}else{return ProtostuffIOUtil.toByteArray(o,getSchema(o.getClass()), LinkedBuffer.allocate(1024));}}public static <T> byte[] serializerV1(T o){Schema<T> schema = getSchema(o.getClass());return ProtostuffIOUtil.toByteArray(o,schema, LinkedBuffer.allocate(1024));}public static <T> Schema getSchema(Class<T> clazz){if(SCHEMA_MAP.containsKey(clazz)){return SCHEMA_MAP.get(clazz);}else{Schema<T> schema = RuntimeSchema.createFrom(clazz);SCHEMA_MAP.put(clazz,schema);return schema;}}public static <T> T deserializer(byte[] bytes,Class<T> clazz) {if(WRAPPER_CLASS_SET.contains(clazz)){SerializeDeserializeWrapperObj<T> obj = new SerializeDeserializeWrapperObj<>();ProtostuffIOUtil.mergeFrom(bytes, obj, WRAPPER_SCHEMA);return obj.getData();}else{Schema<T> schema = getSchema(clazz);T obj = null;try {obj = clazz.newInstance();} catch (InstantiationException | IllegalAccessException e) {return null;}ProtostuffIOUtil.mergeFrom(bytes, obj, schema);return obj;}}public static Object deserializerToObj(byte[] bytes,Class clazz) {if(WRAPPER_CLASS_SET.contains(clazz)){SerializeDeserializeWrapperObj obj = new SerializeDeserializeWrapperObj<>();ProtostuffIOUtil.mergeFrom(bytes, obj, WRAPPER_SCHEMA);return obj.getData();}else{Schema schema = RuntimeSchema.createFrom(clazz);Object obj = null;try {obj = clazz.newInstance();} catch (InstantiationException | IllegalAccessException e) {return null;}ProtostuffIOUtil.mergeFrom(bytes, obj, schema);return obj;}}public static <T> T deserializerV1(byte[] bytes,Class<T> clazz) throws IllegalAccessException, InstantiationException {Schema<T> schema = getSchema(clazz);T obj = clazz.newInstance();ProtostuffIOUtil.mergeFrom(bytes, obj, schema);return obj;}private static class SerializeDeserializeWrapperObj<T> {private T data;public static <T> SerializeDeserializeWrapperObj<T> builder(T data) {SerializeDeserializeWrapperObj<T> wrapper = new SerializeDeserializeWrapperObj<>();wrapper.setData(data);return wrapper;}public T getData() {return data;}public void setData(T data) {this.data = data;}}}
改写ShareDataSessionImpl
/** Copyright 2014 Red Hat, Inc.** All rights reserved. This program and the accompanying materials* are made available under the terms of the Eclipse Public License v1.0* and Apache License v2.0 which accompanies this distribution.** The Eclipse Public License is available at* http://www.eclipse.org/legal/epl-v10.html** The Apache License v2.0 is available at* http://www.opensource.org/licenses/apache2.0.php** You may elect to redistribute this code under either of these licenses.*/package com.ly.session;import com.ly.ProtoStuffUtil;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.core.shareddata.impl.ClusterSerializable;
import io.vertx.ext.auth.VertxContextPRNG;
import io.vertx.ext.web.Session;
import io.vertx.ext.web.impl.Utils;
import io.vertx.ext.web.sstore.AbstractSession;
import io.vertx.ext.web.sstore.SessionStore;
import io.vertx.ext.web.sstore.redis.RedisSessionStore;
import io.vertx.redis.client.Redis;
import io.vertx.redis.client.RedisOptions;
import io.vertx.redis.client.Request;
import io.vertx.redis.client.Response;import java.lang.reflect.InvocationTargetException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;import static io.vertx.redis.client.Command.*;
import static io.vertx.redis.client.Request.cmd;/*** @author <a href="http://tfox.org">Tim Fox</a>*/
public class ReactiveRedisSessionStore implements SessionStore {private Redis redis;private VertxContextPRNG random;private long retryTimeout;@Overridepublic SessionStore init(Vertx vertx, JsonObject options) {Objects.requireNonNull(options, "options are required");long timeout = options.getLong("retryTimeout", RedisSessionStore.DEFAULT_RETRY_TIMEOUT_MS);Redis redis = Redis.createClient(vertx, new RedisOptions(options));return init(vertx, timeout, redis);}public SessionStore init(Vertx vertx, long retryTimeout, Redis redis) {random = VertxContextPRNG.current(vertx);this.retryTimeout = retryTimeout;this.redis = Objects.requireNonNull(redis, "redis is required");return this;}@Overridepublic long retryTimeout() {return retryTimeout;}@Overridepublic Session createSession(long timeout) {return createSession(timeout, DEFAULT_SESSIONID_LENGTH);}@Overridepublic Session createSession(long timeout, int length) {return new RedisShardDataSessionImpl(random, timeout, length);}public ReactiveRedisSessionStore() {super();}@Overridepublic void get(String id, Handler<AsyncResult<Session>> resultHandler) {redis.send(cmd(GET).arg(id), resGet -> {if (resGet.failed()) {resultHandler.handle(Future.failedFuture(resGet.cause()));return;}Response response = resGet.result();if (response != null) {RedisShardDataSessionImpl session = new RedisShardDataSessionImpl(random);session.readFromBuffer(0, response.toBuffer());// postpone expiration time, this cannot be done in a single frame with GET cmd
// redis.send(cmd(PEXPIRE).arg(id).arg(session.timeout()), resExpire -> {
// if (resExpire.failed()) {
// resultHandler.handle(Future.failedFuture(resExpire.cause()));
// } else {
// resultHandler.handle(Future.succeededFuture(session));
// }
// });resultHandler.handle(Future.succeededFuture(session));} else {resultHandler.handle(Future.succeededFuture());}});}@Overridepublic void delete(String id, Handler<AsyncResult<Void>> resultHandler) {redis.send(cmd(DEL).arg(id), res -> {if (res.failed()) {resultHandler.handle(Future.failedFuture(res.cause()));} else {resultHandler.handle(Future.succeededFuture());}});}@Overridepublic void put(Session session, Handler<AsyncResult<Void>> resultHandler) {Buffer buffer = Buffer.buffer();RedisShardDataSessionImpl sessionImpl = (RedisShardDataSessionImpl) session;sessionImpl.writeToBuffer(buffer);// submit with all session data & expiration TO in msRequest rq = cmd(SET).arg(session.id()).arg(buffer).arg("PX").arg(session.timeout());redis.send(rq, res -> {if (res.failed()) {resultHandler.handle(Future.failedFuture(res.cause()));} else {resultHandler.handle(Future.succeededFuture());}});}@Overridepublic void clear(Handler<AsyncResult<Void>> resultHandler) {}@Overridepublic void size(Handler<AsyncResult<Integer>> resultHandler) {}@Overridepublic void close() {}public static class RedisShardDataSessionImpl extends AbstractSession{private static final Charset UTF8 = StandardCharsets.UTF_8;private static final byte TYPE_LONG = 1;private static final byte TYPE_INT = 2;private static final byte TYPE_SHORT = 3;private static final byte TYPE_BYTE = 4;private static final byte TYPE_DOUBLE = 5;private static final byte TYPE_FLOAT = 6;private static final byte TYPE_CHAR = 7;private static final byte TYPE_BOOLEAN = 8;private static final byte TYPE_STRING = 9;private static final byte TYPE_BUFFER = 10;private static final byte TYPE_BYTES = 11;private static final byte TYPE_CLUSTER_SERIALIZABLE = 13;private static final byte TYPE_PB_SERIALIZABLE = 14;/*** Important note: This constructor (even though not referenced anywhere) is required for serialization purposes. Do* not remove.*/public RedisShardDataSessionImpl() {super();}public RedisShardDataSessionImpl(VertxContextPRNG random) {super(random);}public RedisShardDataSessionImpl(VertxContextPRNG random, long timeout, int length) {super(random, timeout, length);}public void writeToBuffer(Buffer buff) {byte[] bytes = id().getBytes(UTF8);buff.appendInt(bytes.length).appendBytes(bytes);buff.appendLong(timeout());buff.appendLong(lastAccessed());buff.appendInt(version());// use cacheBuffer dataBuf = writeDataToBuffer();buff.appendBuffer(dataBuf);}public int readFromBuffer(int pos, Buffer buffer) {int len = buffer.getInt(pos);pos += 4;byte[] bytes = buffer.getBytes(pos, pos + len);pos += len;setId(new String(bytes, UTF8));setTimeout(buffer.getLong(pos));pos += 8;setLastAccessed(buffer.getLong(pos));pos += 8;setVersion(buffer.getInt(pos));pos += 4;pos = readDataFromBuffer(pos, buffer);return pos;}private Buffer writeDataToBuffer() {Buffer buffer = Buffer.buffer();if (isEmpty()) {buffer.appendInt(0);} else {final Map<String, Object> data = data();buffer.appendInt(data.size());for (Map.Entry<String, Object> entry : data.entrySet()) {String key = entry.getKey();byte[] keyBytes = key.getBytes(UTF8);buffer.appendInt(keyBytes.length).appendBytes(keyBytes);Object val = entry.getValue();if (val instanceof Long) {buffer.appendByte(TYPE_LONG).appendLong((long) val);} else if (val instanceof Integer) {buffer.appendByte(TYPE_INT).appendInt((int) val);} else if (val instanceof Short) {buffer.appendByte(TYPE_SHORT).appendShort((short) val);} else if (val instanceof Byte) {buffer.appendByte(TYPE_BYTE).appendByte((byte) val);} else if (val instanceof Double) {buffer.appendByte(TYPE_DOUBLE).appendDouble((double) val);} else if (val instanceof Float) {buffer.appendByte(TYPE_FLOAT).appendFloat((float) val);} else if (val instanceof Character) {buffer.appendByte(TYPE_CHAR).appendShort((short) ((Character) val).charValue());} else if (val instanceof Boolean) {buffer.appendByte(TYPE_BOOLEAN).appendByte((byte) ((boolean) val ? 1 : 0));} else if (val instanceof String) {byte[] bytes = ((String) val).getBytes(UTF8);buffer.appendByte(TYPE_STRING).appendInt(bytes.length).appendBytes(bytes);} else if (val instanceof Buffer) {Buffer buff = (Buffer) val;buffer.appendByte(TYPE_BUFFER).appendInt(buff.length()).appendBuffer(buff);} else if (val instanceof byte[]) {byte[] bytes = (byte[]) val;buffer.appendByte(TYPE_BYTES).appendInt(bytes.length).appendBytes(bytes);} else if (val instanceof ClusterSerializable) {buffer.appendByte(TYPE_CLUSTER_SERIALIZABLE);String className = val.getClass().getName();byte[] classNameBytes = className.getBytes(UTF8);buffer.appendInt(classNameBytes.length).appendBytes(classNameBytes);((ClusterSerializable) val).writeToBuffer(buffer);} else {// 默认序列化成pbbuffer.appendByte(TYPE_PB_SERIALIZABLE);String className = val.getClass().getName();byte[] classNameBytes = className.getBytes(UTF8);buffer.appendInt(classNameBytes.length).appendBytes(classNameBytes);byte[] serializedBytes = ProtoStuffUtil.serializer(val);buffer.appendInt(serializedBytes.length).appendBytes(serializedBytes);}}}return buffer;}private int readDataFromBuffer(int pos, Buffer buffer) {try {int entries = buffer.getInt(pos);pos += 4;if (entries > 0) {final Map<String, Object> data = new ConcurrentHashMap<>(entries);for (int i = 0; i < entries; i++) {int keylen = buffer.getInt(pos);pos += 4;byte[] keyBytes = buffer.getBytes(pos, pos + keylen);pos += keylen;String key = new String(keyBytes, UTF8);byte type = buffer.getByte(pos++);Object val;switch (type) {case TYPE_LONG:val = buffer.getLong(pos);pos += 8;break;case TYPE_INT:val = buffer.getInt(pos);pos += 4;break;case TYPE_SHORT:val = buffer.getShort(pos);pos += 2;break;case TYPE_BYTE:val = buffer.getByte(pos);pos++;break;case TYPE_FLOAT:val = buffer.getFloat(pos);pos += 4;break;case TYPE_DOUBLE:val = buffer.getDouble(pos);pos += 8;break;case TYPE_CHAR:short s = buffer.getShort(pos);pos += 2;val = (char) s;break;case TYPE_BOOLEAN:byte b = buffer.getByte(pos);pos++;val = b == 1;break;case TYPE_STRING:int len = buffer.getInt(pos);pos += 4;byte[] bytes = buffer.getBytes(pos, pos + len);val = new String(bytes, UTF8);pos += len;break;case TYPE_BUFFER:len = buffer.getInt(pos);pos += 4;bytes = buffer.getBytes(pos, pos + len);val = Buffer.buffer(bytes);pos += len;break;case TYPE_BYTES:len = buffer.getInt(pos);pos += 4;val = buffer.getBytes(pos, pos + len);pos += len;break;case TYPE_CLUSTER_SERIALIZABLE:int classNameLen = buffer.getInt(pos);pos += 4;byte[] classNameBytes = buffer.getBytes(pos, pos + classNameLen);pos += classNameLen;String className = new String(classNameBytes, UTF8);Class<?> clazz = Utils.getClassLoader().loadClass(className);if (!ClusterSerializable.class.isAssignableFrom(clazz)) {throw new ClassCastException(new String(classNameBytes) + " is not assignable from ClusterSerializable");}ClusterSerializable obj = (ClusterSerializable) clazz.getDeclaredConstructor().newInstance();pos = obj.readFromBuffer(pos, buffer);val = obj;break;case TYPE_PB_SERIALIZABLE:classNameLen = buffer.getInt(pos);pos += 4;classNameBytes = buffer.getBytes(pos, pos + classNameLen);pos += classNameLen;className = new String(classNameBytes, UTF8);Class<?> clazzByPb = Utils.getClassLoader().loadClass(className);int serializeObjLength = buffer.getInt(pos);pos+=4;byte[] serializeObjBytes = buffer.getBytes(pos,pos+serializeObjLength);pos+=serializeObjLength;val = ProtoStuffUtil.deserializerToObj(serializeObjBytes,clazzByPb);break;default:throw new IllegalStateException("Invalid serialized type: " + type);}data.put(key, val);}setData(data);}return pos;} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e) {throw new VertxException(e);}}}
}
序列化时默认采用pb做序列化,反序列化时,如果类型是pb就采用pb做反序列化。
完整代码其看github
https://github.com/haozhi-ly/spring-boot-tutorial/tree/master/spring-boot-vertx