欢迎光临散文网 会员登陆 & 注册

java实现Socket群聊

2022-09-11 13:51 作者:虚云幻仙  | 我要投稿

/**
* 实现群聊
* 服务端向每个客户端一对一通信,将一个客户端的消息发送给其他所有人,客户端与客户端之间没有直接联系
*/

public class GroupChatServer {
   public static String msg;
   //Sender线程和Receiver线程的缓冲区,Receiver线程收到某个客户端的消息,放入msg,唤醒Sender线程群发消息
   public static void main(String[] args) {
       try(ServerSocket ss = new ServerSocket(8888)) {
           System.out.println("群聊服务器已启动,等待客户端连接");
           while (true){
               Socket socket = ss.accept();
               System.out.println("有新的客户端连接: "+socket.getInetAddress());
               new Receiver2(socket).start();
               new Sender2(socket).start();
           }
       } catch (IOException e) {
           throw new RuntimeException(e);
       }

   }
}
class Sender2 extends Thread{
   //向客户端一对一发送消息
   final Socket socket;

   public Sender2(Socket socket) {
       this.socket = socket;
   }

   @Override
   public void run() {
       try(PrintWriter pw = new PrintWriter(socket.getOutputStream(),true)) {
           while (true){
               synchronized(""){
                   //所有收发线程同步""
                   "".wait();
                   //每次先等待,当有人说话了再唤醒,发送消息
               }
               if (socket.isClosed()||socket.isOutputShutdown())break;
               //如果Receiver线程.readLine返回null即对方Socket对象已关闭,则关闭己方Socket对象,Sender线程检测到Socket关闭退出循环结束线程
               pw.println(GroupChatServer.msg);
               //服务器对每个客户端一对一发送,将这一个消息群发给所有客户端,因为只是读取msg而不会改变它所以不需要同步
           }
       } catch (IOException | InterruptedException e) {
           throw new RuntimeException(e);
       }finally {
           if (socket != null) {
               try {
                   socket.close();
                   //在Receiver线程也会finally socket.close() close方法可以多次执行,方法内判断如果isClosed直接return,不会产生异常
               } catch (IOException e) {
                   throw new RuntimeException(e);
               }
           }
       }

   }
}
class Receiver2 extends Thread{
   //与客户端一对一接收消息
   final Socket socket;

   public Receiver2(Socket socket) {
       this.socket = socket;
   }

   @Override
   public void run() {
       try(BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
           synchronized (""){
               GroupChatServer.msg = "用户"+socket.getInetAddress()+"加入群聊";
               //每当有新用户加入时在群内提示
               "".notifyAll();
           }
           while (true){
               String newMsg = br.readLine();
               if (newMsg == null){
                   System.out.println("用户"+socket.getInetAddress()+"结束通信");
                   synchronized (""){
                       GroupChatServer.msg = "用户"+socket.getInetAddress()+"退出群聊";
                       "".notifyAll();
                   }
                   break;
               }
               synchronized (""){
                   //同步""每次只能一个Receiver线程存msg
                   GroupChatServer.msg = "用户"+socket.getInetAddress()+"说: "+newMsg;
                   "".notifyAll();
                   //.notifyAll()唤醒所有"".wait()阻塞的Sender线程,每个Sender线程将msg发送给客户端后再次wait
               }
           }
       } catch (IOException e) {
           throw new RuntimeException(e);
       }finally {
           if (socket!=null){
               try {
                   socket.close();
                   //Receiver线程先发现用户结束通信,唤醒Sender线程后break执行close,这时Sender还在就绪状态排队,所以Receiver线程会更早关闭socket
               } catch (IOException e) {
                   throw new RuntimeException(e);
               }
           }
       }
   }
}

class GCS2{
   //解决:当多个Receiver同时收到客户端的消息,在Sender唤醒但还没有得到CPU资源或部分线程没得到CPU资源发送消息时,下一个Receiver将msg更改,部分用户漏收消息的可能性
   static String msg;
   static Socket microphone;
   //缓存发送msg的Socket
   final static ArrayList<Socket> group = new ArrayList<>();
   //组员容器
   final static ArrayList<PrintWriter> pws = new ArrayList<>();
   //输出流容器

   public static void add(Socket socket){
       group.add(socket);
       try {
           pws.add(new PrintWriter(socket.getOutputStream(),true));
       } catch (IOException e) {
           throw new RuntimeException(e);
       }
   }

