001package org.xbib.elasticsearch.http.netty.client; 002 003import org.jboss.netty.bootstrap.ClientBootstrap; 004import org.jboss.netty.channel.Channel; 005import org.jboss.netty.channel.ChannelFuture; 006import org.jboss.netty.channel.ChannelHandlerContext; 007import org.jboss.netty.channel.ChannelStateEvent; 008import org.jboss.netty.channel.ExceptionEvent; 009import org.jboss.netty.channel.MessageEvent; 010import org.jboss.netty.channel.SimpleChannelUpstreamHandler; 011import org.jboss.netty.handler.codec.http.DefaultHttpRequest; 012import org.jboss.netty.handler.codec.http.HttpHeaders.Names; 013import org.jboss.netty.handler.codec.http.HttpHeaders.Values; 014import org.jboss.netty.handler.codec.http.HttpMethod; 015import org.jboss.netty.handler.codec.http.HttpRequest; 016import org.jboss.netty.handler.codec.http.HttpResponse; 017import org.jboss.netty.handler.codec.http.HttpResponseStatus; 018import org.jboss.netty.handler.codec.http.HttpVersion; 019import org.jboss.netty.handler.codec.http.websocketx.WebSocket00FrameDecoder; 020import org.jboss.netty.handler.codec.http.websocketx.WebSocket00FrameEncoder; 021import org.jboss.netty.handler.codec.http.websocketx.WebSocketFrame; 022import org.jboss.netty.util.CharsetUtil; 023import org.xbib.elasticsearch.websocket.client.WebSocketActionListener; 024import org.xbib.elasticsearch.websocket.client.WebSocketClient; 025 026import java.net.InetSocketAddress; 027import java.net.URI; 028 029/** 030 * Handles socket communication for a connected WebSocket client. Not intended 031 * for end-users. Please use {@link NettyWebSocketClient} for controlling your client. 032 */ 033public class NettyWebSocketClientHandler extends SimpleChannelUpstreamHandler 034 implements WebSocketClient { 035 036 private final ClientBootstrap bootstrap; 037 private final URI url; 038 private final WebSocketActionListener listener; 039 private boolean handshakeCompleted = false; 040 private Channel channel; 041 042 public NettyWebSocketClientHandler(ClientBootstrap bootstrap, URI url, WebSocketActionListener listener) { 043 this.bootstrap = bootstrap; 044 this.url = url; 045 this.listener = listener; 046 } 047 048 @Override 049 public Channel channel() { 050 return channel; 051 } 052 053 @Override 054 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent event) throws Exception { 055 String path = url.getPath(); 056 if (url.getQuery() != null && url.getQuery().length() > 0) { 057 path = url.getPath() + "?" + url.getQuery(); 058 } 059 HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, path); 060 request.headers().add(Names.UPGRADE, Values.WEBSOCKET); 061 request.headers().add(Names.CONNECTION, Values.UPGRADE); 062 request.headers().add(Names.HOST, url.getHost()); 063 request.headers().add(Names.ORIGIN, "http://" + url.getHost()); 064 event.getChannel().write(request); 065 ctx.getPipeline().replace("encoder", "ws-encoder", new WebSocket00FrameEncoder()); 066 this.channel = event.getChannel(); 067 } 068 069 @Override 070 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent event) throws Exception { 071 listener.onDisconnect(this); 072 handshakeCompleted = false; 073 channel = null; 074 } 075 076 @Override 077 public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception { 078 if (!handshakeCompleted) { 079 HttpResponse response = (HttpResponse) event.getMessage(); 080 final HttpResponseStatus status = new HttpResponseStatus(101, "Web Socket Protocol Handshake"); 081 final boolean validStatus = response.getStatus().equals(status); 082 final boolean validUpgrade = response.headers().get(Names.UPGRADE).equals(Values.WEBSOCKET); 083 final boolean validConnection = response.headers().get(Names.CONNECTION).equals(Values.UPGRADE); 084 if (!validStatus || !validUpgrade || !validConnection) { 085 throw new NettyWebSocketException("Invalid handshake response"); 086 } 087 handshakeCompleted = true; 088 ctx.getPipeline().replace("decoder", "ws-decoder", new WebSocket00FrameDecoder()); 089 listener.onConnect(this); 090 return; 091 } 092 if (event.getMessage() instanceof HttpResponse) { 093 HttpResponse response = (HttpResponse) event.getMessage(); 094 throw new NettyWebSocketException("Unexpected HttpResponse (status=" + response.getStatus() + ", content=" + response.getContent().toString(CharsetUtil.UTF_8) + ")"); 095 } 096 WebSocketFrame frame = (WebSocketFrame) event.getMessage(); 097 listener.onMessage(this, frame); 098 } 099 100 @Override 101 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { 102 final Throwable t = e.getCause(); 103 listener.onError(t); 104 e.getChannel().close(); 105 } 106 107 @Override 108 public ChannelFuture connect() { 109 return bootstrap.connect(new InetSocketAddress(url.getHost(), url.getPort())); 110 } 111 112 @Override 113 public ChannelFuture disconnect() { 114 if (channel == null) { 115 return null; 116 } 117 return channel.close(); 118 } 119 120 @Override 121 public ChannelFuture send(WebSocketFrame frame) { 122 if (channel == null) { 123 return null; 124 } 125 return channel.write(frame); 126 } 127 128}