摘要:前言此博客所述項目代碼已在開源歡迎大家一起貢獻點此進入最近一次寫博客還是年底謝謝大家持久以來的關注本篇博文將會教大家如何從到搭建一個簡單高效且拓展性強的框架什么是相信大家都或多或少使用過框架比如阿里的谷歌的的等等那么究竟什么是翻譯成中文
Cool-Rpc 前言
此博客所述項目代碼已在github開源,歡迎大家一起貢獻!
點此進入:Cool-RPC
最近一次寫博客還是17年底,謝謝大家持久以來的關注
本篇博文將會教大家如何從0到1,搭建一個簡單、高效且拓展性強的rpc框架.
相信大家都或多或少使用過RPC框架,比如阿里的Dubbo、谷歌的grpc、Facebook的Thrift等等
那么究竟什么是rpc?
rpc翻譯成中文叫做遠程過程調用,通俗易懂點:將單應用架構成分布式系統架構后,多個系統間數據怎么交互,這就是rpc的職責.
從服務的角度來看,rpc分為服務提供者(provider)和服務消費者(consumer)兩大類,中間會有一些共用java接口,叫做開放api接口
也就是說,接口服務實現類所處的地方叫做provider,接口服務調用類所處的地方叫consumer
因為處于分布式環境中,那consumer調用provider時,如何知道對方服務器的IP和開放端口呢?
這時需要一個組件叫做注冊中心,consumer通過服務名后,去注冊中心上查找該服務的IP+Port,拿到地址數據后,再去請求該地址的服務
如圖:
Cool-Rpc技術簡介此項目基于傳輸層(TCP/IP協議)進行通訊,傳輸層框架使用netty編寫,github上會有mina版本
提供多套序列化框架,默認使用Protostuff序列化,可配置使用java序列化等
注冊中心默認zookeeper,可配置使用redis(只要有節點數據存儲和消息通知功能的組件即可)
consumer通過java動態代理的方式使用執行遠程調用
將所要執行的類名,方法,參數等通知provider,之后provider拿著數據調用本地實現類,將處理后得到的結果通知給consumer
廢話了那么多,開始上干貨,建議大家從github克隆完整代碼,本篇博文只講重點代碼
注冊中心以api接口名為key,IP+Port為value,將數據持久化,以供消費者查詢調用
以zookeeper為例:
為了更靈活地實現服務注冊者和發現者,這里添加一個注冊中心適配器
public abstract class ServiceCenterAdapter implements ServiceCenter{ String host; int port = 0; String passWord; ServiceCenterAdapter(){} ServiceCenterAdapter(String host){ this.host = host; } ServiceCenterAdapter(String host, int port) { this.host = host; this.port = port; } @Override public String discover(String serviceName) { return null; } @Override public void register(String serviceName, String serviceAddress) {} @Override public void setHost(String host){ this.host = host; }; @Override public void setPort(int port){ this.port = port; }; @Override public void setPassWord(String passWord){ this.passWord = passWord; }; //獲取 IP:端口 @Override public String getAddress(){ if ("".equals(host) || host == null || port == 0){ throw new RuntimeException("the zookeeper host or port error"); } return host+":"+String.valueOf(port); }; }
zookeeper的服務注冊(provider使用):
在實際項目中,需要構造此類,并注入相應的IP和端口,最后以bean的形式注入到IOC容器中
public class ZooKeeperServiceRegistry extends ServiceCenterAdapter { private static final Logger log = LoggerFactory.getLogger(ZooKeeperServiceRegistry.class); private ZkClient zkClient; { this.port = 2181; zkClient = new ZkClient(getAddress(), CoolConstant.ZK_SESSION_TIMEOUT, CoolConstant.ZK_CONNECTION_TIMEOUT); log.info("connect zookeeper"); } public ZooKeeperServiceRegistry(String zkHost) { super(zkHost); } public ZooKeeperServiceRegistry(String zkHost, int zkPort) { super(zkHost, zkPort); } // 注冊服務 serviceName=接口名 serviceAddress=IP+Port @Override public void register(String serviceName, String serviceAddress) { // create cool node permanent String registryPath = CoolConstant.ZK_REGISTRY_PATH; if (!zkClient.exists(registryPath)) { zkClient.createPersistent(registryPath); log.info("create registry node: {}", registryPath); } // create service node permanent String servicePath = registryPath + "/" + serviceName; if (!zkClient.exists(servicePath)) { zkClient.createPersistent(servicePath); log.info("create service node: {}", servicePath); } // create service address node temp String addressPath = servicePath + "/address-"; String addressNode = zkClient.createEphemeralSequential(addressPath, serviceAddress); log.info("create address node: {}", addressNode); } }
zookeeper的服務發現者(consumer使用):
同上,也需要配置相應的IP和端口,并以bean注入到項目ioc容器中
public class ZooKeeperServiceDiscovery extends ServiceCenterAdapter { private static final Logger log = LoggerFactory.getLogger(ZooKeeperServiceDiscovery.class); { super.port = 2181; } public ZooKeeperServiceDiscovery(){}; public ZooKeeperServiceDiscovery(String zkHost){ super(zkHost); } public ZooKeeperServiceDiscovery(String zkHost, int zkPort){ super(zkHost, zkPort); } // 服務發現 name=api接口名 @Override public String discover(String name) { ZkClient zkClient = new ZkClient(getAddress(), CoolConstant.ZK_SESSION_TIMEOUT, CoolConstant.ZK_CONNECTION_TIMEOUT); log.debug("connect zookeeper"); try { String servicePath = CoolConstant.ZK_REGISTRY_PATH + "/" + name; if (!zkClient.exists(servicePath)) { throw new RuntimeException(String.format("can not find any service node on path: %s", servicePath)); } List服務端TCP處理器addressList = zkClient.getChildren(servicePath); if (addressList.size() == 0) { throw new RuntimeException(String.format("can not find any address node on path: %s", servicePath)); } String address; int size = addressList.size(); if (size == 1) { address = addressList.get(0); log.debug("get only address node: {}", address); } else { address = addressList.get(ThreadLocalRandom.current().nextInt(size)); log.debug("get random address node: {}", address); } String addressPath = servicePath + "/" + address; return zkClient.readData(addressPath); } finally { zkClient.close(); } } }
此篇博文的TCP數據(包括編解碼器、處理器)全部以netty編寫
服務端的netty引導類:
public class CoolRpcServer implements ApplicationContextAware { private static Logger log = LoggerFactory.getLogger(CoolRpcServer.class); private Channel channel; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; private ServerBootstrap bootstrap; private HandlerInitializer handlerInitializer; private ServiceCenter serviceRegistry; private String serviceIP; private int port; public static MapservicesMap ; { bossGroup = new NioEventLoopGroup(1); workerGroup = new NioEventLoopGroup(); bootstrap = new ServerBootstrap(); handlerInitializer = new HandlerInitializer(); servicesMap = new HashMap<>(16); } public CoolRpcServer(ServiceCenter serviceRegistry, String serviceIP, int port){ this.serviceRegistry = serviceRegistry; this.serviceIP = serviceIP; this.port = port; } /** * start and init tcp server if ioc contain is booting */ @SuppressWarnings("unchecked") public void initServer() throws InterruptedException { bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(handlerInitializer); bootstrap.option(ChannelOption.SO_BACKLOG, 128); bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); // the most send bytes ( 256KB ) bootstrap.childOption(ChannelOption.SO_SNDBUF, 1024 * 256); // the most receive bytes ( 2048KB ) bootstrap.childOption(ChannelOption.SO_RCVBUF, 1024 * 1024 * 2); channel = bootstrap.bind(serviceIP,port).sync().channel(); if (servicesMap != null && servicesMap.size() > 0){ for (String beanName: servicesMap.keySet()){ serviceRegistry.register(beanName, serviceIP + ":" + String.valueOf(port)); log.info("register service name = {}", beanName); } } log.info("TCP server started successfully, port:{}", port); channel.closeFuture().sync(); } /** * close ioc contain and stop tcp server */ public void stopServer(){ if (channel != null && channel.isActive()) { channel.close(); } if (bossGroup != null) { bossGroup.shutdownGracefully(); } if (workerGroup != null) { workerGroup.shutdownGracefully(); } log.info("TCP server stopped successfully, port: {}", port); } /** * scan Annotation of CoolService */ @Override public void setApplicationContext(ApplicationContext ctx) throws BeansException { Map beans = ctx.getBeansWithAnnotation(CoolService.class); if (beans != null && beans.size()>0){ for (Object bean : beans.values()){ String name = bean.getClass().getAnnotation(CoolService.class).value().getName(); servicesMap.put(name, bean); } } } }
此項目的開放api接口實現類需要用@CoolService注解標識,服務端容器啟動時,會掃描所有帶有此注解的實現類,并注入到注冊中心
服務端處理器(netty handler):
@ChannelHandler.Sharable public class CoolServerHandler extends ChannelInboundHandlerAdapter { private static Logger log = LoggerFactory.getLogger(CoolServerHandler.class); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { CoolResponse response = new CoolResponse(); CoolRequest request = (CoolRequest) msg; try { Object result = invoke(request); response.setRequestID(request.getRequestID()); response.setResult(result); } catch (Throwable error) { response.setError(error); } ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } private Object invoke(CoolRequest request) throws Throwable{ if (request == null){ throw new Throwable("cool rpc request not found"); } String className = request.getClassName(); String methodName = request.getMethodName(); Object[] parameters = request.getParameters(); Object service = CoolRpcServer.servicesMap.get(className); if (service == null){ throw new Throwable("cool rpc service not exist"); } Class> serviceClass = service.getClass(); Class>[] parameterTypes = request.getParameterTypes(); FastClass fastClass = FastClass.create(serviceClass); FastMethod fastMethod = fastClass.getMethod(methodName, parameterTypes); return fastMethod.invoke(service, parameters); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.error("server caught exception", cause); ctx.close(); } }
將客戶端傳輸過來的請求數據(類名,方法,參數)在本地以cglib的方式反射調用
調用成功后,將處理完畢的結果編碼返回給客戶端,并且關閉TCP連接
consumer只有api接口,并沒有其實現類,所以我們可以用java動態代理的方式去自定義方法實現,代理的方法實現便是建立TCP握手連接,有provider來執行方法,將得到的結果返回給代理類,由此造成一種單憑接口就能調用實現類方法的假象
第一步: 使用java動態代理new出代理對象
public class CoolProxy { private static Logger log = LoggerFactory.getLogger(CoolProxy.class); private ServiceCenter serviceDiscovery; public CoolProxy(ServiceCenter serviceDiscovery){ this.serviceDiscovery = serviceDiscovery; } @SuppressWarnings("unchecked") publicT getInstance(Class cls){ return (T)Proxy.newProxyInstance(cls.getClassLoader(), new Class>[]{cls}, (proxy, method, args) -> { CoolRequest request = new CoolRequest(); request.setRequestID(UUID.randomUUID().toString()); request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setParameters(args); request.setParameterTypes(method.getParameterTypes()); String[] addr = serviceDiscovery.discover(cls.getName()).split(":",2); CoolRpcClient client = new CoolRpcClient(addr[0], Integer.parseInt(addr[1])); CoolResponse response = client.send(request); if (response.getError()!=null){ throw response.getError(); } else { return response.getResult(); } }); } }
第二步: 在代理方法中,使用遠程過程調用(rpc)
客戶端引導類:
public class CoolRpcClient { private static Logger log = LoggerFactory.getLogger(CoolRpcClient.class); private CountDownLatch countDownLatch; private EventLoopGroup group; private Bootstrap bootstrap; private CoolResponse response; private String serviceIP; private int port; { response = new CoolResponse(); countDownLatch = new CountDownLatch(1); group = new NioEventLoopGroup(); bootstrap = new Bootstrap(); } public CoolRpcClient(String serviceIP, int port){ this.serviceIP = serviceIP; this.port = port; } public CoolResponse send(CoolRequest request){ try { bootstrap.group(group); bootstrap.channel(NioSocketChannel.class); bootstrap.handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new CoolRpcDecoder(CoolResponse.class)) .addLast(new CoolRpcEncoder(CoolRequest.class)) .addLast(new CoolClientHandler(countDownLatch, response)); } }); bootstrap.option(ChannelOption.TCP_NODELAY, true); Channel channel = bootstrap.connect(serviceIP, port).sync().channel(); channel.writeAndFlush(request).sync(); countDownLatch.await(); channel.closeFuture().sync(); return response; } catch (Exception e){ e.printStackTrace(); return null; } finally { group.shutdownGracefully(); } } }
客戶端處理器(handler):
@ChannelHandler.Sharable public class CoolClientHandler extends ChannelInboundHandlerAdapter { private static Logger log = LoggerFactory.getLogger(CoolClientHandler.class); private CountDownLatch latch; private CoolResponse response; public CoolClientHandler(CountDownLatch latch, CoolResponse response){ this.latch = latch; this.response = response; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { CoolResponse enResponse = (CoolResponse) msg; this.response.sync(enResponse); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { latch.countDown(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("api caught exception", cause); ctx.close(); } }
最后使用CountDownLatch同步通知調用者,rpc調用完畢
結束語以上便是Cool-Rpc的簡單講解,如有更好的想法請聯系我
熱烈歡迎大家一起維護此項目Cool-RPC
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/76893.html
摘要:在文章微服務調用鏈追蹤中心搭建一文中模擬出來的調用鏈就是一個遠程調用的例子,只不過這篇文章里是通過這種同步調用方式,利用的是協議在應用層完成的,這種方法雖然奏效,但有時效率并不高。 showImg(https://segmentfault.com/img/remote/1460000014858219); 一、概述 RPC(Remote Procedure Call)即 遠程過程調...
摘要:在文章微服務調用鏈追蹤中心搭建一文中模擬出來的調用鏈就是一個遠程調用的例子,只不過這篇文章里是通過這種同步調用方式,利用的是協議在應用層完成的,這種方法雖然奏效,但有時效率并不高。 showImg(https://segmentfault.com/img/remote/1460000014858219); 一、概述 RPC(Remote Procedure Call)即 遠程過程調...
摘要:與文章框架實踐之一文中實踐的另一種通用框架能通過自動生成對應語言的接口類似,也能自動地生成和的存根,我們只需要一個命令就能快速搭建起運行環境。類似于之前對于框架的實踐步驟,下面一一闡述。 showImg(https://segmentfault.com/img/remote/1460000014946557); 概述 gRPC是Google開源的通用高性能RPC框架,它支持的是使用P...
摘要:與文章框架實踐之一文中實踐的另一種通用框架能通過自動生成對應語言的接口類似,也能自動地生成和的存根,我們只需要一個命令就能快速搭建起運行環境。類似于之前對于框架的實踐步驟,下面一一闡述。 showImg(https://segmentfault.com/img/remote/1460000014946557); 概述 gRPC是Google開源的通用高性能RPC框架,它支持的是使用P...
Github 地址:https://github.com/Snailclimb/springboot-integration-examples ,歡迎各位 Star。 目錄: 使用 SpringBoot+Dubbo 搭建一個簡單分布式服務 實戰之前,先來看幾個重要的概念 什么是分布式? 什么是 Duboo? Dubbo 架構 什么是 RPC? 為什么要用 Dubbo? 開始實戰 1 ...
閱讀 2782·2021-10-11 11:08
閱讀 1498·2021-09-30 09:48
閱讀 1059·2021-09-22 15:29
閱讀 1044·2019-08-30 15:54
閱讀 986·2019-08-29 15:19
閱讀 537·2019-08-29 13:12
閱讀 3172·2019-08-26 13:53
閱讀 971·2019-08-26 13:28