   public static void main(String[] args) {
       try(ServerSocket ss = new ServerSocket(8888)) {
           new Sender3().start();
           //用一个Sender线程处理所有输出流,每次将msg发送给所有人之后再让Receiver传新的msg
           while (true){
               Socket socket = ss.accept();
               add(socket);
               new Receiver3(socket).start();
           }
       } catch (IOException e) {
           throw new RuntimeException(e);
       }
   }
}
class Receiver3 extends Thread{
   final Socket socket;

   public Receiver3(Socket socket) {
       this.socket = socket;
   }

   @Override
   public void run() {
       try(BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
           push("用户"+socket.getInetAddress()+":"+socket.getPort()+"加入群聊");
           //push()方法用于更新msg,单独写方法体便于复用
           while (true){
               String nMsg = br.readLine();
               if (nMsg == null) {
                   push("用户"+socket.getInetAddress()+":"+socket.getPort()+"退出群聊");
                   break;
               }
               push("用户"+socket.getInetAddress()+":"+socket.getPort()+"说: "+ nMsg);
               //客户端发送给服务端的信息已经被缓存在Socket中了,每次.readLine()是读取本地缓存中的内容,也就是说即便Receiver线程因为排队更新msg而阻塞,也不会漏掉客户端的某段msg
           }

       } catch (IOException | InterruptedException e) {
           throw new RuntimeException(e);
       } finally {
           if (socket != null) {
               try {
                   socket.close();
               } catch (IOException e) {
                   throw new RuntimeException(e);
               }
           }
       }
   }

   private void push(String msg) throws InterruptedException {
       synchronized ("r"){
           if (GCS2.msg!=null){
               "r".wait();
               //当Sender没有取走msg时.wait
           }
           GCS2.msg = msg;
           GCS2.microphone = socket;
       }
       synchronized ("s"){
           "s".notify();
           //唤醒Sender线程,Receiver线程在"r".wait()阻塞,Sender线程在"s".wait()阻塞,如果是同一个对象锁的话使用notify()无法保证唤醒的是Sender
       }
   }

}
class Sender3 extends Thread{
   @Override
   public void run() {
       while (true){
           synchronized ("s"){
               if (GCS2.msg==null) {
                   try {
                       "s".wait();
                   } catch (InterruptedException e) {
                       throw new RuntimeException(e);
                   }
               }
           }
           checkConnection();
           //将已经关闭的Socket从group移除并关闭PW输出流
           for (int i = 0;i<GCS2.group.size();i++){
               if (GCS2.group.get(i)!=GCS2.microphone){
                   GCS2.pws.get(i).println(GCS2.msg);
               }
           }
           synchronized ("r"){
               GCS2.msg=null;
               GCS2.microphone = null;
               "r".notify();
               //唤醒一个因为"r".wait()阻塞的Receiver线程更新信息
           }


       }
   }

   private void checkConnection(){
       ArrayList<Integer> list = new ArrayList<>();
       for (int i = GCS2.group.size()-1;i>=0;i--){
           if (GCS2.group.get(i).isClosed()||GCS2.group.get(i).isOutputShutdown()){
               list.add(i);
           }
       }
       for (int i :
               list) {
           GCS2.group.remove(i);
           GCS2.pws.get(i).close();
           GCS2.pws.remove(i);
           //从后往前删,每次删除不会影响下一个要删的元素的索引
       }
   }
}

class GCS3{
   final static String[] history = new String[99];
   //使用数组缓存历史记录
   static int cursor;
   public static void push(String msg){
       synchronized (history){
           //存的时候串行
           history[cursor++]=msg;
           if (cursor==99){
               cursor=0;
           }
       }
       synchronized ("s"){
           "s".notifyAll();
       }
   }

   public static void main(String[] args) {
       try(ServerSocket ss = new ServerSocket(8888)) {
           System.out.println("服务器启动");
           while (true){
               Socket socket = ss.accept();
               new Receiver4(socket).start();
               new Sender4(socket).start();
           }
       } catch (IOException e) {
           throw new RuntimeException(e);
       }
   }

}

class Sender4 extends Thread{
   final Socket socket;
   int index;

   public Sender4(Socket socket) {
       this.socket = socket;
       this.index = GCS3.cursor;
   }

   @Override
   public void run() {
       try(PrintWriter pw = new PrintWriter(socket.getOutputStream(),true)) {
           while (true){
               synchronized ("s"){
                   if (index==GCS3.cursor){
                       "s".wait();
                   }
               }
               //取的时候并行
               if (socket.isClosed()||socket.isOutputShutdown())break;
               for (int c = GCS3.cursor;index!=c;index=(index+1)%99){
                   pw.println(GCS3.history[index]);
               }

           }
       } catch (IOException | InterruptedException e) {
           throw new RuntimeException(e);
       }
   }
}
class Receiver4 extends Thread{
   final Socket socket;

