はじめに

練習課題(チャットシステム)

クラス構造

classDiagram.png
SelectSever?チャットサーバ。ClientStub?から送られたメッセージを接続されている全てのClientStub?に送り返す
ClientUIチャットクライアントのUI。ほぼ、Visual Editor で作られたモノそのまま
ClientStub?チャットクライアントの通信部分

SelectServer?.java

package com.snail.messenger.server;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import sun.misc.HexDumpEncoder;

public class SelectServer extends Thread {
  private static int PORT = 3456;
  private static int BUF_SIZE = 1024;

  private List<SocketChannel> channelList = new LinkedList<SocketChannel>();

  private Map<SocketChannel, ByteArrayOutputStream> bufferMap = new HashMap<SocketChannel, ByteArrayOutputStream>();

  private Selector selector = null;

  public static void main(String[] args) {
    new SelectServer().start();
  }

  @Override
  public void run() {
    ServerSocketChannel serverChannel = null;

    try {
      selector = Selector.open();
      serverChannel = ServerSocketChannel.open();
      serverChannel.configureBlocking(false);
      serverChannel.socket().bind(new InetSocketAddress(PORT));
      serverChannel.register(selector, SelectionKey.OP_ACCEPT);

      System.out.println("DAEMON WAKEUP!");

      while (selector.select() > 0) {
        Iterator<SelectionKey> keyIt = selector.selectedKeys()
            .iterator();

        while (keyIt.hasNext()) {
          SelectionKey key = keyIt.next();
          keyIt.remove();

          if (key.isAcceptable()) {
            doAccept((ServerSocketChannel) key.channel());
          } else if (key.isReadable()) {
            doRead((SocketChannel) key.channel());
          } else if (key.isWritable()) {
            doWrite((SocketChannel) key.channel());
          }
        }
      }
    } catch (IOException e) {
      e.printStackTrace();
    } finally {
      try {
        System.out.println("DAEMON SHUTDOWN!");
        serverChannel.close();
      } catch (IOException ignoreEx) {
        ignoreEx = null;
      }
    }
  }

  private void doAccept(ServerSocketChannel daemonChannel) {
    try {
      SocketChannel channel = daemonChannel.accept();
      System.out.println("ACCEPT " + channel);
      channel.configureBlocking(false);

      // ×× OP_WRITE を監視対象にすると CPU利用率が100%になる ××
      // 書き込むメッセージがあるときだけ、そのチャンネルの OP_WRITE
      // を監視する。
      // channel.register(selector,
      // SelectionKey.OP_READ + SelectionKey.OP_WRITE);

      channel.register(selector, SelectionKey.OP_READ);

      channelList.add(channel);

      String remoteAddr = channel.socket().getRemoteSocketAddress()
          .toString();
      System.out.println("Connected:" + remoteAddr);
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

  private void doRead(SocketChannel channel) {
    try {
      String remoteAddr = channel.socket().getRemoteSocketAddress()
          .toString();

      ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE);

      if (channel.read(buf) > 0) {
        buf.flip();

        byte[] bytes = new byte[buf.limit()];
        buf.get(bytes);

        System.out.println("Message From:" + remoteAddr);

        HexDumpEncoder hex = new HexDumpEncoder();
        System.out.println(hex.encode(bytes));

        for (SocketChannel client : channelList) {
          System.out.println("SEND:" + client);

          // ×× ここで Channel に write() しちゃダメ ××
          // client.write(ByteBuffer.wrap(bytes));

          // ※ ここでは、Buffer に貯めておいて、Channel が writable
          // ※ になったら、書き出す。
          ByteArrayOutputStream bout = bufferMap.get(client);
          if (bout == null) {
            bout = new ByteArrayOutputStream();
            bufferMap.put(client, bout);
          }
          bout.write(bytes);

          // 宛先チャンネルが Writable になるのを監視する
          client.register(selector, SelectionKey.OP_WRITE);
        }
      }
    } catch (Exception e) {
      // Socketが切断された
      logout(channel);
    }
  }

  private void doWrite(SocketChannel channel) {
    ByteArrayOutputStream bout = bufferMap.get(channel);
    if (bout != null) {
      System.out.println("WRITE CHANNEL");
      try {
        ByteBuffer bbuf = ByteBuffer.wrap(bout.toByteArray());
        int size = channel.write(bbuf);

        System.out.println("SEND " + size + "/" + bbuf.limit());

        if (bbuf.hasRemaining()) {
          // bbufをすべてを送信しきれなかったので、残りをbufferMapに書き戻す
          ByteArrayOutputStream rest = new ByteArrayOutputStream();
          rest.write(bbuf.array(), bbuf.position(), bbuf.remaining());
          bufferMap.put(channel, rest);
          // 宛先チャンネルが Writable になるのを監視し続ける。
          // 宛先チャンネルが切断されたことを検知するために Readable の監視も行う
          channel.register(selector, SelectionKey.OP_READ
              + SelectionKey.OP_WRITE);
        } else {
          // bbufをすべて送信し終わったので、bufferMap今回送信分を削除する
          bufferMap.remove(channel);
          // 宛先チャンネルが Writable になるのを監視するのをやめる
          channel.register(selector, SelectionKey.OP_READ);
        }
      } catch (IOException e) {
        e.printStackTrace();
      }
    }
  }

  private void logout(SocketChannel channel) {
    String remoteAddr = channel.socket().getRemoteSocketAddress()
        .toString();
    System.out.println("LOGOUT :" + remoteAddr);

    try {
      channel.finishConnect();
      channel.close();

      if (channelList.remove(channel)) {
        System.out.println("A CLIENT REMOVED FROM THE CLIENT LIST");
      } else {
        System.out
            .println("FAILED TO REMOVE A CLIENT FROM THE CLIENT LIST");
      }
    } catch (Exception ignoreEx) {
      System.out.println("CHANNEL CLOSE FAILED");
      ignoreEx.printStackTrace();
      ignoreEx = null;

      return;
    }

    System.out.println("CHANNEL CLOSE SUCCESS");
  }
}

Selectorの使い方

selector = Selector.open();
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(PORT));
serverChannel.register(selector, SelectionKey.OP_ACCEPT);

