基于Java信令服务器WebRTC一对多直播



package org.kurento.tutorial.one2manycall;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import org.kurento.client.*;
import org.kurento.jsonrpc.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
public class MyDemo extends TextWebSocketHandler {
// 序列化工具
private static final Gson gson = new GsonBuilder().create();
// 线程安全的hashMap
private final ConcurrentHashMap<String, UserSession> viewers = new ConcurrentHashMap<>();
// 注入与kms通信客户端
@Autowired
private KurentoClient kurento;
// 媒体管线(管道)用于连接两个媒体元素之间的流
private MediaPipeline pipeline;
// 主播端session会话
private UserSession presenterUserSession;
/**
* 处理客户端的数据包
* @param session
* @param message
* @throws Exception
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
// 序列化消息到json对象
JsonObject jsonMessage = gson.fromJson(message.getPayload(),JsonObject.class);
switch (jsonMessage.get("id").getAsString()) {
case "presenter":
// 处理主播端发来的SdpOffer
presenter(session, jsonMessage);
break;
case "viewer":
// 处理观众端发来的SdpOffer
viewer(session, jsonMessage);
break;
case "onIceCandidate":
// 处理客户端发来的候选人信息
onIceCandidate(session,jsonMessage);
break;
default:
break;
}
}
/**
* 客户端关闭连接,释放媒体管道
* @param session
* @param status
* @throws Exception
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
// 关闭客户端会话
}
/**
* 处理主播端发来的SdpOffer
* @param session
* @param jsonMessage
*/
private synchronized void presenter(final WebSocketSession session, JsonObject jsonMessage) {
// 如果当前没有主播在直播
if (presenterUserSession == null) {
// 实例化一个用户会话对象
presenterUserSession = new UserSession(session);
// 创建一个流媒体管道
pipeline = kurento.createMediaPipeline();
// 通过流媒体管道实例化一个流媒体端点元素
WebRtcEndpoint presenterWebRtc = new WebRtcEndpoint.Builder(pipeline).build();
// 在主播端会话设置流媒体端点元素成员属性
presenterUserSession.setWebRtcEndpoint(presenterWebRtc);
/**
* 注册IceCandidate监听器 用于监听生成的候选人信息 并发送给主播客户端
* 实现基于IceCandidate监听器事件监听器
*/
presenterWebRtc.addIceCandidateFoundListener(new EventListener<IceCandidateFoundEvent>() {
/**
* IceCandidate事件处理器
*
* @param event IceCandidateFoundEvent事件对象
*/
@Override
public void onEvent(IceCandidateFoundEvent event) {
JsonObject response = new JsonObject();
response.addProperty("id", "iceCandidate");
// 从事件对象拿到事件的IceCandidate信息
response.add("candidate", JsonUtils.toJsonObject(event.getCandidate()));
// 拿到候选信息之后发送给主播端的客户端
try {
synchronized (session) {
session.sendMessage(new TextMessage(response.toString()));
}
} catch (IOException e) {
e.printStackTrace();
}
}
});
// 开始处理客户端传过来的SdpOffer
String sdpOffer = jsonMessage.getAsJsonPrimitive("sdpOffer").getAsString();
// 处理SdpOffer并从KMS服务端返回Sdp应答信息
String sdpAnswer = presenterWebRtc.processOffer(sdpOffer);
// 发送sdpAnswer应答信息到主播端的客户端处理
JsonObject response = new JsonObject();
response.addProperty("id", "presenterResponse");
response.addProperty("response", "accepted");
response.addProperty("sdpAnswer", sdpAnswer);
// 同步线程发送sdpAnswer应答
synchronized (session) {
try {
presenterUserSession.sendMessage(response);
} catch (IOException e) {
e.printStackTrace();
}
}
// 开始收集KMS的presenterWebRtc候选信息
presenterWebRtc.gatherCandidates();
} else {
// 释放媒体管道
}
}
public void onIceCandidate(WebSocketSession session,JsonObject jsonMessage) {
// 拿到客户端发来候选信息
JsonObject candidate = jsonMessage.get("candidate").getAsJsonObject();
// 声明一个UserSession对象
UserSession user = null;
// 如果主播端会话存在
if (presenterUserSession != null) {
// 判断当前的session是不是主播端session
if (presenterUserSession.getSession() == session) {
// 设置当前要处理的会话对象是主播端session
user = presenterUserSession;
} else {
// 不是主播端就是观众端会话了
user = viewers.get(session.getId());
}
}
if (user != null) {
/**
* 为这个会话在kms添加候选信息以便进行流媒体传输通信
* 封装候选信息到一个对象
*/
IceCandidate cand = new IceCandidate(
candidate.get("candidate").getAsString(),
candidate.get("sdpMid").getAsString(),
candidate.get("sdpMLineIndex").getAsInt());
// 开始会user添加候选人信息
user.addCandidate(cand);
}
}
/**
* 处理观众端发来的SdpOffer
* @param session
* @param jsonMessage
*/
public synchronized void viewer(final WebSocketSession session,JsonObject jsonMessage) {
// 实例化一个新的观众对象
UserSession viewer = new UserSession(session);
// 把当前的会话添加到观众会话容器当中
viewers.put(session.getId(), viewer);
// 用当前的流媒体管道实例化一个新的观众媒体元素
WebRtcEndpoint nextWebRtc = new WebRtcEndpoint.Builder(pipeline).build();
// 添加候选监听器
nextWebRtc.addIceCandidateFoundListener(new EventListener<IceCandidateFoundEvent>() {
@Override
public void onEvent(IceCandidateFoundEvent event) {
JsonObject response = new JsonObject();
response.addProperty("id","iceCandidate");
response.add("candidate", JsonUtils.toJsonObject(event.getCandidate()));
try{
synchronized(session){
session.sendMessage(new TextMessage(response.toString()));
}
}catch (IOException e){
e.printStackTrace();
}
}
});
viewer.setWebRtcEndpoint(nextWebRtc);
// 从主播端媒体元素的src端连接到当前观众端元素的sink端进行输,必须要媒体管道
presenterUserSession.getWebRtcEndpoint().connect(nextWebRtc);
// 处理sdpOffer提供
String sdpOffer = jsonMessage.getAsJsonPrimitive("sdpOffer").getAsString();
String sdpAnswer = nextWebRtc.processOffer(sdpOffer);
// 发送kms端的媒体元素的sdpAnswer应答
JsonObject response = new JsonObject();
response.addProperty("id", "viewerResponse");
response.addProperty("response", "accepted");
response.addProperty("sdpAnswer", sdpAnswer);
// 同步发送消息
synchronized (session) {
try {
viewer.sendMessage(response);
} catch (IOException e) {
e.printStackTrace();
}
}
// 开始收集候选信息
nextWebRtc.gatherCandidates();
}
}
var ws = new WebSocket('wss://' + location.host + '/call'); // 升级为WebSocket通信
var video; // 视频组件对象
var webRtcPeer; // 点对点通信对象
window.onload = function() {
video = document.getElementById('video');
$('#startLive').attr('onclick', 'startLive()');
$('#checkLive').attr('onclick', 'checkLive()');
}
// 窗口关闭之前,断开WebSocket会话连接
window.onbeforeunload = function() {
ws.close();
}
/**
* 开始实例化WebRtcPeer对象,设置候选收集监听回调函数,生成SDP提供,指定生成SDP回调函数
*/
function startLive() {
var options = {
localVideo : video,
onicecandidate : onIceCandidate
};
webRtcPeer = new kurentoUtils.WebRtcPeer.WebRtcPeerSendonly(options, function (error) {
if (error) {
return console.log('实例化主播端webRtcPeer对象失败: ' + error);
}
// 生成主播端的SDP提供
webRtcPeer.generateOffer(onAnchorOfferSdp);
});
}
/**
* 异步监听
* 回调函数,得到主播端的SDPOffer并发送给后端
*/
function onAnchorOfferSdp(error, offerSdp) {
if (error) {
return console.log('监听生成OfferSdp失败');
}
console.log('监听生成OfferSdp失败');
var message = {
id : 'presenter', // presenter sendAnchorOfferSdp
sdpOffer: offerSdp
};
// 发送主播端的OfferSdp
sendMessage(message);
}
/**
* 处理服务端发送过来的sdpAnswer处理给主播端
* @param message
*/
function anchorProcessAnswer(message) {
webRtcPeer.processAnswer(message.sdpAnswer, function (error) {
if(error)
return console.log("处理服务端发送过来的sdpAnswer处理给主播端错误: "+error);
});
}
/**
* 异步监听
* 公共回调函数
* 收集候选通信数据,并发送给后端
* @param candidate
*/
function onIceCandidate(candidate) {
var message = {
id : 'onIceCandidate',
candidate : candidate
};
// 发送主播端或观众端的候选数据到服务端
sendMessage(message);
}
/**
* 公共处理服务端发送过来的候选人数据
* @param message
*/
function serverSendIceCandidate(message) {
webRtcPeer.addIceCandidate(message.candidate, function (error) {
if(error)
return console.log('添加服务端的候选人失败: ' + error);
});
}
/**
* 发送WebSocket请求到服务端
*/
function sendMessage(message) {
// js对象序列化到字符串
var jsonMessage = JSON.stringify(message);
console.log("发送WebSocket请求到服务端:" + jsonMessage);
//调用ws对象send接口
ws.send(jsonMessage);
}
/**
* 观众端查看直播
*/
function checkLive() {
var options = {
remoteVideo : video,
onicecandidate: onIceCandidate //与主播共用同一个回调函数
};
webRtcPeer = new kurentoUtils.WebRtcPeer.WebRtcPeerRecvonly(options, function (error) {
if (error) {
return console.log('实例化观众端webRtcPeer失败 ' + error);
}
// 生成观众端的SDP提供,需要指定回调函数
this.generateOffer(onAudienceOfferSdp);
});
}
/**
* 异步监听生成观众端的OfferSdp
* @param error
* @param offerSdp
*/
function onAudienceOfferSdp(error, offerSdp) {
if (error) {
return console.log('监听生成OfferSdp失败');
}
var message = {
id : 'viewer', // viewer sendAudienceOfferSdp
sdpOffer: offerSdp
};
sendMessage(message);
}
/**
* 处理服务端提供的sdp用于观众端
* @param message
*/
function audienceProcessAnswer(message) {
webRtcPeer.processAnswer(message.sdpAnswer, function (error) {
if(error)
return console.log("处理服务端提供的sdp用于观众端错误: "+error);
});
}
// 监听WebSocket消息
ws.onmessage = function (message) {
// 字符串序列化到JSON对象
var parsedMessage = JSON.parse(message.data);
console.log('接收到消息:' + message.data);
switch (parsedMessage.id) {
case 'presenterResponse': // presenterResponse anchorProcessAnswer
anchorProcessAnswer(parsedMessage);
break;
case 'viewerResponse': // viewerResponse audienceProcessAnswer
audienceProcessAnswer(parsedMessage);
break;
case 'iceCandidate': // iceCandidate serverSendIceCandidate
serverSendIceCandidate(parsedMessage)
break;
default:
console.log('无法处理消息: ', parsedMessage);
}
};
