001package org.xbib.elasticsearch.http.netty.client; 002 003import org.jboss.netty.bootstrap.ClientBootstrap; 004import org.jboss.netty.channel.ChannelPipeline; 005import org.jboss.netty.channel.ChannelPipelineFactory; 006import org.jboss.netty.channel.Channels; 007import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; 008import org.jboss.netty.handler.codec.http.HttpRequestEncoder; 009import org.jboss.netty.handler.codec.http.HttpResponseDecoder; 010import org.xbib.elasticsearch.websocket.client.WebSocketActionListener; 011import org.xbib.elasticsearch.websocket.client.WebSocketClient; 012import org.xbib.elasticsearch.websocket.client.WebSocketClientBulkRequest; 013import org.xbib.elasticsearch.websocket.client.WebSocketClientFactory; 014import org.xbib.elasticsearch.websocket.client.WebSocketClientRequest; 015import org.xbib.elasticsearch.http.netty.NettyInteractiveRequest; 016 017import java.net.URI; 018import java.util.concurrent.Executors; 019 020/** 021 * A factory for creating Websocket clients. The entry point for creating and 022 * connecting a client. Can and should be used to create multiple instances. 023 * <p/> 024 * Extended for Websocket client request methods. 025 */ 026public class NettyWebSocketClientFactory implements WebSocketClientFactory { 027 028 private NioClientSocketChannelFactory socketChannelFactory = new NioClientSocketChannelFactory( 029 Executors.newCachedThreadPool(), 030 Executors.newCachedThreadPool()); 031 032 /** 033 * Create a new WebSocket client 034 * 035 * @param url URL to connect to. 036 * @param listener Callback interface to receive events 037 * @return A WebSocket client. Call {@link NettyWebSocketClient#connect()} to 038 * connect. 039 */ 040 @Override 041 public WebSocketClient newClient(final URI url, final WebSocketActionListener listener) { 042 ClientBootstrap bootstrap = new ClientBootstrap(socketChannelFactory); 043 044 String protocol = url.getScheme(); 045 if (!protocol.equals("ws") && !protocol.equals("wss")) { 046 throw new IllegalArgumentException("unsupported protocol: " + protocol); 047 } 048 049 final NettyWebSocketClientHandler clientHandler = new NettyWebSocketClientHandler(bootstrap, url, listener); 050 051 bootstrap.setPipelineFactory(new ChannelPipelineFactory() { 052 @Override 053 public ChannelPipeline getPipeline() throws Exception { 054 ChannelPipeline pipeline = Channels.pipeline(); 055 pipeline.addLast("decoder", new HttpResponseDecoder()); 056 pipeline.addLast("encoder", new HttpRequestEncoder()); 057 pipeline.addLast("ws-handler", clientHandler); 058 return pipeline; 059 } 060 }); 061 062 return clientHandler; 063 } 064 065 @Override 066 public void shutdown() { 067 socketChannelFactory.releaseExternalResources(); 068 } 069 070 @Override 071 public WebSocketClientRequest newRequest() { 072 return new NettyInteractiveRequest(); 073 } 074 075 @Override 076 public WebSocketClientBulkRequest indexRequest() { 077 return new NettyWebSocketBulkRequest("index"); 078 } 079 080 @Override 081 public WebSocketClientBulkRequest deleteRequest() { 082 return new NettyWebSocketBulkRequest("delete"); 083 } 084 085 @Override 086 public WebSocketClientRequest flushRequest() { 087 return new NettyInteractiveRequest().type("flush"); 088 } 089}