欢迎光临散文网 会员登陆 & 注册

基于Java信令服务器WebRTC一对多直播

2020-10-30 01:36 作者:开源开发者  | 我要投稿




package org.kurento.tutorial.one2manycall;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import org.kurento.client.*;
import org.kurento.jsonrpc.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

public class MyDemo extends TextWebSocketHandler {


   // 序列化工具
   private static final Gson gson = new GsonBuilder().create();

   // 线程安全的hashMap
   private final ConcurrentHashMap<String, UserSession> viewers = new ConcurrentHashMap<>();

   // 注入与kms通信客户端
   @Autowired
   private KurentoClient kurento;

   // 媒体管线(管道)用于连接两个媒体元素之间的流
   private MediaPipeline pipeline;

   // 主播端session会话
   private UserSession presenterUserSession;

   /**
    * 处理客户端的数据包
    * @param session
    * @param message
    * @throws Exception
    */
   @Override
   protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {

       // 序列化消息到json对象
       JsonObject jsonMessage = gson.fromJson(message.getPayload(),JsonObject.class);

       switch (jsonMessage.get("id").getAsString()) {
           case "presenter":

               // 处理主播端发来的SdpOffer
               presenter(session, jsonMessage);
               break;
           case "viewer":

               // 处理观众端发来的SdpOffer
               viewer(session, jsonMessage);

               break;
           case "onIceCandidate":

               // 处理客户端发来的候选人信息
               onIceCandidate(session,jsonMessage);

               break;
           default:
               break;
       }

   }



   /**
    * 客户端关闭连接,释放媒体管道
    * @param session
    * @param status
    * @throws Exception
    */
   @Override
   public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {

       // 关闭客户端会话

   }

   /**
    * 处理主播端发来的SdpOffer
    * @param session
    * @param jsonMessage
    */
   private synchronized void presenter(final WebSocketSession session, JsonObject jsonMessage) {

       // 如果当前没有主播在直播
       if (presenterUserSession == null) {

           // 实例化一个用户会话对象
           presenterUserSession = new UserSession(session);

           // 创建一个流媒体管道
           pipeline = kurento.createMediaPipeline();

           // 通过流媒体管道实例化一个流媒体端点元素
           WebRtcEndpoint presenterWebRtc = new WebRtcEndpoint.Builder(pipeline).build();
           // 在主播端会话设置流媒体端点元素成员属性
           presenterUserSession.setWebRtcEndpoint(presenterWebRtc);


           /**
            * 注册IceCandidate监听器 用于监听生成的候选人信息 并发送给主播客户端
            * 实现基于IceCandidate监听器事件监听器
            */
           presenterWebRtc.addIceCandidateFoundListener(new EventListener<IceCandidateFoundEvent>() {

               /**
                * IceCandidate事件处理器
                *
                * @param event IceCandidateFoundEvent事件对象
                */
               @Override
               public void onEvent(IceCandidateFoundEvent event) {

                   JsonObject response = new JsonObject();
                   response.addProperty("id", "iceCandidate");

                   // 从事件对象拿到事件的IceCandidate信息
                   response.add("candidate", JsonUtils.toJsonObject(event.getCandidate()));

                   // 拿到候选信息之后发送给主播端的客户端
                   try {
                       synchronized (session) {
                           session.sendMessage(new TextMessage(response.toString()));
                       }
                   } catch (IOException e) {
                       e.printStackTrace();
                   }

               }
           });

           // 开始处理客户端传过来的SdpOffer
           String sdpOffer = jsonMessage.getAsJsonPrimitive("sdpOffer").getAsString();

           // 处理SdpOffer并从KMS服务端返回Sdp应答信息
           String sdpAnswer = presenterWebRtc.processOffer(sdpOffer);

           // 发送sdpAnswer应答信息到主播端的客户端处理
           JsonObject response = new JsonObject();
           response.addProperty("id", "presenterResponse");
           response.addProperty("response", "accepted");
           response.addProperty("sdpAnswer", sdpAnswer);


           // 同步线程发送sdpAnswer应答
           synchronized (session) {
               try {
                   presenterUserSession.sendMessage(response);
               } catch (IOException e) {
                   e.printStackTrace();
               }
           }

           // 开始收集KMS的presenterWebRtc候选信息
           presenterWebRtc.gatherCandidates();

       } else {
           // 释放媒体管道

       }

   }