   public Receiver4(Socket socket) {
       this.socket = socket;
   }

   @Override
   public void run() {
       try(BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
           GCS3.push("用户"+socket.getInetAddress()+":"+socket.getPort()+"加入群聊");
           while (true){
               String msg = br.readLine();
               if (msg == null) {
                   GCS3.push("用户"+socket.getInetAddress()+":"+socket.getPort()+"退出群聊");
                   break;
               }
               GCS3.push("用户"+socket.getInetAddress()+":"+socket.getPort()+"说: "+msg);

           }
       } catch (IOException e) {
           throw new RuntimeException(e);
       }

       try {
           socket.close();
       } catch (IOException e) {
           throw new RuntimeException(e);
       }
   }
}

class GCS5 {
   static int total;
   //当前通信的连接总数
   static int count;
   //对一段msg完成发送的Sender数
   static String msg = "加入群聊";
   static Socket microphone;

   public static void totalUp(){
       synchronized("total"){
           total++;
       }
   }
   public static void totalDown(){
       synchronized ("total"){
           total--;
           //修改同一数据时需要串行
       }
   }
   public static void countUp(){
       synchronized ("count"){
           count++;
       }
   }
   public static void push(String s,Socket socket){
       if (GCS5.count<GCS5.total) {
           synchronized ("r") {
               try {
                   "r".wait();
                   //每次放入新的消息前先等待,等着前一句全部发送完毕后被唤醒
                   boolean b;
               } catch (InterruptedException e) {
                   throw new RuntimeException(e);
               }
           }
       }
       synchronized ("count"){
           count=0;
           msg = s;
           microphone = socket;
       }
       synchronized ("s"){
           "s".notifyAll();
       }
   }

   public static void main(String[] args) {
       try(ServerSocket ss = new ServerSocket(8888)) {
           System.out.println("已启动");
           while (true){
               Socket socket = ss.accept();
               System.out.println("新的连接");
               new Receiver5(socket).start();
               new Sender5(socket).start();
           }
       } catch (IOException e) {
           throw new RuntimeException(e);
       }
   }


}

class Receiver5 extends Thread{
   final Socket socket;

   public Receiver5(Socket socket) {
       this.socket = socket;
   }

   @Override
   public void run() {
       try(BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
           GCS5.push("用户"+socket.getInetAddress()+":"+socket.getPort()+"加入群聊",socket);
           while (true){
               String msg = br.readLine();
               if (msg == null) {
                   GCS5.push("用户"+socket.getInetAddress()+":"+socket.getPort()+"退出群聊",socket);
                   break;
               }
               GCS5.push("用户"+socket.getInetAddress()+":"+socket.getPort()+"说: "+msg,socket);
           }
       } catch (IOException e) {
           throw new RuntimeException(e);
       }finally {
           if (socket != null) {
               try {
                   socket.close();
               } catch (IOException e) {
                   throw new RuntimeException(e);
               }
           }
       }
       GCS5.totalDown();
   }


}
class Sender5 extends Thread{
   final Socket socket;

   public Sender5(Socket socket) {
       this.socket = socket;
   }

   @Override
   public void run() {
       try(PrintWriter pw = new PrintWriter(socket.getOutputStream(),true)) {
           pw.println("加入群聊");
           GCS5.totalUp();
           //Sender线程运行后再total++,使创建到运行这期间count不会卡在total-1
           while (!socket.isClosed()){
               GCS5.countUp();
               if (socket!=GCS5.microphone){
                   pw.println(GCS5.msg);
               }
               //先count++,如果Receiver线程正在更新msg,更新后count增加的1是新的msg的发送量
               if (GCS5.count>=GCS5.total){
                   synchronized("r"){
                       "r".notify();
                       //当Sender都发送完之后唤醒一个.wait()的Receiver。但如果这时没有.wait()的Receiver,所有Sender全部.wait(),再收到新的msg时Receiver需要自己判断count>=total不执行.wait()
                   }
               }
               synchronized ("s"){
                   "s".wait();
               }
           }
       } catch (IOException | InterruptedException e) {
           throw new RuntimeException(e);
       }
   }
}

java实现Socket群聊的评论 (共 条)

分享到微博请遵守国家法律