001package org.xbib.elasticsearch.http.netty.client;
002
003import org.jboss.netty.bootstrap.ClientBootstrap;
004import org.jboss.netty.channel.Channel;
005import org.jboss.netty.channel.ChannelFuture;
006import org.jboss.netty.channel.ChannelHandlerContext;
007import org.jboss.netty.channel.ChannelStateEvent;
008import org.jboss.netty.channel.ExceptionEvent;
009import org.jboss.netty.channel.MessageEvent;
010import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
011import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
012import org.jboss.netty.handler.codec.http.HttpHeaders.Names;
013import org.jboss.netty.handler.codec.http.HttpHeaders.Values;
014import org.jboss.netty.handler.codec.http.HttpMethod;
015import org.jboss.netty.handler.codec.http.HttpRequest;
016import org.jboss.netty.handler.codec.http.HttpResponse;
017import org.jboss.netty.handler.codec.http.HttpResponseStatus;
018import org.jboss.netty.handler.codec.http.HttpVersion;
019import org.jboss.netty.handler.codec.http.websocketx.WebSocket00FrameDecoder;
020import org.jboss.netty.handler.codec.http.websocketx.WebSocket00FrameEncoder;
021import org.jboss.netty.handler.codec.http.websocketx.WebSocketFrame;
022import org.jboss.netty.util.CharsetUtil;
023import org.xbib.elasticsearch.websocket.client.WebSocketActionListener;
024import org.xbib.elasticsearch.websocket.client.WebSocketClient;
025
026import java.net.InetSocketAddress;
027import java.net.URI;
028
029/**
030 * Handles socket communication for a connected WebSocket client. Not intended
031 * for end-users. Please use {@link NettyWebSocketClient} for controlling your client.
032 */
033public class NettyWebSocketClientHandler extends SimpleChannelUpstreamHandler
034        implements WebSocketClient {
035
036    private final ClientBootstrap bootstrap;
037    private final URI url;
038    private final WebSocketActionListener listener;
039    private boolean handshakeCompleted = false;
040    private Channel channel;
041
042    public NettyWebSocketClientHandler(ClientBootstrap bootstrap, URI url, WebSocketActionListener listener) {
043        this.bootstrap = bootstrap;
044        this.url = url;
045        this.listener = listener;
046    }
047
048    @Override
049    public Channel channel() {
050        return channel;
051    }
052
053    @Override
054    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent event) throws Exception {
055        String path = url.getPath();
056        if (url.getQuery() != null && url.getQuery().length() > 0) {
057            path = url.getPath() + "?" + url.getQuery();
058        }
059        HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, path);
060        request.headers().add(Names.UPGRADE, Values.WEBSOCKET);
061        request.headers().add(Names.CONNECTION, Values.UPGRADE);
062        request.headers().add(Names.HOST, url.getHost());
063        request.headers().add(Names.ORIGIN, "http://" + url.getHost());
064        event.getChannel().write(request);
065        ctx.getPipeline().replace("encoder", "ws-encoder", new WebSocket00FrameEncoder());
066        this.channel = event.getChannel();
067    }
068
069    @Override
070    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent event) throws Exception {
071        listener.onDisconnect(this);
072        handshakeCompleted = false;
073        channel = null;
074    }
075
076    @Override
077    public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
078        if (!handshakeCompleted) {
079            HttpResponse response = (HttpResponse) event.getMessage();
080            final HttpResponseStatus status = new HttpResponseStatus(101, "Web Socket Protocol Handshake");
081            final boolean validStatus = response.getStatus().equals(status);
082            final boolean validUpgrade = response.headers().get(Names.UPGRADE).equals(Values.WEBSOCKET);
083            final boolean validConnection = response.headers().get(Names.CONNECTION).equals(Values.UPGRADE);
084            if (!validStatus || !validUpgrade || !validConnection) {
085                throw new NettyWebSocketException("Invalid handshake response");
086            }
087            handshakeCompleted = true;
088            ctx.getPipeline().replace("decoder", "ws-decoder", new WebSocket00FrameDecoder());
089            listener.onConnect(this);
090            return;
091        }
092        if (event.getMessage() instanceof HttpResponse) {
093            HttpResponse response = (HttpResponse) event.getMessage();
094            throw new NettyWebSocketException("Unexpected HttpResponse (status=" + response.getStatus() + ", content=" + response.getContent().toString(CharsetUtil.UTF_8) + ")");
095        }
096        WebSocketFrame frame = (WebSocketFrame) event.getMessage();
097        listener.onMessage(this, frame);
098    }
099
100    @Override
101    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
102        final Throwable t = e.getCause();
103        listener.onError(t);
104        e.getChannel().close();
105    }
106
107    @Override
108    public ChannelFuture connect() {
109        return bootstrap.connect(new InetSocketAddress(url.getHost(), url.getPort()));
110    }
111
112    @Override
113    public ChannelFuture disconnect() {
114        if (channel == null) {
115            return null;
116        }
117        return channel.close();
118    }
119
120    @Override
121    public ChannelFuture send(WebSocketFrame frame) {
122        if (channel == null) {
123            return null;
124        }
125        return channel.write(frame);
126    }
127
128}