使用Selector的原因
Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。
仅用单个线程来处理多个Channels的好处是,只需要更少的线程来处理通道。事实上,可以只用一个线程处理所有的通道。对于操作系统来说,线程之间上下文切换的开销很大,而且每个线程都要占用系统的一些资源(如内存)。因此,使用的线程越少越好。
但是,需要记住,现代的操作系统和CPU在多任务方面表现的越来越好,所以多线程的开销随着时间的推移,变得越来越小了。实际上,如果一个CPU有多个内核,不使用多任务可能是在浪费CPU能力。不管怎么说,关于那种设计的讨论应该放在另一篇不同的文章中。在这里,只要知道使用Selector能够处理多个通道就足够了。
下面是单线程使用一个Selector处理3个channel的示例图:
范例
说明一下,实际上,对数据的处理应该放在一个专门的线程中来执行,但是我为了使代码更简洁,看起来更容易理解,因此并没有这么做!!!
BaseSocketChannelSelector.java
package com.demo.channel;
import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Set;
public abstract class BaseSocketChannelSelector {
private Selector selector;
protected BaseSocketChannelSelector()
throws IOException {
selector = Selector.open();
}
protected void registerChannel(SelectableChannel channel,
int ops) {
try {
channel.configureBlocking(
false);
SelectionKey key = channel.register(selector, ops);
}
catch (IOException e) {
e.printStackTrace();
}
}
public void select() {
while (
true) {
try {
if (!selector.isOpen()) {
System.out.println(
"selector is closed");
break;
}
int readyChannels = selector.select();
if (readyChannels ==
0) {
continue;
}
if (!selector.isOpen()) {
System.out.println(
"selector is closed");
break;
}
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectedKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isValid()) {
doOnSelectionKey(key);
}
iterator.remove();
}
}
catch (IOException e) {
e.printStackTrace();
}
}
}
protected abstract void doOnSelectionKey(SelectionKey key);
protected void closeSelector() {
try {
selector.close();
}
catch (IOException e) {
e.printStackTrace();
}
}
}
TcpClientChannelSelector.java
package com.demo.channel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
public class TcpClientChannelSelector extends BaseSocketChannelSelector {
private static final String HOSTNAME =
"127.0.0.1";
private static final int PORT =
5555;
private static final int BUFF_SIZE =
255;
private SocketChannel socketChannel;
private ByteBuffer readBuffer;
private ByteBuffer writeBuffer;
private boolean isConnected;
public TcpClientChannelSelector()
throws IOException {
super();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(
false);
socketChannel.connect(
new InetSocketAddress(HOSTNAME, PORT));
readBuffer = ByteBuffer.allocate(BUFF_SIZE);
writeBuffer = ByteBuffer.allocate(BUFF_SIZE);
isConnected =
false;
registerChannel(socketChannel, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
}
public boolean isConnected() {
return isConnected;
}
@Override
public void doOnSelectionKey(SelectionKey key) {
if (key.isAcceptable()) {
System.out.println(
"client is accepted by server");
}
else if (key.isConnectable()) {
SocketChannel channel = (SocketChannel) key.channel();
if (channel.isConnectionPending()) {
try {
if (channel.finishConnect()) {
System.out.println(
"client connect server succ");
isConnected =
true;
}
}
catch (IOException e) {
e.printStackTrace();
}
}
}
else if (key.isReadable()) {
try {
int len = socketChannel.read(readBuffer);
if (len == -
1) {
System.out.println(
"client read : len=-1");
close();
}
else {
readBuffer.flip();
byte[] buffer =
new byte[len];
readBuffer.get(buffer);
readBuffer.clear();
System.out.println(
"client read : len=" + len +
", str=" +
new String(buffer));
}
}
catch (IOException e) {
e.printStackTrace();
}
}
}
public void write(String str) {
byte[] buffer = str.getBytes();
writeBuffer.put(buffer);
writeBuffer.flip();
try {
System.out.println(
"client write : len=" + buffer.length +
", str=" + str);
socketChannel.write(writeBuffer);
}
catch (IOException e) {
e.printStackTrace();
}
writeBuffer.clear();
}
public void close() {
System.out.println(
"close client");
isConnected =
false;
super.closeSelector();
try {
socketChannel.close();
}
catch (IOException e) {
e.printStackTrace();
}
}
}
TcpServerChannelSelector.java
package com.demo.channel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
public class TcpServerChannelSelector extends BaseSocketChannelSelector {
private static final String HOSTNAME =
"127.0.0.1";
private static final int PORT =
5555;
private static final int BUFF_SIZE =
255;
private ServerSocketChannel serverSocketChannel;
private ArrayList<SocketChannel> socketChannelList;
private ByteBuffer readBuffer;
private ByteBuffer writeBuffer;
public TcpServerChannelSelector()
throws IOException {
super();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(
false);
serverSocketChannel.bind(
new InetSocketAddress(HOSTNAME, PORT));
socketChannelList =
new ArrayList<SocketChannel>();
readBuffer = ByteBuffer.allocate(BUFF_SIZE);
writeBuffer = ByteBuffer.allocate(BUFF_SIZE);
registerChannel(serverSocketChannel, SelectionKey.OP_ACCEPT);
}
@Override
protected void doOnSelectionKey(SelectionKey key) {
if (key.isAcceptable()) {
try {
SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept();
socketChannelList.add(socketChannel);
System.out.println(
"server find client : " + socketChannel.getRemoteAddress());
registerChannel(socketChannel, SelectionKey.OP_READ);
}
catch (IOException e) {
e.printStackTrace();
}
}
else if (key.isReadable()) {
try {
SocketChannel socketChannel = (SocketChannel) key.channel();
int len = socketChannel.read(readBuffer);
if (len == -
1) {
System.out.println(
"client read : len=-1");
socketChannel.close();
socketChannelList.remove(socketChannel);
}
else {
readBuffer.flip();
byte[] buffer =
new byte[len];
readBuffer.get(buffer,
0, len);
readBuffer.clear();
String str =
new String(buffer);
System.out.println(
"server read : len=" + len +
", str=" + str);
write(str);
}
}
catch (IOException e) {
e.printStackTrace();
}
}
}
public void write(String str) {
byte[] buffer = str.getBytes();
writeBuffer.put(buffer);
writeBuffer.flip();
for (SocketChannel channel : socketChannelList) {
try {
System.out.println(
"server write : len=" + buffer.length +
", str=" + str);
channel.write(writeBuffer);
}
catch (IOException e) {
e.printStackTrace();
}
writeBuffer.rewind();
}
writeBuffer.clear();
}
public void close() {
System.out.println(
"close server");
super.closeSelector();
for (SocketChannel channel : socketChannelList) {
try {
channel.close();
}
catch (IOException e) {
e.printStackTrace();
}
}
socketChannelList.clear();
try {
serverSocketChannel.close();
}
catch (IOException e) {
e.printStackTrace();
}
}
}
SelectDemo.java
package com.demo.channel;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SelectDemo {
private static TcpServerChannelSelector serverChannelSelector;
private static TcpClientChannelSelector clientChannelSelector;
public static void main(String[] args) {
int nThreads =
2;
try {
serverChannelSelector =
new TcpServerChannelSelector();
clientChannelSelector =
new TcpClientChannelSelector();
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
executorService.execute(
new Runnable() {
@Override
public void run() {
serverChannelSelector.select();
}
});
executorService.execute(
new Runnable() {
@Override
public void run() {
clientChannelSelector.select();
}
});
while (!clientChannelSelector.isConnected()) {
try {
Thread.sleep(
500);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
for (
int i =
0; i <
5; i++) {
String str =
"(" + i +
")" +
new Date();
clientChannelSelector.write(str);
}
try {
Thread.sleep(
1000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
serverChannelSelector.close();
clientChannelSelector.close();
}
catch (IOException e) {
e.printStackTrace();
}
}
}
执行结果
client connect server succ
server find client : /
127.0.0.1:
59651
client write : len=
31, str=(
0)Mon
Dec 05 19:
12:
46 CST
2016
client write : len=
31, str=(
1)Mon
Dec 05 19:
12:
46 CST
2016
server read : len=
31, str=(
0)Mon
Dec 05 19:
12:
46 CST
2016
client write : len=
31, str=(
2)Mon
Dec 05 19:
12:
46 CST
2016
server write : len=
31, str=(
0)Mon
Dec 05 19:
12:
46 CST
2016
client write : len=
31, str=(
3)Mon
Dec 05 19:
12:
46 CST
2016
server read : len=
62, str=(
1)Mon
Dec 05 19:
12:
46 CST
2016(
2)Mon
Dec 05 19:
12:
46 CST
2016
server write : len=
62, str=(
1)Mon
Dec 05 19:
12:
46 CST
2016(
2)Mon
Dec 05 19:
12:
46 CST
2016
client write : len=
31, str=(
4)Mon
Dec 05 19:
12:
46 CST
2016
server read : len=
31, str=(
3)Mon
Dec 05 19:
12:
46 CST
2016
server write : len=
31, str=(
3)Mon
Dec 05 19:
12:
46 CST
2016
server read : len=
31, str=(
4)Mon
Dec 05 19:
12:
46 CST
2016
server write : len=
31, str=(
4)Mon
Dec 05 19:
12:
46 CST
2016
client read : len=
155, str=(
0)Mon
Dec 05 19:
12:
46 CST
2016(
1)Mon
Dec 05 19:
12:
46 CST
2016(
2)Mon
Dec 05 19:
12:
46 CST
2016(
3)Mon
Dec 05 19:
12:
46 CST
2016(
4)Mon
Dec 05 19:
12:
46 CST
2016
close server
selector is closed
close client
Exception
in thread
"pool-1-thread-2" java
.nio.channels.ClosedSelectorException
at sun
.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl
.java:
83)
at sun
.nio.ch.SelectorImpl.select(SelectorImpl
.java:
97)
at sun
.nio.ch.SelectorImpl.select(SelectorImpl
.java:
101)
at
com.demo.channel.BaseSocketChannelSelector.select(BaseSocketChannelSelector
.java:
40)
at
com.demo.channel.SelectDemo$2
.run(SelectDemo
.java:
29)
at java
.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor
.java:
1142)
at java
.util.concurrent.ThreadPoolExecutor$Worker
.run(ThreadPoolExecutor
.java:
617)
at java
.lang.Thread.run(Thread
.java:
745)
参考
http://ifeve.com/selectors/http://blog.csdn.net/windsunmoon/article/details/45373457