近来在学习Java NIO 络开发知识,写了一个基于Java NIO的多人在线聊天工具MyChat练练手。源码公开在Coding上:

编写一个基于Java NIO的多人在线聊天工具,需要以下几方面的知识:客户端服务器模型,Java NIO中的Selector,Channel,ByteBuffer,Collections以及序列化和反序列化的知识。下面来对照源码逐一剖析MyChat的源码构成:

一.服务器

为了便于实时分析服务器在线人数和聊天室列表,需要在服务器端提供一个交互接口,也就是获取System.in的输入,执行相应的操作,如下所示:

System.out.println(“===输入选择项===”);

System.out.println(“1.获取用户列表;2.获取聊天室列表;3.获取指定聊天室成员;4.关闭服务器”);boolean isExit=false;

Scanner scanner=new Scanner(System.in);

由于主线程需要运行交互界面,这样一来执行与客户端交互任务的代码就要放在另外一个线程中了:

ChatServer server=newChatServer();

Thread serverThread=new Thread(server,”聊天服务器”);

serverThread.setDaemon(true);//后台进程

serverThread.start();

接下来将分别介绍服务器端的实现类ChatServer的关键成员:

private Selector mSelector=null;//用于注册所有连接到服务器的SocketChannel对象

//保存所有用户的Map

private Map mUsers=Collections.synchronizedMap(new HashMap());//保存所有聊天室的Map

private Map mRooms=Collections.synchronizedMap(new HashMap());//聊天室

第一个成员变量mSelector是一个Selector对象,用于管理所有连接到服务器的Channel,为了管理多个通道的读写,要将不同的通道注册到一个Selector对象上。每个通道分配有一个SelectionKey。然后程序可以询问这个Selector对象,哪些通道已经准备就绪可以无阻塞的完成你希望完成的操作,可以请求Selector对象返回相应的键集合。通过调用Selector类的唯一构造函数:静态工厂方法open()来创建新的选择器,并通过register()方法注册通道。

mSelector=Selector.open();

ServerSocketChannel server=ServerSocketChannel.open();

InetSocketAddress isa=newInetSocketAddress(mHost, mPort);

server.bind(isa);//绑定指定端口

server.configureBlocking(false);

server.register(mSelector, SelectionKey.OP_ACCEPT);

System.out.println(“服务器在”+mPort+”端口启动成功”);

注册成功后,就可以通过Selector的select()方法查询已经就绪的通道。

while(mSelector.select()>0)

