摘要:序調(diào)用,有多種序列化的方式,通用如,使用的方面的,比如默認(rèn)的序列化,比如還有跨語言的,比如。所以也一直在尋找運(yùn)行效率與開發(fā)效率兼得的序列化方式。偶爾在網(wǎng)上看到,覺得找到了一直在找的這種序列化方式。
序
rpc調(diào)用,有多種序列化的方式,通用如json,mongodb使用的bson;java方面的,比如Java默認(rèn)的序列化,比如hessian;還有跨語言的,比如thrift、protocolbuf。thrift和pb的好處是序列化后size比較小,但是缺點(diǎn)是得生成java代碼,這個(gè)挺雞肋的,所以不管二者運(yùn)行時(shí)效率有多高,開發(fā)效率相對(duì)比較低的。像hessian,是有一些在用,但是感覺不如pb那樣強(qiáng)大。所以也一直在尋找運(yùn)行效率與開發(fā)效率兼得的序列化方式。偶爾在網(wǎng)上看到protostuff,覺得找到了一直在找的這種序列化方式。
protostuff簡介protobuf的一個(gè)缺點(diǎn)是需要數(shù)據(jù)結(jié)構(gòu)的預(yù)編譯過程,首先要編寫.proto格式的配置文件,再通過protobuf提供的工具生成各種語言響應(yīng)的代碼。由于java具有反射和動(dòng)態(tài)代碼生成的能力,這個(gè)預(yù)編譯過程不是必須的,可以在代碼執(zhí)行時(shí)來實(shí)現(xiàn)。有protostuff已經(jīng)實(shí)現(xiàn)了這個(gè)功能。
protostuff效率Ser Time+Deser Time (ns)
Size, Compressed size [light] in bytes
使用 pom依賴工具類com.dyuproject.protostuff protostuff-core 1.0.8 com.dyuproject.protostuff protostuff-runtime 1.0.8
public class SerializationUtil { private static Map基于netty的rpc, Schema>> cachedSchema = new ConcurrentHashMap , Schema>>(); private static Objenesis objenesis = new ObjenesisStd(true); private static Schema getSchema(Class clazz) { @SuppressWarnings("unchecked") Schema schema = (Schema ) cachedSchema.get(clazz); if (schema == null) { schema = RuntimeSchema.getSchema(clazz); if (schema != null) { cachedSchema.put(clazz, schema); } } return schema; } /** * 序列化 * * @param obj * @return */ public static byte[] serializer(T obj) { @SuppressWarnings("unchecked") Class clazz = (Class ) obj.getClass(); LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); try { Schema schema = getSchema(clazz); return ProtostuffIOUtil.toByteArray(obj, schema, buffer); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } finally { buffer.clear(); } } /** * 反序列化 * * @param data * @param clazz * @return */ public static T deserializer(byte[] data, Class clazz) { try { T obj = objenesis.newInstance(clazz); Schema schema = getSchema(clazz); ProtostuffIOUtil.mergeFrom(data, obj, schema); return obj; } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } }
NettyServer
public class NettyServer { private static final Logger logger = LoggerFactory.getLogger(NettyServer.class); private int ioThreadNum; //內(nèi)核為此套接口排隊(duì)的最大連接個(gè)數(shù),對(duì)于給定的監(jiān)聽套接口,內(nèi)核要維護(hù)兩個(gè)隊(duì)列,未鏈接隊(duì)列和已連接隊(duì)列大小總和最大值 private int backlog; private int port; private Channel channel; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; public NettyServer(int ioThreadNum, int backlog, int port) { this.ioThreadNum = ioThreadNum; this.backlog = backlog; this.port = port; } public void start() throws InterruptedException { bossGroup = new NioEventLoopGroup(); workerGroup = new NioEventLoopGroup(this.ioThreadNum); final MapdemoService = new HashMap (); demoService.put("com.codecraft.service.HelloService", new HelloServiceImpl()); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, backlog) //注意是childOption .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(new ChannelInitializer () { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new RpcDecoder(RpcRequest.class)) .addLast(new RpcEncoder(RpcResponse.class)) .addLast(new ServerRpcHandler(demoService)); } }); channel = serverBootstrap.bind("127.0.0.1",port).sync().channel(); logger.info("NettyRPC server listening on port "+ port + " and ready for connections..."); Runtime.getRuntime().addShutdownHook(new Thread(){ @Override public void run(){ //do shutdown staff } }); } public void stop() { if (null == channel) { throw new ServerStopException(); } bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); channel.closeFuture().syncUninterruptibly(); bossGroup = null; workerGroup = null; channel = null; } }
ServerRpcHandler
public class ServerRpcHandler extends SimpleChannelInboundHandler{ private static final Logger logger = LoggerFactory.getLogger(ServerRpcHandler.class); private final Map serviceMapping; public ServerRpcHandler(Map serviceMapping) { this.serviceMapping = serviceMapping; } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) throws Exception { RpcResponse response = new RpcResponse(); response.setTraceId(rpcRequest.getTraceId()); try { logger.info("server handle request:{}",rpcRequest); Object result = handle(rpcRequest); response.setResult(result); } catch (Throwable t) { response.setError(t); } channelHandlerContext.writeAndFlush(response); } private Object handle(RpcRequest request) throws Throwable { String className = request.getClassName(); Object serviceBean = serviceMapping.get(className); Class> serviceClass = serviceBean.getClass(); String methodName = request.getMethodName(); Class>[] parameterTypes = request.getParameterTypes(); Object[] parameters = request.getParameters(); FastClass serviceFastClass = FastClass.create(serviceClass); FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes); return serviceFastMethod.invoke(serviceBean, parameters); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { logger.error(cause.getMessage(), cause); RpcResponse response = new RpcResponse(); if(cause instanceof ServerException){ response.setTraceId(((ServerException) cause).getTraceId()); } response.setError(cause); ctx.writeAndFlush(response); } }
NettyClient
public class NettyClient implements IClient { private EventLoopGroup workerGroup; private Channel channel; private int workerGroupThreads; private ClientRpcHandler clientRpcHandler; private final Optional> NO_TIMEOUT = Optional. >absent(); public NettyClient(int workerGroupThreads) { this.workerGroupThreads = workerGroupThreads; } public void connect(InetSocketAddress socketAddress) { workerGroup = new NioEventLoopGroup(workerGroupThreads); clientRpcHandler = new ClientRpcHandler(); Bootstrap bootstrap = new Bootstrap(); bootstrap .group(workerGroup) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer () { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new RpcDecoder(RpcResponse.class)) .addLast(new RpcEncoder(RpcRequest.class)) .addLast(clientRpcHandler); } }); channel = bootstrap.connect(socketAddress.getAddress().getHostAddress(), socketAddress.getPort()) .syncUninterruptibly() .channel(); } public RpcResponse syncSend(RpcRequest request) throws InterruptedException { System.out.println("send request:"+request); channel.writeAndFlush(request).sync(); return clientRpcHandler.send(request,NO_TIMEOUT); } public RpcResponse asyncSend(RpcRequest request,TimeUnit timeUnit,long timeout) throws InterruptedException { channel.writeAndFlush(request); return clientRpcHandler.send(request, Optional.of(Pair.of(timeout,timeUnit))); } public InetSocketAddress getRemoteAddress() { SocketAddress remoteAddress = channel.remoteAddress(); if (!(remoteAddress instanceof InetSocketAddress)) { throw new RuntimeException("Get remote address error, should be InetSocketAddress"); } return (InetSocketAddress) remoteAddress; } public void close() { if (null == channel) { throw new ClientCloseException(); } workerGroup.shutdownGracefully(); channel.closeFuture().syncUninterruptibly(); workerGroup = null; channel = null; } }
ClientRpcHandler
@ChannelHandler.Sharable public class ClientRpcHandler extends SimpleChannelInboundHandler{ //用blocking queue主要是用阻塞的功能,省的自己加鎖 private final ConcurrentHashMap > responseMap = new ConcurrentHashMap >(); //messageReceived @Override protected void channelRead0(ChannelHandlerContext ctx, RpcResponse rpcResponse) throws Exception { System.out.println("receive response:"+rpcResponse); BlockingQueue queue = responseMap.get(rpcResponse.getTraceId()); queue.add(rpcResponse); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); cause.printStackTrace(); } public RpcResponse send(RpcRequest request,Optional > timeout) throws InterruptedException { responseMap.putIfAbsent(request.getTraceId(), new LinkedBlockingQueue (1)); RpcResponse response = null; try { BlockingQueue queue = responseMap.get(request.getTraceId()); if(timeout == null || !timeout.isPresent()){ response = queue.take(); }else{ response = queue.poll(timeout.get().getKey(),timeout.get().getValue()); } } finally { responseMap.remove(request.getTraceId()); } return response; } }
decoder
public class RpcDecoder extends ByteToMessageDecoder { private Class> genericClass; public RpcDecoder(Class> genericClass) { this.genericClass = genericClass; } @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List
encoder
public class RpcEncoder extends MessageToByteEncoder { private Class> genericClass; public RpcEncoder(Class> genericClass) { this.genericClass = genericClass; } @Override protected void encode(ChannelHandlerContext channelHandlerContext, Object obj, ByteBuf byteBuf) throws Exception { if (genericClass.isInstance(obj)) { byte[] data = SerializationUtil.serializer(obj); byteBuf.writeInt(data.length); byteBuf.writeBytes(data); } } }參考
jvm-serializers
protostuff
java序列化/反序列化之xstream、protobuf、protostuff 的比較與使用例子
Protostuff序列化
protostuff介紹
Protostuff詳解
序列化框架 kryo VS hessian VS Protostuff VS java
Protostuff序列化和反序列化
eishay/jvm-serializers
Protostuff 序列化
使用Netty實(shí)現(xiàn)多路復(fù)用的client
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://m.specialneedsforspecialkids.com/yun/65730.html
摘要:序列化工具類序列化工具的序列化與反序列化使用實(shí)現(xiàn)序列化和反序列化反序列化時(shí),必須要有默認(rèn)構(gòu)造函數(shù),否則報(bào)錯(cuò)使用序列化緩存此類分別包含序列化序列化序列化三種序列化方式。 序列化工具類 序列化即將對(duì)象序列化為字節(jié)數(shù)組,反序列化就是將字節(jié)數(shù)組恢復(fù)成對(duì)象。主要的目的是方便傳輸和存儲(chǔ)。 序列化工具類: public class SerializeUtil { private stati...
摘要:在查詢的服務(wù)方法上添加如下注解表明該方法的返回值需要緩存。當(dāng)被緩存的數(shù)據(jù)發(fā)生改變,緩存需要被清理或者修改,這里使用如下注解清除指定的緩存。事務(wù)是一個(gè)原子操作,所有的緩存,消息,這種非強(qiáng)一致性要求的操作,都應(yīng)該在事務(wù)成功提交后執(zhí)行。 【為什么使用redis 性能極高,redis能讀的速度是110000次/s,寫的速度是81000次/s 豐富的數(shù)據(jù)類型,redis支持二進(jìn)制案例的 Str...
摘要:項(xiàng)目簡介在慕課網(wǎng)上發(fā)現(xiàn)了一個(gè)項(xiàng)目,內(nèi)容講的是高并發(fā)秒殺,覺得挺有意思的,就進(jìn)去學(xué)習(xí)了一番。比如重復(fù)秒殺,秒殺關(guān)閉這些都是屬于秒殺的業(yè)務(wù)。秒殺操作是與數(shù)據(jù)庫的事務(wù)相關(guān)的,不能使用緩存來替代了。 項(xiàng)目簡介 在慕課網(wǎng)上發(fā)現(xiàn)了一個(gè)JavaWeb項(xiàng)目,內(nèi)容講的是高并發(fā)秒殺,覺得挺有意思的,就進(jìn)去學(xué)習(xí)了一番。 記錄在該項(xiàng)目中學(xué)到了什么玩意.. 該項(xiàng)目源碼對(duì)應(yīng)的gitHub地址(由觀看其視頻的人編寫...
摘要:之壓縮與解壓解壓壓縮壓縮與解壓工具類在實(shí)際的應(yīng)用場景中,特別是對(duì)外傳輸數(shù)據(jù)時(shí),將原始數(shù)據(jù)壓縮之后丟出去,可以說是非常常見的一個(gè)了,平常倒是沒有直接使用原生的壓縮工具類,使用和的機(jī)會(huì)較多正好在實(shí)際的工作場景中遇到了,現(xiàn)在簡單的看下使用姿 title: 180918-JDK之Deflater壓縮與Inflater解壓tags: JDK categories: Java JDK dat...
閱讀 3718·2021-11-25 09:43
閱讀 2605·2021-11-18 13:11
閱讀 2218·2019-08-30 15:55
閱讀 3277·2019-08-26 11:58
閱讀 2830·2019-08-26 10:47
閱讀 2235·2019-08-26 10:20
閱讀 1278·2019-08-23 17:59
閱讀 3013·2019-08-23 15:54