while (selector.select() > 0) {
  Iterator<SelectionKey> keyIt = selector.selectedKeys().iterator();
  
  while (keyIt.hasNext()) {
    SelectionKey key = keyIt.next();
    keyIt.remove();
    
    if (key.isAcceptable()) {
      doAccept((ServerSocketChannel) key.channel());
    } else if (key.isReadable()) {
      doRead((SocketChannel) key.channel());
    } else if (key.isWritable()) {
      doWrite((SocketChannel) key.channel());
    }
  }
}

ServerSocket?の状態監視

Socketの状態監視

doAccept(ServerSocketChannel? daemonChannel)

private void doAccept(ServerSocketChannel daemonChannel) {
  try {
    SocketChannel channel = daemonChannel.accept();
    System.out.println("ACCEPT " + channel);
    channel.configureBlocking(false);
    
    // ×× OP_WRITE を監視対象にすると CPU利用率が100%になる ××
    // 書き込むメッセージがあるときだけ、そのチャンネルの OP_WRITE
    // を監視する。
    // channel.register(selector,
    //   SelectionKey.OP_READ + SelectionKey.OP_WRITE);
    
    channel.register(selector, SelectionKey.OP_READ);
    channelList.add(channel);

    String remoteAddr = channel.socket().getRemoteSocketAddress()
                                .toString();
    System.out.println("Connected:" + remoteAddr);
  } catch (IOException e) {
    e.printStackTrace();
  }
}

doRead(SocketChannel? channel)

private void doRead(SocketChannel channel) {
  try {
    String remoteAddr = channel.socket().getRemoteSocketAddress().toString();

    ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE);

    if (channel.read(buf) > 0) {
      buf.flip();

      byte[] bytes = new byte[buf.limit()];
      buf.get(bytes);

      System.out.println("Message From:" + remoteAddr);

      HexDumpEncoder hex = new HexDumpEncoder();
      System.out.println(hex.encode(bytes));

      for (SocketChannel client : channelList) {
        System.out.println("SEND:" + client);
        
        // ×× ここで Channel に write() しちゃダメ ××
        // client.write(ByteBuffer.wrap(bytes));
        
        // ※ ここでは、Buffer に貯めておいて、Channel が writable
        // ※ になったら、書き出す。
        ByteArrayOutputStream bout = bufferMap.get(client);
        if( bout == null ){
          bout = new ByteArrayOutputStream();
          bufferMap.put(client,bout);
        }
        bout.write(bytes);
        
        // 宛先チャンネルが Writable になるのを監視する
        client.register(selector, SelectionKey.OP_WRITE );
      }
    }
  } catch (Exception e) {
    // Socketが切断された
    logout(channel);
  }
}

