摘要:經(jīng)過琢磨,其實(shí)是要考慮安全性的。具體在以下幾個(gè)方面跨域連接協(xié)議升級前握手?jǐn)r截器消息信道攔截器對于跨域問題,我們可以通過方法來設(shè)置可連接的域名,防止跨站連接。
前言
大學(xué)的學(xué)習(xí)時(shí)光臨近尾聲,感嘆時(shí)光匆匆,三年一晃而過。同學(xué)們都忙著找工作,我也在這里拋一份簡歷吧,歡迎各位老板和獵手誠邀。我們進(jìn)入正題。直播行業(yè)是當(dāng)前火熱的行業(yè),誰都想從中分得一杯羹,直播養(yǎng)活了一大批人,一個(gè)平臺主播粗略估計(jì)就有幾千號人,但是實(shí)時(shí)在線觀看量有的居然到了驚人的百萬級別,特別是游戲主播,可想而知,直播間是一個(gè)磁鐵式的廣告?zhèn)鞑ッ浇?,也難怪這么多巨頭公司都搶著做直播。我不太清楚直播行業(yè)技術(shù)有多深,畢竟自己沒做過,但是咱們可以自己實(shí)現(xiàn)一個(gè)滿足幾百號人同時(shí)觀看的直播間呀。
演示地址(電腦端與移動端效果不同哦)
服務(wù)端項(xiàng)目地址
客戶端項(xiàng)目地址
手機(jī)端效果
這個(gè)場景很熟悉吧~~ 通過obs推流軟件來推流。
戶外直播,通過yasea手機(jī)端推流軟件,使用手機(jī)攝像頭推流。
電腦端效果
播放香港衛(wèi)視
直播畫面
項(xiàng)目總覽項(xiàng)目分為三個(gè)部分:
客戶端
直播間視頻拉流、播放和聊天室,炫酷的彈幕以及直播間信息
服務(wù)端
處理直播間、用戶的數(shù)據(jù)業(yè)務(wù),聊天室消息的處理
服務(wù)器部署
視頻服務(wù)器和web服務(wù)器
移動客戶端
VUE全家桶
UI層vonic
axios
視頻播放器: vue-video-player + videojs-contrib-hls
websocket客戶端: vue-stomp
彈幕插件: vue-barrage
打包工具:webpack
電腦端客戶端
項(xiàng)目架構(gòu): Jquery + BootStrap
視頻播放器: video.js
websocket客戶端: stomp.js + sockjs.js
彈幕插件: Jquery.danmu.js
模版引擎: thymeleaf
服務(wù)端
IDE: IntelliJ IDEA
項(xiàng)目架構(gòu): SpringBoot1.5.4 +Maven3.0
主數(shù)據(jù)庫: Mysql5.7
輔數(shù)據(jù)庫: redis3.2
數(shù)據(jù)庫訪問層: spring-boot-starter-data-jpa + spring-boot-starter-data-redis
websocket: spring-boot-starter-websocket
消息中間件: RabbitMQ/3.6.10
服務(wù)器部署
視頻直播模塊: nginx-rtmp-module
web應(yīng)用服務(wù)器: tomcat8.0
服務(wù)器: 騰訊云centos6.5
技術(shù)點(diǎn)講解 直播間主要涉及到兩個(gè)主要功能:第一是視頻直播、第二是聊天室。這兩個(gè)都是非常講究實(shí)時(shí)性。視頻直播
說到直播我們先了解下幾個(gè)常用的直播流協(xié)議,看了挺多的流媒體協(xié)議文章博客,但都是非常粗略,這里有個(gè)比較詳細(xì)的 流媒體協(xié)議介紹,如果想詳細(xì)了解協(xié)議內(nèi)容估計(jì)去要看看專業(yè)書籍了。這里我們用到的只是rtmp和hls,實(shí)踐后發(fā)現(xiàn):rtmp只能夠在電腦端播放,hls只能夠在手機(jī)端播放。而且rtmp是相當(dāng)快的盡管沒有rtsp那么快,延遲只有幾秒,我測試的就差不多2-5秒,但是hls大概有10幾秒。所以如果你體驗(yàn)過demo,就會發(fā)現(xiàn)手機(jī)延遲比較多。
直播的流程:
直播分為推流和拉流兩個(gè)過程,那么流推向哪里,拉流又從哪里拉取呢?那當(dāng)然需要視頻服務(wù)器啦,千萬不要以為視頻直播服務(wù)器很復(fù)雜,其實(shí)在nginx服務(wù)器中一切都變得簡單。后面我會講解如何部署Nginx服務(wù)器并配置視頻模塊(nginx-rtmp-module).
首先主播通過推流軟件,比如OBS Studio推流軟件,這個(gè)是比較專業(yè)級別的,很多直播平臺的推薦主播使用這個(gè)軟件來推送視頻流,這里我也推薦一個(gè)開源的安卓端推流工具Yasea,下載地址,文件很小,但是很強(qiáng)大。
直播內(nèi)容推送到服務(wù)器后,就可以在服務(wù)器端使用視頻編碼工具進(jìn)行轉(zhuǎn)碼了,可以轉(zhuǎn)換成各種高清,標(biāo)清,超清的分辨率視頻,也就是為什么我們在各個(gè)視頻網(wǎng)站都可以選擇視頻清晰度。這里我們沒有轉(zhuǎn)碼,只是通過前端視頻播放器(video.js)來拉取視頻.這樣整個(gè)視頻推流拉流過程就完成了。
聊天室
直播間里面的聊天室跟我們的群聊天差不多,只不過它變成了web端,web端的即時(shí)通信方案有很多,這里我們選擇websocket協(xié)議來與服務(wù)端通信,websocket是基于http之上的傳輸協(xié)議,客戶端向服務(wù)端發(fā)送http請求,并攜帶Upgrade:websocket升級頭信息表示轉(zhuǎn)換websocket協(xié)議,通過與服務(wù)端握手成功后就可以建立tcp通道,由此來傳遞消息,它與http最大的差別就是,服務(wù)端可以主動向客戶端發(fā)送消息。
既然建立了消息通道,那我們就需要往通道里發(fā)消息,但是總得需要一個(gè)東西來管控消息該發(fā)給誰吧,要不然全亂套了,所以我們選擇了消息中間件RabbitMQ.使用它來負(fù)責(zé)消息的路由去向。
源碼地址
工程結(jié)構(gòu)|—— build 構(gòu)建服務(wù)和webpack配置 |—— congfig 項(xiàng)目不同環(huán)境的配置 |—— dist build生成生產(chǎn)目錄 |—— static 靜態(tài)資源 |—— package.json 項(xiàng)目配置文件 |—— src 開發(fā)源代碼目錄 |—— api 通過axios導(dǎo)出的api目錄 |—— components 頁面和組件 |—— public 公有組件 |—— vuex 全局狀態(tài) |—— main.js 應(yīng)用啟動配置點(diǎn)功能模塊
拉取服務(wù)器的直播視頻流(hls)并播放直播畫面
與服務(wù)端創(chuàng)建websocket連接,收發(fā)聊天室消息
通過websocket獲取消息并發(fā)送到彈幕
通過websocket實(shí)時(shí)更新在線用戶
結(jié)合服務(wù)端獲取訪問歷史記錄
問題反饋模塊
效果圖 項(xiàng)目說明請參考源碼
服務(wù)端實(shí)操源碼地址
由于個(gè)人比較喜歡接觸新的東西,所以后端選擇了springboot,前端選擇了Vue.js年輕人嘛總得跟上潮流。SpringBoot實(shí)踐過后發(fā)現(xiàn)真的太省心了,不用再理會各種配置文件,全自動化裝配。
這里貼一下pom.xml
4.0.0 com.hushangjie rtmp-demo 0.0.1-SNAPSHOT jar rtmp-demo Demo project for Spring Boot org.springframework.boot spring-boot-starter-parent 1.5.4.RELEASE UTF-8 UTF-8 1.8 org.springframework.boot spring-boot-devtools true org.springframework.boot spring-boot-starter-actuator org.springframework.boot spring-boot-actuator-docs org.springframework.boot spring-boot-starter-data-jpa org.springframework.boot spring-boot-starter-data-redis org.springframework.boot spring-boot-starter-thymeleaf net.sourceforge.nekohtml nekohtml 1.9.22 org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-websocket org.springframework.boot spring-boot-starter-test test org.webjars vue 2.1.3 mysql mysql-connector-java joda-time joda-time 2.9.2 io.projectreactor reactor-core 2.0.8.RELEASE io.projectreactor reactor-net 2.0.8.RELEASE io.netty netty-all 4.1.6.Final org.springframework.boot spring-boot-maven-plugin true
application.properties文件
spring.datasource.url=jdbc:mysql://host:3306/database?characterEncoding=utf8&useSSL=false spring.datasource.username=username spring.datasource.password=password spring.datasource.driver-class-name=com.mysql.jdbc.Driver spring.thymeleaf.mode=LEGACYHTML5 server.port=8085 # REDIS (RedisProperties) # Redis數(shù)據(jù)庫索引(默認(rèn)為0) spring.redis.database=0 # Redis服務(wù)器地址 spring.redis.host=127.0.0.1 # Redis服務(wù)器連接端口 spring.redis.port=6379 # Redis服務(wù)器連接密碼(默認(rèn)為空) spring.redis.password= # 連接池最大連接數(shù)(使用負(fù)值表示沒有限制) spring.redis.pool.max-active=8 # 連接池最大阻塞等待時(shí)間(使用負(fù)值表示沒有限制) spring.redis.pool.max-wait=-1 # 連接池中的最大空閑連接 spring.redis.pool.max-idle=8 # 連接池中的最小空閑連接 spring.redis.pool.min-idle=0 # 連接超時(shí)時(shí)間(毫秒) spring.redis.timeout=0websocket配置
@Configuration @EnableWebSocketMessageBroker public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer { //攔截器注入service失敗解決辦法 @Bean public MyChannelInterceptor myChannelInterceptor(){ return new MyChannelInterceptor(); } @Override public void registerStompEndpoints(StompEndpointRegistry registry) { //添加訪問域名限制可以防止跨域socket連接 //setAllowedOrigins("http://localhost:8085") registry.addEndpoint("/live").setAllowedOrigins("*").addInterceptors(new HandShkeInceptor()).withSockJS(); } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { /*.enableSimpleBroker("/topic","/queue");*/ //假如需要第三方消息代理,比如rabitMQ,activeMq,在這里配置 registry.setApplicationDestinationPrefixes("/demo") .enableStompBrokerRelay("/topic","/queue") .setRelayHost("127.0.0.1") .setRelayPort(61613) .setClientLogin("guest") .setClientPasscode("guest") .setSystemLogin("guest") .setSystemPasscode("guest") .setSystemHeartbeatSendInterval(5000) .setSystemHeartbeatReceiveInterval(4000); } @Override public void configureClientInboundChannel(ChannelRegistration registration) { ChannelRegistration channelRegistration = registration.setInterceptors(myChannelInterceptor()); super.configureClientInboundChannel(registration); } @Override public void configureClientOutboundChannel(ChannelRegistration registration) { super.configureClientOutboundChannel(registration); } }
配置類繼承了消息代理配置類,意味著我們將使用消息代理rabbitmq.使用registerStompEndpoints方法注冊一個(gè)websocket終端連接。這里我們需要了解兩個(gè)東西,第一個(gè)是stomp和sockjs,sockjs是啥呢,其實(shí)它是對于websocket的封裝,因?yàn)槿绻麊渭兪褂脀ebsocket的話效率會非常低,我們需要的編碼量也會增多,而且如果瀏覽器不支持websocket,sockjs會自動降級為輪詢策略,并模擬websocket,保證客戶端和服務(wù)端可以通信。
stomp有是什么看這里
stomp是一種簡單(流)文本定向消息協(xié)議,它提供了一個(gè)可互操作的連接格式,允許STOMP客戶端與任意STOMP消息代理(Broker)進(jìn)行交互,也就是我們上面的RabbbitMQ,它就是一個(gè)消息代理。
我們可以通過configureMessageBroker來配置消息代理,需要注意的是我們將要部署的服務(wù)器也應(yīng)該要有RabbitMQ,因?yàn)樗且粋€(gè)中間件,安裝非常容易,這里就不說明了。這里我們配置了“/topic,/queue”兩個(gè)代理轉(zhuǎn)播策略,就是說客戶端訂閱了前綴為“/topic,/queue”頻道都會通過消息代理(RabbitMQ)來轉(zhuǎn)發(fā)。跟spring沒啥關(guān)系啦,完全解耦。
一開始接觸 stomp的時(shí)候一直有個(gè)問題困擾我,客戶端只要與服務(wù)端通過websocket建立了連接,那么他就可以訂閱任何內(nèi)容,意味著可以接受任何消息,這樣豈不是亂了套啦,于是我翻閱了大量博客文章,很多都是官方的例子并沒有解決實(shí)際問題。經(jīng)過琢磨,其實(shí)websocket是要考慮安全性的。具體在以下幾個(gè)方面
跨域websocket連接
協(xié)議升級前握手?jǐn)r截器
消息信道攔截器
對于跨域問題,我們可以通過setAllowedOrigins方法來設(shè)置可連接的域名,防止跨站連接。
對于站內(nèi)用戶是否允許連接我們可以如下配置
public class HandShkeInceptor extends HttpSessionHandshakeInterceptor { private static final SetONLINE_USERS = new HashSet<>(); @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map attributes) throws Exception { System.out.println("握手前"+request.getURI()); //http協(xié)議轉(zhuǎn)換websoket協(xié)議進(jìn)行前,通常這個(gè)攔截器可以用來判斷用戶合法性等 //鑒別用戶 if (request instanceof ServletServerHttpRequest) { ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request; //這句話很重要如果getSession(true)會導(dǎo)致移動端無法握手成功 //request.getSession(true):若存在會話則返回該會話,否則新建一個(gè)會話。 //request.getSession(false):若存在會話則返回該會話,否則返回NULL //HttpSession session = servletRequest.getServletRequest().getSession(false); HttpSession session = servletRequest.getServletRequest().getSession(); UserEntity user = (UserEntity) session.getAttribute("user"); if (user != null) { //這里只使用簡單的session來存儲用戶,如果使用了springsecurity可以直接使用principal return super.beforeHandshake(request, response, wsHandler, attributes); }else { System.out.println("用戶未登錄,握手失敗!"); return false; } } return false; } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) { //握手成功后,通常用來注冊用戶信息 System.out.println("握手后"); super.afterHandshake(request, response, wsHandler, ex); } }
HttpSessionHandshakeInterceptor 這個(gè)攔截器用來管理握手和握手后的事情,我們可以通過請求信息,比如token、或者session判用戶是否可以連接,這樣就能夠防范非法用戶。
那如何限制用戶只能訂閱指定內(nèi)容呢?我們接著往下看
public class MyChannelInterceptor extends ChannelInterceptorAdapter { @Autowired private StatDao statDao; @Autowired private SimpMessagingTemplate simpMessagingTemplate; @Override public boolean preReceive(MessageChannel channel) { System.out.println("preReceive"); return super.preReceive(channel); } @Override public Message> preSend(Message> message, MessageChannel channel) { StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); StompCommand command = accessor.getCommand(); //檢測用戶訂閱內(nèi)容(防止用戶訂閱不合法頻道) if (StompCommand.SUBSCRIBE.equals(command)) { //從數(shù)據(jù)庫獲取用戶訂閱頻道進(jìn)行對比(這里為了演示直接使用set集合代替) SetsubedChannelInDB = new HashSet<>(); subedChannelInDB.add("/topic/group"); subedChannelInDB.add("/topic/online_user"); if (subedChannelInDB.contains(accessor.getDestination())) { //該用戶訂閱的頻道合法 return super.preSend(message, channel); } else { //該用戶訂閱的頻道不合法直接返回null前端用戶就接受不到該頻道信息。 return null; } } else { return super.preSend(message, channel); } } @Override public void afterSendCompletion(Message> message, MessageChannel channel, boolean sent, Exception ex) { //System.out.println("afterSendCompletion"); //檢測用戶是否連接成功,搜集在線的用戶信息如果數(shù)據(jù)量過大我們可以選擇使用緩存數(shù)據(jù)庫比如redis, //這里由于需要頻繁的刪除和增加集合內(nèi)容,我們選擇set集合來存儲在線用戶 StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); StompCommand command = accessor.getCommand(); if (StompCommand.SUBSCRIBE.equals(command)){ Map map = (Map ) accessor.getHeader("simpSessionAttributes"); //ONLINE_USERS.add(map.get("user")); UserEntity user = map.get("user"); if(user != null){ statDao.pushOnlineUser(user); Guest guest = new Guest(); guest.setUserEntity(user); guest.setAccessTime(Calendar.getInstance().getTimeInMillis()); statDao.pushGuestHistory(guest); //通過websocket實(shí)時(shí)返回在線人數(shù) this.simpMessagingTemplate.convertAndSend("/topic/online_user",statDao.getAllUserOnline()); } } //如果用戶斷開連接,刪除用戶信息 if (StompCommand.DISCONNECT.equals(command)){ Map map = (Map ) accessor.getHeader("simpSessionAttributes"); //ONLINE_USERS.remove(map.get("user")); UserEntity user = map.get("user"); if (user != null){ statDao.popOnlineUser(user); simpMessagingTemplate.convertAndSend("/topic/online_user",statDao.getAllUserOnline()); } } super.afterSendCompletion(message, channel, sent, ex); } }
在stomp里面,Channel信道就是消息傳送的通道,客戶端與服務(wù)端建立了連接就相當(dāng)于建立了通道,以后的信息就是通過這個(gè)通道來傳輸。所有的消息都有消息頭,被封裝在了spring 的messag接口中,比如建立連接時(shí)候消息頭就含有CONNECT,當(dāng)然還有一些其他的信息。客戶端訂閱的時(shí)候也有訂閱頭信息SUBSCRIBE,那么我是不是可以在這個(gè)攔截器ChannelInterceptorAdapter 中攔截每個(gè)人的訂閱信息,然后與數(shù)據(jù)庫的信息作比對,最后決定這個(gè)用戶是否可以訂閱這個(gè)頻道的信息呢,對的,這是我的想法,按照這樣的思路,做單聊不是迎刃而解了嗎。
那客戶端通過websocket發(fā)送的消息如何到達(dá)訂閱者手中呢,按照rabbitmq的規(guī)則,訂閱者屬于消費(fèi)者,發(fā)送消息的一方屬于生產(chǎn)者,生產(chǎn)者通過websocket把消息發(fā)送到服務(wù)端,服務(wù)端通過轉(zhuǎn)發(fā)給消息代理(rabbitmq),消息代理負(fù)責(zé)存儲消息,管理發(fā)送規(guī)則,推送消息給訂閱者,看下面的代碼
@MessageMapping(value = "/chat") @SendTo("/topic/group") public MsgEntity testWst(String message , @Header(value = "simpSessionAttributes") Mapsession){ UserEntity user = (UserEntity) session.get("user"); String username = user.getRandomName(); MsgEntity msg = new MsgEntity(); msg.setCreator(username); msg.setsTime(Calendar.getInstance()); msg.setMsgBody(message); return msg; }
@MessageMapping看起來跟springmvc方法特別像,它即可以用在類級別上也可以用在方法級別上
當(dāng)發(fā)送者往‘/chat’發(fā)送消息后,服務(wù)端接受到消息,再發(fā)送給“/topic/group”的訂閱者,@SendTo就是發(fā)送給誰,這里需要注意的有,如果我們沒有配置消息代理,只使用了enableSimpleBroker("/topic","/queue")簡單消息代理,那么就是直接發(fā)送到消息訂閱者,如果配置了消息代理,那還要通過消息代理,由它來轉(zhuǎn)發(fā)。
如果我們想在服務(wù)端隨時(shí)發(fā)送消息,而不是在客戶端發(fā)送(這樣的場景很常見,比如發(fā)送全局通知),可以使用SimpMessagingTemplate類,通過注入該bean,在合適的業(yè)務(wù)場景中發(fā)送消息。
Redis統(tǒng)計(jì)數(shù)據(jù)直播間經(jīng)常需要統(tǒng)計(jì)數(shù)據(jù),比如實(shí)時(shí)在線人數(shù),訪問量,貢獻(xiàn)排行榜,訂閱量。我選擇的方案是使用redis來計(jì)數(shù),盡管這個(gè)demo可能不會太多人訪問,但是我的目的是學(xué)習(xí)如何使用redis
先看springboot中redis的配置
@Configuration public class RedisConfig extends CachingConfigurerSupport{ /** * 生成key的策略 * * @return */ @Bean public KeyGenerator keyGenerator() { return new KeyGenerator() { @Override public Object generate(Object target, Method method, Object... params) { StringBuilder sb = new StringBuilder(); sb.append(target.getClass().getName()); sb.append(method.getName()); for (Object obj : params) { sb.append(obj.toString()); } return sb.toString(); } }; } /** * 管理緩存 * * @param redisTemplate * @return */ @SuppressWarnings("rawtypes") @Bean public CacheManager cacheManager(RedisTemplate redisTemplate) { RedisCacheManager rcm = new RedisCacheManager(redisTemplate); //設(shè)置緩存過期時(shí)間 // rcm.setDefaultExpiration(60);//秒 //設(shè)置value的過期時(shí)間 Mapmap=new HashMap(); map.put("test",60L); rcm.setExpires(map); return rcm; } /** * RedisTemplate配置 * @param factory * @return */ @Bean public RedisTemplate redisTemplate(RedisConnectionFactory factory) { StringRedisTemplate template = new StringRedisTemplate(factory); Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); template.setValueSerializer(jackson2JsonRedisSerializer);//如果key是String 需要配置一下StringSerializer,不然key會亂碼 /XX/XX template.afterPropertiesSet(); //template.setStringSerializer(); return template; } }
redis數(shù)據(jù)統(tǒng)計(jì)Dao的實(shí)現(xiàn)
@Repository public class StatDao { @Autowired RedisTemplate redisTemplate; public void pushOnlineUser(UserEntity userEntity){ redisTemplate.opsForSet().add("OnlineUser",userEntity); } public void popOnlineUser(UserEntity userEntity){ redisTemplate.opsForSet().remove("OnlineUser" ,userEntity); } public Set getAllUserOnline(){ return redisTemplate.opsForSet().members("OnlineUser"); } public void pushGuestHistory(Guest guest){ //最多存儲指定個(gè)數(shù)的訪客 if (redisTemplate.opsForList().size("Guest") == 200l){ redisTemplate.opsForList().rightPop("Guest"); } redisTemplate.opsForList().leftPush("Guest",guest); } public List getGuestHistory(){ return redisTemplate.opsForList().range("Guest",0,-1); } }
Dao層非常簡單,因?yàn)槲覀冎恍枰y(tǒng)計(jì)在線人數(shù)和訪客。但是在線人數(shù)是實(shí)時(shí)更新的,既然我們使用了websocket實(shí)時(shí)數(shù)據(jù)更新就非常容易了,前面我們講過,通過信道攔截器可以攔截連接,訂閱,斷開連接等等事件信息,所以我們就可以當(dāng)用戶連接時(shí)存儲在線用戶,通過websocket返回在線用戶信息。
public class MyChannelInterceptor extends ChannelInterceptorAdapter { @Autowired private StatDao statDao; @Autowired private SimpMessagingTemplate simpMessagingTemplate; @Override public boolean preReceive(MessageChannel channel) { System.out.println("preReceive"); return super.preReceive(channel); } @Override public Message> preSend(Message> message, MessageChannel channel) { StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); StompCommand command = accessor.getCommand(); //檢測用戶訂閱內(nèi)容(防止用戶訂閱不合法頻道) if (StompCommand.SUBSCRIBE.equals(command)) { //從數(shù)據(jù)庫獲取用戶訂閱頻道進(jìn)行對比(這里為了演示直接使用set集合代替) SetsubedChannelInDB = new HashSet<>(); subedChannelInDB.add("/topic/group"); subedChannelInDB.add("/topic/online_user"); if (subedChannelInDB.contains(accessor.getDestination())) { //該用戶訂閱的頻道合法 return super.preSend(message, channel); } else { //該用戶訂閱的頻道不合法直接返回null前端用戶就接受不到該頻道信息。 return null; } } else { return super.preSend(message, channel); } } @Override public void afterSendCompletion(Message> message, MessageChannel channel, boolean sent, Exception ex) { //System.out.println("afterSendCompletion"); //檢測用戶是否連接成功,搜集在線的用戶信息如果數(shù)據(jù)量過大我們可以選擇使用緩存數(shù)據(jù)庫比如redis, //這里由于需要頻繁的刪除和增加集合內(nèi)容,我們選擇set集合來存儲在線用戶 StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); StompCommand command = accessor.getCommand(); if (StompCommand.SUBSCRIBE.equals(command)){ Map map = (Map ) accessor.getHeader("simpSessionAttributes"); //ONLINE_USERS.add(map.get("user")); UserEntity user = map.get("user"); if(user != null){ statDao.pushOnlineUser(user); Guest guest = new Guest(); guest.setUserEntity(user); guest.setAccessTime(Calendar.getInstance().getTimeInMillis()); statDao.pushGuestHistory(guest); //通過websocket實(shí)時(shí)返回在線人數(shù) this.simpMessagingTemplate.convertAndSend("/topic/online_user",statDao.getAllUserOnline()); } } //如果用戶斷開連接,刪除用戶信息 if (StompCommand.DISCONNECT.equals(command)){ Map map = (Map ) accessor.getHeader("simpSessionAttributes"); //ONLINE_USERS.remove(map.get("user")); UserEntity user = map.get("user"); if (user != null){ statDao.popOnlineUser(user); simpMessagingTemplate.convertAndSend("/topic/online_user",statDao.getAllUserOnline()); } } super.afterSendCompletion(message, channel, sent, ex); } }
由于這個(gè)項(xiàng)目有移動端和電腦端,所以需要根據(jù)請求代理UserAgent來判斷客戶端屬于哪一種類型。這個(gè)工具類在源碼上有。我就不貼了。
服務(wù)器部署說了這么多即時(shí)通信,卻沒發(fā)現(xiàn)視頻直播。不要著急我們馬上進(jìn)入視頻環(huán)節(jié)。文章開頭就說明了幾種媒體流協(xié)議,這里不講解詳細(xì)的協(xié)議流程,只需要知道,我們是通過推流軟件采集視頻信息,如何采集也不是我們關(guān)注的。采集到信息后通過軟件來推送到指定的服務(wù)器,如下圖
obs推流設(shè)置
yasea手機(jī)端推流設(shè)置
紅色部分是服務(wù)器開放的獲取流接口。
Nginx-rtmp-module配置視頻服務(wù)器有很多,也支持很多媒體流協(xié)議。這里我們選擇nginx-rtmp-module來做視頻服務(wù),接下來我們需要在linux下安裝nginx,并安裝rtmp模塊。本人也是linux初學(xué)者,一步步摸索著把服務(wù)器搭建好,聽說tomcat和nginx很配哦,所以作為免費(fèi)開源的當(dāng)然首選這兩個(gè)。
接下來需要在linux安裝一下軟件和服務(wù)。
Nginx以及Nginx-rtmp-module
Tomcat
Mysql
Redis
RabbitMQ
安裝步驟我就不說了,大家搜索一下啦,這里貼一下nginx.conf文件配置
rtmp { server { listen 1935; chunk_size 4096; application video { play /yjdata/www/www/video; } application live { live on; hls on; hls_path /yjdata/www/www/live/hls/; hls_fragment 5s; } } }
上面代碼是配置rtmp模塊, play /yjdata/www/www/video 指的是配置點(diǎn)播模塊,可以直接播放/yjdata/www/www/video路徑下的視頻。hls_path制定hls分塊存放路徑,因?yàn)閔ls是通過獲取到推送的視頻流信息,分塊存儲在服務(wù)器。所以它的延時(shí)比rtmp要更高。
server { listen 80; server_name localhost; #charset koi8-r; index index.jsp index.html; root /yjdata/www/www; #access_log logs/host.access.log main; location / { proxy_pass http://127.0.0.1:8080; } location ~ .*.(gif|jpg|jpeg|png|bmp|swf|js|css|docx|pdf|doc|ppt|html|properties)$ { expires 30d; root /yjdata/www/www/static/; } location /hls { types { application/vnd.apple.mpegurl m3u8; #application/x-mpegURL; video/mp2t ts; } alias /yjdata/www/www/live/hls/; expires -1; add_header Cache-Control no-cache; } location /stat { rtmp_stat all; rtmp_stat_stylesheet stat.xsl; } location /stat.xsl { root /soft/nginx/nginx-rtmp-module/; }
上面配置了location 指向/hls,別名是/yjdata/www/www/live/hls/,所以可以在前端直接通過域名+/hls/+文件名.m3u8獲取直播視頻。
關(guān)于nginx的配置還有很多,我也在學(xué)習(xí)當(dāng)中??偠灾畁ginx非常強(qiáng)大。
通過從前端=>后臺=>服務(wù)器,整個(gè)流程走下來還是需要花很多心思。但是收獲也是很多。本人將從大學(xué)出來,初出茅廬,文章錯(cuò)誤之處,盡請指正。本人郵箱979783618@qq.com
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/39572.html
摘要:安裝配置在阿里云控制臺購買并啟動之前說過,我們選擇阿里云作為云提供商。重要目錄及文件默認(rèn)的安裝目錄默認(rèn)全局配置文件默認(rèn)子配置文件目錄默認(rèn)根目錄總結(jié)本文詳細(xì)介紹了如何在阿里云上購買并啟動一個(gè)的新實(shí)例,以及如何安裝配置和測試和。 本文是鋼哥的Oracle APEX系列文章中的其中一篇,完整 Oracle APEX 系列文章如下: Oracle APEX 系列文章1:Oracle APEX...
閱讀 780·2021-11-23 09:51
閱讀 844·2021-11-23 09:51
閱讀 2514·2021-11-15 18:01
閱讀 3873·2021-10-11 11:07
閱讀 2409·2021-09-22 15:30
閱讀 1082·2021-09-22 14:59
閱讀 1564·2019-08-30 15:55
閱讀 1762·2019-08-30 15:52