001package org.xbib.elasticsearch.http.netty; 002 003import org.jboss.netty.channel.ChannelHandler; 004import org.jboss.netty.channel.ChannelHandlerContext; 005import org.jboss.netty.channel.ExceptionEvent; 006import org.jboss.netty.channel.MessageEvent; 007import org.jboss.netty.channel.SimpleChannelUpstreamHandler; 008import org.jboss.netty.handler.codec.http.HttpHeaders; 009import org.jboss.netty.handler.codec.http.HttpRequest; 010import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; 011import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; 012import org.xbib.elasticsearch.websocket.Presence; 013 014/** 015 * Handles HTTP request and upgrades HTTP to WebSocket if appropriate. 016 */ 017@ChannelHandler.Sharable 018public class NettyHttpRequestHandler extends SimpleChannelUpstreamHandler { 019 020 protected final NettyWebSocketServerTransport serverTransport; 021 022 protected WebSocketServerHandshaker handshaker; 023 024 private static String getWebSocketLocation(HttpRequest req) { 025 return "ws://" + req.headers().get(HttpHeaders.Names.HOST) + WEBSOCKET_PATH; 026 } 027 028 private static final String WEBSOCKET_PATH = "/websocket"; 029 030 public NettyHttpRequestHandler(NettyWebSocketServerTransport serverTransport) { 031 this.serverTransport = serverTransport; 032 } 033 034 @Override 035 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 036 HttpRequest request = (HttpRequest) e.getMessage(); 037 if (request.getUri().startsWith(WEBSOCKET_PATH)) { 038 // Websocket handshake 039 WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(getWebSocketLocation(request), null, false); 040 handshaker = wsFactory.newHandshaker(request); 041 if (handshaker == null) { 042 wsFactory.sendUnsupportedWebSocketVersionResponse(ctx.getChannel()); 043 } else { 044 handshaker.handshake(ctx.getChannel(), request).addListener(WebSocketServerHandshaker.HANDSHAKE_LISTENER); 045 } 046 // extract topic from request URI 047 String topic = request.getUri(); 048 if (topic.length() > WEBSOCKET_PATH.length() + 1) { 049 topic = topic.substring(WEBSOCKET_PATH.length() + 1); 050 serverTransport.presence(Presence.CONNECTED, topic, e.getChannel()); 051 } 052 } else { 053 NettyHttpRequest nettyHttpRequest =new NettyHttpRequest(request); 054 serverTransport.dispatchRequest(nettyHttpRequest, new NettyHttpChannel(serverTransport, e.getChannel(), nettyHttpRequest)); 055 super.messageReceived(ctx, e); 056 } 057 } 058 059 @Override 060 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { 061 serverTransport.exceptionCaught(ctx, e); 062 e.getChannel().close(); 063 } 064 065}