摘要:關(guān)注我轉(zhuǎn)載請務(wù)必注明原創(chuàng)地址為前提上篇文章寫完了流程啟動的一部分,方法都入口,以及創(chuàng)建運行的必須環(huán)境以及相關(guān)配置,接著就是創(chuàng)建該環(huán)境的節(jié)點了。的創(chuàng)建看下新建節(jié)點的代碼代碼比較多,這里是比較關(guān)鍵的地方,我就把注釋直接寫在代碼上面了,實在不好
關(guān)注我
轉(zhuǎn)載請務(wù)必注明原創(chuàng)地址為:http://www.54tianzhisheng.cn/2018/08/12/es-code03/
前提上篇文章寫完了 ES 流程啟動的一部分,main 方法都入口,以及創(chuàng)建 Elasticsearch 運行的必須環(huán)境以及相關(guān)配置,接著就是創(chuàng)建該環(huán)境的節(jié)點了。
Node 的創(chuàng)建看下新建節(jié)點的代碼:(代碼比較多,這里是比較關(guān)鍵的地方,我就把注釋直接寫在代碼上面了,實在不好拆開這段代碼,300 多行代碼)
public Node(Environment environment) { this(environment, Collections.emptyList()); //執(zhí)行下面的代碼 } protected Node(final Environment environment, Collection> classpathPlugins) { final List resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error boolean success = false; { // use temp logger just to say we are starting. we can"t use it later on because the node name might not be set Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(environment.settings())); logger.info("initializing ..."); } try { originalSettings = environment.settings(); Settings tmpSettings = Settings.builder().put(environment.settings()) .put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build(); // create the node environment as soon as possible, to recover the node id and enable logging try { nodeEnvironment = new NodeEnvironment(tmpSettings, environment); //1、創(chuàng)建節(jié)點環(huán)境,比如節(jié)點名稱,節(jié)點ID,分片信息,存儲元,以及分配內(nèi)存準(zhǔn)備給節(jié)點使用 resourcesToClose.add(nodeEnvironment); } catch (IOException ex) { throw new IllegalStateException("Failed to create node environment", ex); } final boolean hadPredefinedNodeName = NODE_NAME_SETTING.exists(tmpSettings); final String nodeId = nodeEnvironment.nodeId(); tmpSettings = addNodeNameIfNeeded(tmpSettings, nodeId); final Logger logger = Loggers.getLogger(Node.class, tmpSettings); // this must be captured after the node name is possibly added to the settings final String nodeName = NODE_NAME_SETTING.get(tmpSettings); if (hadPredefinedNodeName == false) { logger.info("node name derived from node ID [{}]; set [{}] to override", nodeId, NODE_NAME_SETTING.getKey()); } else { logger.info("node name [{}], node ID [{}]", nodeName, nodeId); } //2、打印出JVM相關(guān)信息 final JvmInfo jvmInfo = JvmInfo.jvmInfo(); logger.info( "version[{}], pid[{}], build[{}/{}/{}/{}], OS[{}/{}/{}], JVM[{}/{}/{}/{}]", Version.displayVersion(Version.CURRENT, Build.CURRENT.isSnapshot()), jvmInfo.pid(), Build.CURRENT.flavor().displayName(), Build.CURRENT.type().displayName(), Build.CURRENT.shortHash(), Build.CURRENT.date(), Constants.OS_NAME, Constants.OS_VERSION, Constants.OS_ARCH,Constants.JVM_VENDOR,Constants.JVM_NAME, Constants.JAVA_VERSION,Constants.JVM_VERSION); logger.info("JVM arguments {}", Arrays.toString(jvmInfo.getInputArguments())); //檢查當(dāng)前版本是不是 pre-release 版本(Snapshot), warnIfPreRelease(Version.CURRENT, Build.CURRENT.isSnapshot(), logger); 。。。 this.pluginsService = new PluginsService(tmpSettings, environment.configFile(), environment.modulesFile(), environment.pluginsFile(), classpathPlugins); //3、利用PluginsService加載相應(yīng)的模塊和插件 this.settings = pluginsService.updatedSettings(); localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId()); // create the environment based on the finalized (processed) view of the settings // this is just to makes sure that people get the same settings, no matter where they ask them from this.environment = new Environment(this.settings, environment.configFile()); Environment.assertEquivalent(environment, this.environment); final List > executorBuilders = pluginsService.getExecutorBuilders(settings); //線程池 final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0])); resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS)); // adds the context to the DeprecationLogger so that it does not need to be injected everywhere DeprecationLogger.setThreadContext(threadPool.getThreadContext()); resourcesToClose.add(() -> DeprecationLogger.removeThreadContext(threadPool.getThreadContext())); final List > additionalSettings = new ArrayList<>(pluginsService.getPluginSettings()); //額外配置 final List additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter()); for (final ExecutorBuilder> builder : threadPool.builders()) { //4、加載一些額外配置 additionalSettings.addAll(builder.getRegisteredSettings()); } client = new NodeClient(settings, threadPool);//5、創(chuàng)建一個節(jié)點客戶端 //6、緩存一系列模塊,如NodeModule,ClusterModule,IndicesModule,ActionModule,GatewayModule,SettingsModule,RepositioriesModule,scriptModule,analysisModule final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool); final ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class)); AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class)); // this is as early as we can validate settings at this point. we already pass them to ScriptModule as well as ThreadPool so we might be late here already final SettingsModule settingsModule = new SettingsModule(this.settings, additionalSettings, additionalSettingsFilter); scriptModule.registerClusterSettingsListeners(settingsModule.getClusterSettings()); resourcesToClose.add(resourceWatcherService); final NetworkService networkService = new NetworkService( getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class))); List clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class); final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool, ClusterModule.getClusterStateCustomSuppliers(clusterPlugins)); clusterService.addStateApplier(scriptModule.getScriptService()); resourcesToClose.add(clusterService); final IngestService ingestService = new IngestService(settings, threadPool, this.environment, scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class)); final DiskThresholdMonitor listener = new DiskThresholdMonitor(settings, clusterService::state, clusterService.getClusterSettings(), client); final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client, listener::onNewInfo); final UsageService usageService = new UsageService(settings); ModulesBuilder modules = new ModulesBuilder(); // plugin modules must be added here, before others or we can get crazy injection errors... for (Module pluginModule : pluginsService.createGuiceModules()) { modules.add(pluginModule); } final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, clusterInfoService); ClusterModule clusterModule = new ClusterModule(settings, clusterService, clusterPlugins, clusterInfoService); modules.add(clusterModule); IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class)); modules.add(indicesModule); SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class)); CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(), settingsModule.getClusterSettings()); resourcesToClose.add(circuitBreakerService); modules.add(new GatewayModule()); PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings); BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService); resourcesToClose.add(bigArrays); modules.add(settingsModule); List namedWriteables = Stream.of( NetworkModule.getNamedWriteables().stream(), indicesModule.getNamedWriteables().stream(), searchModule.getNamedWriteables().stream(), pluginsService.filterPlugins(Plugin.class).stream() .flatMap(p -> p.getNamedWriteables().stream()), ClusterModule.getNamedWriteables().stream()) .flatMap(Function.identity()).collect(Collectors.toList()); final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables); NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(Stream.of( NetworkModule.getNamedXContents().stream(), searchModule.getNamedXContents().stream(), pluginsService.filterPlugins(Plugin.class).stream() .flatMap(p -> p.getNamedXContent().stream()), ClusterModule.getNamedXWriteables().stream()) .flatMap(Function.identity()).collect(toList())); modules.add(new RepositoriesModule(this.environment, pluginsService.filterPlugins(RepositoryPlugin.class), xContentRegistry)); final MetaStateService metaStateService = new MetaStateService(settings, nodeEnvironment, xContentRegistry); final IndicesService indicesService = new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry, analysisModule.getAnalysisRegistry(), clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptModule.getScriptService(),client, metaStateService); Collection
上面代碼真的很多,這里再說下上面這么多代碼主要干了什么吧:(具體是哪行代碼執(zhí)行的如下流程,上面代碼中也標(biāo)記了)
1、創(chuàng)建節(jié)點環(huán)境,比如節(jié)點名稱,節(jié)點 ID,分片信息,存儲元,以及分配內(nèi)存準(zhǔn)備給節(jié)點使用
2、打印出 JVM 相關(guān)信息
3、利用 PluginsService 加載相應(yīng)的模塊和插件,具體哪些模塊可以去 modules 目錄下查看
4、加載一些額外的配置參數(shù)
5、創(chuàng)建一個節(jié)點客戶端
6、緩存一系列模塊,如NodeModule,ClusterModule,IndicesModule,ActionModule,GatewayModule,SettingsModule,RepositioriesModule,scriptModule,analysisModule
7、獲取 RestController,用于處理各種 Elasticsearch 的 rest 命令,如 _cat, _all, _cat/health, _clusters 等 rest命令
8、綁定處理各種服務(wù)的實例
9、利用 Guice 將各種模塊以及服務(wù)(xxxService)注入到 Elasticsearch 環(huán)境中
10、初始化工作完成(打印日志)
JarHell 報錯解釋前一篇閱讀源碼環(huán)境搭建的文章寫過用 JDK 1.8 編譯 ES 源碼是會遇到如下異常:
org.elasticsearch.bootstrap.StartupException: java.lang.IllegalStateException: jar hell!
這里說下就是 setup 方法中的如下代碼導(dǎo)致的
try { // look for jar hell final Logger logger = ESLoggerFactory.getLogger(JarHell.class); JarHell.checkJarHell(logger::debug); } catch (IOException | URISyntaxException e) { throw new BootstrapException(e); }
所以你如果是用 JDK 1.8 編譯的,那么就需要把所有的有這塊的代碼給注釋掉就可以編譯成功的。
我自己試過用 JDK 10 編譯是沒有出現(xiàn)這里報錯的。
正式啟動 ES 節(jié)點回到上面 Bootstrap 中的靜態(tài) init 方法中,接下來就是正式啟動 elasticsearch 節(jié)點了:
INSTANCE.start(); //調(diào)用下面的 start 方法 private void start() throws NodeValidationException { node.start(); //正式啟動 Elasticsearch 節(jié)點 keepAliveThread.start(); }
接下來看看這個 start 方法里面的 node.start() 方法源碼:
public Node start() throws NodeValidationException { if (!lifecycle.moveToStarted()) { return this; } Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings)); logger.info("starting ..."); pluginLifecycleComponents.forEach(LifecycleComponent::start); //1、利用Guice獲取上述注冊的各種模塊以及服務(wù) //Node 的啟動其實就是 node 里每個組件的啟動,同樣的,分別調(diào)用不同的實例的 start 方法來啟動這個組件, 如下: injector.getInstance(MappingUpdatedAction.class).setClient(client); injector.getInstance(IndicesService.class).start(); injector.getInstance(IndicesClusterStateService.class).start(); injector.getInstance(SnapshotsService.class).start(); injector.getInstance(SnapshotShardsService.class).start(); injector.getInstance(RoutingService.class).start(); injector.getInstance(SearchService.class).start(); nodeService.getMonitorService().start(); final ClusterService clusterService = injector.getInstance(ClusterService.class); final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class); nodeConnectionsService.start(); clusterService.setNodeConnectionsService(nodeConnectionsService); injector.getInstance(ResourceWatcherService.class).start(); injector.getInstance(GatewayService.class).start(); Discovery discovery = injector.getInstance(Discovery.class); clusterService.getMasterService().setClusterStatePublisher(discovery::publish); // Start the transport service now so the publish address will be added to the local disco node in ClusterService TransportService transportService = injector.getInstance(TransportService.class); transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class)); transportService.start(); assert localNodeFactory.getNode() != null; assert transportService.getLocalNode().equals(localNodeFactory.getNode()) : "transportService has a different local node than the factory provided"; final MetaData onDiskMetadata; try { // we load the global state here (the persistent part of the cluster state stored on disk) to // pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state. if (DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings)) {//根據(jù)配置文件看當(dāng)前節(jié)點是master還是data節(jié)點 onDiskMetadata = injector.getInstance(GatewayMetaState.class).loadMetaState(); } else { onDiskMetadata = MetaData.EMPTY_META_DATA; } assert onDiskMetadata != null : "metadata is null but shouldn"t"; // this is never null } catch (IOException e) { throw new UncheckedIOException(e); } validateNodeBeforeAcceptingRequests(new BootstrapContext(settings, onDiskMetadata), transportService.boundAddress(), pluginsService .filterPlugins(Plugin .class) .stream() .flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList())); //2、將當(dāng)前節(jié)點加入到一個集群簇中去,并啟動當(dāng)前節(jié)點 clusterService.addStateApplier(transportService.getTaskManager()); // start after transport service so the local disco is known discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService clusterService.start(); assert clusterService.localNode().equals(localNodeFactory.getNode()) : "clusterService has a different local node than the factory provided"; transportService.acceptIncomingRequests(); discovery.startInitialJoin(); // tribe nodes don"t have a master so we shouldn"t register an observer s final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings); if (initialStateTimeout.millis() > 0) { final ThreadPool thread = injector.getInstance(ThreadPool.class); ClusterState clusterState = clusterService.state(); ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, null, logger, thread.getThreadContext()); if (clusterState.nodes().getMasterNodeId() == null) { logger.debug("waiting to join the cluster. timeout [{}]", initialStateTimeout); final CountDownLatch latch = new CountDownLatch(1); observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { latch.countDown(); } @Override public void onClusterServiceClose() { latch.countDown(); } @Override public void onTimeout(TimeValue timeout) { logger.warn("timed out while waiting for initial discovery state - timeout: {}", initialStateTimeout); latch.countDown(); } }, state -> state.nodes().getMasterNodeId() != null, initialStateTimeout); try { latch.await(); } catch (InterruptedException e) { throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state"); } } } if (NetworkModule.HTTP_ENABLED.get(settings)) { injector.getInstance(HttpServerTransport.class).start(); } if (WRITE_PORTS_FILE_SETTING.get(settings)) { if (NetworkModule.HTTP_ENABLED.get(settings)) { HttpServerTransport http = injector.getInstance(HttpServerTransport.class); writePortsFile("http", http.boundAddress()); } TransportService transport = injector.getInstance(TransportService.class); writePortsFile("transport", transport.boundAddress()); } logger.info("started"); pluginsService.filterPlugins(ClusterPlugin.class).forEach(ClusterPlugin::onNodeStarted); return this; }
上面代碼主要是:
1、利用 Guice 獲取上述注冊的各種模塊以及服務(wù),然后啟動 node 里每個組件(分別調(diào)用不同的實例的 start 方法來啟動)
2、打印日志(啟動節(jié)點完成)
總結(jié)這篇文章主要把大概啟動流程串通了,講了下 node 節(jié)點的創(chuàng)建和正式啟動 ES 節(jié)點了。因為篇幅較多所以拆開成兩篇,先不扣細(xì)節(jié)了,后面流程啟動文章寫完后我們再單一的扣細(xì)節(jié)。
相關(guān)文章1、渣渣菜雞為什么要看 ElasticSearch 源碼?
2、渣渣菜雞的 ElasticSearch 源碼解析 —— 環(huán)境搭建
3、渣渣菜雞的 ElasticSearch 源碼解析 —— 啟動流程(上)
4、渣渣菜雞的 ElasticSearch 源碼解析 —— 啟動流程(下)
5、Elasticsearch 系列文章(一):Elasticsearch 默認(rèn)分詞器和中分分詞器之間的比較及使用方法
6、Elasticsearch 系列文章(二):全文搜索引擎 Elasticsearch 集群搭建入門教程
7、Elasticsearch 系列文章(三):ElasticSearch 集群監(jiān)控
8、Elasticsearch 系列文章(四):ElasticSearch 單個節(jié)點監(jiān)控
9、Elasticsearch 系列文章(五):ELK 實時日志分析平臺環(huán)境搭建
10、教你如何在 IDEA 遠(yuǎn)程 Debug ElasticSearch
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/76904.html
摘要:總結(jié)這篇文章主要先把大概啟動流程串通,因為篇幅較多所以拆開成兩篇,先不扣細(xì)節(jié)了,后面流程啟動文章寫完后我們再單一的扣細(xì)節(jié)。 關(guān)注我 showImg(https://segmentfault.com/img/remote/1460000012730965?w=258&h=258); 轉(zhuǎn)載請務(wù)必注明原創(chuàng)地址為:http://www.54tianzhisheng.cn/2018/08/11/...
摘要:注意這個版本需要和下面的源碼版本一致下載源碼從上下載相應(yīng)版本的源代碼,這里建議用,這樣的話后面你可以隨意切換到的其他版本去。我們看下有哪些版本的找到了目前源碼版本最新的版本的穩(wěn)定版為切換到該版本于是就可以切換到該穩(wěn)定版本了。 關(guān)注我 showImg(https://segmentfault.com/img/remote/1460000012730965?w=258&h=258); 轉(zhuǎn)載...
閱讀 2151·2021-10-14 09:43
閱讀 2207·2019-08-30 15:55
閱讀 739·2019-08-30 14:23
閱讀 2031·2019-08-30 13:21
閱讀 1247·2019-08-30 12:50
閱讀 2210·2019-08-29 18:46
閱讀 2292·2019-08-29 17:28
閱讀 2378·2019-08-29 17:21