一、思路简介
, F* Z- E7 c( H, ~% P3 c 分为客户端和服务器两个类,所有的客户端将聊的内容发送给服务器,服务器接受后,将每一条内容发送给每一个客户端,客户端再显示在终端上。
6 Z! Z* ~, ?3 f二、客户端设计
( }* }4 O* [; Q" Q P" H; `' J5 G 客户端包含2个线程,1个用来接受服务器的信息,再显示,1个用来接收键盘的输入,发送给服务器。/ f/ ^1 }& C" _# F% T) v( C( y
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
public class WeChatClient { //WeChat的客户端类
private Socket client;
private String name;
private InputStream in;
private OutputStream out;
private MassageSenter massageSenter;
private MassageGeter massageGeter;
class MassageGeter extends Thread{ //一个子线程类,用于客户端接收消息
MassageGeter() throws IOException{
in = client.getInputStream();
}
@Override
public void run() {
int len;
byte[] bytes = new byte[1024];
try {
while ((len = in.read(bytes)) != -1) { //此函数是阻塞的
System.out.println(new String(bytes,0,len, StandardCharsets.UTF_8));
}
}catch (IOException e){
System.out.println(e.toString());
}
System.out.println("Connection interruption");
}
}
class MassageSenter extends Thread{ //一个子线程类,用于发送消息给服务器
MassageSenter() throws IOException{
out = client.getOutputStream();
}
@Override
public void run() {
Scanner scanner = new Scanner(System.in);
try {
while (scanner.hasNextLine()) { //此函数为阻塞的函数
String massage = scanner.nextLine();
out.write((name + " : " + massage).getBytes(StandardCharsets.UTF_8));
if(massage.equals("//exit"))
break;
}
}catch (IOException e){
e.printStackTrace();
}
}
}
WeChatClient(String name, String host, int port) throws IOException {//初始化,实例化发送和接收2个线程
this.name = name;
client = new Socket(host,port);
massageGeter = new MassageGeter();
massageSenter = new MassageSenter();
}
void login() throws IOException{//登录时,先发送名字给服务器,在接收到服务器的正确回应之后,启动线程
out.write(name.getBytes(StandardCharsets.UTF_8));
byte[] bytes = new byte[1024];
int len;
len = in.read(bytes);
String answer = new String(bytes,0,len, StandardCharsets.UTF_8);
if(answer.equals("logined!")) {
System.out.println("Welcome to WeChat! "+name);
massageSenter.start();
massageGeter.start();
try {
massageSenter.join();//join()的作用是等线程结束之后再继续执行主线程(main)
massageGeter.join();
}catch (InterruptedException e){
System.err.println(e.toString());
}
}else{
System.out.println("Server Wrong");
}
client.close();
}
public static void main(String[] args) throws IOException{//程序入口
String host = "127.0.0.1";
WeChatClient client = new WeChatClient("Uzi",host,7777);
client.login();
}
} 三、服务器设计
& j) |8 c/ s( e3 V! x 服务器包含3个线程类,端口监听线程,客户端接收信息线程,发送信息线程。服务器类还包含并维护着一个已经连接的用户列表,和一个待发送信息列表。
: e! D6 ~+ V% J7 |: ]- R% M6 e2 K: b 服务器有一个负责监听端口的线程,此线程在接收到客户端的连接请求后,将连接的客户端添加进用户列表;并为每一个连接的客户端实例化一个接受信息的线程类,从各个客户端接收员信息,并存入待发送信息列表。发送信息线程查看列表是否为空,若不为空,则将里面的信息发送给用户列表的每一个用户。3 @& A9 u) U! j# H3 f( s2 Z
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
public class WeChatServer {
private ServerSocket server;
private ArrayList<User> users;//用户列表
private ArrayList<String> massages;//待发送消息队列
private Listener listener;
private MassageSenter massageSenter;
class User{ //用户类,包含用户的登录id和一个输出流
String name;
OutputStream out;
User(String name,OutputStream out){
this.name = name;
this.out = out;
}
@Override
public String toString() {
return name;
}
}
private static String GetMassage(InputStream in) throws IOException{//从一个输入流接收一个字符串
int len;
byte[] bytes = new byte[1024];
len = in.read(bytes);
return new String(bytes,0,len,StandardCharsets.UTF_8);
}
private void UserList(){ //列出当前在线用户,调试用
for(User user : users)
System.out.println(user);
}
class Listener extends Thread{ //监听线程类,负则监听是否有客户端连接
@Override
public void run() {
try {
while (true) {
Socket socket = server.accept();//此函数是阻塞的
InputStream in = socket.getInputStream();
String name = GetMassage(in);//获取接入用户的name
System.out.println(name +" has connected");
massages.add(name+" has joined just now!!");//向聊天室报告用户连入的信息
OutputStream out = socket.getOutputStream();
out.write("logined!".getBytes(StandardCharsets.UTF_8));//发送成功建立连接的反馈
User user = new User(name,out);
users.add(user);//添加至在线用户列表
MassageListener listener = new MassageListener(user,in);//创建用于接收此用户信息的线程
listener.start();
}
}catch (IOException e){
e.printStackTrace();
}
}
}
class MassageListener extends Thread{ //接收线程类,用于从一个客户端接收信息,并加入待发送列表
private User user;
private InputStream in;
MassageListener(User user,InputStream in){
this.user = user;
this.in = in;
}
@Override
public void run() {
try {
while (true){
String massage = GetMassage(in);
System.out.println("GET MASSAGE "+massage);
if(massage.contains("//exit")){ // "/exit" 是退出指令
break;
}
massages.add(massage);
}//用户退出有两种形式,输入 “//exit” 或者直接关闭程序
in.close();
user.out.close();
}catch (IOException e){//此异常是处理客户端异常关闭,即GetMassage(in)调用会抛出异常,因为in出入流已经自动关闭
e.printStackTrace();
}finally {
System.out.println(user.name+" has exited!!");
massages.add(user.name+" has exited!!");
users.remove(user);//必须将已经断开连接的用户从用户列表中移除,否则会在发送信息时产生异常
System.out.println("Now the users has");
UserList();
}
}
}
private synchronized void SentToAll(String massage)throws IOException{//将信息发送给每一个用户,加入synchronized修饰,保证在发送时,用户列表不会被其他线程更改
if(users.isEmpty())
return;
for(User user : users){
user.out.write(massage.getBytes(StandardCharsets.UTF_8));
}
}
class MassageSenter extends Thread{//消息发送线程
@Override
public void run() {
while(true){
try{
sleep(1);//此线程中没有阻塞的函数,加入沉睡语句防止线程过多抢占资源
}catch (InterruptedException e){
e.printStackTrace();
}
if(!massages.isEmpty()){
String massage = massages.get(0);
massages.remove(0);
try {
SentToAll(massage);
}catch (IOException e){
e.printStackTrace();
}
}
}
}
}
WeChatServer(int port) throws IOException { //初始化
server = new ServerSocket(port);
users = new ArrayList<>();
massages = new ArrayList<>();
listener = new Listener();
massageSenter = new MassageSenter();
}
private void start(){ //线程启动
listener.start();
massageSenter.start();
}
public static void main(String[] args) throws IOException{
WeChatServer server = new WeChatServer(7777);
server.start();
}
} 四、总结0 q1 V9 X* |; U- Z
之所以需要多线程编程,是因为有的函数是阻塞的,例如:
3 _7 @4 W- V0 u. t# y# V1 _% swhile ((len = in.read(bytes)) != -1) { //此函数是阻塞的
System.out.println(new String(bytes,0,len, StandardCharsets.UTF_8));
} 这些阻塞的函数是需要等待其他的程序,例如scanner.hasNextLine()需要等待程序员的输入才会返回值,in.read需要等待流的另一端传输数据,使用多线程就可以在这些函数处于阻塞状态时,去运行其他的线程。所以,多线程编程的关键便是那些阻塞的函数。 |