   public void onIceCandidate(WebSocketSession session,JsonObject jsonMessage) {

       // 拿到客户端发来候选信息
       JsonObject candidate = jsonMessage.get("candidate").getAsJsonObject();

       // 声明一个UserSession对象
       UserSession user = null;

       // 如果主播端会话存在
       if (presenterUserSession != null) {
           // 判断当前的session是不是主播端session
           if (presenterUserSession.getSession() == session) {

               // 设置当前要处理的会话对象是主播端session
               user = presenterUserSession;

           } else {
               // 不是主播端就是观众端会话了
               user = viewers.get(session.getId());
           }
       }

       if (user != null) {

           /**
            * 为这个会话在kms添加候选信息以便进行流媒体传输通信
            * 封装候选信息到一个对象
            */

           IceCandidate cand = new IceCandidate(
                   candidate.get("candidate").getAsString(),
                   candidate.get("sdpMid").getAsString(),
                   candidate.get("sdpMLineIndex").getAsInt());
           // 开始会user添加候选人信息
           user.addCandidate(cand);
       }


   }

   /**
    * 处理观众端发来的SdpOffer
    * @param session
    * @param jsonMessage
    */
   public synchronized void viewer(final WebSocketSession session,JsonObject jsonMessage) {

       // 实例化一个新的观众对象
       UserSession viewer = new UserSession(session);

       // 把当前的会话添加到观众会话容器当中
       viewers.put(session.getId(), viewer);

       // 用当前的流媒体管道实例化一个新的观众媒体元素
       WebRtcEndpoint nextWebRtc = new WebRtcEndpoint.Builder(pipeline).build();

       // 添加候选监听器
       nextWebRtc.addIceCandidateFoundListener(new EventListener<IceCandidateFoundEvent>() {
           @Override
           public void onEvent(IceCandidateFoundEvent event) {
               JsonObject response = new JsonObject();
               response.addProperty("id","iceCandidate");
               response.add("candidate", JsonUtils.toJsonObject(event.getCandidate()));
               try{
                   synchronized(session){
                       session.sendMessage(new TextMessage(response.toString()));
                   }
               }catch (IOException e){
                   e.printStackTrace();
               }
           }
       });

       viewer.setWebRtcEndpoint(nextWebRtc);

       // 从主播端媒体元素的src端连接到当前观众端元素的sink端进行输,必须要媒体管道
       presenterUserSession.getWebRtcEndpoint().connect(nextWebRtc);

       // 处理sdpOffer提供
       String sdpOffer = jsonMessage.getAsJsonPrimitive("sdpOffer").getAsString();
       String sdpAnswer = nextWebRtc.processOffer(sdpOffer);

       // 发送kms端的媒体元素的sdpAnswer应答
       JsonObject response = new JsonObject();
       response.addProperty("id", "viewerResponse");
       response.addProperty("response", "accepted");
       response.addProperty("sdpAnswer", sdpAnswer);


       // 同步发送消息
       synchronized (session) {
           try {
               viewer.sendMessage(response);
           } catch (IOException e) {
               e.printStackTrace();
           }
       }

       // 开始收集候选信息
       nextWebRtc.gatherCandidates();

   }

}


var ws = new WebSocket('wss://' + location.host + '/call');  // 升级为WebSocket通信
var video; // 视频组件对象
var webRtcPeer; // 点对点通信对象

window.onload = function() {
   video = document.getElementById('video');

   $('#startLive').attr('onclick', 'startLive()');
   $('#checkLive').attr('onclick', 'checkLive()');
}

// 窗口关闭之前,断开WebSocket会话连接
window.onbeforeunload = function() {
   ws.close();
}