{

Iterator iterator=mSelector.selectedKeys().iterator();while(iterator.hasNext())

{

SelectionKey sk=iterator.next();

iterator.remove();

select()方法用于查询注册到Selector上的待处理的就绪Channel,是一个阻塞方法,直到至少有一个注册的Channel准备好之后就可以进行处理。SelectionKey对象相当于通道的指针,可以保存通道的连接状态。Selector对象的selectedKeys()方法可以返回所有注册Channel的SelectionKey。接下来可以通过isAccetable(),isReadable(),isWritable()等方法测试该键能进行的操作。

ServerSocketChannel类只有一个目的:接受入站连接。通过注册到Selector对象来获取入站连接通知,如下所示:

if(sk.isAcceptable())

{

SocketChannel sc=server.accept();//开始接收客户端连接

sc.configureBlocking(false);

sc.register(mSelector, SelectionKey.OP_READ);

sk.interestOps(SelectionKey.OP_ACCEPT);

}

接下来可以通过sk.isReadable()进入处理客户端数据的代码块:

if(sk.isReadable())//有数据

{

SocketChannel sc=(SocketChannel)sk.channel();

ByteBuffer buffer=ByteBuffer.allocate(1024);

ByteArrayOutputStream boStream=newByteArrayOutputStream();try{while(sc.read(buffer)>0)//TODO:性能问题

{

buffer.flip();

boStream.write(Arrays.copyOfRange(buffer.array(),0, buffer.limit()));

}byte[] frame=boStream.toByteArray();

boStream.close();

为了能进一步讲明白为什么需要上面这种方式读取客户端信息,这里先插入讲解一下服务器和客户端交互的信使类Message。为了提升扩展性,我定义了一个Serializable类Message,用于服务器和客户端之间进行交互(如登录,返回结果,创建聊天室等)。Message类的定义如下:

1 class Message implementsSerializable2 {3 private static final long serialVersionUID = 1L;4 private Map fields=new HashMap();//TODO:泛型支持,任意消息类型,包括文本,图片,语音,视频,文件等

5 privateCommands command;6 publicMessage(Commands command)7 {8 this.command=command;9 }10 publicCommands getCommand()11 {12 return this.command;13 }14 publicMessage set(FieldType key,String value)15 {16 if(key!=null&&value!=null)17 {18 fields.put(key,value);19 }20 return this;21 }22 publicString get(FieldType key)23 {24 returnfields.get(key);25 }26

27 public byte[] toBytes()28 {29 return SerializeHelper.serialize(this);30 }31

32 publicByteBuffer wrap()33 {34 byte[] frame=toBytes();35 returnByteBuffer.wrap(frame);36 }37 }

其中有两个关键的成员:一个Map型的用于保存数据的field成员和一个枚举类型的用于表明命令类型的command成员。其中Command枚举定义如下:

enumCommands{

LOG_IN,

LOG_OUT,

QUERY_USERS,

QUERY_ALL_CHAT_ROOMS,

QUERY_MY_CHAT_ROOMS,

QUERY_ROOM_MEMBERS,

HEART_BEAT,

MSG_P2P,//个人对个人的消息

MSG_P2R,//聊天室消息

CREATE_CHAT_ROOM,

JOIN_CHAT_ROOM,

LEAVE_CHAT_ROOM,

SET_USER_NAME;

};

另外,为了指名携带数据的类型,定义了一个FieldType枚举,如下:

enumFieldType{

USER_ID,

USER_NAME,

PASS_WD,

PEER_ID,//单聊对象的ID

ROOM_ID,//聊天室ID

USER_LIST,//用户列表

ROOM_LIST_ALL,//所有房间列表

ROOM_LIST_ME,//我的聊天室列表

ROOM_MEMBERS,//用户列表

MSG_TXT,

RESPONSE_STATUS,

ENCODING;

};

这样一来,服务器和客户端就可以通过这种可序列化的Message相互通信了。具体就是客户端将要发送给服务器的数据封装在Message对象中后,通过SocketChanne发送到服务器,服务器收到数据后通过反序列化获取原始的Message对象,并根据command成员来判断接收到的是什么类型的Message,如登录,点对点消息等。

Message msg=(Message)SerializeHelper.deSerialize(frame);if(msg!=null)

{

String userId=msg.get(FieldType.USER_ID);switch(msg.getCommand()) {caseLOG_IN:

{

System.out.println(“用户”+userId+”请求登录…”);

Message message=newMessage(Commands.LOG_IN);//TODO:检查用户名密码,暂时没有注册功能,就只检测用户名是否重复

if(!mUsers.containsKey(userId))

{

message.set(FieldType.RESPONSE_STATUS,”成功”);

System.out.println(“用户”+userId+”登录成功”);

UserEntity user=newUserEntity(userId,sc);

mUsers.put(userId,user);

}else{

message.set(FieldType.RESPONSE_STATUS,”该帐 已经登录”);

}//发送登录结果

sendRawMessage(sc, message);break;

}

这里出现的mUsers对象,就是我要介绍的服务器端第二个重要的成员变量,mUsers是一个用Collections.synchronizedSet封装的支持多线程访问的HashSet,用于保存[用户ID->用户对象]的映射。所谓用户对象就是另外定义的一个用于保存用户基本信息的类,其中包含了用户的id,passwd,对应的SocketChannel和所加入的聊天室集合。如下所示:

classUserEntity{privateString mUserId;privateString mPassWd;privateSocketChannel mSocketChannel;private Set mJoinedRooms=Collections.synchronizedSet(new HashSet());

服务器端还有一个重要的成员变量,用于保存服务器端所有聊天室的集合,也是一个用Collections.synchronizedSet封装的HashSet,用于保存[聊天室ID->聊天室对象]的映射。聊天室对象是专门定义的一个保存聊天室基本信息的类,其中包含了聊天室id,聊天室成员集合。如下所示:

final classChatRoom {private String mRoomId=null;private Set mUsers=Collections.synchronizedSet(new HashSet());

到此,服务器端的代码基本剖析完毕,接下来我们看看客户端的代码。

二.客户端

客户端的代码相对服务器来说要简单许多,一个典型的NIO客户端程序连接服务器流程如下所示:

mSelector=Selector.open();

InetSocketAddress remote=newInetSocketAddress(host, port);

mSocketChannel=SocketChannel.open(remote);

mSocketChannel.configureBlocking(false);

mSocketChannel.register(mSelector, SelectionKey.OP_READ);

其中注册Selector的接口几乎与服务器一致,除了传递给register方法的第二个参数不同。注册完通道后就可以向服务器发送登录请求了:

Message message=newMessage(Commands.LOG_IN);

message.set(FieldType.USER_ID, userid);

message.set(FieldType.PASS_WD, passwd);

sendRawMessage(message);

其中sendRawMessage是一个私有方法,用于将Message序列化后使用ByteBuffer通过SocketChannel发送到服务器端,具体代码如下:

private voidsendRawMessage(Message message)

{if(mSocketChannel!=null&&message!=null)

{try{

mSocketChannel.write(message.wrap());

}catch(Exception e) {

e.printStackTrace();

}

}

}

我为Message类设计了一个wrap()方法可以将Message序列化后的byte[]包装成ByteBuffer返回,从而可以直接作为SocketChannel.write()方法的参数。具体代码可以参考文章开头的Git仓库。

与服务器一样,客户端需要接收用户输入,从而也将与服务器交互的部分放在单独的线程运行。我将这个线程类放在ChatClient类的内部作为嵌套类,这样可以直接访问外部类的成员变量,为线程之间通信提供便利。

三.实例分析

介绍完服务器和客户端的设计之后,下面以创建聊天室为例详细介绍客户端和服务器端的通信流程。

当客户端登录到服务器中后,服务器会保存客户端的用户ID以及对应的SocketChannel信息,客户端通过一条创建聊天室的Message向服务器申请创建聊天室:

Message message=newMessage(Commands.CREATE_CHAT_ROOM);

message.set(FieldType.USER_ID,mUserId );

message.set(FieldType.ROOM_ID, roomId);

sendRawMessage(message);

如上所示,该Message的命令字是Commands.CREATE_CHAT_ROOM,包含了两个域,分别是创建者的ID和待创建的房间ID(这里为了设计简便,将ID和名称等同为一个概念,实际中ID应该是一个唯一的整型量,名称是聊天室的名字,可以重复)。服务器端通过反序列化Message,并提取对应的命令字进入对应的处理逻辑:

caseCREATE_CHAT_ROOM:

{

System.out.println(“用户”+userId+”请求创建聊天室”);

String roomId=msg.get(FieldType.ROOM_ID);

Message message=newMessage(Commands.CREATE_CHAT_ROOM);if(!StringHelper.isNullOrTrimEmpty(roomId))

{if(!mRooms.containsKey(roomId))

{

ChatRoom room=newChatRoom(roomId);

room.addUser(userId);

mRooms.put(roomId, room);

UserEntity user=mUsers.get(userId);if(user!=null)

user.joinRoom(roomId);

message.set(FieldType.RESPONSE_STATUS,”成功”);

}else{

message.set(FieldType.RESPONSE_STATUS,”创建失败,已存在同名聊天室”);

}

}else//返回错误消息

{

message.set(FieldType.RESPONSE_STATUS,”创建失败,聊天室名称不能为空”);

}

sendRawMessage(sc, message);break;

}

我们来仔细分析下上面的代码。首先从Message中提取到了userId和roomId,然后判断服务器端mRooms集合是否已经存在同名聊天室,如果不存在,则创建一个新的聊天室:ChatRoom room=new Chat(roomId)。并将创建者本人加入到聊天室用户列表中:room.addUser(userId)。同时,为了方便查找用户加入的所有聊天室,还将该聊天室的ID通过UserEntity的joinRoom()方法保存到了UserEntity的聊天室集合中,最后将表示正确结果的Message发送给请求客户端;反之如果已经存在同名聊天室,则将包含错误信息的Message发送给客户端。而客户端负责与服务器端交互的线程则通过反序列化Message获取操作结果,并显示给用户。

为了更加直观地展示MyChat的工作流程,将终端运行的结果整了几张截图附在下面:

客户端1:

服务器端:

3137e5a6bd312991bcbe63ee52ff729a.png

文章知识点与官方知识档案匹配,可进一步学习相关知识Java技能树NIONIO概述92476 人正在系统学习中 相关资源:软件测试群软件测试群软件测试群软件测试群_测试群-其它文档类…

声明:本站部分文章及图片源自用户投稿,如本站任何资料有侵权请您尽早请联系jinwei@zod.com.cn进行处理,非常感谢!

上一篇 2021年1月15日
下一篇 2021年1月15日

相关推荐