serverChannel.register(selector, SelectionKey.OP_ACCEPT); while (selector.select() > 0) { // 処理 }
DAEMON WAKEUP! ACCEPT java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55029] Connected:/127.0.0.1:55029 ACCEPT java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55032] Connected:/127.0.0.1:55032 Message From:/127.0.0.1:55029 0000: E3 81 9F E3 82 8D E3 81 86 20 3A 20 E4 BB 8A E6 ......... : .... 0010: 97 A5 E3 81 A9 E3 81 86 E3 82 88 EF BC 9F 0A SEND:java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55029] SEND:java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55032] Message From:/127.0.0.1:55032 0000: E3 81 AF E3 81 AA E3 81 93 20 3A 20 E4 BB 8A E6 ......... : .... 0010: 97 A5 E3 81 AF E3 83 80 E3 83 A1 0A SEND:java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55029] SEND:java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55032] Message From:/127.0.0.1:55029 0000: E3 81 9F E3 82 8D E3 81 86 20 3A 20 E6 98 8E E6 ......... : .... 0010: 97 A5 E3 81 AF E3 81 A9 E3 81 86 E3 82 88 EF BC ................ 0020: 9F 0A SEND:java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55029] SEND:java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55032] Message From:/127.0.0.1:55032 0000: E3 81 AF E3 81 AA E3 81 93 20 3A 20 E6 98 8E E6 ......... : .... 0010: 97 A5 E3 81 AF E3 83 80 E3 83 A1 0A SEND:java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55029] SEND:java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55032] Message From:/127.0.0.1:55029 0000: E3 81 9F E3 82 8D E3 81 86 20 3A 20 E6 98 8E E5 ......... : .... 0010: BE 8C E6 97 A5 E3 81 AF E3 81 A9 E3 81 86 E3 82 ................ 0020: 88 EF BC 9F 0A SEND:java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55029] SEND:java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55032] Message From:/127.0.0.1:55032 0000: E3 81 AF E3 81 AA E3 81 93 20 3A 20 E4 BB 8A E6 ......... : .... 0010: 97 A5 E3 82 82 E6 98 8E E6 97 A5 E3 82 82 E6 98 ................ 0020: 8E E5 BE 8C E6 97 A5 E3 82 82 E6 98 8E E6 98 8E ................ 0030: E5 BE 8C E6 97 A5 E3 82 82 E3 83 80 E3 83 A1 0A ................ SEND:java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55029] SEND:java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55032] Message From:/127.0.0.1:55029 0000: E3 81 9F E3 82 8D E3 81 86 20 3A 20 4F 72 7A 0A ......... : Orz. SEND:java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55029] SEND:java.nio.channels.SocketChannel[connected local=/127.0.0.1:3456 remote=/127.0.0.1:55032] LOGOUT :/127.0.0.1:55029 A CLIENT REMOVED FROM THE CLIENT LIST CHANNEL CLOSE SUCCESS LOGOUT :/127.0.0.1:55032 A CLIENT REMOVED FROM THE CLIENT LIST CHANNEL CLOSE SUCCESS
SelectSever? | チャットサーバ。ClientStub?から送られたメッセージを接続されている全てのClientStub?に送り返す |
ClientUI | チャットクライアントのUI。ほぼ、Visual Editor で作られたモノそのまま |
ClientStub? | チャットクライアントの通信部分 |
package com.snail.messenger.server; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import sun.misc.HexDumpEncoder; public class SelectServer extends Thread { private static int PORT = 3456; private static int BUF_SIZE = 1024; private List<SocketChannel> channelList = new LinkedList<SocketChannel>(); private Map<SocketChannel, ByteArrayOutputStream> bufferMap = new HashMap<SocketChannel, ByteArrayOutputStream>(); private Selector selector = null; public static void main(String[] args) { new SelectServer().start(); } @Override public void run() { ServerSocketChannel serverChannel = null; try { selector = Selector.open(); serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); serverChannel.socket().bind(new InetSocketAddress(PORT)); serverChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("DAEMON WAKEUP!"); while (selector.select() > 0) { Iterator<SelectionKey> keyIt = selector.selectedKeys() .iterator(); while (keyIt.hasNext()) { SelectionKey key = keyIt.next(); keyIt.remove(); if (key.isAcceptable()) { doAccept((ServerSocketChannel) key.channel()); } else if (key.isReadable()) { doRead((SocketChannel) key.channel()); } else if (key.isWritable()) { doWrite((SocketChannel) key.channel()); } } } } catch (IOException e) { e.printStackTrace(); } finally { try { System.out.println("DAEMON SHUTDOWN!"); serverChannel.close(); } catch (IOException ignoreEx) { ignoreEx = null; } } } private void doAccept(ServerSocketChannel daemonChannel) { try { SocketChannel channel = daemonChannel.accept(); System.out.println("ACCEPT " + channel); channel.configureBlocking(false); // ×× OP_WRITE を監視対象にすると CPU利用率が100%になる ×× // 書き込むメッセージがあるときだけ、そのチャンネルの OP_WRITE // を監視する。 // channel.register(selector, // SelectionKey.OP_READ + SelectionKey.OP_WRITE); channel.register(selector, SelectionKey.OP_READ); channelList.add(channel); String remoteAddr = channel.socket().getRemoteSocketAddress() .toString(); System.out.println("Connected:" + remoteAddr); } catch (IOException e) { e.printStackTrace(); } } private void doRead(SocketChannel channel) { try { String remoteAddr = channel.socket().getRemoteSocketAddress() .toString(); ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE); if (channel.read(buf) > 0) { buf.flip(); byte[] bytes = new byte[buf.limit()]; buf.get(bytes); System.out.println("Message From:" + remoteAddr); HexDumpEncoder hex = new HexDumpEncoder(); System.out.println(hex.encode(bytes)); for (SocketChannel client : channelList) { System.out.println("SEND:" + client); // ×× ここで Channel に write() しちゃダメ ×× // client.write(ByteBuffer.wrap(bytes)); // ※ ここでは、Buffer に貯めておいて、Channel が writable // ※ になったら、書き出す。 ByteArrayOutputStream bout = bufferMap.get(client); if (bout == null) { bout = new ByteArrayOutputStream(); bufferMap.put(client, bout); } bout.write(bytes); // 宛先チャンネルが Writable になるのを監視する client.register(selector, SelectionKey.OP_WRITE); } } } catch (Exception e) { // Socketが切断された logout(channel); } } private void doWrite(SocketChannel channel) { ByteArrayOutputStream bout = bufferMap.get(channel); if (bout != null) { System.out.println("WRITE CHANNEL"); try { ByteBuffer bbuf = ByteBuffer.wrap(bout.toByteArray()); int size = channel.write(bbuf); System.out.println("SEND " + size + "/" + bbuf.limit()); if (bbuf.hasRemaining()) { // bbufをすべてを送信しきれなかったので、残りをbufferMapに書き戻す ByteArrayOutputStream rest = new ByteArrayOutputStream(); rest.write(bbuf.array(), bbuf.position(), bbuf.remaining()); bufferMap.put(channel, rest); // 宛先チャンネルが Writable になるのを監視し続ける。 // 宛先チャンネルが切断されたことを検知するために Readable の監視も行う channel.register(selector, SelectionKey.OP_READ + SelectionKey.OP_WRITE); } else { // bbufをすべて送信し終わったので、bufferMap今回送信分を削除する bufferMap.remove(channel); // 宛先チャンネルが Writable になるのを監視するのをやめる channel.register(selector, SelectionKey.OP_READ); } } catch (IOException e) { e.printStackTrace(); } } } private void logout(SocketChannel channel) { String remoteAddr = channel.socket().getRemoteSocketAddress() .toString(); System.out.println("LOGOUT :" + remoteAddr); try { channel.finishConnect(); channel.close(); if (channelList.remove(channel)) { System.out.println("A CLIENT REMOVED FROM THE CLIENT LIST"); } else { System.out .println("FAILED TO REMOVE A CLIENT FROM THE CLIENT LIST"); } } catch (Exception ignoreEx) { System.out.println("CHANNEL CLOSE FAILED"); ignoreEx.printStackTrace(); ignoreEx = null; return; } System.out.println("CHANNEL CLOSE SUCCESS"); } }
selector = Selector.open(); serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); serverChannel.socket().bind(new InetSocketAddress(PORT)); serverChannel.register(selector, SelectionKey.OP_ACCEPT); while (selector.select() > 0) { Iterator<SelectionKey> keyIt = selector.selectedKeys().iterator(); while (keyIt.hasNext()) { SelectionKey key = keyIt.next(); keyIt.remove(); if (key.isAcceptable()) { doAccept((ServerSocketChannel) key.channel()); } else if (key.isReadable()) { doRead((SocketChannel) key.channel()); } else if (key.isWritable()) { doWrite((SocketChannel) key.channel()); } } }
Selector selector = Selector.open(); channel.register(selector, SelectionKey.OP_ACCEPT);
OP_ACCEPT | 16 | 監視対象の Channel に誰かが接続にきた |
OP_CONNECT | 8 | 監視対象の Channel が何かに接続した |
OP_READ | 4 | 監視対象の Channel が読み込み可能状態になった |
OP_WRITE | 1 | 監視対象の Channel が書き込み可能状態になった |
while (selector.select() > 0) { ... }が、事実上
while (true) { ... }になるため
|C| +-------------SELECT SERVER------------+ |C| |L| | +----------+ +----------+ | |L| |I|--(データ)--+> | src | | dest | | |I| |E| read | Channel |-->[Buf]-->| Channel | | |E| |N| | | | | |-+-(データ)-->|N| |T| | |(OP_READ) | |(OP_WRITE)| write |T| | | | +----------+ +----------+ | | | |1| +--------------------------------------+ |2|
private void doAccept(ServerSocketChannel daemonChannel) { try { SocketChannel channel = daemonChannel.accept(); System.out.println("ACCEPT " + channel); channel.configureBlocking(false); // ×× OP_WRITE を監視対象にすると CPU利用率が100%になる ×× // 書き込むメッセージがあるときだけ、そのチャンネルの OP_WRITE // を監視する。 // channel.register(selector, // SelectionKey.OP_READ + SelectionKey.OP_WRITE); channel.register(selector, SelectionKey.OP_READ); channelList.add(channel); String remoteAddr = channel.socket().getRemoteSocketAddress() .toString(); System.out.println("Connected:" + remoteAddr); } catch (IOException e) { e.printStackTrace(); } }
private void doRead(SocketChannel channel) { try { String remoteAddr = channel.socket().getRemoteSocketAddress().toString(); ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE); if (channel.read(buf) > 0) { buf.flip(); byte[] bytes = new byte[buf.limit()]; buf.get(bytes); System.out.println("Message From:" + remoteAddr); HexDumpEncoder hex = new HexDumpEncoder(); System.out.println(hex.encode(bytes)); for (SocketChannel client : channelList) { System.out.println("SEND:" + client); // ×× ここで Channel に write() しちゃダメ ×× // client.write(ByteBuffer.wrap(bytes)); // ※ ここでは、Buffer に貯めておいて、Channel が writable // ※ になったら、書き出す。 ByteArrayOutputStream bout = bufferMap.get(client); if( bout == null ){ bout = new ByteArrayOutputStream(); bufferMap.put(client,bout); } bout.write(bytes); // 宛先チャンネルが Writable になるのを監視する client.register(selector, SelectionKey.OP_WRITE ); } } } catch (Exception e) { // Socketが切断された logout(channel); } }
private void doWrite(SocketChannel channel) { ByteArrayOutputStream bout = bufferMap.get(channel); if (bout != null) { System.out.println("WRITE CHANNEL"); try { ByteBuffer bbuf = ByteBuffer.wrap(bout.toByteArray()); int size = channel.write(bbuf); System.out.println("SEND " + size + "/" + bbuf.limit()); if (bbuf.hasRemaining()) { // bbufをすべてを送信しきれなかったので、残りをbufferMapに書き戻す ByteArrayOutputStream rest = new ByteArrayOutputStream(); rest.write(bbuf.array(), bbuf.position(), bbuf.remaining()); bufferMap.put(channel, rest); // 宛先チャンネルが Writable になるのを監視し続ける。 // 宛先チャンネルが切断されたことを検知するために Readable の監視も行う channel.register(selector, SelectionKey.OP_READ + SelectionKey.OP_WRITE); } else { // bbufをすべて送信し終わったので、bufferMap今回送信分を削除する bufferMap.remove(channel); // 宛先チャンネルが Writable になるのを監視するのをやめる channel.register(selector, SelectionKey.OP_READ); } } catch (IOException e) { e.printStackTrace(); } } }
ByteBuffer buf = ByteBuffer.allocate(10);
position = 0; capacity = 9; limit = 9; buf.capacity buf.position buf.limit ↓ ↓ 0 1 2 3 4 5 6 7 8 9 buf [ ][ ][ ][ ][ ][ ][ ][ ][ ][ ]
channel.read(buf);
position = 4; capacity = 9; limit = 9; buf.capacity buf.position buf.limit ↓ ↓ 0 1 2 3 4 5 6 7 8 9 buf [41][42][43][44][45][ ][ ][ ][ ][ ]
buf.flip();
position = 0; capacity = 9; limit = 4; buf.capacity buf.position buf.limit ↓ ↓ ↓ 0 1 2 3 4 5 6 7 8 9 buf [41][42][43][44][45][ ][ ][ ][ ][ ]
byte[] bArray = new byte[buf.limit]; buf.get(bArray) // position〜limit を byte[] に書き出す。
buf.clear();
position = 0; capacity = 9; limit = 9; buf.capacity buf.position buf.limit ↓ ↓ 0 1 2 3 4 5 6 7 8 9 buf [ ][ ][ ][ ][ ][ ][ ][ ][ ][ ]
InputStream?( Socket#getInputStream?() )と、OutputStream?( Socket#getOutputStream?() )を 別スレッドから同時に使いたいので、Channelは使わない。
package com.snail.messenger.client; import java.io.IOException; import java.net.Socket; import java.net.UnknownHostException; public abstract class ClientStub extends Thread { private static String HOST = "localhost"; private static int PORT = 3456; private Socket socket = null; public ClientStub() { try { socket = new Socket(HOST, PORT); } catch (UnknownHostException e1) { e1.printStackTrace(); } catch (IOException e1) { e1.printStackTrace(); } } @Override public void run() { try { byte[] buf = new byte[1024]; int size; while ((size = socket.getInputStream().read(buf)) > 0) { onMessage(new String(buf, 0, size, "UTF-8")); } } catch (IOException e) { e.printStackTrace(); } System.out.println("READ THREAD SHUTDOWN"); } public void write(String str) throws IOException { socket.getOutputStream().write(str.getBytes("UTF-8")); } protected abstract void onMessage(String str); }
基本的に、Visual Editor で画面を作って、Write ボタンを押したら ClientStub?#write() を呼び出し、 ClientStub?#onMessage() が呼び出されたら TextArea? にメッセージを追記するようにしただけ
package com.snail.messenger.client; import java.awt.Rectangle; import java.io.IOException; import javax.swing.JButton; import javax.swing.JFrame; import javax.swing.JPanel; import javax.swing.JTextArea; import javax.swing.JTextField; import javax.swing.SwingUtilities; /** * @author kagyuu * */ public class ClientUI extends JFrame { private static final long serialVersionUID = 1L; private JPanel jContentPane = null; private JTextArea jTextArea = null; private JTextField jTextField = null; private JButton jButton = null; private ClientStub clientStub = null; // @jve:decl-index=0: private String userName = "TinyChat"; /** * This method initializes jContentPane * * @return javax.swing.JPanel */ private JPanel getJContentPane() { if (jContentPane == null) { try { jContentPane = new JPanel(); jContentPane.setLayout(null); // Generated jContentPane.add(getJTextArea(), null); // Generated jContentPane.add(getJTextField(), null); // Generated jContentPane.add(getJButton(), null); // Generated } catch (java.lang.Throwable e) { // TODO: Something } } return jContentPane; } /** * This method initializes jTextArea * * @return javax.swing.JTextArea */ private JTextArea getJTextArea() { if (jTextArea == null) { try { jTextArea = new JTextArea(); jTextArea.setBounds(new Rectangle(11, 9, 331, 195)); // Generated // jTextArea.setEditable(false); } catch (java.lang.Throwable e) { // TODO: Something } } return jTextArea; } /** * This method initializes jTextField * * @return javax.swing.JTextField */ private JTextField getJTextField() { if (jTextField == null) { try { jTextField = new JTextField(); jTextField.setBounds(new Rectangle(9, 216, 260, 27)); // Generated } catch (java.lang.Throwable e) { // TODO: Something } } return jTextField; } /** * This method initializes jButton * * @return javax.swing.JButton */ private JButton getJButton() { if (jButton == null) { try { jButton = new JButton(); jButton.setBounds(new Rectangle(274, 218, 69, 23)); // Generated jButton.setText("Write"); // Generated jButton.addActionListener(new java.awt.event.ActionListener() { public void actionPerformed(java.awt.event.ActionEvent e) { System.out.println("actionPerformed()"); try { clientStub.write(userName + " : " + getJTextField().getText()); } catch (IOException e1) { e1.printStackTrace(); } } }); } catch (java.lang.Throwable e) { // TODO: Something } } return jButton; } /** * @param args */ public static void main(final String[] args) { SwingUtilities.invokeLater(new Runnable() { public void run() { ClientUI thisClass = new ClientUI(); thisClass.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE); thisClass.setVisible(true); thisClass.setTitle(args[0]); thisClass.userName = args[0]; } }); } /** * This is the default constructor */ public ClientUI() { super(); initialize(); } /** * This method initializes this * * @return void */ private void initialize() { this.setSize(370, 292); this.setContentPane(getJContentPane()); // Generated this.setResizable(false); this.setTitle(this.userName); this.clientStub = new ClientStub() { @Override protected void onMessage(String str) { System.out.println(getJTextArea().getText()); getJTextArea().append(str+"\n"); } }; this.clientStub.start(); } }