doWrite(SocketChannel? channel)

 private void doWrite(SocketChannel channel) {
   ByteArrayOutputStream bout = bufferMap.get(channel);
   if (bout != null) {
     System.out.println("WRITE CHANNEL");
     try {
       ByteBuffer bbuf = ByteBuffer.wrap(bout.toByteArray());
       int size = channel.write(bbuf);

       System.out.println("SEND " + size + "/" + bbuf.limit());

       if (bbuf.hasRemaining()) {
         // bbufをすべてを送信しきれなかったので、残りをbufferMapに書き戻す
         ByteArrayOutputStream rest = new ByteArrayOutputStream();
         rest.write(bbuf.array(), bbuf.position(), bbuf.remaining());
         bufferMap.put(channel, rest);
         // 宛先チャンネルが Writable になるのを監視し続ける。
         // 宛先チャンネルが切断されたことを検知するために Readable の監視も行う
         channel.register(selector, SelectionKey.OP_READ
             + SelectionKey.OP_WRITE);
       } else {
         // bbufをすべて送信し終わったので、bufferMap今回送信分を削除する
         bufferMap.remove(channel);
         // 宛先チャンネルが Writable になるのを監視するのをやめる
         channel.register(selector, SelectionKey.OP_READ);
       }
     } catch (IOException e) {
       e.printStackTrace();
     }
   }
 }

ByteBuffer?について

  1. 初期状態
    ByteBuffer buf = ByteBuffer.allocate(10);
     
    position = 0;
    capacity = 9;
    limit = 9;
    
                                         buf.capacity
     buf.position                        buf.limit
         ↓                                  ↓
         0   1   2   3   4   5   6   7   8   9
    buf [  ][  ][  ][  ][  ][  ][  ][  ][  ][  ]
  2. Channel からのデータの読み込み (ByteBuffer?への書き込み)
    channel.read(buf);
     
    とりあえず 4 byte 読み込んだとすると
     
    position = 4;
    capacity = 9;
    limit = 9;
                                          buf.capacity
                       buf.position       buf.limit
                         ↓                  ↓
          0   1   2   3   4   5   6   7   8   9
    buf [41][42][43][44][45][  ][  ][  ][  ][  ]
  3. byte[] への転記準備
    buf.flip();
     
    position = 0;
    capacity = 9;
    limit = 4;
                                          buf.capacity
      buf.position    buf.limit
         ↓              ↓                   ↓
          0   1   2   3   4   5   6   7   8   9
    buf [41][42][43][44][45][  ][  ][  ][  ][  ]
  4. byte[] への転記
    byte[] bArray = new byte[buf.limit];
    buf.get(bArray) // position〜limit を byte[] に書き出す。
  5. clear
    buf.clear();
     
    せっかくなので clear() して、使い回すべきでしょう。
     
    position = 0;
    capacity = 9;
    limit = 9;
    
                                         buf.capacity
     buf.position                        buf.limit
         ↓                                  ↓
         0   1   2   3   4   5   6   7   8   9
    buf [  ][  ][  ][  ][  ][  ][  ][  ][  ][  ]

ClientStub?.java

