serverChannel.register(selector, SelectionKey.OP_ACCEPT); while (selector.select() > 0) { // 処理 }
DAEMON WAKEUP! ACCEPT java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55029] Connected:/127.0.0.1:55029 ACCEPT java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55032] Connected:/127.0.0.1:55032 Message From:/127.0.0.1:55029 0000: E3 81 9F E3 82 8D E3 81 86 20 3A 20 E4 BB 8A E6 ......... : .... 0010: 97 A5 E3 81 A9 E3 81 86 E3 82 88 EF BC 9F 0A SEND:java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55029] SEND:java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55032] Message From:/127.0.0.1:55032 0000: E3 81 AF E3 81 AA E3 81 93 20 3A 20 E4 BB 8A E6 ......... : .... 0010: 97 A5 E3 81 AF E3 83 80 E3 83 A1 0A SEND:java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55029] SEND:java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55032] Message From:/127.0.0.1:55029 0000: E3 81 9F E3 82 8D E3 81 86 20 3A 20 E6 98 8E E6 ......... : .... 0010: 97 A5 E3 81 AF E3 81 A9 E3 81 86 E3 82 88 EF BC ................ 0020: 9F 0A SEND:java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55029] SEND:java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55032] Message From:/127.0.0.1:55032 0000: E3 81 AF E3 81 AA E3 81 93 20 3A 20 E6 98 8E E6 ......... : .... 0010: 97 A5 E3 81 AF E3 83 80 E3 83 A1 0A SEND:java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55029] SEND:java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55032] Message From:/127.0.0.1:55029 0000: E3 81 9F E3 82 8D E3 81 86 20 3A 20 E6 98 8E E5 ......... : .... 0010: BE 8C E6 97 A5 E3 81 AF E3 81 A9 E3 81 86 E3 82 ................ 0020: 88 EF BC 9F 0A SEND:java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55029] SEND:java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55032] Message From:/127.0.0.1:55032 0000: E3 81 AF E3 81 AA E3 81 93 20 3A 20 E4 BB 8A E6 ......... : .... 0010: 97 A5 E3 82 82 E6 98 8E E6 97 A5 E3 82 82 E6 98 ................ 0020: 8E E5 BE 8C E6 97 A5 E3 82 82 E6 98 8E E6 98 8E ................ 0030: E5 BE 8C E6 97 A5 E3 82 82 E3 83 80 E3 83 A1 0A ................ SEND:java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55029] SEND:java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55032] Message From:/127.0.0.1:55029 0000: E3 81 9F E3 82 8D E3 81 86 20 3A 20 4F 72 7A 0A ......... : Orz. SEND:java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55029] SEND:java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55032] LOGOUT :/127.0.0.1:55029 A CLIENT REMOVED FROM THE CLIENT LIST CHANNEL CLOSE SUCCESS LOGOUT :/127.0.0.1:55032 A CLIENT REMOVED FROM THE CLIENT LIST CHANNEL CLOSE SUCCESS
SelectSever? | チャットサーバ。ClientStub?から送られたメッセージを接続されている全てのClientStub?に送り返す |
ClientUI | チャットクライアントのUI。ほぼ、Visual Editor で作られたモノそのまま |
ClientStub? | チャットクライアントの通信部分 |
package com.snail.messenger.server;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import sun.misc.HexDumpEncoder;
public class SelectServer extends Thread {
private static int PORT = 3456;
private static int BUF_SIZE = 1024;
private List<SocketChannel> channelList = new LinkedList<SocketChannel>();
private Map<SocketChannel, ByteArrayOutputStream> bufferMap = new HashMap<SocketChannel, ByteArrayOutputStream>();
private Selector selector = null;
public static void main(String[] args) {
new SelectServer().start();
}
@Override
public void run() {
ServerSocketChannel serverChannel = null;
try {
selector = Selector.open();
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(PORT));
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("DAEMON WAKEUP!");
while (selector.select() > 0) {
Iterator<SelectionKey> keyIt = selector.selectedKeys()
.iterator();
while (keyIt.hasNext()) {
SelectionKey key = keyIt.next();
keyIt.remove();
if (key.isAcceptable()) {
doAccept((ServerSocketChannel) key.channel());
} else if (key.isReadable()) {
doRead((SocketChannel) key.channel());
} else if (key.isWritable()) {
doWrite((SocketChannel) key.channel());
}
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
System.out.println("DAEMON SHUTDOWN!");
serverChannel.close();
} catch (IOException ignoreEx) {
ignoreEx = null;
}
}
}
private void doAccept(ServerSocketChannel daemonChannel) {
try {
SocketChannel channel = daemonChannel.accept();
System.out.println("ACCEPT " + channel);
channel.configureBlocking(false);
// ×× OP_WRITE を監視対象にすると CPU利用率が100%になる ××
// 書き込むメッセージがあるときだけ、そのチャンネルの OP_WRITE
// を監視する。
// channel.register(selector,
// SelectionKey.OP_READ + SelectionKey.OP_WRITE);
channel.register(selector, SelectionKey.OP_READ);
channelList.add(channel);
String remoteAddr = channel.socket().getRemoteSocketAddress()
.toString();
System.out.println("Connected:" + remoteAddr);
} catch (IOException e) {
e.printStackTrace();
}
}
private void doRead(SocketChannel channel) {
try {
String remoteAddr = channel.socket().getRemoteSocketAddress()
.toString();
ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE);
if (channel.read(buf) > 0) {
buf.flip();
byte[] bytes = new byte[buf.limit()];
buf.get(bytes);
System.out.println("Message From:" + remoteAddr);
HexDumpEncoder hex = new HexDumpEncoder();
System.out.println(hex.encode(bytes));
for (SocketChannel client : channelList) {
System.out.println("SEND:" + client);
// ×× ここで Channel に write() しちゃダメ ××
// client.write(ByteBuffer.wrap(bytes));
// ※ ここでは、Buffer に貯めておいて、Channel が writable
// ※ になったら、書き出す。
ByteArrayOutputStream bout = bufferMap.get(client);
if (bout == null) {
bout = new ByteArrayOutputStream();
bufferMap.put(client, bout);
}
bout.write(bytes);
// 宛先チャンネルが Writable になるのを監視する
client.register(selector, SelectionKey.OP_WRITE);
}
}
} catch (Exception e) {
// Socketが切断された
logout(channel);
}
}
private void doWrite(SocketChannel channel) {
ByteArrayOutputStream bout = bufferMap.get(channel);
if (bout != null) {
System.out.println("WRITE CHANNEL");
try {
ByteBuffer bbuf = ByteBuffer.wrap(bout.toByteArray());
int size = channel.write(bbuf);
System.out.println("SEND " + size + "/" + bbuf.limit());
if (bbuf.hasRemaining()) {
// bbufをすべてを送信しきれなかったので、残りをbufferMapに書き戻す
ByteArrayOutputStream rest = new ByteArrayOutputStream();
rest.write(bbuf.array(), bbuf.position(), bbuf.remaining());
bufferMap.put(channel, rest);
// 宛先チャンネルが Writable になるのを監視し続ける。
// 宛先チャンネルが切断されたことを検知するために Readable の監視も行う
channel.register(selector, SelectionKey.OP_READ
+ SelectionKey.OP_WRITE);
} else {
// bbufをすべて送信し終わったので、bufferMap今回送信分を削除する
bufferMap.remove(channel);
// 宛先チャンネルが Writable になるのを監視するのをやめる
channel.register(selector, SelectionKey.OP_READ);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void logout(SocketChannel channel) {
String remoteAddr = channel.socket().getRemoteSocketAddress()
.toString();
System.out.println("LOGOUT :" + remoteAddr);
try {
channel.finishConnect();
channel.close();
if (channelList.remove(channel)) {
System.out.println("A CLIENT REMOVED FROM THE CLIENT LIST");
} else {
System.out
.println("FAILED TO REMOVE A CLIENT FROM THE CLIENT LIST");
}
} catch (Exception ignoreEx) {
System.out.println("CHANNEL CLOSE FAILED");
ignoreEx.printStackTrace();
ignoreEx = null;
return;
}
System.out.println("CHANNEL CLOSE SUCCESS");
}
}
selector = Selector.open(); serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); serverChannel.socket().bind(new InetSocketAddress(PORT)); serverChannel.register(selector, SelectionKey.OP_ACCEPT); while (selector.select() > 0) { Iterator<SelectionKey> keyIt = selector.selectedKeys().iterator(); while (keyIt.hasNext()) { SelectionKey key = keyIt.next(); keyIt.remove(); if (key.isAcceptable()) { doAccept((ServerSocketChannel) key.channel()); } else if (key.isReadable()) { doRead((SocketChannel) key.channel()); } else if (key.isWritable()) { doWrite((SocketChannel) key.channel()); } } }
Selector selector = Selector.open(); channel.register(selector, SelectionKey.OP_ACCEPT);
OP_ACCEPT | 16 | 監視対象の Channel に誰かが接続にきた |
OP_CONNECT | 8 | 監視対象の Channel が何かに接続した |
OP_READ | 4 | 監視対象の Channel が読み込み可能状態になった |
OP_WRITE | 1 | 監視対象の Channel が書き込み可能状態になった |
while (selector.select() > 0) { ... }が、事実上
while (true) { ... }になるため
|C| +-------------SELECT SERVER------------+ |C| |L| | +----------+ +----------+ | |L| |I|--(データ)--+> | src | | dest | | |I| |E| read | Channel |-->[Buf]-->| Channel | | |E| |N| | | | | |-+-(データ)-->|N| |T| | |(OP_READ) | |(OP_WRITE)| write |T| | | | +----------+ +----------+ | | | |1| +--------------------------------------+ |2|
private void doAccept(ServerSocketChannel daemonChannel) { try { SocketChannel channel = daemonChannel.accept(); System.out.println("ACCEPT " + channel); channel.configureBlocking(false); // ×× OP_WRITE を監視対象にすると CPU利用率が100%になる ×× // 書き込むメッセージがあるときだけ、そのチャンネルの OP_WRITE // を監視する。 // channel.register(selector, // SelectionKey.OP_READ + SelectionKey.OP_WRITE); channel.register(selector, SelectionKey.OP_READ); channelList.add(channel); String remoteAddr = channel.socket().getRemoteSocketAddress() .toString(); System.out.println("Connected:" + remoteAddr); } catch (IOException e) { e.printStackTrace(); } }
private void doRead(SocketChannel channel) { try { String remoteAddr = channel.socket().getRemoteSocketAddress().toString(); ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE); if (channel.read(buf) > 0) { buf.flip(); byte[] bytes = new byte[buf.limit()]; buf.get(bytes); System.out.println("Message From:" + remoteAddr); HexDumpEncoder hex = new HexDumpEncoder(); System.out.println(hex.encode(bytes)); for (SocketChannel client : channelList) { System.out.println("SEND:" + client); // ×× ここで Channel に write() しちゃダメ ×× // client.write(ByteBuffer.wrap(bytes)); // ※ ここでは、Buffer に貯めておいて、Channel が writable // ※ になったら、書き出す。 ByteArrayOutputStream bout = bufferMap.get(client); if( bout == null ){ bout = new ByteArrayOutputStream(); bufferMap.put(client,bout); } bout.write(bytes); // 宛先チャンネルが Writable になるのを監視する client.register(selector, SelectionKey.OP_WRITE ); } } } catch (Exception e) { // Socketが切断された logout(channel); } }
private void doWrite(SocketChannel channel) { ByteArrayOutputStream bout = bufferMap.get(channel); if (bout != null) { System.out.println("WRITE CHANNEL"); try { ByteBuffer bbuf = ByteBuffer.wrap(bout.toByteArray()); int size = channel.write(bbuf); System.out.println("SEND " + size + "/" + bbuf.limit()); if (bbuf.hasRemaining()) { // bbufをすべてを送信しきれなかったので、残りをbufferMapに書き戻す ByteArrayOutputStream rest = new ByteArrayOutputStream(); rest.write(bbuf.array(), bbuf.position(), bbuf.remaining()); bufferMap.put(channel, rest); // 宛先チャンネルが Writable になるのを監視し続ける。 // 宛先チャンネルが切断されたことを検知するために Readable の監視も行う channel.register(selector, SelectionKey.OP_READ + SelectionKey.OP_WRITE); } else { // bbufをすべて送信し終わったので、bufferMap今回送信分を削除する bufferMap.remove(channel); // 宛先チャンネルが Writable になるのを監視するのをやめる channel.register(selector, SelectionKey.OP_READ); } } catch (IOException e) { e.printStackTrace(); } } }
ByteBuffer buf = ByteBuffer.allocate(10);
position = 0; capacity = 9; limit = 9; buf.capacity buf.position buf.limit ↓ ↓ 0 1 2 3 4 5 6 7 8 9 buf [ ][ ][ ][ ][ ][ ][ ][ ][ ][ ]
channel.read(buf);
position = 4; capacity = 9; limit = 9; buf.capacity buf.position buf.limit ↓ ↓ 0 1 2 3 4 5 6 7 8 9 buf [41][42][43][44][45][ ][ ][ ][ ][ ]
buf.flip();
position = 0; capacity = 9; limit = 4; buf.capacity buf.position buf.limit ↓ ↓ ↓ 0 1 2 3 4 5 6 7 8 9 buf [41][42][43][44][45][ ][ ][ ][ ][ ]
byte[] bArray = new byte[buf.limit]; buf.get(bArray) // position〜limit を byte[] に書き出す。
buf.clear();
position = 0; capacity = 9; limit = 9; buf.capacity buf.position buf.limit ↓ ↓ 0 1 2 3 4 5 6 7 8 9 buf [ ][ ][ ][ ][ ][ ][ ][ ][ ][ ]
InputStream?( Socket#getInputStream?() )と、OutputStream?( Socket#getOutputStream?() )を 別スレッドから同時に使いたいので、Channelは使わない。
package com.snail.messenger.client;
import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;
public abstract class ClientStub extends Thread {
private static String HOST = "localhost";
private static int PORT = 3456;
private Socket socket = null;
public ClientStub() {
try {
socket = new Socket(HOST, PORT);
} catch (UnknownHostException e1) {
e1.printStackTrace();
} catch (IOException e1) {
e1.printStackTrace();
}
}
@Override
public void run() {
try {
byte[] buf = new byte[1024];
int size;
while ((size = socket.getInputStream().read(buf)) > 0) {
onMessage(new String(buf, 0, size, "UTF-8"));
}
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("READ THREAD SHUTDOWN");
}
public void write(String str) throws IOException {
socket.getOutputStream().write(str.getBytes("UTF-8"));
}
protected abstract void onMessage(String str);
}
基本的に、Visual Editor で画面を作って、Write ボタンを押したら ClientStub?#write() を呼び出し、 ClientStub?#onMessage() が呼び出されたら TextArea? にメッセージを追記するようにしただけ
package com.snail.messenger.client;
import java.awt.Rectangle;
import java.io.IOException;
import javax.swing.JButton;
import javax.swing.JFrame;
import javax.swing.JPanel;
import javax.swing.JTextArea;
import javax.swing.JTextField;
import javax.swing.SwingUtilities;
/**
* @author kagyuu
*
*/
public class ClientUI extends JFrame {
private static final long serialVersionUID = 1L;
private JPanel jContentPane = null;
private JTextArea jTextArea = null;
private JTextField jTextField = null;
private JButton jButton = null;
private ClientStub clientStub = null; // @jve:decl-index=0:
private String userName = "TinyChat";
/**
* This method initializes jContentPane
*
* @return javax.swing.JPanel
*/
private JPanel getJContentPane() {
if (jContentPane == null) {
try {
jContentPane = new JPanel();
jContentPane.setLayout(null); // Generated
jContentPane.add(getJTextArea(), null); // Generated
jContentPane.add(getJTextField(), null); // Generated
jContentPane.add(getJButton(), null); // Generated
} catch (java.lang.Throwable e) {
// TODO: Something
}
}
return jContentPane;
}
/**
* This method initializes jTextArea
*
* @return javax.swing.JTextArea
*/
private JTextArea getJTextArea() {
if (jTextArea == null) {
try {
jTextArea = new JTextArea();
jTextArea.setBounds(new Rectangle(11, 9, 331, 195)); // Generated
// jTextArea.setEditable(false);
} catch (java.lang.Throwable e) {
// TODO: Something
}
}
return jTextArea;
}
/**
* This method initializes jTextField
*
* @return javax.swing.JTextField
*/
private JTextField getJTextField() {
if (jTextField == null) {
try {
jTextField = new JTextField();
jTextField.setBounds(new Rectangle(9, 216, 260, 27)); // Generated
} catch (java.lang.Throwable e) {
// TODO: Something
}
}
return jTextField;
}
/**
* This method initializes jButton
*
* @return javax.swing.JButton
*/
private JButton getJButton() {
if (jButton == null) {
try {
jButton = new JButton();
jButton.setBounds(new Rectangle(274, 218, 69, 23)); // Generated
jButton.setText("Write"); // Generated
jButton.addActionListener(new java.awt.event.ActionListener() {
public void actionPerformed(java.awt.event.ActionEvent e) {
System.out.println("actionPerformed()");
try {
clientStub.write(userName + " : " + getJTextField().getText());
} catch (IOException e1) {
e1.printStackTrace();
}
}
});
} catch (java.lang.Throwable e) {
// TODO: Something
}
}
return jButton;
}
/**
* @param args
*/
public static void main(final String[] args) {
SwingUtilities.invokeLater(new Runnable() {
public void run() {
ClientUI thisClass = new ClientUI();
thisClass.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
thisClass.setVisible(true);
thisClass.setTitle(args[0]);
thisClass.userName = args[0];
}
});
}
/**
* This is the default constructor
*/
public ClientUI() {
super();
initialize();
}
/**
* This method initializes this
*
* @return void
*/
private void initialize() {
this.setSize(370, 292);
this.setContentPane(getJContentPane()); // Generated
this.setResizable(false);
this.setTitle(this.userName);
this.clientStub = new ClientStub() {
@Override
protected void onMessage(String str) {
System.out.println(getJTextArea().getText());
getJTextArea().append(str+"\n");
}
};
this.clientStub.start();
}
}