摘要:需要對外提供配置接口,通過配置接口,動態配置,可接收的消息。遇到問題正式這種注解的方式,使得只能在代碼中寫死,沒法動態修改。而我想實現的效果是能夠動態的修改,動態的創建。因此能不能動態修改配置文件對應的變量,然后消費者動態注入。
背景
最近在寫一個Mqtt消息轉發的中間件,可通過ActvieMq接收消息。需要對外提供配置接口,通過配置接口,動態配置Queue,可接收Queue的消息。
整個項目依賴于SpringBoot ,通過SpringBoot實現隊列消費,只需要通過@JmsListener(destination = "queueName") 注解,就可以實現對特定隊列的消費。
遇到問題正式這種注解的方式,使得destination只能在代碼中寫死,沒法動態修改。 而我想實現的效果是能夠動態的修改destination,動態的創建Consumer。
解決方案繼續使用@JmsListener,找到某種方式,能夠動態修改destination
不使用@JmsListener,通過ActiveMq提供的Jar,手動實現連接、消費等。之前通過這種方式實現過RabbitMq的處理,因此該方案不會有難度,只是工作量多一些。
由于SpringBoot的過分簡單,因此開始嘗試通過第一種方式。
解決過程一開始想著通過反射修改注解destination但是嘗試失敗。后來想到@JmsListener(destination = "${xxx}")這種方式,根據配置文件,可以修改消費的隊列,但是需要從新啟動。 因此能不能動態修改配置文件對應的變量,然后消費者動態注入Spring。
自定義實現一個MapPropertySource,然后加入到Environment中
@Configuration
public class DynamicTestSetting {
public static final String DYNAMIC_CONFIG = "dynamic_settting"; @Autowired AbstractEnvironment environment; @PostConstruct public void init() { environment.getPropertySources().addFirst(new DynamicLoadPropertySource(DYNAMIC_CONFIG, null)); }
}
@Slf4j
public class DynamicLoadPropertySource extends MapPropertySource {
private static Mapmap = new ConcurrentHashMap (64); private static ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1); static { scheduled.scheduleAtFixedRate(new Runnable() { @Override public void run() { //測試:定時修改value //真正使用可能需要傳遞一個參數或者定時讀取配置文件 map.put("test", String.valueOf(System.currentTimeMillis())); } }, 1, 1, TimeUnit.SECONDS); } public DynamicLoadPropertySource(String name, Map source) { super(name, map); } @Override public Object getProperty(String name) { return map.get(name); }
}
2.消費者需要動態加入
@Slf4j
public class TestConsumer {
@JmsListener(destination = "${test}") public void receiveQueue(BytesMessage msg) { //接收消息后的處理邏輯 }
}
/*動態注入bean到spring,需要用到ApplicationContext/
DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) context.getAutowireCapableBeanFactory();
BeanDefinitionBuilder definitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(TestConsumer.class); defaultListableBeanFactory.registerBeanDefinition("testConsumer",definitionBuilder.getBeanDefinition());
//調用getBeaan時Spring會創建一個TestConsumer實例,這個時候ActiveMq中會創建一個destination = "${test}"隊列,TestConsummer和其綁定消費
TestConsumer consumer = context.getBean("testConsumer", TestConsumer.class); System.out.println(consumer);動態關閉Queue的消費者
參見另一篇:https://segmentfault.com/n/13...
參考文章:https://blog.csdn.net/qq_3491...
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/71482.html
摘要:本文主要講述消息服務在中的使用。所以需要一個監聽容器工廠的概念,即接口,它會引用上面創建好的與的連接工廠,由它來負責接收消息以及將消息分發給指定的監聽器。為了消費消息,訂閱者必須保持運行的狀態。 JMS 在 SpringBoot 中的使用 摘要:本文屬于原創,歡迎轉載,轉載請保留出處:https://github.com/jasonGeng88/blog> 本文所有服務均采用doc...
摘要:介紹它是出品,最流行的,能力強勁的開源消息總線。是一個完全支持和規范的實現,盡管規范出臺已經是很久的事情了,但是在當今的應用中間仍然扮演著特殊的地位。相關文章整合使用整合使用關注我轉載請務必注明原創地址為安裝同之前一樣,直接在里面玩吧。 showImg(https://segmentfault.com/img/remote/1460000012996066?w=1920&h=1281)...
摘要:安裝到官方網站下載最新的的安裝包,并解壓到本地目錄下,下載鏈接如下。修改消費者使用配置消費者監聽的隊列,其中是接收到的消息收到的報文為接收到的消息重新執行 安裝ActiveMQ 到Apache官方網站下載最新的ActiveMQ的安裝包,并解壓到本地目錄下,下載鏈接如下:http://activemq.apache.org/do...。showImg(https://segmentfau...
摘要:本文旨在指出中集成的一些性能陷阱,在另一篇文章各組件詳解里有組件介紹及如何正確使用的內容。因此的做法會大大降低性能,并且將大部分的時間都花在反復重建這些對象上。提供的可以讓使用避免頻繁創建的問題。至于使用的性能測試則留給同學自己做了。 Github 本文旨在指出Spring/Spring Boot中集成JMS的一些性能陷阱,在另一篇文章Spring JMS各組件詳解里有Spring J...
摘要:還自動配置發送和接收消息所需的基礎設施。支持是一個輕量級的可靠的可伸縮的可移植的消息代理,基于協議,使用通過協議進行通信。 32. 消息傳遞 Spring框架為與消息傳遞系統集成提供了廣泛的支持,從使用JmsTemplate簡化的JMS API到使用完整的基礎設施異步接收消息,Spring AMQP為高級消息隊列協議提供了類似的特性集。Spring Boot還為RabbitTempla...
閱讀 1384·2021-11-15 18:11
閱讀 2512·2021-08-19 10:56
閱讀 680·2021-08-09 13:42
閱讀 795·2019-08-30 15:53
閱讀 2088·2019-08-30 10:55
閱讀 3145·2019-08-29 17:18
閱讀 1437·2019-08-29 13:45
閱讀 546·2019-08-29 13:15