001package org.xbib.elasticsearch.websocket;
002
003import org.elasticsearch.ElasticsearchException;
004import org.elasticsearch.common.component.AbstractLifecycleComponent;
005import org.elasticsearch.common.inject.Inject;
006import org.elasticsearch.common.logging.ESLogger;
007import org.elasticsearch.common.logging.ESLoggerFactory;
008import org.elasticsearch.common.settings.Settings;
009import org.elasticsearch.common.xcontent.XContentFactory;
010import org.elasticsearch.common.xcontent.XContentParser;
011import org.jboss.netty.channel.Channel;
012import org.jboss.netty.channel.ChannelHandlerContext;
013import org.jboss.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
014import org.jboss.netty.handler.codec.http.websocketx.PingWebSocketFrame;
015import org.jboss.netty.handler.codec.http.websocketx.PongWebSocketFrame;
016import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame;
017import org.jboss.netty.handler.codec.http.websocketx.WebSocketFrame;
018import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
019import org.xbib.elasticsearch.http.netty.NettyInteractiveChannel;
020import org.xbib.elasticsearch.http.netty.NettyInteractiveRequest;
021
022import java.nio.charset.Charset;
023import java.util.HashMap;
024import java.util.Map;
025
026/**
027 * The InteractiveController controls the presence of websocket connections
028 * and the flow of websocket frames for interactive use.
029 */
030public class InteractiveController extends AbstractLifecycleComponent<InteractiveController> {
031
032    private final ESLogger logger = ESLoggerFactory.getLogger(InteractiveController.class.getSimpleName());
033
034    private final HashMap<String, InteractiveHandler> handlers = new HashMap();
035
036    @Inject
037    public InteractiveController(Settings settings) {
038        super(settings);
039    }
040
041    @Override
042    protected void doStart() throws ElasticsearchException {
043    }
044
045    @Override
046    protected void doStop() throws ElasticsearchException {
047    }
048
049    @Override
050    protected void doClose() throws ElasticsearchException {
051    }
052
053    public void registerHandler(String type, InteractiveHandler handler) {
054        handlers.put(type, handler);
055    }
056
057    public void presence(Presence presence, String topic, Channel channel) {
058        if (logger.isDebugEnabled()) {
059            logger.debug("presence: " + presence.name()
060                    + " topic =" + topic
061                    + " channel =" + channel);
062        }
063    }
064
065    public void frame(WebSocketServerHandshaker handshaker, WebSocketFrame frame, ChannelHandlerContext context) {
066        Channel channel = context.getChannel();
067        if (frame instanceof TextWebSocketFrame) {
068            text((TextWebSocketFrame) frame, channel);
069        } else if (handshaker != null && frame instanceof CloseWebSocketFrame) {
070            handshaker.close(context.getChannel(), (CloseWebSocketFrame) frame);
071            presence(Presence.DISCONNECTED, null, channel);
072        } else if (frame instanceof PingWebSocketFrame) {
073            channel.write(new PongWebSocketFrame(frame.getBinaryData()));
074        }
075    }
076
077    private void text(TextWebSocketFrame frame, Channel channel) {
078        Map<String, Object> map = parse(frame.getBinaryData().toString(Charset.forName("UTF-8")));
079        if (map == null) {
080            error("invalid request", channel);
081            return;
082        }
083        String type = (String) map.get("type");
084        if (type == null) {
085            error("no type found", channel);
086            return;
087        }
088        if (!handlers.containsKey(type)) {
089            error("missing handler for type: " + type, channel);
090            return;
091        }
092        Map<String, Object> data = (Map<String, Object>) map.get("data");
093        handlers.get(type).handleRequest(new NettyInteractiveRequest(data),
094                new NettyInteractiveChannel(channel));
095    }
096
097    private Map<String, Object> parse(String source) {
098        Map<String, Object> map = null;
099        XContentParser parser = null;
100        try {
101            parser = XContentFactory.xContent(source).createParser(source);
102            map = parser.map();
103        } catch (Exception e) {
104            logger.error("unable to parse: {}", source);
105        } finally {
106            if (parser != null) {
107                parser.close();
108            }
109        }
110        return map;
111    }
112
113    private void error(String message, Channel channel) {
114        String text = "{\"ok\":false,\"error\":\"" + message + "\"}";
115        TextWebSocketFrame frame = new TextWebSocketFrame(text);
116        channel.write(frame);
117    }
118}