国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

使用Protostuff序列化

ephererid / 3398人閱讀

摘要:序調(diào)用,有多種序列化的方式,通用如,使用的方面的,比如默認(rèn)的序列化,比如還有跨語言的,比如。所以也一直在尋找運(yùn)行效率與開發(fā)效率兼得的序列化方式。偶爾在網(wǎng)上看到,覺得找到了一直在找的這種序列化方式。

rpc調(diào)用,有多種序列化的方式,通用如json,mongodb使用的bson;java方面的,比如Java默認(rèn)的序列化,比如hessian;還有跨語言的,比如thrift、protocolbuf。thrift和pb的好處是序列化后size比較小,但是缺點(diǎn)是得生成java代碼,這個(gè)挺雞肋的,所以不管二者運(yùn)行時(shí)效率有多高,開發(fā)效率相對(duì)比較低的。像hessian,是有一些在用,但是感覺不如pb那樣強(qiáng)大。所以也一直在尋找運(yùn)行效率與開發(fā)效率兼得的序列化方式。偶爾在網(wǎng)上看到protostuff,覺得找到了一直在找的這種序列化方式。

protostuff簡介

protobuf的一個(gè)缺點(diǎn)是需要數(shù)據(jù)結(jié)構(gòu)的預(yù)編譯過程,首先要編寫.proto格式的配置文件,再通過protobuf提供的工具生成各種語言響應(yīng)的代碼。由于java具有反射和動(dòng)態(tài)代碼生成的能力,這個(gè)預(yù)編譯過程不是必須的,可以在代碼執(zhí)行時(shí)來實(shí)現(xiàn)。有protostuff已經(jīng)實(shí)現(xiàn)了這個(gè)功能。

protostuff效率

Ser Time+Deser Time (ns)

Size, Compressed size [light] in bytes

使用 pom依賴
        
            com.dyuproject.protostuff
            protostuff-core
            1.0.8
        
        
            com.dyuproject.protostuff
            protostuff-runtime
            1.0.8
        
工具類
public class SerializationUtil {

    private static Map, Schema> cachedSchema = new ConcurrentHashMap, Schema>();

    private static Objenesis objenesis = new ObjenesisStd(true);

    private static  Schema getSchema(Class clazz) {
        @SuppressWarnings("unchecked")
        Schema schema = (Schema) cachedSchema.get(clazz);
        if (schema == null) {
            schema = RuntimeSchema.getSchema(clazz);
            if (schema != null) {
                cachedSchema.put(clazz, schema);
            }
        }
        return schema;
    }

    /**
     * 序列化
     *
     * @param obj
     * @return
     */
    public static  byte[] serializer(T obj) {
        @SuppressWarnings("unchecked")
        Class clazz = (Class) obj.getClass();
        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
        try {
            Schema schema = getSchema(clazz);
            return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        } finally {
            buffer.clear();
        }
    }

    /**
     * 反序列化
     *
     * @param data
     * @param clazz
     * @return
     */
    public static  T deserializer(byte[] data, Class clazz) {
        try {
            T obj = objenesis.newInstance(clazz);
            Schema schema = getSchema(clazz);
            ProtostuffIOUtil.mergeFrom(data, obj, schema);
            return obj;
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }
}
基于netty的rpc

NettyServer

public class NettyServer {

    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);

    private int ioThreadNum;

    //內(nèi)核為此套接口排隊(duì)的最大連接個(gè)數(shù),對(duì)于給定的監(jiān)聽套接口,內(nèi)核要維護(hù)兩個(gè)隊(duì)列,未鏈接隊(duì)列和已連接隊(duì)列大小總和最大值
    private int backlog;

    private int port;

    private Channel channel;
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;

    public NettyServer(int ioThreadNum, int backlog, int port) {
        this.ioThreadNum = ioThreadNum;
        this.backlog = backlog;
        this.port = port;
    }

    public void start() throws InterruptedException {
        bossGroup = new NioEventLoopGroup();
        workerGroup = new NioEventLoopGroup(this.ioThreadNum);
        final Map demoService = new HashMap();
        demoService.put("com.codecraft.service.HelloService", new HelloServiceImpl());

        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, backlog)
                //注意是childOption
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline()
                                .addLast(new RpcDecoder(RpcRequest.class))
                                .addLast(new RpcEncoder(RpcResponse.class))
                                .addLast(new ServerRpcHandler(demoService));
                    }
                });

        channel = serverBootstrap.bind("127.0.0.1",port).sync().channel();

        logger.info("NettyRPC server listening on port "+ port + " and ready for connections...");

        Runtime.getRuntime().addShutdownHook(new Thread(){
            @Override
            public void run(){
                //do shutdown staff
            }
        });
    }

    public void stop() {
        if (null == channel) {
            throw new ServerStopException();
        }
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
        channel.closeFuture().syncUninterruptibly();
        bossGroup = null;
        workerGroup = null;
        channel = null;
    }
}