/**
* 开始实例化WebRtcPeer对象,设置候选收集监听回调函数,生成SDP提供,指定生成SDP回调函数
*/
function startLive() {

   var options = {
       localVideo : video,
       onicecandidate : onIceCandidate
   };

   webRtcPeer = new kurentoUtils.WebRtcPeer.WebRtcPeerSendonly(options, function (error) {

       if (error) {
           return console.log('实例化主播端webRtcPeer对象失败: ' + error);
       }

       // 生成主播端的SDP提供
       webRtcPeer.generateOffer(onAnchorOfferSdp);



   });


}



/**
* 异步监听
* 回调函数,得到主播端的SDPOffer并发送给后端
*/
function onAnchorOfferSdp(error, offerSdp) {
   if (error) {
       return console.log('监听生成OfferSdp失败');
   }
   console.log('监听生成OfferSdp失败');
   var message = {
       id : 'presenter', // presenter sendAnchorOfferSdp
       sdpOffer: offerSdp
   };

   // 发送主播端的OfferSdp
   sendMessage(message);

}

/**
* 处理服务端发送过来的sdpAnswer处理给主播端
* @param message
*/
function anchorProcessAnswer(message) {
   webRtcPeer.processAnswer(message.sdpAnswer, function (error) {
       if(error)
           return console.log("处理服务端发送过来的sdpAnswer处理给主播端错误: "+error);
   });
}


/**
* 异步监听
* 公共回调函数
* 收集候选通信数据,并发送给后端
* @param candidate
*/
function onIceCandidate(candidate) {

   var message = {
       id : 'onIceCandidate',
       candidate : candidate
   };

   // 发送主播端或观众端的候选数据到服务端
   sendMessage(message);

}

/**
* 公共处理服务端发送过来的候选人数据
* @param message
*/
function serverSendIceCandidate(message) {
   webRtcPeer.addIceCandidate(message.candidate, function (error) {
       if(error)
           return console.log('添加服务端的候选人失败: ' + error);
   });


}


/**
* 发送WebSocket请求到服务端
*/
function sendMessage(message) {
   // js对象序列化到字符串
   var jsonMessage = JSON.stringify(message);
   console.log("发送WebSocket请求到服务端:" + jsonMessage);

   //调用ws对象send接口
   ws.send(jsonMessage);

}


/**
* 观众端查看直播
*/
function checkLive() {

   var options = {
       remoteVideo : video,
       onicecandidate: onIceCandidate //与主播共用同一个回调函数
   };

   webRtcPeer = new kurentoUtils.WebRtcPeer.WebRtcPeerRecvonly(options, function (error) {
       if (error) {
           return console.log('实例化观众端webRtcPeer失败 ' + error);
       }
       // 生成观众端的SDP提供,需要指定回调函数
       this.generateOffer(onAudienceOfferSdp);


   });

}

/**
* 异步监听生成观众端的OfferSdp
* @param error
* @param offerSdp
*/
function onAudienceOfferSdp(error, offerSdp) {
   if (error) {
       return console.log('监听生成OfferSdp失败');
   }

   var message = {
       id : 'viewer', // viewer sendAudienceOfferSdp
       sdpOffer: offerSdp
   };

   sendMessage(message);

}


/**
* 处理服务端提供的sdp用于观众端
* @param message
*/
function audienceProcessAnswer(message) {
   webRtcPeer.processAnswer(message.sdpAnswer, function (error) {
       if(error)
           return console.log("处理服务端提供的sdp用于观众端错误: "+error);
   });
}


// 监听WebSocket消息

ws.onmessage = function (message) {

   // 字符串序列化到JSON对象
   var parsedMessage = JSON.parse(message.data);

   console.log('接收到消息:' + message.data);

   switch (parsedMessage.id) {
       case 'presenterResponse': // presenterResponse anchorProcessAnswer
           anchorProcessAnswer(parsedMessage);
           break;
       case 'viewerResponse': // viewerResponse audienceProcessAnswer
           audienceProcessAnswer(parsedMessage);
           break;
       case 'iceCandidate': // iceCandidate serverSendIceCandidate
           serverSendIceCandidate(parsedMessage)
           break;
       default:
           console.log('无法处理消息: ', parsedMessage);
   }

};



基于Java信令服务器WebRTC一对多直播的评论 (共 条)

分享到微博请遵守国家法律