[java]代码库
WebSocket要求服务器与客户端之间要保持连接状态,当然当客户端或服务器端关闭了连接对方是有办法检测到的,这个无需我们关心;可以通过发送心跳保持2者之间连接的有效性。从另外一个方面来看,如果是客户端和服务器之间有交互的应用,客户端长时间没有发送业务数据到服务器则说明用户可能在短时间内不会再使用客户端了,这时服务器端可以将这些闲置的连接关闭以达到减少资源的开销。
为了达到这个目的,首先我们需要知道的是客户端是否已经闲置超过了指定的时限,那我们怎样知道呢。最初考虑的方案是这样的,每次客户端发送业务数据到服务器端时,我们就调用System.currentTimeMillis 函数获取到当前系统的毫秒数并在连接对应的Client对象中记录下这个数量preReceiveTimestamp。在系统中创建一个线程遍历每个客户端连接,取出preReceiveTimestamp 和系统当前的毫秒数System.currentTimeMillis 进行比较,如果 System.currentTimeMillis - preReceiveTimestamp 之差超过了设置的超时限制 则说明客户端已经闲置超时 可以关闭了。
但是System.currentTimeMillis 函数的调用需要从 应用线程 切换 到系统内核线程 然后再切换到应用线程中,他们之间的切换如果太过频繁对系统的性能还是有一定的影响。有人测试过在一般的PC服务器上1000w次System.currentTimeMillis调用大概需要12秒左右,平均每次1.3毫秒。这对于大并发量的服务器来说性能上的影响不得不考虑。
为了避免频繁的调用System.currentTimeMillis,而又要达到检查客户端闲置是否超过时限,在开源WebSocket服务器CshBBrain中进行了优化。优化后的方案为当客户端有发送数据到服务器端时,将连接对应的Client对象中的数据接收标识设置为true;在系统中创建一个线程,线程遍历每个客户端连接,检测客户端Client对象中的数据接收标识是否为true,如果不为true则说明客户端闲置超时可以关闭了,如果数据接收标识为true则说明客户端没有闲置超时,并将客户端Client对象中的数据接收标识设置为false,线程没隔系统设置的超时时限对客户端进行是否闲置超时检查一次。这样就避免了频繁的调用System.currentTimeMillis函数,将影响降低到最低。
客户端闲置检查线程核心代码,截取的MasterServer类中对应代码:
private void startClientMonitor(){
while(noStopRequested){
try {
if(this.timeOut > 0){// 超时阀值
Iterator<Integer> it = clients.keySet().iterator();
while(it.hasNext()){
Integer key = it.next();
Client client = clients.get(key);
if(!client.isReadDataFlag()){// 超时没有收到数据
client.close();// 关闭连接
clients.remove(key);// 从映射表中删除连接
}else{
client.setReadDataFlag(false);// 将读取数据标识设置为false
}
}
this.clientMonitor.sleep(this.timeOut * 60 * 1000);// 隔指定时限检测一次
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
服务接收到客户端发送的业务数据处理将数据读取标识设置为true,截取的Client类中对应的代码:
private boolean readRequest(ByteBuffer byteBuffer){
SocketChannel socketChannel = (SocketChannel) this.key.channel();
boolean readSuccess = false;// 是否读取到数据标识
boolean returnValue = false;// 返回值
try{
int dataLength = 0;
do{// 读取客户端请求的数据并解码
dataLength = socketChannel.read(byteBuffer);
if(dataLength > 0){
byteBuffer.flip();
this.decoderHandler.process(byteBuffer,this);// 解码处理
byteBuffer.clear();
readSuccess = true;
this.readDataFlag = true;// 将读取数据标识设置为真
this.preBlank = false;// 上次不为空读
}else{
if(this.preBlank){
++this.readCount;
}
this.preBlank = true;
break;
}
}while(dataLength > 0);
if(this.readCount >= zoreFetchCount){// 空读超过指定次数,关闭链接,返回
log.info("the max count read: " + this.readCount);
this.close();
returnValue = false;
return returnValue;
}
}catch(IOException e){
e.printStackTrace();
this.close();
returnValue = false;
return returnValue;
}
if(readSuccess){// 如果读取到数据
if(requestWithFile.isReadFile()){// 是否读取文件
if(requestWithFile.readFinish()){// 是否读取完毕文件
//if(requestWithFile.getFileReceiver().finishWrite()){// 是否读取完毕文件
returnValue = true;
if(MasterServer.keepConnect){//长连接
this.registeRead();
}
}else{// 没有读取完毕文件,注册通道,继续读取
if(MasterServer.keepConnect){//长连接
this.registeRead();
}else{
try{
this.inputMonitorWorker.registeRead(key);
}catch(Exception e){
e.printStackTrace();
this.close();
returnValue = false;
this.requestWithFile.getFileReceiver().close();// 关闭文件
return returnValue;
}
this.inRead.compareAndSet(true, false);
}
returnValue = false;// 将文件内容读取完后再进行处理
}
}else{// 普通请求,没有上传文件
returnValue = true;
if(MasterServer.keepConnect){//长连接
this.registeRead();
}
}
}else{
returnValue = false;
if(MasterServer.keepConnect){//长连接
this.registeRead();
}
}
if(returnValue){// 读取完毕放入处理队列
HashMap<String, String> requestData = requestWithFile.getRequestData();
if(requestData != null){
this.getBizObjects().add(requestData);
}
}
return returnValue;
}
//源代码片段来自云代码http://yuncode.net