ServerRpcHandler

public class ServerRpcHandler extends SimpleChannelInboundHandler {

    private static final Logger logger = LoggerFactory.getLogger(ServerRpcHandler.class);

    private final Map serviceMapping;

    public ServerRpcHandler(Map serviceMapping) {
        this.serviceMapping = serviceMapping;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) throws Exception {
        RpcResponse response = new RpcResponse();
        response.setTraceId(rpcRequest.getTraceId());
        try {
            logger.info("server handle request:{}",rpcRequest);
            Object result = handle(rpcRequest);
            response.setResult(result);
        } catch (Throwable t) {
            response.setError(t);
        }
        channelHandlerContext.writeAndFlush(response);
    }

    private Object handle(RpcRequest request) throws Throwable {
        String className = request.getClassName();
        Object serviceBean = serviceMapping.get(className);

        Class serviceClass = serviceBean.getClass();
        String methodName = request.getMethodName();
        Class[] parameterTypes = request.getParameterTypes();
        Object[] parameters = request.getParameters();

        FastClass serviceFastClass = FastClass.create(serviceClass);
        FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
        return serviceFastMethod.invoke(serviceBean, parameters);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        logger.error(cause.getMessage(), cause);
        RpcResponse response = new RpcResponse();
        if(cause instanceof ServerException){
            response.setTraceId(((ServerException) cause).getTraceId());
        }
        response.setError(cause);
        ctx.writeAndFlush(response);
    }
}

NettyClient

public class NettyClient implements IClient {

    private EventLoopGroup workerGroup;
    private Channel channel;

    private int workerGroupThreads;

    private ClientRpcHandler clientRpcHandler;

    private final Optional> NO_TIMEOUT = Optional.>absent();

    public NettyClient(int workerGroupThreads) {
        this.workerGroupThreads = workerGroupThreads;
    }

    public void connect(InetSocketAddress socketAddress) {
        workerGroup = new NioEventLoopGroup(workerGroupThreads);
        clientRpcHandler = new ClientRpcHandler();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap
                .group(workerGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                                .addLast(new RpcDecoder(RpcResponse.class))
                                .addLast(new RpcEncoder(RpcRequest.class))
                                .addLast(clientRpcHandler);
                    }
                });
        channel = bootstrap.connect(socketAddress.getAddress().getHostAddress(), socketAddress.getPort())
                .syncUninterruptibly()
                .channel();
    }

    public RpcResponse syncSend(RpcRequest request) throws InterruptedException {
        System.out.println("send request:"+request);
        channel.writeAndFlush(request).sync();
        return clientRpcHandler.send(request,NO_TIMEOUT);
    }

    public RpcResponse asyncSend(RpcRequest request,TimeUnit timeUnit,long timeout) throws InterruptedException {
        channel.writeAndFlush(request);
        return clientRpcHandler.send(request, Optional.of(Pair.of(timeout,timeUnit)));
    }

    public InetSocketAddress getRemoteAddress() {
        SocketAddress remoteAddress = channel.remoteAddress();
        if (!(remoteAddress instanceof InetSocketAddress)) {
            throw new RuntimeException("Get remote address error, should be InetSocketAddress");
        }
        return (InetSocketAddress) remoteAddress;
    }

    public void close() {
        if (null == channel) {
            throw new ClientCloseException();
        }
        workerGroup.shutdownGracefully();
        channel.closeFuture().syncUninterruptibly();
        workerGroup = null;
        channel = null;
    }
}

ClientRpcHandler

@ChannelHandler.Sharable
public class ClientRpcHandler extends SimpleChannelInboundHandler {

    //用blocking queue主要是用阻塞的功能,省的自己加鎖
    private final ConcurrentHashMap> responseMap = new ConcurrentHashMap>();


    //messageReceived
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcResponse rpcResponse) throws Exception {
        System.out.println("receive response:"+rpcResponse);
        BlockingQueue queue = responseMap.get(rpcResponse.getTraceId());
        queue.add(rpcResponse);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        cause.printStackTrace();
    }

    public RpcResponse send(RpcRequest request,Optional> timeout) throws InterruptedException {
        responseMap.putIfAbsent(request.getTraceId(), new LinkedBlockingQueue(1));
        RpcResponse response = null;
        try {
            BlockingQueue queue = responseMap.get(request.getTraceId());
            if(timeout == null || !timeout.isPresent()){
                response = queue.take();
            }else{
                response = queue.poll(timeout.get().getKey(),timeout.get().getValue());
            }
        } finally {
            responseMap.remove(request.getTraceId());
        }
        return response;
    }
}

decoder

public class RpcDecoder extends ByteToMessageDecoder {

    private Class genericClass;