InputStream?( Socket#getInputStream?() )と、OutputStream?( Socket#getOutputStream?() )を 別スレッドから同時に使いたいので、Channelは使わない。

package com.snail.messenger.client;
 
import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;
 
 
public abstract class ClientStub extends Thread {
  private static String HOST = "localhost";
  private static int PORT = 3456;
  private Socket socket = null;
 
  public ClientStub() {
    try {
      socket = new Socket(HOST, PORT);
    } catch (UnknownHostException e1) {
      e1.printStackTrace();
    } catch (IOException e1) {
      e1.printStackTrace();
    }
  }
 
  @Override
  public void run() {
    try {
      byte[] buf = new byte[1024];
      int size;
 
      while ((size = socket.getInputStream().read(buf)) > 0) {
        onMessage(new String(buf, 0, size, "UTF-8"));
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
 
    System.out.println("READ THREAD SHUTDOWN");
  }
 
  public void write(String str) throws IOException {
    socket.getOutputStream().write(str.getBytes("UTF-8"));
  }
 
  protected abstract void onMessage(String str);
}

ClientUI.java

基本的に、Visual Editor で画面を作って、Write ボタンを押したら ClientStub?#write() を呼び出し、 ClientStub?#onMessage() が呼び出されたら TextArea? にメッセージを追記するようにしただけ

package com.snail.messenger.client;
 
import java.awt.Rectangle;
import java.io.IOException;
 
import javax.swing.JButton;
import javax.swing.JFrame;
import javax.swing.JPanel;
import javax.swing.JTextArea;
import javax.swing.JTextField;
import javax.swing.SwingUtilities;
 
/**
 * @author kagyuu
 * 
 */
public class ClientUI extends JFrame {
 
  private static final long serialVersionUID = 1L;
  private JPanel jContentPane = null;
  private JTextArea jTextArea = null;
  private JTextField jTextField = null;
  private JButton jButton = null;
  private ClientStub clientStub = null; // @jve:decl-index=0:
  private String userName = "TinyChat";
 
  /**
   * This method initializes jContentPane
   * 
   * @return javax.swing.JPanel
   */
  private JPanel getJContentPane() {
    if (jContentPane == null) {
      try {
        jContentPane = new JPanel();
        jContentPane.setLayout(null); // Generated
        jContentPane.add(getJTextArea(), null); // Generated
        jContentPane.add(getJTextField(), null); // Generated
        jContentPane.add(getJButton(), null); // Generated
      } catch (java.lang.Throwable e) {
        // TODO: Something
      }
    }
    return jContentPane;
  }
 
  /**
   * This method initializes jTextArea
   * 
   * @return javax.swing.JTextArea
   */
  private JTextArea getJTextArea() {
    if (jTextArea == null) {
      try {
        jTextArea = new JTextArea();
        jTextArea.setBounds(new Rectangle(11, 9, 331, 195)); // Generated
        // jTextArea.setEditable(false);
      } catch (java.lang.Throwable e) {
        // TODO: Something
      }
    }
    return jTextArea;
  }
 
  /**
   * This method initializes jTextField
   * 
   * @return javax.swing.JTextField
   */
  private JTextField getJTextField() {
    if (jTextField == null) {
      try {
        jTextField = new JTextField();
        jTextField.setBounds(new Rectangle(9, 216, 260, 27)); // Generated
      } catch (java.lang.Throwable e) {
        // TODO: Something
      }
    }
    return jTextField;
  }
 
  /**
   * This method initializes jButton
   * 
   * @return javax.swing.JButton
   */
  private JButton getJButton() {
    if (jButton == null) {
      try {
        jButton = new JButton();
        jButton.setBounds(new Rectangle(274, 218, 69, 23)); // Generated
        jButton.setText("Write"); // Generated
        jButton.addActionListener(new java.awt.event.ActionListener() {
          public void actionPerformed(java.awt.event.ActionEvent e) {
            System.out.println("actionPerformed()");
            try {
              clientStub.write(userName + " : " + getJTextField().getText());
            } catch (IOException e1) {
              e1.printStackTrace();
            }
          }
        });
      } catch (java.lang.Throwable e) {
        // TODO: Something
      }
    }
    return jButton;
  }
 
  /**
   * @param args
   */
  public static void main(final String[] args) {
    SwingUtilities.invokeLater(new Runnable() {
      public void run() {
        ClientUI thisClass = new ClientUI();
        thisClass.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
        thisClass.setVisible(true);
        thisClass.setTitle(args[0]);
        thisClass.userName = args[0];
      }
    });
  }
 
  /**
   * This is the default constructor
   */
  public ClientUI() {
    super();
    initialize();
  }
 
  /**
   * This method initializes this
   * 
   * @return void
   */
  private void initialize() {
    this.setSize(370, 292);
    this.setContentPane(getJContentPane()); // Generated
    this.setResizable(false);
    this.setTitle(this.userName);
 
    this.clientStub = new ClientStub() {
      @Override
      protected void onMessage(String str) {
        System.out.println(getJTextArea().getText());
        getJTextArea().append(str+"\n");
      }
    };
 
    this.clientStub.start();
  }
}

Java#JavaSE


Java#JavaSE


添付ファイル: filehanako.png 2718件 [詳細] fileclassDiagram.png 2878件 [詳細] filetaro.png 2701件 [詳細]

トップ   編集 凍結 差分 バックアップ 添付 複製 名前変更 リロード   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS   sitemap
Last-modified: 2008-10-13 (月) 00:00:41 (5867d)
Short-URL: http://at-sushi.com/pukiwiki/index.php?cmd=s&k=5b24a2d8b1
ISBN10
ISBN13
9784061426061