摘要:序调用,有多种序列化的方式,通用如,使用的方面的,比如默认的序列化,比如还有跨语言的,比如。所以也一直在寻找运行效率与开发效率兼得的序列化方式。偶尔在网上看到,觉得找到了一直在找的这种序列化方式。
序
rpc调用,有多种序列化的方式,通用如json,mongodb使用的bson;java方面的,比如Java默认的序列化,比如hessian;还有跨语言的,比如thrift、protocolbuf。thrift和pb的好处是序列化后size比较小,但是缺点是得生成java代码,这个挺鸡肋的,所以不管二者运行时效率有多高,开发效率相对比较低的。像hessian,是有一些在用,但是感觉不如pb那样强大。所以也一直在寻找运行效率与开发效率兼得的序列化方式。偶尔在网上看到protostuff,觉得找到了一直在找的这种序列化方式。
protostuff简介protobuf的一个缺点是需要数据结构的预编译过程,首先要编写.proto格式的配置文件,再通过protobuf提供的工具生成各种语言响应的代码。由于java具有反射和动态代码生成的能力,这个预编译过程不是必须的,可以在代码执行时来实现。有protostuff已经实现了这个功能。
protostuff效率Ser Time+Deser Time (ns)
Size, Compressed size [light] in bytes
使用 pom依赖工具类com.dyuproject.protostuff protostuff-core 1.0.8 com.dyuproject.protostuff protostuff-runtime 1.0.8
public class SerializationUtil {
private static Map, Schema>> cachedSchema = new ConcurrentHashMap, Schema>>();
private static Objenesis objenesis = new ObjenesisStd(true);
private static Schema getSchema(Class clazz) {
@SuppressWarnings("unchecked")
Schema schema = (Schema) cachedSchema.get(clazz);
if (schema == null) {
schema = RuntimeSchema.getSchema(clazz);
if (schema != null) {
cachedSchema.put(clazz, schema);
}
}
return schema;
}
/**
* 序列化
*
* @param obj
* @return
*/
public static byte[] serializer(T obj) {
@SuppressWarnings("unchecked")
Class clazz = (Class) obj.getClass();
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
try {
Schema schema = getSchema(clazz);
return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
} finally {
buffer.clear();
}
}
/**
* 反序列化
*
* @param data
* @param clazz
* @return
*/
public static T deserializer(byte[] data, Class clazz) {
try {
T obj = objenesis.newInstance(clazz);
Schema schema = getSchema(clazz);
ProtostuffIOUtil.mergeFrom(data, obj, schema);
return obj;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
}
基于netty的rpc
NettyServer
public class NettyServer {
private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
private int ioThreadNum;
//内核为此套接口排队的最大连接个数,对于给定的监听套接口,内核要维护两个队列,未链接队列和已连接队列大小总和最大值
private int backlog;
private int port;
private Channel channel;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
public NettyServer(int ioThreadNum, int backlog, int port) {
this.ioThreadNum = ioThreadNum;
this.backlog = backlog;
this.port = port;
}
public void start() throws InterruptedException {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup(this.ioThreadNum);
final Map demoService = new HashMap();
demoService.put("com.codecraft.service.HelloService", new HelloServiceImpl());
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, backlog)
//注意是childOption
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new RpcDecoder(RpcRequest.class))
.addLast(new RpcEncoder(RpcResponse.class))
.addLast(new ServerRpcHandler(demoService));
}
});
channel = serverBootstrap.bind("127.0.0.1",port).sync().channel();
logger.info("NettyRPC server listening on port "+ port + " and ready for connections...");
Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public void run(){
//do shutdown staff
}
});
}
public void stop() {
if (null == channel) {
throw new ServerStopException();
}
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
channel.closeFuture().syncUninterruptibly();
bossGroup = null;
workerGroup = null;
channel = null;
}
}
ServerRpcHandler
public class ServerRpcHandler extends SimpleChannelInboundHandler{ private static final Logger logger = LoggerFactory.getLogger(ServerRpcHandler.class); private final Map serviceMapping; public ServerRpcHandler(Map serviceMapping) { this.serviceMapping = serviceMapping; } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) throws Exception { RpcResponse response = new RpcResponse(); response.setTraceId(rpcRequest.getTraceId()); try { logger.info("server handle request:{}",rpcRequest); Object result = handle(rpcRequest); response.setResult(result); } catch (Throwable t) { response.setError(t); } channelHandlerContext.writeAndFlush(response); } private Object handle(RpcRequest request) throws Throwable { String className = request.getClassName(); Object serviceBean = serviceMapping.get(className); Class> serviceClass = serviceBean.getClass(); String methodName = request.getMethodName(); Class>[] parameterTypes = request.getParameterTypes(); Object[] parameters = request.getParameters(); FastClass serviceFastClass = FastClass.create(serviceClass); FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes); return serviceFastMethod.invoke(serviceBean, parameters); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { logger.error(cause.getMessage(), cause); RpcResponse response = new RpcResponse(); if(cause instanceof ServerException){ response.setTraceId(((ServerException) cause).getTraceId()); } response.setError(cause); ctx.writeAndFlush(response); } }
NettyClient
public class NettyClient implements IClient {
private EventLoopGroup workerGroup;
private Channel channel;
private int workerGroupThreads;
private ClientRpcHandler clientRpcHandler;
private final Optional> NO_TIMEOUT = Optional.>absent();
public NettyClient(int workerGroupThreads) {
this.workerGroupThreads = workerGroupThreads;
}
public void connect(InetSocketAddress socketAddress) {
workerGroup = new NioEventLoopGroup(workerGroupThreads);
clientRpcHandler = new ClientRpcHandler();
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new RpcDecoder(RpcResponse.class))
.addLast(new RpcEncoder(RpcRequest.class))
.addLast(clientRpcHandler);
}
});
channel = bootstrap.connect(socketAddress.getAddress().getHostAddress(), socketAddress.getPort())
.syncUninterruptibly()
.channel();
}
public RpcResponse syncSend(RpcRequest request) throws InterruptedException {
System.out.println("send request:"+request);
channel.writeAndFlush(request).sync();
return clientRpcHandler.send(request,NO_TIMEOUT);
}
public RpcResponse asyncSend(RpcRequest request,TimeUnit timeUnit,long timeout) throws InterruptedException {
channel.writeAndFlush(request);
return clientRpcHandler.send(request, Optional.of(Pair.of(timeout,timeUnit)));
}
public InetSocketAddress getRemoteAddress() {
SocketAddress remoteAddress = channel.remoteAddress();
if (!(remoteAddress instanceof InetSocketAddress)) {
throw new RuntimeException("Get remote address error, should be InetSocketAddress");
}
return (InetSocketAddress) remoteAddress;
}
public void close() {
if (null == channel) {
throw new ClientCloseException();
}
workerGroup.shutdownGracefully();
channel.closeFuture().syncUninterruptibly();
workerGroup = null;
channel = null;
}
}
ClientRpcHandler
@ChannelHandler.Sharable public class ClientRpcHandler extends SimpleChannelInboundHandler{ //用blocking queue主要是用阻塞的功能,省的自己加锁 private final ConcurrentHashMap > responseMap = new ConcurrentHashMap >(); //messageReceived @Override protected void channelRead0(ChannelHandlerContext ctx, RpcResponse rpcResponse) throws Exception { System.out.println("receive response:"+rpcResponse); BlockingQueue queue = responseMap.get(rpcResponse.getTraceId()); queue.add(rpcResponse); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); cause.printStackTrace(); } public RpcResponse send(RpcRequest request,Optional > timeout) throws InterruptedException { responseMap.putIfAbsent(request.getTraceId(), new LinkedBlockingQueue (1)); RpcResponse response = null; try { BlockingQueue queue = responseMap.get(request.getTraceId()); if(timeout == null || !timeout.isPresent()){ response = queue.take(); }else{ response = queue.poll(timeout.get().getKey(),timeout.get().getValue()); } } finally { responseMap.remove(request.getTraceId()); } return response; } }
decoder
public class RpcDecoder extends ByteToMessageDecoder {
private Class> genericClass;
public RpcDecoder(Class> genericClass) {
this.genericClass = genericClass;
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List
encoder
public class RpcEncoder extends MessageToByteEncoder {
private Class> genericClass;
public RpcEncoder(Class> genericClass) {
this.genericClass = genericClass;
}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object obj, ByteBuf byteBuf) throws Exception {
if (genericClass.isInstance(obj)) {
byte[] data = SerializationUtil.serializer(obj);
byteBuf.writeInt(data.length);
byteBuf.writeBytes(data);
}
}
}
参考
jvm-serializers
protostuff
java序列化/反序列化之xstream、protobuf、protostuff 的比较与使用例子
Protostuff序列化
protostuff介绍
Protostuff详解
序列化框架 kryo VS hessian VS Protostuff VS java
Protostuff序列化和反序列化
eishay/jvm-serializers
Protostuff 序列化
使用Netty实现多路复用的client
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/65730.html
摘要:序列化工具类序列化工具的序列化与反序列化使用实现序列化和反序列化反序列化时,必须要有默认构造函数,否则报错使用序列化缓存此类分别包含序列化序列化序列化三种序列化方式。 序列化工具类 序列化即将对象序列化为字节数组,反序列化就是将字节数组恢复成对象。主要的目的是方便传输和存储。 序列化工具类: public class SerializeUtil { private stati...
摘要:在查询的服务方法上添加如下注解表明该方法的返回值需要缓存。当被缓存的数据发生改变,缓存需要被清理或者修改,这里使用如下注解清除指定的缓存。事务是一个原子操作,所有的缓存,消息,这种非强一致性要求的操作,都应该在事务成功提交后执行。 【为什么使用redis 性能极高,redis能读的速度是110000次/s,写的速度是81000次/s 丰富的数据类型,redis支持二进制案例的 Str...
摘要:项目简介在慕课网上发现了一个项目,内容讲的是高并发秒杀,觉得挺有意思的,就进去学习了一番。比如重复秒杀,秒杀关闭这些都是属于秒杀的业务。秒杀操作是与数据库的事务相关的,不能使用缓存来替代了。 项目简介 在慕课网上发现了一个JavaWeb项目,内容讲的是高并发秒杀,觉得挺有意思的,就进去学习了一番。 记录在该项目中学到了什么玩意.. 该项目源码对应的gitHub地址(由观看其视频的人编写...
摘要:之压缩与解压解压压缩压缩与解压工具类在实际的应用场景中,特别是对外传输数据时,将原始数据压缩之后丢出去,可以说是非常常见的一个了,平常倒是没有直接使用原生的压缩工具类,使用和的机会较多正好在实际的工作场景中遇到了,现在简单的看下使用姿 title: 180918-JDK之Deflater压缩与Inflater解压tags: JDK categories: Java JDK dat...
阅读 4865·2021-11-25 09:43
阅读 2817·2021-11-18 13:11
阅读 2511·2019-08-30 15:55
阅读 3445·2019-08-26 11:58
阅读 3026·2019-08-26 10:47
阅读 2459·2019-08-26 10:20
阅读 1433·2019-08-23 17:59
阅读 3369·2019-08-23 15:54