001package org.xbib.elasticsearch.http.netty.client;
002
003import org.jboss.netty.bootstrap.ClientBootstrap;
004import org.jboss.netty.channel.ChannelPipeline;
005import org.jboss.netty.channel.ChannelPipelineFactory;
006import org.jboss.netty.channel.Channels;
007import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
008import org.jboss.netty.handler.codec.http.HttpRequestEncoder;
009import org.jboss.netty.handler.codec.http.HttpResponseDecoder;
010import org.xbib.elasticsearch.websocket.client.WebSocketActionListener;
011import org.xbib.elasticsearch.websocket.client.WebSocketClient;
012import org.xbib.elasticsearch.websocket.client.WebSocketClientBulkRequest;
013import org.xbib.elasticsearch.websocket.client.WebSocketClientFactory;
014import org.xbib.elasticsearch.websocket.client.WebSocketClientRequest;
015import org.xbib.elasticsearch.http.netty.NettyInteractiveRequest;
016
017import java.net.URI;
018import java.util.concurrent.Executors;
019
020/**
021 * A factory for creating Websocket clients. The entry point for creating and
022 * connecting a client. Can and should be used to create multiple instances.
023 * <p/>
024 * Extended for Websocket client request methods.
025 */
026public class NettyWebSocketClientFactory implements WebSocketClientFactory {
027
028    private NioClientSocketChannelFactory socketChannelFactory = new NioClientSocketChannelFactory(
029            Executors.newCachedThreadPool(),
030            Executors.newCachedThreadPool());
031
032    /**
033     * Create a new WebSocket client
034     *
035     * @param url      URL to connect to.
036     * @param listener Callback interface to receive events
037     * @return A WebSocket client. Call {@link NettyWebSocketClient#connect()} to
038     * connect.
039     */
040    @Override
041    public WebSocketClient newClient(final URI url, final WebSocketActionListener listener) {
042        ClientBootstrap bootstrap = new ClientBootstrap(socketChannelFactory);
043
044        String protocol = url.getScheme();
045        if (!protocol.equals("ws") && !protocol.equals("wss")) {
046            throw new IllegalArgumentException("unsupported protocol: " + protocol);
047        }
048
049        final NettyWebSocketClientHandler clientHandler = new NettyWebSocketClientHandler(bootstrap, url, listener);
050
051        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
052            @Override
053            public ChannelPipeline getPipeline() throws Exception {
054                ChannelPipeline pipeline = Channels.pipeline();
055                pipeline.addLast("decoder", new HttpResponseDecoder());
056                pipeline.addLast("encoder", new HttpRequestEncoder());
057                pipeline.addLast("ws-handler", clientHandler);
058                return pipeline;
059            }
060        });
061
062        return clientHandler;
063    }
064
065    @Override
066    public void shutdown() {
067        socketChannelFactory.releaseExternalResources();
068    }
069
070    @Override
071    public WebSocketClientRequest newRequest() {
072        return new NettyInteractiveRequest();
073    }
074
075    @Override
076    public WebSocketClientBulkRequest indexRequest() {
077        return new NettyWebSocketBulkRequest("index");
078    }
079
080    @Override
081    public WebSocketClientBulkRequest deleteRequest() {
082        return new NettyWebSocketBulkRequest("delete");
083    }
084
085    @Override
086    public WebSocketClientRequest flushRequest() {
087        return new NettyInteractiveRequest().type("flush");
088    }
089}