001package org.xbib.elasticsearch.websocket; 002 003import org.elasticsearch.ElasticsearchException; 004import org.elasticsearch.common.component.AbstractLifecycleComponent; 005import org.elasticsearch.common.inject.Inject; 006import org.elasticsearch.common.logging.ESLogger; 007import org.elasticsearch.common.logging.ESLoggerFactory; 008import org.elasticsearch.common.settings.Settings; 009import org.elasticsearch.common.xcontent.XContentFactory; 010import org.elasticsearch.common.xcontent.XContentParser; 011import org.jboss.netty.channel.Channel; 012import org.jboss.netty.channel.ChannelHandlerContext; 013import org.jboss.netty.handler.codec.http.websocketx.CloseWebSocketFrame; 014import org.jboss.netty.handler.codec.http.websocketx.PingWebSocketFrame; 015import org.jboss.netty.handler.codec.http.websocketx.PongWebSocketFrame; 016import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame; 017import org.jboss.netty.handler.codec.http.websocketx.WebSocketFrame; 018import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; 019import org.xbib.elasticsearch.http.netty.NettyInteractiveChannel; 020import org.xbib.elasticsearch.http.netty.NettyInteractiveRequest; 021 022import java.nio.charset.Charset; 023import java.util.HashMap; 024import java.util.Map; 025 026/** 027 * The InteractiveController controls the presence of websocket connections 028 * and the flow of websocket frames for interactive use. 029 */ 030public class InteractiveController extends AbstractLifecycleComponent<InteractiveController> { 031 032 private final ESLogger logger = ESLoggerFactory.getLogger(InteractiveController.class.getSimpleName()); 033 034 private final HashMap<String, InteractiveHandler> handlers = new HashMap(); 035 036 @Inject 037 public InteractiveController(Settings settings) { 038 super(settings); 039 } 040 041 @Override 042 protected void doStart() throws ElasticsearchException { 043 } 044 045 @Override 046 protected void doStop() throws ElasticsearchException { 047 } 048 049 @Override 050 protected void doClose() throws ElasticsearchException { 051 } 052 053 public void registerHandler(String type, InteractiveHandler handler) { 054 handlers.put(type, handler); 055 } 056 057 public void presence(Presence presence, String topic, Channel channel) { 058 if (logger.isDebugEnabled()) { 059 logger.debug("presence: " + presence.name() 060 + " topic =" + topic 061 + " channel =" + channel); 062 } 063 } 064 065 public void frame(WebSocketServerHandshaker handshaker, WebSocketFrame frame, ChannelHandlerContext context) { 066 Channel channel = context.getChannel(); 067 if (frame instanceof TextWebSocketFrame) { 068 text((TextWebSocketFrame) frame, channel); 069 } else if (handshaker != null && frame instanceof CloseWebSocketFrame) { 070 handshaker.close(context.getChannel(), (CloseWebSocketFrame) frame); 071 presence(Presence.DISCONNECTED, null, channel); 072 } else if (frame instanceof PingWebSocketFrame) { 073 channel.write(new PongWebSocketFrame(frame.getBinaryData())); 074 } 075 } 076 077 private void text(TextWebSocketFrame frame, Channel channel) { 078 Map<String, Object> map = parse(frame.getBinaryData().toString(Charset.forName("UTF-8"))); 079 if (map == null) { 080 error("invalid request", channel); 081 return; 082 } 083 String type = (String) map.get("type"); 084 if (type == null) { 085 error("no type found", channel); 086 return; 087 } 088 if (!handlers.containsKey(type)) { 089 error("missing handler for type: " + type, channel); 090 return; 091 } 092 Map<String, Object> data = (Map<String, Object>) map.get("data"); 093 handlers.get(type).handleRequest(new NettyInteractiveRequest(data), 094 new NettyInteractiveChannel(channel)); 095 } 096 097 private Map<String, Object> parse(String source) { 098 Map<String, Object> map = null; 099 XContentParser parser = null; 100 try { 101 parser = XContentFactory.xContent(source).createParser(source); 102 map = parser.map(); 103 } catch (Exception e) { 104 logger.error("unable to parse: {}", source); 105 } finally { 106 if (parser != null) { 107 parser.close(); 108 } 109 } 110 return map; 111 } 112 113 private void error(String message, Channel channel) { 114 String text = "{\"ok\":false,\"error\":\"" + message + "\"}"; 115 TextWebSocketFrame frame = new TextWebSocketFrame(text); 116 channel.write(frame); 117 } 118}