    public RpcDecoder(Class genericClass) {
        this.genericClass = genericClass;
    }

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception {
        if (byteBuf.readableBytes() < 4) {
            return;
        }
        byteBuf.markReaderIndex();
        int dataLength = byteBuf.readInt();
        if (dataLength < 0) {
            channelHandlerContext.close();
        }
        if (byteBuf.readableBytes() < dataLength) {
            byteBuf.resetReaderIndex();
        }
        byte[] data = new byte[dataLength];
        byteBuf.readBytes(data);

        Object obj = SerializationUtil.deserializer(data, genericClass);
        list.add(obj);
    }
}

encoder

public class RpcEncoder extends MessageToByteEncoder {

    private Class genericClass;

    public RpcEncoder(Class genericClass) {
        this.genericClass = genericClass;
    }

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Object obj, ByteBuf byteBuf) throws Exception {
        if (genericClass.isInstance(obj)) {
            byte[] data = SerializationUtil.serializer(obj);
            byteBuf.writeInt(data.length);
            byteBuf.writeBytes(data);
        }
    }
}
參考

jvm-serializers

protostuff

java序列化/反序列化之xstream、protobuf、protostuff 的比較與使用例子

Protostuff序列化

protostuff介紹

Protostuff詳解

序列化框架 kryo VS hessian VS Protostuff VS java

Protostuff序列化和反序列化

eishay/jvm-serializers

Protostuff 序列化

使用Netty實(shí)現(xiàn)多路復(fù)用的client

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://m.specialneedsforspecialkids.com/yun/65730.html

相關(guān)文章

  • java常用列化與反列化方法

    摘要:序列化工具類序列化工具的序列化與反序列化使用實(shí)現(xiàn)序列化和反序列化反序列化時(shí),必須要有默認(rèn)構(gòu)造函數(shù),否則報(bào)錯(cuò)使用序列化緩存此類分別包含序列化序列化序列化三種序列化方式。 序列化工具類 序列化即將對(duì)象序列化為字節(jié)數(shù)組,反序列化就是將字節(jié)數(shù)組恢復(fù)成對(duì)象。主要的目的是方便傳輸和存儲(chǔ)。 序列化工具類: public class SerializeUtil { private stati...

    zhkai 評(píng)論0 收藏0
  • java并發(fā)編程學(xué)習(xí)20--基于springboot的秒殺系統(tǒng)實(shí)現(xiàn)2--redis緩存

    摘要:在查詢的服務(wù)方法上添加如下注解表明該方法的返回值需要緩存。當(dāng)被緩存的數(shù)據(jù)發(fā)生改變,緩存需要被清理或者修改,這里使用如下注解清除指定的緩存。事務(wù)是一個(gè)原子操作,所有的緩存,消息,這種非強(qiáng)一致性要求的操作,都應(yīng)該在事務(wù)成功提交后執(zhí)行。 【為什么使用redis 性能極高,redis能讀的速度是110000次/s,寫的速度是81000次/s 豐富的數(shù)據(jù)類型,redis支持二進(jìn)制案例的 Str...

    bovenson 評(píng)論0 收藏0
  • Java高并發(fā)秒殺系統(tǒng)【觀后總結(jié)】

    摘要:項(xiàng)目簡介在慕課網(wǎng)上發(fā)現(xiàn)了一個(gè)項(xiàng)目,內(nèi)容講的是高并發(fā)秒殺,覺得挺有意思的,就進(jìn)去學(xué)習(xí)了一番。比如重復(fù)秒殺,秒殺關(guān)閉這些都是屬于秒殺的業(yè)務(wù)。秒殺操作是與數(shù)據(jù)庫的事務(wù)相關(guān)的,不能使用緩存來替代了。 項(xiàng)目簡介 在慕課網(wǎng)上發(fā)現(xiàn)了一個(gè)JavaWeb項(xiàng)目,內(nèi)容講的是高并發(fā)秒殺,覺得挺有意思的,就進(jìn)去學(xué)習(xí)了一番。 記錄在該項(xiàng)目中學(xué)到了什么玩意.. 該項(xiàng)目源碼對(duì)應(yīng)的gitHub地址(由觀看其視頻的人編寫...

    mengbo 評(píng)論0 收藏0
  • 180918-JDK之Deflater壓縮與Inflater解壓

    摘要:之壓縮與解壓解壓壓縮壓縮與解壓工具類在實(shí)際的應(yīng)用場景中,特別是對(duì)外傳輸數(shù)據(jù)時(shí),將原始數(shù)據(jù)壓縮之后丟出去,可以說是非常常見的一個(gè)了,平常倒是沒有直接使用原生的壓縮工具類,使用和的機(jī)會(huì)較多正好在實(shí)際的工作場景中遇到了,現(xiàn)在簡單的看下使用姿 title: 180918-JDK之Deflater壓縮與Inflater解壓tags: JDK categories: Java JDK dat...

    chemzqm 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

ephererid

|高級(jí)講師

TA的文章

閱讀更多
最新活動(dòng)
閱讀需要支付1元查看
<