001package org.xbib.elasticsearch.transport.netty;
002
003import org.elasticsearch.ElasticsearchException;
004import org.elasticsearch.ElasticsearchIllegalArgumentException;
005import org.elasticsearch.ElasticsearchIllegalStateException;
006import org.elasticsearch.Version;
007import org.elasticsearch.cluster.node.DiscoveryNode;
008import org.elasticsearch.common.Strings;
009import org.elasticsearch.common.collect.ImmutableList;
010import org.elasticsearch.common.collect.Lists;
011import org.elasticsearch.common.component.AbstractLifecycleComponent;
012import org.elasticsearch.common.inject.Inject;
013import org.elasticsearch.common.io.stream.HandlesStreamOutput;
014import org.elasticsearch.common.io.stream.StreamOutput;
015import org.elasticsearch.common.network.NetworkService;
016import org.elasticsearch.common.network.NetworkUtils;
017import org.elasticsearch.common.settings.Settings;
018import org.elasticsearch.common.transport.BoundTransportAddress;
019import org.elasticsearch.common.transport.InetSocketTransportAddress;
020import org.elasticsearch.common.transport.PortsRange;
021import org.elasticsearch.common.transport.TransportAddress;
022import org.elasticsearch.common.unit.ByteSizeValue;
023import org.elasticsearch.common.unit.TimeValue;
024import org.elasticsearch.common.util.BigArrays;
025import org.elasticsearch.common.util.concurrent.EsExecutors;
026import org.elasticsearch.common.util.concurrent.KeyedLock;
027import org.elasticsearch.threadpool.ThreadPool;
028import org.elasticsearch.transport.BindTransportException;
029import org.elasticsearch.transport.ConnectTransportException;
030import org.elasticsearch.transport.NodeNotConnectedException;
031import org.elasticsearch.transport.Transport;
032import org.elasticsearch.transport.TransportException;
033import org.elasticsearch.transport.TransportRequest;
034import org.elasticsearch.transport.TransportRequestOptions;
035import org.elasticsearch.transport.TransportServiceAdapter;
036import org.elasticsearch.transport.support.TransportStatus;
037import org.jboss.netty.bootstrap.ClientBootstrap;
038import org.jboss.netty.bootstrap.ServerBootstrap;
039import org.jboss.netty.buffer.ChannelBuffer;
040import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory;
041import org.jboss.netty.channel.Channel;
042import org.jboss.netty.channel.ChannelFuture;
043import org.jboss.netty.channel.ChannelFutureListener;
044import org.jboss.netty.channel.ChannelHandlerContext;
045import org.jboss.netty.channel.ChannelPipeline;
046import org.jboss.netty.channel.ChannelPipelineFactory;
047import org.jboss.netty.channel.Channels;
048import org.jboss.netty.channel.ExceptionEvent;
049import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory;
050import org.jboss.netty.channel.ReceiveBufferSizePredictorFactory;
051import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
052import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
053import org.jboss.netty.channel.socket.nio.NioWorkerPool;
054import org.jboss.netty.channel.socket.oio.OioClientSocketChannelFactory;
055import org.jboss.netty.channel.socket.oio.OioServerSocketChannelFactory;
056import org.jboss.netty.util.HashedWheelTimer;
057import org.xbib.elasticsearch.common.io.stream.BytesStreamOutput;
058import org.xbib.elasticsearch.common.netty.NettyStaticSetup;
059import org.xbib.elasticsearch.common.netty.OpenChannelsHandler;
060
061import java.io.IOException;
062import java.net.InetAddress;
063import java.net.InetSocketAddress;
064import java.net.SocketAddress;
065import java.util.ArrayList;
066import java.util.Iterator;
067import java.util.List;
068import java.util.Map;
069import java.util.Set;
070import java.util.concurrent.ConcurrentMap;
071import java.util.concurrent.CountDownLatch;
072import java.util.concurrent.Executors;
073import java.util.concurrent.TimeUnit;
074import java.util.concurrent.atomic.AtomicInteger;
075import java.util.concurrent.atomic.AtomicReference;
076import java.util.concurrent.locks.ReadWriteLock;
077import java.util.concurrent.locks.ReentrantReadWriteLock;
078
079import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_BLOCKING;
080import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_BLOCKING_CLIENT;
081import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_BLOCKING_SERVER;
082import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_CONNECT_TIMEOUT;
083import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_DEFAULT_CONNECT_TIMEOUT;
084import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_DEFAULT_RECEIVE_BUFFER_SIZE;
085import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_DEFAULT_SEND_BUFFER_SIZE;
086import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_KEEP_ALIVE;
087import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_NO_DELAY;
088import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_RECEIVE_BUFFER_SIZE;
089import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_REUSE_ADDRESS;
090import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_SEND_BUFFER_SIZE;
091import static org.elasticsearch.common.transport.NetworkExceptionHelper.isCloseConnectionException;
092import static org.elasticsearch.common.transport.NetworkExceptionHelper.isConnectException;
093import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
094import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
095
096
097/**
098 * There are 4 types of connections per node, low/med/high/ping. Low if for batch oriented APIs (like recovery or
099 * batch) with high payload that will cause regular request. (like search or single index) to take
100 * longer. Med is for the typical search / single doc index. And High for things like cluster state. Ping is reserved for
101 * sending out ping requests to other nodes.
102 */
103public class NettyTransport extends AbstractLifecycleComponent<Transport> implements Transport {
104
105    static {
106        NettyStaticSetup.setup();
107    }
108
109    private final NetworkService networkService;
110
111    final Version version;
112
113    final int workerCount;
114
115    final int bossCount;
116
117    final boolean blockingServer;
118
119    final boolean blockingClient;
120
121    final String port;
122
123    final String bindHost;
124
125    final String publishHost;
126
127    final boolean compress;
128
129    final TimeValue connectTimeout;
130
131    final Boolean tcpNoDelay;
132
133    final Boolean tcpKeepAlive;
134
135    final Boolean reuseAddress;
136
137    final ByteSizeValue tcpSendBufferSize;
138
139    final ByteSizeValue tcpReceiveBufferSize;
140
141    final ReceiveBufferSizePredictorFactory receiveBufferSizePredictorFactory;
142
143    final int connectionsPerNodeRecovery;
144    final int connectionsPerNodeBulk;
145    final int connectionsPerNodeReg;
146    final int connectionsPerNodeState;
147    final int connectionsPerNodePing;
148
149    final ByteSizeValue maxCumulationBufferCapacity;
150
151    final BigArrays bigArrays;
152
153    final int maxCompositeBufferComponents;
154
155    private final ThreadPool threadPool;
156
157    private volatile OpenChannelsHandler serverOpenChannels;
158
159    private volatile ClientBootstrap clientBootstrap;
160
161    private volatile ServerBootstrap serverBootstrap;
162
163    // node id to actual channel
164    final ConcurrentMap<DiscoveryNode, NodeChannels> connectedNodes = newConcurrentMap();
165
166    private volatile Channel serverChannel;
167
168    private volatile TransportServiceAdapter transportServiceAdapter;
169
170    private volatile BoundTransportAddress boundAddress;
171
172    private final KeyedLock<String> connectionLock = new KeyedLock<String>();
173
174    // this lock is here to make sure we close this transport and disconnect all the client nodes
175    // connections while no connect operations is going on... (this might help with 100% CPU when stopping the transport?)
176    private final ReadWriteLock globalLock = new ReentrantReadWriteLock();
177
178    @Inject
179    public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version) {
180        super(settings);
181        this.threadPool = threadPool;
182        this.networkService = networkService;
183        this.bigArrays = bigArrays;
184        this.version = version;
185
186        if (settings.getAsBoolean("netty.epollBugWorkaround", false)) {
187            System.setProperty("org.jboss.netty.epollBugWorkaround", "true");
188        }
189
190        this.workerCount = componentSettings.getAsInt("worker_count", EsExecutors.boundedNumberOfProcessors(settings) * 2);
191        this.bossCount = componentSettings.getAsInt("boss_count", 1);
192        this.blockingServer = settings.getAsBoolean("transport.tcp.blocking_server", settings.getAsBoolean(TCP_BLOCKING_SERVER, settings.getAsBoolean(TCP_BLOCKING, false)));
193        this.blockingClient = settings.getAsBoolean("transport.tcp.blocking_client", settings.getAsBoolean(TCP_BLOCKING_CLIENT, settings.getAsBoolean(TCP_BLOCKING, false)));
194        this.port = componentSettings.get("port", settings.get("transport.tcp.port", "9300-9400"));
195        this.bindHost = componentSettings.get("bind_host", settings.get("transport.bind_host", settings.get("transport.host")));
196        this.publishHost = componentSettings.get("publish_host", settings.get("transport.publish_host", settings.get("transport.host")));
197        this.compress = settings.getAsBoolean("transport.tcp.compress", false);
198        this.connectTimeout = componentSettings.getAsTime("connect_timeout", settings.getAsTime("transport.tcp.connect_timeout", settings.getAsTime(TCP_CONNECT_TIMEOUT, TCP_DEFAULT_CONNECT_TIMEOUT)));
199        this.tcpNoDelay = componentSettings.getAsBoolean("tcp_no_delay", settings.getAsBoolean(TCP_NO_DELAY, true));
200        this.tcpKeepAlive = componentSettings.getAsBoolean("tcp_keep_alive", settings.getAsBoolean(TCP_KEEP_ALIVE, true));
201        this.reuseAddress = componentSettings.getAsBoolean("reuse_address", settings.getAsBoolean(TCP_REUSE_ADDRESS, NetworkUtils.defaultReuseAddress()));
202        this.tcpSendBufferSize = componentSettings.getAsBytesSize("tcp_send_buffer_size", settings.getAsBytesSize(TCP_SEND_BUFFER_SIZE, TCP_DEFAULT_SEND_BUFFER_SIZE));
203        this.tcpReceiveBufferSize = componentSettings.getAsBytesSize("tcp_receive_buffer_size", settings.getAsBytesSize(TCP_RECEIVE_BUFFER_SIZE, TCP_DEFAULT_RECEIVE_BUFFER_SIZE));
204        this.connectionsPerNodeRecovery = componentSettings.getAsInt("connections_per_node.recovery", settings.getAsInt("transport.connections_per_node.recovery", 2));
205        this.connectionsPerNodeBulk = componentSettings.getAsInt("connections_per_node.bulk", settings.getAsInt("transport.connections_per_node.bulk", 3));
206        this.connectionsPerNodeReg = componentSettings.getAsInt("connections_per_node.reg", settings.getAsInt("transport.connections_per_node.reg", 6));
207        this.connectionsPerNodeState = componentSettings.getAsInt("connections_per_node.high", settings.getAsInt("transport.connections_per_node.state", 1));
208        this.connectionsPerNodePing = componentSettings.getAsInt("connections_per_node.ping", settings.getAsInt("transport.connections_per_node.ping", 1));
209
210        // we want to have at least 1 for reg/state/ping
211        if (this.connectionsPerNodeReg == 0) {
212            throw new ElasticsearchIllegalArgumentException("can't set [connection_per_node.reg] to 0");
213        }
214        if (this.connectionsPerNodePing == 0) {
215            throw new ElasticsearchIllegalArgumentException("can't set [connection_per_node.ping] to 0");
216        }
217        if (this.connectionsPerNodeState == 0) {
218            throw new ElasticsearchIllegalArgumentException("can't set [connection_per_node.state] to 0");
219        }
220
221        this.maxCumulationBufferCapacity = componentSettings.getAsBytesSize("max_cumulation_buffer_capacity", null);
222        this.maxCompositeBufferComponents = componentSettings.getAsInt("max_composite_buffer_components", -1);
223
224        long defaultReceiverPredictor = 512 * 1024;
225
226        // skip JVM info
227
228        // See AdaptiveReceiveBufferSizePredictor#DEFAULT_XXX for default values in netty..., we can use higher ones for us, even fixed one
229        ByteSizeValue receivePredictorMin = componentSettings.getAsBytesSize("receive_predictor_min", componentSettings.getAsBytesSize("receive_predictor_size", new ByteSizeValue(defaultReceiverPredictor)));
230        ByteSizeValue receivePredictorMax = componentSettings.getAsBytesSize("receive_predictor_max", componentSettings.getAsBytesSize("receive_predictor_size", new ByteSizeValue(defaultReceiverPredictor)));
231        if (receivePredictorMax.bytes() == receivePredictorMin.bytes()) {
232            receiveBufferSizePredictorFactory = new FixedReceiveBufferSizePredictorFactory((int) receivePredictorMax.bytes());
233        } else {
234            receiveBufferSizePredictorFactory = new AdaptiveReceiveBufferSizePredictorFactory((int) receivePredictorMin.bytes(), (int) receivePredictorMin.bytes(), (int) receivePredictorMax.bytes());
235        }
236
237        logger.debug("using worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]",
238                workerCount, port, bindHost, publishHost, compress, connectTimeout, connectionsPerNodeRecovery, connectionsPerNodeBulk, connectionsPerNodeReg, connectionsPerNodeState, connectionsPerNodePing, receivePredictorMin, receivePredictorMax);
239    }
240
241    public Settings settings() {
242        return this.settings;
243    }
244
245    @Override
246    public void transportServiceAdapter(TransportServiceAdapter service) {
247        this.transportServiceAdapter = service;
248    }
249
250    TransportServiceAdapter transportServiceAdapter() {
251        return transportServiceAdapter;
252    }
253
254    ThreadPool threadPool() {
255        return threadPool;
256    }
257
258    @Override
259    protected void doStart() throws ElasticsearchException {
260        if (blockingClient) {
261            clientBootstrap = new ClientBootstrap(new OioClientSocketChannelFactory(Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_client_worker"))));
262        } else {
263            clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(
264                    Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_client_boss")),
265                    bossCount,
266                    new NioWorkerPool(Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_client_worker")), workerCount),
267                    new HashedWheelTimer(daemonThreadFactory(settings, "transport_client_timer"))));
268        }
269        ChannelPipelineFactory clientPipelineFactory = new ChannelPipelineFactory() {
270            @Override
271            public ChannelPipeline getPipeline() throws Exception {
272                ChannelPipeline pipeline = Channels.pipeline();
273                SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder();
274                if (maxCumulationBufferCapacity != null) {
275                    if (maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) {
276                        sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE);
277                    } else {
278                        sizeHeader.setMaxCumulationBufferCapacity((int) maxCumulationBufferCapacity.bytes());
279                    }
280                }
281                if (maxCompositeBufferComponents != -1) {
282                    sizeHeader.setMaxCumulationBufferComponents(maxCompositeBufferComponents);
283                }
284                pipeline.addLast("size", sizeHeader);
285                pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger));
286                return pipeline;
287            }
288        };
289        clientBootstrap.setPipelineFactory(clientPipelineFactory);
290        clientBootstrap.setOption("connectTimeoutMillis", connectTimeout.millis());
291        if (tcpNoDelay != null) {
292            clientBootstrap.setOption("tcpNoDelay", tcpNoDelay);
293        }
294        if (tcpKeepAlive != null) {
295            clientBootstrap.setOption("keepAlive", tcpKeepAlive);
296        }
297        if (tcpSendBufferSize != null && tcpSendBufferSize.bytes() > 0) {
298            clientBootstrap.setOption("sendBufferSize", tcpSendBufferSize.bytes());
299        }
300        if (tcpReceiveBufferSize != null && tcpReceiveBufferSize.bytes() > 0) {
301            clientBootstrap.setOption("receiveBufferSize", tcpReceiveBufferSize.bytes());
302        }
303        clientBootstrap.setOption("receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory);
304        if (reuseAddress != null) {
305            clientBootstrap.setOption("reuseAddress", reuseAddress);
306        }
307
308        if (!settings.getAsBoolean("network.server", true)) {
309            return;
310        }
311
312        serverOpenChannels = new OpenChannelsHandler(logger);
313        if (blockingServer) {
314            serverBootstrap = new ServerBootstrap(new OioServerSocketChannelFactory(
315                    Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_server_boss")),
316                    Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_server_worker"))));
317        } else {
318            serverBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
319                    Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_server_boss")),
320                    Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_server_worker")),
321                    workerCount));
322        }
323        ChannelPipelineFactory serverPipelineFactory = new ChannelPipelineFactory() {
324            @Override
325            public ChannelPipeline getPipeline() throws Exception {
326                ChannelPipeline pipeline = Channels.pipeline();
327                pipeline.addLast("openChannels", serverOpenChannels);
328                SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder();
329                if (maxCumulationBufferCapacity != null) {
330                    if (maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) {
331                        sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE);
332                    } else {
333                        sizeHeader.setMaxCumulationBufferCapacity((int) maxCumulationBufferCapacity.bytes());
334                    }
335                }
336                if (maxCompositeBufferComponents != -1) {
337                    sizeHeader.setMaxCumulationBufferComponents(maxCompositeBufferComponents);
338                }
339                pipeline.addLast("size", sizeHeader);
340                pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger));
341                return pipeline;
342            }
343        };
344        serverBootstrap.setPipelineFactory(serverPipelineFactory);
345        if (tcpNoDelay != null) {
346            serverBootstrap.setOption("child.tcpNoDelay", tcpNoDelay);
347        }
348        if (tcpKeepAlive != null) {
349            serverBootstrap.setOption("child.keepAlive", tcpKeepAlive);
350        }
351        if (tcpSendBufferSize != null && tcpSendBufferSize.bytes() > 0) {
352            serverBootstrap.setOption("child.sendBufferSize", tcpSendBufferSize.bytes());
353        }
354        if (tcpReceiveBufferSize != null && tcpReceiveBufferSize.bytes() > 0) {
355            serverBootstrap.setOption("child.receiveBufferSize", tcpReceiveBufferSize.bytes());
356        }
357        serverBootstrap.setOption("receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory);
358        serverBootstrap.setOption("child.receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory);
359        if (reuseAddress != null) {
360            serverBootstrap.setOption("reuseAddress", reuseAddress);
361            serverBootstrap.setOption("child.reuseAddress", reuseAddress);
362        }
363
364        // Bind and start to accept incoming connections.
365        InetAddress hostAddressX;
366        try {
367            hostAddressX = networkService.resolveBindHostAddress(bindHost);
368        } catch (IOException e) {
369            throw new BindTransportException("Failed to resolve host [" + bindHost + "]", e);
370        }
371        final InetAddress hostAddress = hostAddressX;
372
373        PortsRange portsRange = new PortsRange(port);
374        final AtomicReference<Exception> lastException = new AtomicReference();
375        boolean success = portsRange.iterate(new PortsRange.PortCallback() {
376            @Override
377            public boolean onPortNumber(int portNumber) {
378                try {
379                    serverChannel = serverBootstrap.bind(new InetSocketAddress(hostAddress, portNumber));
380                } catch (Exception e) {
381                    lastException.set(e);
382                    return false;
383                }
384                return true;
385            }
386        });
387        if (!success) {
388            throw new BindTransportException("Failed to bind to [" + port + "]", lastException.get());
389        }
390
391        logger.debug("Bound to address [{}]", serverChannel.getLocalAddress());
392
393        InetSocketAddress boundAddress = (InetSocketAddress) serverChannel.getLocalAddress();
394        InetSocketAddress publishAddress;
395        try {
396            publishAddress = new InetSocketAddress(networkService.resolvePublishHostAddress(publishHost), boundAddress.getPort());
397        } catch (Exception e) {
398            throw new BindTransportException("Failed to resolve publish address", e);
399        }
400        this.boundAddress = new BoundTransportAddress(new InetSocketTransportAddress(boundAddress), new InetSocketTransportAddress(publishAddress));
401    }
402
403    @Override
404    protected void doStop() throws ElasticsearchException {
405        final CountDownLatch latch = new CountDownLatch(1);
406        // make sure we run it on another thread than a possible IO handler thread
407        threadPool.generic().execute(new Runnable() {
408            @Override
409            public void run() {
410                try {
411                    for (Iterator<NodeChannels> it = connectedNodes.values().iterator(); it.hasNext(); ) {
412                        NodeChannels nodeChannels = it.next();
413                        it.remove();
414                        nodeChannels.close();
415                    }
416
417                    if (serverChannel != null) {
418                        try {
419                            serverChannel.close().awaitUninterruptibly();
420                        } finally {
421                            serverChannel = null;
422                        }
423                    }
424
425                    if (serverOpenChannels != null) {
426                        serverOpenChannels.close();
427                        serverOpenChannels = null;
428                    }
429
430                    if (serverBootstrap != null) {
431                        serverBootstrap.releaseExternalResources();
432                        serverBootstrap = null;
433                    }
434
435                    for (Iterator<NodeChannels> it = connectedNodes.values().iterator(); it.hasNext(); ) {
436                        NodeChannels nodeChannels = it.next();
437                        it.remove();
438                        nodeChannels.close();
439                    }
440
441                    if (clientBootstrap != null) {
442                        clientBootstrap.releaseExternalResources();
443                        clientBootstrap = null;
444                    }
445                } finally {
446                    latch.countDown();
447                }
448            }
449        });
450
451        try {
452            latch.await(30, TimeUnit.SECONDS);
453        } catch (InterruptedException e) {
454            // ignore
455        }
456    }
457
458    @Override
459    protected void doClose() throws ElasticsearchException {
460    }
461
462    @Override
463    public TransportAddress[] addressesFromString(String address) throws Exception {
464        int index = address.indexOf('[');
465        if (index != -1) {
466            String host = address.substring(0, index);
467            Set<String> ports = Strings.commaDelimitedListToSet(address.substring(index + 1, address.indexOf(']')));
468            List<TransportAddress> addresses = Lists.newArrayList();
469            for (String port : ports) {
470                int[] iPorts = new PortsRange(port).ports();
471                for (int iPort : iPorts) {
472                    addresses.add(new InetSocketTransportAddress(host, iPort));
473                }
474            }
475            return addresses.toArray(new TransportAddress[addresses.size()]);
476        } else {
477            index = address.lastIndexOf(':');
478            if (index == -1) {
479                List<TransportAddress> addresses = Lists.newArrayList();
480                int[] iPorts = new PortsRange(this.port).ports();
481                for (int iPort : iPorts) {
482                    addresses.add(new InetSocketTransportAddress(address, iPort));
483                }
484                return addresses.toArray(new TransportAddress[addresses.size()]);
485            } else {
486                String host = address.substring(0, index);
487                int port = Integer.parseInt(address.substring(index + 1));
488                return new TransportAddress[]{new InetSocketTransportAddress(host, port)};
489            }
490        }
491    }
492
493    @Override
494    public boolean addressSupported(Class<? extends TransportAddress> address) {
495        return InetSocketTransportAddress.class.equals(address);
496    }
497
498    @Override
499    public BoundTransportAddress boundAddress() {
500        return this.boundAddress;
501    }
502
503    void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
504        if (!lifecycle.started()) {
505            // ignore
506        }
507        if (isCloseConnectionException(e.getCause())) {
508            // disconnect the node
509            Channel channel = ctx.getChannel();
510            for (Map.Entry<DiscoveryNode, NodeChannels> entry : connectedNodes.entrySet()) {
511                if (entry.getValue().hasChannel(channel)) {
512                    disconnectFromNode(entry.getKey());
513                }
514            }
515        } else if (isConnectException(e.getCause())) {
516            if (logger.isTraceEnabled()) {
517                logger.trace("(Ignoring) Exception caught on netty layer [" + ctx.getChannel() + "]", e.getCause());
518            }
519        } else {
520            logger.warn("Exception caught on netty layer [" + ctx.getChannel() + "]", e.getCause());
521        }
522    }
523
524    TransportAddress wrapAddress(SocketAddress socketAddress) {
525        return new InetSocketTransportAddress((InetSocketAddress) socketAddress);
526    }
527
528    @Override
529    public long serverOpen() {
530        OpenChannelsHandler channels = serverOpenChannels;
531        return channels == null ? 0 : channels.numberOfOpenChannels();
532    }
533
534    @Override
535    public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
536        Channel targetChannel = nodeChannel(node, options);
537
538        if (compress) {
539            options.withCompress(true);
540        }
541
542        byte status = 0;
543        status = TransportStatus.setRequest(status);
544
545        BytesStreamOutput bStream = new BytesStreamOutput();
546        bStream.skip(NettyHeader.HEADER_SIZE);
547        StreamOutput stream = bStream;
548        stream = new HandlesStreamOutput(stream);
549
550        // we pick the smallest of the 2, to support both backward and forward compatibility
551        // note, this is the only place we need to do this, since from here on, we use the serialized version
552        // as the version to use also when the node receiving this request will send the response with
553        Version version = Version.smallest(this.version, node.version());
554
555        stream.setVersion(version);
556        stream.writeString(action);
557
558        ChannelBuffer buffer;
559        request.writeTo(stream);
560        stream.close();
561        buffer = bStream.ourBytes().toChannelBuffer();
562        NettyHeader.writeHeader(buffer, requestId, status, version);
563        targetChannel.write(buffer);
564    }
565
566    @Override
567    public boolean nodeConnected(DiscoveryNode node) {
568        return connectedNodes.containsKey(node);
569    }
570
571    @Override
572    public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException {
573        connectToNode(node, true);
574    }
575
576    @Override
577    public void connectToNode(DiscoveryNode node) {
578        connectToNode(node, false);
579    }
580
581    public void connectToNode(DiscoveryNode node, boolean light) {
582        if (!lifecycle.started()) {
583            throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport");
584        }
585        if (node == null) {
586            throw new ConnectTransportException(null, "can't connect to a null node");
587        }
588        globalLock.readLock().lock();
589        try {
590            if (!lifecycle.started()) {
591                throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport");
592            }
593            NodeChannels nodeChannels = connectedNodes.get(node);
594            if (nodeChannels != null) {
595                return;
596            }
597            connectionLock.acquire(node.id());
598            try {
599                if (!lifecycle.started()) {
600                    throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport");
601                }
602                try {
603
604
605                    if (light) {
606                        nodeChannels = connectToChannelsLight(node);
607                    } else {
608                        nodeChannels = new NodeChannels(new Channel[connectionsPerNodeRecovery], new Channel[connectionsPerNodeBulk], new Channel[connectionsPerNodeReg], new Channel[connectionsPerNodeState], new Channel[connectionsPerNodePing]);
609                        try {
610                            connectToChannels(nodeChannels, node);
611                        } catch (Exception e) {
612                            nodeChannels.close();
613                            throw e;
614                        }
615                    }
616
617                    NodeChannels existing = connectedNodes.putIfAbsent(node, nodeChannels);
618                    if (existing != null) {
619                        // we are already connected to a node, close this ones
620                        nodeChannels.close();
621                    } else {
622                        if (logger.isDebugEnabled()) {
623                            logger.debug("connected to node [{}]", node);
624                        }
625                        transportServiceAdapter.raiseNodeConnected(node);
626                    }
627
628                } catch (ConnectTransportException e) {
629                    throw e;
630                } catch (Exception e) {
631                    throw new ConnectTransportException(node, "General node connection failure", e);
632                }
633            } finally {
634                connectionLock.release(node.id());
635            }
636        } finally {
637            globalLock.readLock().unlock();
638        }
639    }
640
641    private NodeChannels connectToChannelsLight(DiscoveryNode node) {
642        InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
643        ChannelFuture connect = clientBootstrap.connect(address);
644        connect.awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
645        if (!connect.isSuccess()) {
646            throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connect.getCause());
647        }
648        Channel[] channels = new Channel[1];
649        channels[0] = connect.getChannel();
650        channels[0].getCloseFuture().addListener(new ChannelCloseListener(node));
651        return new NodeChannels(channels, channels, channels, channels, channels);
652    }
653
654    private void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) {
655        ChannelFuture[] connectRecovery = new ChannelFuture[nodeChannels.recovery.length];
656        ChannelFuture[] connectBulk = new ChannelFuture[nodeChannels.bulk.length];
657        ChannelFuture[] connectReg = new ChannelFuture[nodeChannels.reg.length];
658        ChannelFuture[] connectState = new ChannelFuture[nodeChannels.state.length];
659        ChannelFuture[] connectPing = new ChannelFuture[nodeChannels.ping.length];
660        InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
661        for (int i = 0; i < connectRecovery.length; i++) {
662            connectRecovery[i] = clientBootstrap.connect(address);
663        }
664        for (int i = 0; i < connectBulk.length; i++) {
665            connectBulk[i] = clientBootstrap.connect(address);
666        }
667        for (int i = 0; i < connectReg.length; i++) {
668            connectReg[i] = clientBootstrap.connect(address);
669        }
670        for (int i = 0; i < connectState.length; i++) {
671            connectState[i] = clientBootstrap.connect(address);
672        }
673        for (int i = 0; i < connectPing.length; i++) {
674            connectPing[i] = clientBootstrap.connect(address);
675        }
676
677        try {
678            for (int i = 0; i < connectRecovery.length; i++) {
679                connectRecovery[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
680                if (!connectRecovery[i].isSuccess()) {
681                    throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectRecovery[i].getCause());
682                }
683                nodeChannels.recovery[i] = connectRecovery[i].getChannel();
684                nodeChannels.recovery[i].getCloseFuture().addListener(new ChannelCloseListener(node));
685            }
686
687            for (int i = 0; i < connectBulk.length; i++) {
688                connectBulk[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
689                if (!connectBulk[i].isSuccess()) {
690                    throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectBulk[i].getCause());
691                }
692                nodeChannels.bulk[i] = connectBulk[i].getChannel();
693                nodeChannels.bulk[i].getCloseFuture().addListener(new ChannelCloseListener(node));
694            }
695
696            for (int i = 0; i < connectReg.length; i++) {
697                connectReg[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
698                if (!connectReg[i].isSuccess()) {
699                    throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectReg[i].getCause());
700                }
701                nodeChannels.reg[i] = connectReg[i].getChannel();
702                nodeChannels.reg[i].getCloseFuture().addListener(new ChannelCloseListener(node));
703            }
704
705            for (int i = 0; i < connectState.length; i++) {
706                connectState[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
707                if (!connectState[i].isSuccess()) {
708                    throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectState[i].getCause());
709                }
710                nodeChannels.state[i] = connectState[i].getChannel();
711                nodeChannels.state[i].getCloseFuture().addListener(new ChannelCloseListener(node));
712            }
713
714            for (int i = 0; i < connectPing.length; i++) {
715                connectPing[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
716                if (!connectPing[i].isSuccess()) {
717                    throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectPing[i].getCause());
718                }
719                nodeChannels.ping[i] = connectPing[i].getChannel();
720                nodeChannels.ping[i].getCloseFuture().addListener(new ChannelCloseListener(node));
721            }
722
723            if (nodeChannels.recovery.length == 0) {
724                if (nodeChannels.bulk.length > 0) {
725                    nodeChannels.recovery = nodeChannels.bulk;
726                } else {
727                    nodeChannels.recovery = nodeChannels.reg;
728                }
729            }
730            if (nodeChannels.bulk.length == 0) {
731                nodeChannels.bulk = nodeChannels.reg;
732            }
733        } catch (RuntimeException e) {
734            // clean the futures
735            for (ChannelFuture future : ImmutableList.<ChannelFuture>builder().add(connectRecovery).add(connectBulk).add(connectReg).add(connectState).add(connectPing).build()) {
736                future.cancel();
737                if (future.getChannel() != null && future.getChannel().isOpen()) {
738                    try {
739                        future.getChannel().close();
740                    } catch (Exception e1) {
741                        // ignore
742                    }
743                }
744            }
745            throw e;
746        }
747    }
748
749    @Override
750    public void disconnectFromNode(DiscoveryNode node) {
751        NodeChannels nodeChannels = connectedNodes.remove(node);
752        if (nodeChannels != null) {
753            connectionLock.acquire(node.id());
754            try {
755                try {
756                    nodeChannels.close();
757                } finally {
758                    logger.debug("disconnected from [{}]", node);
759                    transportServiceAdapter.raiseNodeDisconnected(node);
760                }
761            } finally {
762                connectionLock.release(node.id());
763            }
764        }
765    }
766
767    /**
768     * Disconnects from a node if a channel is found as part of that nodes channels.
769     */
770    private void disconnectFromNodeChannel(Channel channel, Throwable failure) {
771        for (DiscoveryNode node : connectedNodes.keySet()) {
772            NodeChannels nodeChannels = connectedNodes.get(node);
773            if (nodeChannels != null && nodeChannels.hasChannel(channel)) {
774                connectionLock.acquire(node.id());
775                if (!nodeChannels.hasChannel(channel)) { //might have been removed in the meanwhile, safety check
776                    assert !connectedNodes.containsKey(node);
777                } else {
778                    try {
779                        connectedNodes.remove(node);
780                        try {
781                            nodeChannels.close();
782                        } finally {
783                            logger.debug("disconnected from [{}] on channel failure", failure, node);
784                            transportServiceAdapter.raiseNodeDisconnected(node);
785                        }
786                    } finally {
787                        connectionLock.release(node.id());
788                    }
789                }
790            }
791        }
792    }
793
794    private Channel nodeChannel(DiscoveryNode node, TransportRequestOptions options) throws ConnectTransportException {
795        NodeChannels nodeChannels = connectedNodes.get(node);
796        if (nodeChannels == null) {
797            throw new NodeNotConnectedException(node, "Node not connected");
798        }
799        return nodeChannels.channel(options.type());
800    }
801
802    private class ChannelCloseListener implements ChannelFutureListener {
803
804        private final DiscoveryNode node;
805
806        private ChannelCloseListener(DiscoveryNode node) {
807            this.node = node;
808        }
809
810        @Override
811        public void operationComplete(ChannelFuture future) throws Exception {
812            disconnectFromNode(node);
813        }
814    }
815
816    public static class NodeChannels {
817
818        private Channel[] recovery;
819        private final AtomicInteger recoveryCounter = new AtomicInteger();
820        private Channel[] bulk;
821        private final AtomicInteger bulkCounter = new AtomicInteger();
822        private Channel[] reg;
823        private final AtomicInteger regCounter = new AtomicInteger();
824        private Channel[] state;
825        private final AtomicInteger stateCounter = new AtomicInteger();
826        private Channel[] ping;
827        private final AtomicInteger pingCounter = new AtomicInteger();
828
829        public NodeChannels(Channel[] recovery, Channel[] bulk, Channel[] reg, Channel[] state, Channel[] ping) {
830            this.recovery = recovery;
831            this.bulk = bulk;
832            this.reg = reg;
833            this.state = state;
834            this.ping = ping;
835        }
836
837        public boolean hasChannel(Channel channel) {
838            return hasChannel(channel, recovery) || hasChannel(channel, bulk) || hasChannel(channel, reg) || hasChannel(channel, state) || hasChannel(channel, ping);
839        }
840
841        private boolean hasChannel(Channel channel, Channel[] channels) {
842            for (Channel channel1 : channels) {
843                if (channel.equals(channel1)) {
844                    return true;
845                }
846            }
847            return false;
848        }
849
850        public Channel channel(TransportRequestOptions.Type type) {
851            if (type == TransportRequestOptions.Type.REG) {
852                return reg[Math.abs(regCounter.incrementAndGet()) % reg.length];
853            } else if (type == TransportRequestOptions.Type.STATE) {
854                return state[Math.abs(stateCounter.incrementAndGet()) % state.length];
855            } else if (type == TransportRequestOptions.Type.PING) {
856                return ping[Math.abs(pingCounter.incrementAndGet()) % ping.length];
857            } else if (type == TransportRequestOptions.Type.BULK) {
858                return bulk[Math.abs(bulkCounter.incrementAndGet()) % bulk.length];
859            } else if (type == TransportRequestOptions.Type.RECOVERY) {
860                return recovery[Math.abs(recoveryCounter.incrementAndGet()) % recovery.length];
861            } else {
862                throw new ElasticsearchIllegalArgumentException("no type channel for [" + type + "]");
863            }
864        }
865
866        public synchronized void close() {
867            List<ChannelFuture> futures = new ArrayList<ChannelFuture>();
868            closeChannelsAndWait(recovery, futures);
869            closeChannelsAndWait(bulk, futures);
870            closeChannelsAndWait(reg, futures);
871            closeChannelsAndWait(state, futures);
872            closeChannelsAndWait(ping, futures);
873            for (ChannelFuture future : futures) {
874                future.awaitUninterruptibly();
875            }
876        }
877
878        private void closeChannelsAndWait(Channel[] channels, List<ChannelFuture> futures) {
879            for (Channel channel : channels) {
880                try {
881                    if (channel != null && channel.isOpen()) {
882                        futures.add(channel.close());
883                    }
884                } catch (Exception e) {
885                    //ignore
886                }
887            }
888        }
889    }
890
891}