001package org.xbib.elasticsearch.http.netty;
002
003import org.jboss.netty.channel.ChannelHandler;
004import org.jboss.netty.channel.ChannelHandlerContext;
005import org.jboss.netty.channel.ExceptionEvent;
006import org.jboss.netty.channel.MessageEvent;
007import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
008import org.jboss.netty.handler.codec.http.HttpHeaders;
009import org.jboss.netty.handler.codec.http.HttpRequest;
010import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
011import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
012import org.xbib.elasticsearch.websocket.Presence;
013
014/**
015 * Handles HTTP request and upgrades HTTP to WebSocket if appropriate.
016 */
017@ChannelHandler.Sharable
018public class NettyHttpRequestHandler extends SimpleChannelUpstreamHandler {
019
020    protected final NettyWebSocketServerTransport serverTransport;
021
022    protected WebSocketServerHandshaker handshaker;
023
024    private static String getWebSocketLocation(HttpRequest req) {
025        return "ws://" + req.headers().get(HttpHeaders.Names.HOST) + WEBSOCKET_PATH;
026    }
027
028    private static final String WEBSOCKET_PATH = "/websocket";
029
030    public NettyHttpRequestHandler(NettyWebSocketServerTransport serverTransport) {
031        this.serverTransport = serverTransport;
032    }
033
034    @Override
035    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
036        HttpRequest request = (HttpRequest) e.getMessage();
037        if (request.getUri().startsWith(WEBSOCKET_PATH)) {
038            // Websocket handshake
039            WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(getWebSocketLocation(request), null, false);
040            handshaker = wsFactory.newHandshaker(request);
041            if (handshaker == null) {
042                wsFactory.sendUnsupportedWebSocketVersionResponse(ctx.getChannel());
043            } else {
044                handshaker.handshake(ctx.getChannel(), request).addListener(WebSocketServerHandshaker.HANDSHAKE_LISTENER);
045            }
046            // extract topic from request URI
047            String topic = request.getUri();
048            if (topic.length() > WEBSOCKET_PATH.length() + 1) {
049                topic = topic.substring(WEBSOCKET_PATH.length() + 1);
050                serverTransport.presence(Presence.CONNECTED, topic, e.getChannel());
051            }
052        } else {
053            NettyHttpRequest nettyHttpRequest =new NettyHttpRequest(request);
054            serverTransport.dispatchRequest(nettyHttpRequest, new NettyHttpChannel(serverTransport, e.getChannel(), nettyHttpRequest));
055            super.messageReceived(ctx, e);
056        }
057    }
058
059    @Override
060    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
061        serverTransport.exceptionCaught(ctx, e);
062        e.getChannel().close();
063    }
064
065}