摘要:整合到本文更加注重代碼實踐,對于配置相關的知識會一筆帶過,不做過多的詳解。筆者是上傳到私服,然后通過導入。接口是預留給開發者根據不同事件處理業務邏輯的接口。改造筆記二優化邏輯
Moquette簡介
Mqtt作為物聯網比較流行的協議現在已經被大范圍使用,其中也有很多開源的MQTT BROKEN。Moquette是用java基于netty實現的輕量級的MQTT BROKEN. Moquette基于Netty實現,性能問題至少前期可以不用考慮,在使用過程中還算穩定,沒有出現過較大的問題。github地址:https://github.com/andsel/moq...。
整合到SpringBoot本文更加注重代碼實踐,對于配置相關的知識會一筆帶過,不做過多的詳解。
假設已經搭建好SpringBoot環境,下載完Moquette。至于怎么引用Moquette,可以在原項目上修改,也可以達成Jar包添加到lib調用,也可以上傳到Maven私服后通過配置pom引用。筆者是上傳到Maven私服,然后通過maven導入。
自定義包裝類,實現io.moquette.server.Server的自動注入
@Slf4j @Service public class MoquetteServer { @Value("${mqtt-server.config-path}") private String configFilePath; @Autowired private IAuthorizator authorizator; /** * Safety相關的攔截器,如果有其它業務,可以再去實現一個攔截器處理其它業務 */ @Autowired @Qualifier("safetyInterceptHandler") private InterceptHandler safetyinterceptHandler; private Server mqttServer; public void startServer() throws IOException { IResourceLoader configFileResourceLoader = new ClasspathResourceLoader(configFilePath); final IConfig config = new ResourceLoaderConfig(configFileResourceLoader); mqttServer = new Server(); /**添加處理Safety相關的攔截器,如果有其它業務,可以再去實現一個攔截器處理其它業務,然后也添加上即可*/ ListinterceptHandlers = Arrays.asList(safetyinterceptHandler); /** * Authenticator 不顯示設置,Server會默認以password_file創建一個ResourceAuthenticator * 如果需要更靈活的連接驗證方案,可以繼承IAuthenticator接口,自定義實現 */ mqttServer.startServer(config, interceptHandlers, null, null, authorizator); } public void stop() { mqttServer.stopServer(); } }
解釋:
(1)添加@Service
(2)configFilePath是Moquette需要的moquette.conf的配置文件路徑,筆者將配置文件放到了resouces目錄下,在application.xml配置的路徑,因此通過@Value自動注入路徑值。
(3)因為是將配置文件放在了resources目錄下,所以用Moquette提供的ClasspathResourceLoader加載的配置文件
(4)IAuthorizator 是權限驗證接口
(5)InterceptHandler是Moquette訂閱、發布、建立連接等相關事件的攔截回調業務處理邏輯接口
2.實現IAuthorizator接口
@Component public class PermitAllAuthorizator implements IAuthorizator { @Override public boolean canWrite(Topic topic, String user, String client) { /**可以控制某個用戶的client,是否具有發布某個主題的權限,目前默認任何Client可以發布任主題*/ return true; } @Override public boolean canRead(Topic topic, String user, String client) { /**可以控制某個用戶的client,是否具有接收某個主題的權限,目前默認任何Client可以接收任何主題*/ return true; } }
解釋:實現另一個很簡單的授權接口,允許任何用戶所有的讀寫請求
3.實現InterceptHandler接口
@Slf4j @Component public class SafetyInterceptHandler extends AbstractInterceptHandler{ @Override public String getID() { return SafetyInterceptHandler.class.getName(); } @Override public void onConnect(InterceptConnectMessage msg) { } @Override public void onConnectionLost(InterceptConnectionLostMessage msg) { } @Override public void onPublish(InterceptPublishMessage msg) { } @Override public void onMessageAcknowledged(InterceptAcknowledgedMessage msg) { } }
解釋:
1.簡單實現InterceptHandler,繼承自AbstractInterceptHandler,并重寫了部分方法。可以根據業務需要實現不同的方法。InterceptHandler接口是Moquette預留給開發者根據不同事件處理業務邏輯的接口。
4.通過SpringBoot啟動Moquette
@SpringBootApplication public class MqttServiceApplication { public static void main(String[] args) throws IOException { SpringApplication application = new SpringApplication(MqttServiceApplication.class); final ApplicationContext context = application.run(args); MoquetteServer server = context.getBean(MoquetteServer.class); server.startServer(); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { server.stop(); log.info("Moquette Server stopped"); } }); } }
如果發現MoquetteServer無法啟動,是否是SpringBoot默認的包掃描機制的問題,可以通過@ComponentScan解決。
通過以上操作,就可以在任何想要使用MoquetteServer的地方,通過@Autowired自動注入。
當然在MoquetteServer中筆者只是簡單實現了封裝,并沒有實現其它方法,讀者完全可以根據自己的需要在MoquetteServer中實現自己需要的功能,但前提是你要對Moquette的源碼熟悉。
moquette改造筆記(二):優化BrokerInterceptor notifyTopicPublished()邏輯
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/77205.html
摘要:修改的實現,實現接口在改造筆記一整合到中修改實現,添加對的實現如下負載過大,處理不過來時,會回調該方法例如可以發生郵件通知相關人員改造筆記四解決中的調用兩次 發現問題 在io.moquette.spi.impl.BrokerInterceptor的構造函數中,新建了一個線程池,代碼如下: private BrokerInterceptor(int poolSize, List hand...
摘要:發現問題在使用中設備異常斷開中的。在中事件都是在鏈中依次傳遞的。事件最后傳遞到。解決方法添加會導致調用兩次解釋會在該從鏈中移除掉時被調用,一般的話沒有手動從鏈中刪除時,會在連接斷開后回調該方法。 發現問題 在使用中設備異常斷開,InterceptHandler中的onConnectionLost()。經過調試發現是MoquetteIdleTimeoutHandler中的代碼導致的,代碼...
摘要:優化邏輯優化方向向啟動方法一樣,每次調用的方法都是在線程池中新建一個任務具體代碼解釋新建一個用來實現調用方法。改造筆記三優化中的線程池 發現問題 下面部分是io.moquette.spi.impl.BrokerInterceptor.java部分源碼 @Override public void notifyClientConnected(final MqttConnectMes...
摘要:發現問題在使用中發現在設備頻繁上下線和兩個設備一樣相互頂替連接的情況下,的和的方法調用沒有先后順序,如果在這兩個方法里面來記錄設備上下線狀態,會造成狀態不對。因為相互頂替的情況并不多見,因此兩個也可以接受,在性能上并不會造成多大影響。 發現問題 在moquette使用中發現在設備頻繁上下線和兩個設備ClientId一樣相互頂替連接的情況下,InterceptHandler的onConn...
摘要:沒有顯式的通常調用需要指定域名才能定位到某個服務器上的某個具體應用,而通過的注冊中心,在調用時只需指定服務名稱,注冊中心會自動發現對應的具體服務。 起因 公司要做系統間的互通,所以需要程序之間互相調用接口,這塊一直是其他同事在做,但是今天一個新項目需要調用到其他系統的接口,所以看了下他們的調用方法,發現都是傳統的httpclient調用,外面做了一層封裝,類似這樣: HttpGet h...
閱讀 1268·2021-11-19 09:40
閱讀 3125·2021-11-02 14:47
閱讀 3091·2021-10-11 10:58
閱讀 3222·2019-08-30 15:54
閱讀 2677·2019-08-30 12:50
閱讀 1730·2019-08-29 16:54
閱讀 470·2019-08-29 15:38
閱讀 1242·2019-08-29 15:19