これは何? †
- Integrating WebSockets and JMS with CDI Events in Java EE 7 を読んで作った。最期に著者の実装サンプルへのリンクを発見、ほぼ同じだけどチョット違うものになった。
- Web Socket と EJB を連携させる
- Web Socket に閉じたアプリはほぼないでしょう
- Web Socket Endpoint から EJB を呼び出したいよね
- EJB (業務ロジック) を起点に Web Socket で PUSH 通知したいよね
- というサンプル
- 処理の流れ
- ブラウザから Message が Web Socket Endpoint に送られる
- Web Socket Endpoint から Session Bean を呼び出す。Session Bean から JMS(Queue) に Message を投入
- Message Driven Bean は JMS(Queue) を監視している。JMS(Queue) に Message が投入されると、Message Driven Bean が CDI のイベントを発行
- Web Socket Endpoint は CDI のイベントを監視している。CDI イベントが発行されると、Web Socket Endpoint が ブラウザにメッセージを PUSH 通知
- これそのままでは意味ないけど、別の Request&Reply 型の処理を契機に Web Socket で PUSH 通知をしたいときに、間に JMS を挟む当方式が応用できる
- myQueue は Glassfish JMS で作ったやつ
WebSocketEndpoint? †
- @ServerEndpoint?("/websocket") がついたクラスが WebSocket? の Endpoint になる。アノテーションの value がサービスのパス
- イベント処理
@OnOpen? | WebSocket? 接続処理 |
@OnMessage? | WebSocket? メッセージ到達処理 |
@OnClose? | WebSocket? 切断処理 |
@OnError? | エラー処理 |
- イベント処理の引数で与えられる javax.websocket.Session で接続を管理
Session#getUserPrincipal?() | リモートユーザを取得 |
Session#getAsyncRemote?().sendText(String) | 非同期でメッセージをPUSH |
Session#getBasicRemote?().sendText(String) | 同期でメッセージをPUSH |
- 直接 JMS を使えないので、@EJB で SessionBean? を Inject して中継させる
- onJMSMessage(@Observes @WSJMSMessage Message msg) は、CDI イベントのハンドラ
- 引数に @javax.enterprise.event.Observes を指定すると CDI のイベントハンドラになる
- このままだと、Message 型の情報を含むイベントはすべて受信してしまうので、自分で作った @WSJMSMessage アノテーションで絞り込む
- (イベント発行側 (Message Driven Bean) では @WSJMSMessage アノテーション付きで CDI イベントを発行する)
package com.mycompany.websocketexam;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import javax.ejb.EJB;
import javax.enterprise.event.Observes;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ServerEndpoint("/websocket")
public class WebSocketEndpoint implements Serializable {
private static final Logger logger
= LoggerFactory.getLogger(WebSocketEndpoint.class);
@EJB
private QueueSenderSessionBean senderBean;
private static final Set<Session> sessions
= Collections.synchronizedSet(new HashSet<Session>());
@OnOpen
public void onOpen(final Session session) {
logger.info("Session Open {}", session.getId());
session.getAsyncRemote().sendText("Session Opened");
sessions.add(session);
}
@OnMessage
public void onMessage(final String message, final Session client) {
logger.info("Call QueueSenderSessionBean");
senderBean.sendMessage(message);
}
@OnClose
public void onClose(final Session session) {
logger.info("Session Close {}", session.getId());
sessions.remove(session);
}
@OnError
public void onError(Throwable t) {
logger.error("Error", t);
}
public void onJMSMessage(@Observes @WSJMSMessage Message msg) {
logger.info("Receive CDI Event {}", msg);
sessions.forEach(s -> {
try {
s.getBasicRemote().sendText(msg.getBody(String.class));
} catch (IOException | JMSException ex) {
logger.error("Error", ex);
}
});
}
}
package com.mycompany.websocketexam;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import javax.inject.Qualifier;
@Qualifier
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER, ElementType.TYPE})
public @interface WSJMSMessage {
}
QueueSenderSessionBean? †
- WebSocketEndpoint? から呼び出されて jms/myQueue にメッセージを投入しているだけ
package com.mycompany.websocketexam;
import javax.annotation.Resource;
import javax.ejb.LocalBean;
import javax.ejb.Stateless;
import javax.inject.Inject;
import javax.jms.JMSContext;
import javax.jms.Queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@LocalBean
@Stateless
public class QueueSenderSessionBean {
private static final Logger logger
= LoggerFactory.getLogger(QueueSenderSessionBean.class);
@Resource(mappedName = "jms/myQueue")
private Queue myQueue;
@Inject
private JMSContext jmsContext;
public void sendMessage(String message) {
logger.info("Enqueu Message to JMS");
jmsContext.createProducer().send(myQueue, message);
}
}
WebSocketMDB †
- jms/myQueue を監視し、メッセージが到達したら CDI イベントを発行している
package com.mycompany.websocketexam;
import javax.ejb.MessageDriven;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.enterprise.event.Event;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@MessageDriven(mappedName = "jms/myQueue")
public class WebSocketMDB implements MessageListener {
private static final Logger logger
= LoggerFactory.getLogger(WebSocketMDB.class);
@Inject
@WSJMSMessage
private Event<Message> jmsEvent;
@Override
public void onMessage(Message msg) {
logger.info("Fire CDI Event");
jmsEvent.fire(msg);
}
}
index.html(WebSocket?クライアント) †
<!DOCTYPE html>
<html>
<head>
<title>Start Page</title>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
</head>
<body>
<h1>WebSocketテスト</h1>
<div id="log">
</div>
<input id="msg" type="text"/>
<button id="send">送信</button>
<script src="https://code.jquery.com/jquery-2.1.4.min.js"></script>
<script type="text/javascript">
var socket = new WebSocket('ws://' + document.location.host + document.location.pathname +'websocket');
socket.onmessage = function(message){
$('#log').append(message.data + "<br/>");
};
$('#send').click(function(){
var text = $('#msg').val();
socket.send(text);
$('#msg').val('');
});
</script>
</body>
</html>
実行イメージ †
情報: 2015-09-10 01:33:34,165 [http-listener-1(2)] INFO c.m.w.WebSocketEndpoint Session Open 89907c41-c5d1-4834-bc91-6b51f81b061c
情報: 2015-09-10 01:33:42,203 [http-listener-1(3)] INFO c.m.w.WebSocketEndpoint Call QueueSenderSessionBean
情報: 2015-09-10 01:33:42,283 [http-listener-1(3)] INFO c.m.w.QueueSenderSessionBean Enqueu Message to JMS
情報: 2015-09-10 01:33:42,487 [p: thread-pool-1; w: 5] INFO c.m.w.WebSocketMDB Fire CDI Event
情報: 2015-09-10 01:33:42,488 [p: thread-pool-1; w: 5] INFO c.m.w.WebSocketEndpoint Receive CDI Event com.sun.messaging.jms.ra.DirectTextPacket@4a68d6
情報: 2015-09-10 01:33:46,425 [http-listener-1(4)] INFO c.m.w.WebSocketEndpoint Call QueueSenderSessionBean
情報: 2015-09-10 01:33:46,426 [http-listener-1(4)] INFO c.m.w.QueueSenderSessionBean Enqueu Message to JMS
情報: 2015-09-10 01:33:46,433 [p: thread-pool-1; w: 1] INFO c.m.w.WebSocketMDB Fire CDI Event
情報: 2015-09-10 01:33:46,433 [p: thread-pool-1; w: 1] INFO c.m.w.WebSocketEndpoint Receive CDI Event com.sun.messaging.jms.ra.DirectTextPacket@170bae7e
情報: 2015-09-10 01:34:12,804 [http-listener-1(5)] INFO c.m.w.WebSocketEndpoint Session Close 89907c41-c5d1-4834-bc91-6b51f81b061c
Java#Glassfish