001package org.xbib.elasticsearch.transport.netty; 002 003import org.elasticsearch.ElasticsearchException; 004import org.elasticsearch.ElasticsearchIllegalArgumentException; 005import org.elasticsearch.ElasticsearchIllegalStateException; 006import org.elasticsearch.Version; 007import org.elasticsearch.cluster.node.DiscoveryNode; 008import org.elasticsearch.common.Strings; 009import org.elasticsearch.common.collect.ImmutableList; 010import org.elasticsearch.common.collect.Lists; 011import org.elasticsearch.common.component.AbstractLifecycleComponent; 012import org.elasticsearch.common.inject.Inject; 013import org.elasticsearch.common.io.stream.HandlesStreamOutput; 014import org.elasticsearch.common.io.stream.StreamOutput; 015import org.elasticsearch.common.network.NetworkService; 016import org.elasticsearch.common.network.NetworkUtils; 017import org.elasticsearch.common.settings.Settings; 018import org.elasticsearch.common.transport.BoundTransportAddress; 019import org.elasticsearch.common.transport.InetSocketTransportAddress; 020import org.elasticsearch.common.transport.PortsRange; 021import org.elasticsearch.common.transport.TransportAddress; 022import org.elasticsearch.common.unit.ByteSizeValue; 023import org.elasticsearch.common.unit.TimeValue; 024import org.elasticsearch.common.util.BigArrays; 025import org.elasticsearch.common.util.concurrent.EsExecutors; 026import org.elasticsearch.common.util.concurrent.KeyedLock; 027import org.elasticsearch.threadpool.ThreadPool; 028import org.elasticsearch.transport.BindTransportException; 029import org.elasticsearch.transport.ConnectTransportException; 030import org.elasticsearch.transport.NodeNotConnectedException; 031import org.elasticsearch.transport.Transport; 032import org.elasticsearch.transport.TransportException; 033import org.elasticsearch.transport.TransportRequest; 034import org.elasticsearch.transport.TransportRequestOptions; 035import org.elasticsearch.transport.TransportServiceAdapter; 036import org.elasticsearch.transport.support.TransportStatus; 037import org.jboss.netty.bootstrap.ClientBootstrap; 038import org.jboss.netty.bootstrap.ServerBootstrap; 039import org.jboss.netty.buffer.ChannelBuffer; 040import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory; 041import org.jboss.netty.channel.Channel; 042import org.jboss.netty.channel.ChannelFuture; 043import org.jboss.netty.channel.ChannelFutureListener; 044import org.jboss.netty.channel.ChannelHandlerContext; 045import org.jboss.netty.channel.ChannelPipeline; 046import org.jboss.netty.channel.ChannelPipelineFactory; 047import org.jboss.netty.channel.Channels; 048import org.jboss.netty.channel.ExceptionEvent; 049import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory; 050import org.jboss.netty.channel.ReceiveBufferSizePredictorFactory; 051import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; 052import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; 053import org.jboss.netty.channel.socket.nio.NioWorkerPool; 054import org.jboss.netty.channel.socket.oio.OioClientSocketChannelFactory; 055import org.jboss.netty.channel.socket.oio.OioServerSocketChannelFactory; 056import org.jboss.netty.util.HashedWheelTimer; 057import org.xbib.elasticsearch.common.io.stream.BytesStreamOutput; 058import org.xbib.elasticsearch.common.netty.NettyStaticSetup; 059import org.xbib.elasticsearch.common.netty.OpenChannelsHandler; 060 061import java.io.IOException; 062import java.net.InetAddress; 063import java.net.InetSocketAddress; 064import java.net.SocketAddress; 065import java.util.ArrayList; 066import java.util.Iterator; 067import java.util.List; 068import java.util.Map; 069import java.util.Set; 070import java.util.concurrent.ConcurrentMap; 071import java.util.concurrent.CountDownLatch; 072import java.util.concurrent.Executors; 073import java.util.concurrent.TimeUnit; 074import java.util.concurrent.atomic.AtomicInteger; 075import java.util.concurrent.atomic.AtomicReference; 076import java.util.concurrent.locks.ReadWriteLock; 077import java.util.concurrent.locks.ReentrantReadWriteLock; 078 079import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_BLOCKING; 080import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_BLOCKING_CLIENT; 081import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_BLOCKING_SERVER; 082import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_CONNECT_TIMEOUT; 083import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_DEFAULT_CONNECT_TIMEOUT; 084import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_DEFAULT_RECEIVE_BUFFER_SIZE; 085import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_DEFAULT_SEND_BUFFER_SIZE; 086import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_KEEP_ALIVE; 087import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_NO_DELAY; 088import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_RECEIVE_BUFFER_SIZE; 089import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_REUSE_ADDRESS; 090import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_SEND_BUFFER_SIZE; 091import static org.elasticsearch.common.transport.NetworkExceptionHelper.isCloseConnectionException; 092import static org.elasticsearch.common.transport.NetworkExceptionHelper.isConnectException; 093import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; 094import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; 095 096 097/** 098 * There are 4 types of connections per node, low/med/high/ping. Low if for batch oriented APIs (like recovery or 099 * batch) with high payload that will cause regular request. (like search or single index) to take 100 * longer. Med is for the typical search / single doc index. And High for things like cluster state. Ping is reserved for 101 * sending out ping requests to other nodes. 102 */ 103public class NettyTransport extends AbstractLifecycleComponent<Transport> implements Transport { 104 105 static { 106 NettyStaticSetup.setup(); 107 } 108 109 private final NetworkService networkService; 110 111 final Version version; 112 113 final int workerCount; 114 115 final int bossCount; 116 117 final boolean blockingServer; 118 119 final boolean blockingClient; 120 121 final String port; 122 123 final String bindHost; 124 125 final String publishHost; 126 127 final boolean compress; 128 129 final TimeValue connectTimeout; 130 131 final Boolean tcpNoDelay; 132 133 final Boolean tcpKeepAlive; 134 135 final Boolean reuseAddress; 136 137 final ByteSizeValue tcpSendBufferSize; 138 139 final ByteSizeValue tcpReceiveBufferSize; 140 141 final ReceiveBufferSizePredictorFactory receiveBufferSizePredictorFactory; 142 143 final int connectionsPerNodeRecovery; 144 final int connectionsPerNodeBulk; 145 final int connectionsPerNodeReg; 146 final int connectionsPerNodeState; 147 final int connectionsPerNodePing; 148 149 final ByteSizeValue maxCumulationBufferCapacity; 150 151 final BigArrays bigArrays; 152 153 final int maxCompositeBufferComponents; 154 155 private final ThreadPool threadPool; 156 157 private volatile OpenChannelsHandler serverOpenChannels; 158 159 private volatile ClientBootstrap clientBootstrap; 160 161 private volatile ServerBootstrap serverBootstrap; 162 163 // node id to actual channel 164 final ConcurrentMap<DiscoveryNode, NodeChannels> connectedNodes = newConcurrentMap(); 165 166 private volatile Channel serverChannel; 167 168 private volatile TransportServiceAdapter transportServiceAdapter; 169 170 private volatile BoundTransportAddress boundAddress; 171 172 private final KeyedLock<String> connectionLock = new KeyedLock<String>(); 173 174 // this lock is here to make sure we close this transport and disconnect all the client nodes 175 // connections while no connect operations is going on... (this might help with 100% CPU when stopping the transport?) 176 private final ReadWriteLock globalLock = new ReentrantReadWriteLock(); 177 178 @Inject 179 public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version) { 180 super(settings); 181 this.threadPool = threadPool; 182 this.networkService = networkService; 183 this.bigArrays = bigArrays; 184 this.version = version; 185 186 if (settings.getAsBoolean("netty.epollBugWorkaround", false)) { 187 System.setProperty("org.jboss.netty.epollBugWorkaround", "true"); 188 } 189 190 this.workerCount = componentSettings.getAsInt("worker_count", EsExecutors.boundedNumberOfProcessors(settings) * 2); 191 this.bossCount = componentSettings.getAsInt("boss_count", 1); 192 this.blockingServer = settings.getAsBoolean("transport.tcp.blocking_server", settings.getAsBoolean(TCP_BLOCKING_SERVER, settings.getAsBoolean(TCP_BLOCKING, false))); 193 this.blockingClient = settings.getAsBoolean("transport.tcp.blocking_client", settings.getAsBoolean(TCP_BLOCKING_CLIENT, settings.getAsBoolean(TCP_BLOCKING, false))); 194 this.port = componentSettings.get("port", settings.get("transport.tcp.port", "9300-9400")); 195 this.bindHost = componentSettings.get("bind_host", settings.get("transport.bind_host", settings.get("transport.host"))); 196 this.publishHost = componentSettings.get("publish_host", settings.get("transport.publish_host", settings.get("transport.host"))); 197 this.compress = settings.getAsBoolean("transport.tcp.compress", false); 198 this.connectTimeout = componentSettings.getAsTime("connect_timeout", settings.getAsTime("transport.tcp.connect_timeout", settings.getAsTime(TCP_CONNECT_TIMEOUT, TCP_DEFAULT_CONNECT_TIMEOUT))); 199 this.tcpNoDelay = componentSettings.getAsBoolean("tcp_no_delay", settings.getAsBoolean(TCP_NO_DELAY, true)); 200 this.tcpKeepAlive = componentSettings.getAsBoolean("tcp_keep_alive", settings.getAsBoolean(TCP_KEEP_ALIVE, true)); 201 this.reuseAddress = componentSettings.getAsBoolean("reuse_address", settings.getAsBoolean(TCP_REUSE_ADDRESS, NetworkUtils.defaultReuseAddress())); 202 this.tcpSendBufferSize = componentSettings.getAsBytesSize("tcp_send_buffer_size", settings.getAsBytesSize(TCP_SEND_BUFFER_SIZE, TCP_DEFAULT_SEND_BUFFER_SIZE)); 203 this.tcpReceiveBufferSize = componentSettings.getAsBytesSize("tcp_receive_buffer_size", settings.getAsBytesSize(TCP_RECEIVE_BUFFER_SIZE, TCP_DEFAULT_RECEIVE_BUFFER_SIZE)); 204 this.connectionsPerNodeRecovery = componentSettings.getAsInt("connections_per_node.recovery", settings.getAsInt("transport.connections_per_node.recovery", 2)); 205 this.connectionsPerNodeBulk = componentSettings.getAsInt("connections_per_node.bulk", settings.getAsInt("transport.connections_per_node.bulk", 3)); 206 this.connectionsPerNodeReg = componentSettings.getAsInt("connections_per_node.reg", settings.getAsInt("transport.connections_per_node.reg", 6)); 207 this.connectionsPerNodeState = componentSettings.getAsInt("connections_per_node.high", settings.getAsInt("transport.connections_per_node.state", 1)); 208 this.connectionsPerNodePing = componentSettings.getAsInt("connections_per_node.ping", settings.getAsInt("transport.connections_per_node.ping", 1)); 209 210 // we want to have at least 1 for reg/state/ping 211 if (this.connectionsPerNodeReg == 0) { 212 throw new ElasticsearchIllegalArgumentException("can't set [connection_per_node.reg] to 0"); 213 } 214 if (this.connectionsPerNodePing == 0) { 215 throw new ElasticsearchIllegalArgumentException("can't set [connection_per_node.ping] to 0"); 216 } 217 if (this.connectionsPerNodeState == 0) { 218 throw new ElasticsearchIllegalArgumentException("can't set [connection_per_node.state] to 0"); 219 } 220 221 this.maxCumulationBufferCapacity = componentSettings.getAsBytesSize("max_cumulation_buffer_capacity", null); 222 this.maxCompositeBufferComponents = componentSettings.getAsInt("max_composite_buffer_components", -1); 223 224 long defaultReceiverPredictor = 512 * 1024; 225 226 // skip JVM info 227 228 // See AdaptiveReceiveBufferSizePredictor#DEFAULT_XXX for default values in netty..., we can use higher ones for us, even fixed one 229 ByteSizeValue receivePredictorMin = componentSettings.getAsBytesSize("receive_predictor_min", componentSettings.getAsBytesSize("receive_predictor_size", new ByteSizeValue(defaultReceiverPredictor))); 230 ByteSizeValue receivePredictorMax = componentSettings.getAsBytesSize("receive_predictor_max", componentSettings.getAsBytesSize("receive_predictor_size", new ByteSizeValue(defaultReceiverPredictor))); 231 if (receivePredictorMax.bytes() == receivePredictorMin.bytes()) { 232 receiveBufferSizePredictorFactory = new FixedReceiveBufferSizePredictorFactory((int) receivePredictorMax.bytes()); 233 } else { 234 receiveBufferSizePredictorFactory = new AdaptiveReceiveBufferSizePredictorFactory((int) receivePredictorMin.bytes(), (int) receivePredictorMin.bytes(), (int) receivePredictorMax.bytes()); 235 } 236 237 logger.debug("using worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]", 238 workerCount, port, bindHost, publishHost, compress, connectTimeout, connectionsPerNodeRecovery, connectionsPerNodeBulk, connectionsPerNodeReg, connectionsPerNodeState, connectionsPerNodePing, receivePredictorMin, receivePredictorMax); 239 } 240 241 public Settings settings() { 242 return this.settings; 243 } 244 245 @Override 246 public void transportServiceAdapter(TransportServiceAdapter service) { 247 this.transportServiceAdapter = service; 248 } 249 250 TransportServiceAdapter transportServiceAdapter() { 251 return transportServiceAdapter; 252 } 253 254 ThreadPool threadPool() { 255 return threadPool; 256 } 257 258 @Override 259 protected void doStart() throws ElasticsearchException { 260 if (blockingClient) { 261 clientBootstrap = new ClientBootstrap(new OioClientSocketChannelFactory(Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_client_worker")))); 262 } else { 263 clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory( 264 Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_client_boss")), 265 bossCount, 266 new NioWorkerPool(Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_client_worker")), workerCount), 267 new HashedWheelTimer(daemonThreadFactory(settings, "transport_client_timer")))); 268 } 269 ChannelPipelineFactory clientPipelineFactory = new ChannelPipelineFactory() { 270 @Override 271 public ChannelPipeline getPipeline() throws Exception { 272 ChannelPipeline pipeline = Channels.pipeline(); 273 SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder(); 274 if (maxCumulationBufferCapacity != null) { 275 if (maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) { 276 sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE); 277 } else { 278 sizeHeader.setMaxCumulationBufferCapacity((int) maxCumulationBufferCapacity.bytes()); 279 } 280 } 281 if (maxCompositeBufferComponents != -1) { 282 sizeHeader.setMaxCumulationBufferComponents(maxCompositeBufferComponents); 283 } 284 pipeline.addLast("size", sizeHeader); 285 pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger)); 286 return pipeline; 287 } 288 }; 289 clientBootstrap.setPipelineFactory(clientPipelineFactory); 290 clientBootstrap.setOption("connectTimeoutMillis", connectTimeout.millis()); 291 if (tcpNoDelay != null) { 292 clientBootstrap.setOption("tcpNoDelay", tcpNoDelay); 293 } 294 if (tcpKeepAlive != null) { 295 clientBootstrap.setOption("keepAlive", tcpKeepAlive); 296 } 297 if (tcpSendBufferSize != null && tcpSendBufferSize.bytes() > 0) { 298 clientBootstrap.setOption("sendBufferSize", tcpSendBufferSize.bytes()); 299 } 300 if (tcpReceiveBufferSize != null && tcpReceiveBufferSize.bytes() > 0) { 301 clientBootstrap.setOption("receiveBufferSize", tcpReceiveBufferSize.bytes()); 302 } 303 clientBootstrap.setOption("receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory); 304 if (reuseAddress != null) { 305 clientBootstrap.setOption("reuseAddress", reuseAddress); 306 } 307 308 if (!settings.getAsBoolean("network.server", true)) { 309 return; 310 } 311 312 serverOpenChannels = new OpenChannelsHandler(logger); 313 if (blockingServer) { 314 serverBootstrap = new ServerBootstrap(new OioServerSocketChannelFactory( 315 Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_server_boss")), 316 Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_server_worker")))); 317 } else { 318 serverBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory( 319 Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_server_boss")), 320 Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_server_worker")), 321 workerCount)); 322 } 323 ChannelPipelineFactory serverPipelineFactory = new ChannelPipelineFactory() { 324 @Override 325 public ChannelPipeline getPipeline() throws Exception { 326 ChannelPipeline pipeline = Channels.pipeline(); 327 pipeline.addLast("openChannels", serverOpenChannels); 328 SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder(); 329 if (maxCumulationBufferCapacity != null) { 330 if (maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) { 331 sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE); 332 } else { 333 sizeHeader.setMaxCumulationBufferCapacity((int) maxCumulationBufferCapacity.bytes()); 334 } 335 } 336 if (maxCompositeBufferComponents != -1) { 337 sizeHeader.setMaxCumulationBufferComponents(maxCompositeBufferComponents); 338 } 339 pipeline.addLast("size", sizeHeader); 340 pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger)); 341 return pipeline; 342 } 343 }; 344 serverBootstrap.setPipelineFactory(serverPipelineFactory); 345 if (tcpNoDelay != null) { 346 serverBootstrap.setOption("child.tcpNoDelay", tcpNoDelay); 347 } 348 if (tcpKeepAlive != null) { 349 serverBootstrap.setOption("child.keepAlive", tcpKeepAlive); 350 } 351 if (tcpSendBufferSize != null && tcpSendBufferSize.bytes() > 0) { 352 serverBootstrap.setOption("child.sendBufferSize", tcpSendBufferSize.bytes()); 353 } 354 if (tcpReceiveBufferSize != null && tcpReceiveBufferSize.bytes() > 0) { 355 serverBootstrap.setOption("child.receiveBufferSize", tcpReceiveBufferSize.bytes()); 356 } 357 serverBootstrap.setOption("receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory); 358 serverBootstrap.setOption("child.receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory); 359 if (reuseAddress != null) { 360 serverBootstrap.setOption("reuseAddress", reuseAddress); 361 serverBootstrap.setOption("child.reuseAddress", reuseAddress); 362 } 363 364 // Bind and start to accept incoming connections. 365 InetAddress hostAddressX; 366 try { 367 hostAddressX = networkService.resolveBindHostAddress(bindHost); 368 } catch (IOException e) { 369 throw new BindTransportException("Failed to resolve host [" + bindHost + "]", e); 370 } 371 final InetAddress hostAddress = hostAddressX; 372 373 PortsRange portsRange = new PortsRange(port); 374 final AtomicReference<Exception> lastException = new AtomicReference(); 375 boolean success = portsRange.iterate(new PortsRange.PortCallback() { 376 @Override 377 public boolean onPortNumber(int portNumber) { 378 try { 379 serverChannel = serverBootstrap.bind(new InetSocketAddress(hostAddress, portNumber)); 380 } catch (Exception e) { 381 lastException.set(e); 382 return false; 383 } 384 return true; 385 } 386 }); 387 if (!success) { 388 throw new BindTransportException("Failed to bind to [" + port + "]", lastException.get()); 389 } 390 391 logger.debug("Bound to address [{}]", serverChannel.getLocalAddress()); 392 393 InetSocketAddress boundAddress = (InetSocketAddress) serverChannel.getLocalAddress(); 394 InetSocketAddress publishAddress; 395 try { 396 publishAddress = new InetSocketAddress(networkService.resolvePublishHostAddress(publishHost), boundAddress.getPort()); 397 } catch (Exception e) { 398 throw new BindTransportException("Failed to resolve publish address", e); 399 } 400 this.boundAddress = new BoundTransportAddress(new InetSocketTransportAddress(boundAddress), new InetSocketTransportAddress(publishAddress)); 401 } 402 403 @Override 404 protected void doStop() throws ElasticsearchException { 405 final CountDownLatch latch = new CountDownLatch(1); 406 // make sure we run it on another thread than a possible IO handler thread 407 threadPool.generic().execute(new Runnable() { 408 @Override 409 public void run() { 410 try { 411 for (Iterator<NodeChannels> it = connectedNodes.values().iterator(); it.hasNext(); ) { 412 NodeChannels nodeChannels = it.next(); 413 it.remove(); 414 nodeChannels.close(); 415 } 416 417 if (serverChannel != null) { 418 try { 419 serverChannel.close().awaitUninterruptibly(); 420 } finally { 421 serverChannel = null; 422 } 423 } 424 425 if (serverOpenChannels != null) { 426 serverOpenChannels.close(); 427 serverOpenChannels = null; 428 } 429 430 if (serverBootstrap != null) { 431 serverBootstrap.releaseExternalResources(); 432 serverBootstrap = null; 433 } 434 435 for (Iterator<NodeChannels> it = connectedNodes.values().iterator(); it.hasNext(); ) { 436 NodeChannels nodeChannels = it.next(); 437 it.remove(); 438 nodeChannels.close(); 439 } 440 441 if (clientBootstrap != null) { 442 clientBootstrap.releaseExternalResources(); 443 clientBootstrap = null; 444 } 445 } finally { 446 latch.countDown(); 447 } 448 } 449 }); 450 451 try { 452 latch.await(30, TimeUnit.SECONDS); 453 } catch (InterruptedException e) { 454 // ignore 455 } 456 } 457 458 @Override 459 protected void doClose() throws ElasticsearchException { 460 } 461 462 @Override 463 public TransportAddress[] addressesFromString(String address) throws Exception { 464 int index = address.indexOf('['); 465 if (index != -1) { 466 String host = address.substring(0, index); 467 Set<String> ports = Strings.commaDelimitedListToSet(address.substring(index + 1, address.indexOf(']'))); 468 List<TransportAddress> addresses = Lists.newArrayList(); 469 for (String port : ports) { 470 int[] iPorts = new PortsRange(port).ports(); 471 for (int iPort : iPorts) { 472 addresses.add(new InetSocketTransportAddress(host, iPort)); 473 } 474 } 475 return addresses.toArray(new TransportAddress[addresses.size()]); 476 } else { 477 index = address.lastIndexOf(':'); 478 if (index == -1) { 479 List<TransportAddress> addresses = Lists.newArrayList(); 480 int[] iPorts = new PortsRange(this.port).ports(); 481 for (int iPort : iPorts) { 482 addresses.add(new InetSocketTransportAddress(address, iPort)); 483 } 484 return addresses.toArray(new TransportAddress[addresses.size()]); 485 } else { 486 String host = address.substring(0, index); 487 int port = Integer.parseInt(address.substring(index + 1)); 488 return new TransportAddress[]{new InetSocketTransportAddress(host, port)}; 489 } 490 } 491 } 492 493 @Override 494 public boolean addressSupported(Class<? extends TransportAddress> address) { 495 return InetSocketTransportAddress.class.equals(address); 496 } 497 498 @Override 499 public BoundTransportAddress boundAddress() { 500 return this.boundAddress; 501 } 502 503 void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { 504 if (!lifecycle.started()) { 505 // ignore 506 } 507 if (isCloseConnectionException(e.getCause())) { 508 // disconnect the node 509 Channel channel = ctx.getChannel(); 510 for (Map.Entry<DiscoveryNode, NodeChannels> entry : connectedNodes.entrySet()) { 511 if (entry.getValue().hasChannel(channel)) { 512 disconnectFromNode(entry.getKey()); 513 } 514 } 515 } else if (isConnectException(e.getCause())) { 516 if (logger.isTraceEnabled()) { 517 logger.trace("(Ignoring) Exception caught on netty layer [" + ctx.getChannel() + "]", e.getCause()); 518 } 519 } else { 520 logger.warn("Exception caught on netty layer [" + ctx.getChannel() + "]", e.getCause()); 521 } 522 } 523 524 TransportAddress wrapAddress(SocketAddress socketAddress) { 525 return new InetSocketTransportAddress((InetSocketAddress) socketAddress); 526 } 527 528 @Override 529 public long serverOpen() { 530 OpenChannelsHandler channels = serverOpenChannels; 531 return channels == null ? 0 : channels.numberOfOpenChannels(); 532 } 533 534 @Override 535 public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { 536 Channel targetChannel = nodeChannel(node, options); 537 538 if (compress) { 539 options.withCompress(true); 540 } 541 542 byte status = 0; 543 status = TransportStatus.setRequest(status); 544 545 BytesStreamOutput bStream = new BytesStreamOutput(); 546 bStream.skip(NettyHeader.HEADER_SIZE); 547 StreamOutput stream = bStream; 548 stream = new HandlesStreamOutput(stream); 549 550 // we pick the smallest of the 2, to support both backward and forward compatibility 551 // note, this is the only place we need to do this, since from here on, we use the serialized version 552 // as the version to use also when the node receiving this request will send the response with 553 Version version = Version.smallest(this.version, node.version()); 554 555 stream.setVersion(version); 556 stream.writeString(action); 557 558 ChannelBuffer buffer; 559 request.writeTo(stream); 560 stream.close(); 561 buffer = bStream.ourBytes().toChannelBuffer(); 562 NettyHeader.writeHeader(buffer, requestId, status, version); 563 targetChannel.write(buffer); 564 } 565 566 @Override 567 public boolean nodeConnected(DiscoveryNode node) { 568 return connectedNodes.containsKey(node); 569 } 570 571 @Override 572 public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException { 573 connectToNode(node, true); 574 } 575 576 @Override 577 public void connectToNode(DiscoveryNode node) { 578 connectToNode(node, false); 579 } 580 581 public void connectToNode(DiscoveryNode node, boolean light) { 582 if (!lifecycle.started()) { 583 throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport"); 584 } 585 if (node == null) { 586 throw new ConnectTransportException(null, "can't connect to a null node"); 587 } 588 globalLock.readLock().lock(); 589 try { 590 if (!lifecycle.started()) { 591 throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport"); 592 } 593 NodeChannels nodeChannels = connectedNodes.get(node); 594 if (nodeChannels != null) { 595 return; 596 } 597 connectionLock.acquire(node.id()); 598 try { 599 if (!lifecycle.started()) { 600 throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport"); 601 } 602 try { 603 604 605 if (light) { 606 nodeChannels = connectToChannelsLight(node); 607 } else { 608 nodeChannels = new NodeChannels(new Channel[connectionsPerNodeRecovery], new Channel[connectionsPerNodeBulk], new Channel[connectionsPerNodeReg], new Channel[connectionsPerNodeState], new Channel[connectionsPerNodePing]); 609 try { 610 connectToChannels(nodeChannels, node); 611 } catch (Exception e) { 612 nodeChannels.close(); 613 throw e; 614 } 615 } 616 617 NodeChannels existing = connectedNodes.putIfAbsent(node, nodeChannels); 618 if (existing != null) { 619 // we are already connected to a node, close this ones 620 nodeChannels.close(); 621 } else { 622 if (logger.isDebugEnabled()) { 623 logger.debug("connected to node [{}]", node); 624 } 625 transportServiceAdapter.raiseNodeConnected(node); 626 } 627 628 } catch (ConnectTransportException e) { 629 throw e; 630 } catch (Exception e) { 631 throw new ConnectTransportException(node, "General node connection failure", e); 632 } 633 } finally { 634 connectionLock.release(node.id()); 635 } 636 } finally { 637 globalLock.readLock().unlock(); 638 } 639 } 640 641 private NodeChannels connectToChannelsLight(DiscoveryNode node) { 642 InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address(); 643 ChannelFuture connect = clientBootstrap.connect(address); 644 connect.awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); 645 if (!connect.isSuccess()) { 646 throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connect.getCause()); 647 } 648 Channel[] channels = new Channel[1]; 649 channels[0] = connect.getChannel(); 650 channels[0].getCloseFuture().addListener(new ChannelCloseListener(node)); 651 return new NodeChannels(channels, channels, channels, channels, channels); 652 } 653 654 private void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) { 655 ChannelFuture[] connectRecovery = new ChannelFuture[nodeChannels.recovery.length]; 656 ChannelFuture[] connectBulk = new ChannelFuture[nodeChannels.bulk.length]; 657 ChannelFuture[] connectReg = new ChannelFuture[nodeChannels.reg.length]; 658 ChannelFuture[] connectState = new ChannelFuture[nodeChannels.state.length]; 659 ChannelFuture[] connectPing = new ChannelFuture[nodeChannels.ping.length]; 660 InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address(); 661 for (int i = 0; i < connectRecovery.length; i++) { 662 connectRecovery[i] = clientBootstrap.connect(address); 663 } 664 for (int i = 0; i < connectBulk.length; i++) { 665 connectBulk[i] = clientBootstrap.connect(address); 666 } 667 for (int i = 0; i < connectReg.length; i++) { 668 connectReg[i] = clientBootstrap.connect(address); 669 } 670 for (int i = 0; i < connectState.length; i++) { 671 connectState[i] = clientBootstrap.connect(address); 672 } 673 for (int i = 0; i < connectPing.length; i++) { 674 connectPing[i] = clientBootstrap.connect(address); 675 } 676 677 try { 678 for (int i = 0; i < connectRecovery.length; i++) { 679 connectRecovery[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); 680 if (!connectRecovery[i].isSuccess()) { 681 throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectRecovery[i].getCause()); 682 } 683 nodeChannels.recovery[i] = connectRecovery[i].getChannel(); 684 nodeChannels.recovery[i].getCloseFuture().addListener(new ChannelCloseListener(node)); 685 } 686 687 for (int i = 0; i < connectBulk.length; i++) { 688 connectBulk[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); 689 if (!connectBulk[i].isSuccess()) { 690 throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectBulk[i].getCause()); 691 } 692 nodeChannels.bulk[i] = connectBulk[i].getChannel(); 693 nodeChannels.bulk[i].getCloseFuture().addListener(new ChannelCloseListener(node)); 694 } 695 696 for (int i = 0; i < connectReg.length; i++) { 697 connectReg[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); 698 if (!connectReg[i].isSuccess()) { 699 throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectReg[i].getCause()); 700 } 701 nodeChannels.reg[i] = connectReg[i].getChannel(); 702 nodeChannels.reg[i].getCloseFuture().addListener(new ChannelCloseListener(node)); 703 } 704 705 for (int i = 0; i < connectState.length; i++) { 706 connectState[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); 707 if (!connectState[i].isSuccess()) { 708 throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectState[i].getCause()); 709 } 710 nodeChannels.state[i] = connectState[i].getChannel(); 711 nodeChannels.state[i].getCloseFuture().addListener(new ChannelCloseListener(node)); 712 } 713 714 for (int i = 0; i < connectPing.length; i++) { 715 connectPing[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); 716 if (!connectPing[i].isSuccess()) { 717 throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectPing[i].getCause()); 718 } 719 nodeChannels.ping[i] = connectPing[i].getChannel(); 720 nodeChannels.ping[i].getCloseFuture().addListener(new ChannelCloseListener(node)); 721 } 722 723 if (nodeChannels.recovery.length == 0) { 724 if (nodeChannels.bulk.length > 0) { 725 nodeChannels.recovery = nodeChannels.bulk; 726 } else { 727 nodeChannels.recovery = nodeChannels.reg; 728 } 729 } 730 if (nodeChannels.bulk.length == 0) { 731 nodeChannels.bulk = nodeChannels.reg; 732 } 733 } catch (RuntimeException e) { 734 // clean the futures 735 for (ChannelFuture future : ImmutableList.<ChannelFuture>builder().add(connectRecovery).add(connectBulk).add(connectReg).add(connectState).add(connectPing).build()) { 736 future.cancel(); 737 if (future.getChannel() != null && future.getChannel().isOpen()) { 738 try { 739 future.getChannel().close(); 740 } catch (Exception e1) { 741 // ignore 742 } 743 } 744 } 745 throw e; 746 } 747 } 748 749 @Override 750 public void disconnectFromNode(DiscoveryNode node) { 751 NodeChannels nodeChannels = connectedNodes.remove(node); 752 if (nodeChannels != null) { 753 connectionLock.acquire(node.id()); 754 try { 755 try { 756 nodeChannels.close(); 757 } finally { 758 logger.debug("disconnected from [{}]", node); 759 transportServiceAdapter.raiseNodeDisconnected(node); 760 } 761 } finally { 762 connectionLock.release(node.id()); 763 } 764 } 765 } 766 767 /** 768 * Disconnects from a node if a channel is found as part of that nodes channels. 769 */ 770 private void disconnectFromNodeChannel(Channel channel, Throwable failure) { 771 for (DiscoveryNode node : connectedNodes.keySet()) { 772 NodeChannels nodeChannels = connectedNodes.get(node); 773 if (nodeChannels != null && nodeChannels.hasChannel(channel)) { 774 connectionLock.acquire(node.id()); 775 if (!nodeChannels.hasChannel(channel)) { //might have been removed in the meanwhile, safety check 776 assert !connectedNodes.containsKey(node); 777 } else { 778 try { 779 connectedNodes.remove(node); 780 try { 781 nodeChannels.close(); 782 } finally { 783 logger.debug("disconnected from [{}] on channel failure", failure, node); 784 transportServiceAdapter.raiseNodeDisconnected(node); 785 } 786 } finally { 787 connectionLock.release(node.id()); 788 } 789 } 790 } 791 } 792 } 793 794 private Channel nodeChannel(DiscoveryNode node, TransportRequestOptions options) throws ConnectTransportException { 795 NodeChannels nodeChannels = connectedNodes.get(node); 796 if (nodeChannels == null) { 797 throw new NodeNotConnectedException(node, "Node not connected"); 798 } 799 return nodeChannels.channel(options.type()); 800 } 801 802 private class ChannelCloseListener implements ChannelFutureListener { 803 804 private final DiscoveryNode node; 805 806 private ChannelCloseListener(DiscoveryNode node) { 807 this.node = node; 808 } 809 810 @Override 811 public void operationComplete(ChannelFuture future) throws Exception { 812 disconnectFromNode(node); 813 } 814 } 815 816 public static class NodeChannels { 817 818 private Channel[] recovery; 819 private final AtomicInteger recoveryCounter = new AtomicInteger(); 820 private Channel[] bulk; 821 private final AtomicInteger bulkCounter = new AtomicInteger(); 822 private Channel[] reg; 823 private final AtomicInteger regCounter = new AtomicInteger(); 824 private Channel[] state; 825 private final AtomicInteger stateCounter = new AtomicInteger(); 826 private Channel[] ping; 827 private final AtomicInteger pingCounter = new AtomicInteger(); 828 829 public NodeChannels(Channel[] recovery, Channel[] bulk, Channel[] reg, Channel[] state, Channel[] ping) { 830 this.recovery = recovery; 831 this.bulk = bulk; 832 this.reg = reg; 833 this.state = state; 834 this.ping = ping; 835 } 836 837 public boolean hasChannel(Channel channel) { 838 return hasChannel(channel, recovery) || hasChannel(channel, bulk) || hasChannel(channel, reg) || hasChannel(channel, state) || hasChannel(channel, ping); 839 } 840 841 private boolean hasChannel(Channel channel, Channel[] channels) { 842 for (Channel channel1 : channels) { 843 if (channel.equals(channel1)) { 844 return true; 845 } 846 } 847 return false; 848 } 849 850 public Channel channel(TransportRequestOptions.Type type) { 851 if (type == TransportRequestOptions.Type.REG) { 852 return reg[Math.abs(regCounter.incrementAndGet()) % reg.length]; 853 } else if (type == TransportRequestOptions.Type.STATE) { 854 return state[Math.abs(stateCounter.incrementAndGet()) % state.length]; 855 } else if (type == TransportRequestOptions.Type.PING) { 856 return ping[Math.abs(pingCounter.incrementAndGet()) % ping.length]; 857 } else if (type == TransportRequestOptions.Type.BULK) { 858 return bulk[Math.abs(bulkCounter.incrementAndGet()) % bulk.length]; 859 } else if (type == TransportRequestOptions.Type.RECOVERY) { 860 return recovery[Math.abs(recoveryCounter.incrementAndGet()) % recovery.length]; 861 } else { 862 throw new ElasticsearchIllegalArgumentException("no type channel for [" + type + "]"); 863 } 864 } 865 866 public synchronized void close() { 867 List<ChannelFuture> futures = new ArrayList<ChannelFuture>(); 868 closeChannelsAndWait(recovery, futures); 869 closeChannelsAndWait(bulk, futures); 870 closeChannelsAndWait(reg, futures); 871 closeChannelsAndWait(state, futures); 872 closeChannelsAndWait(ping, futures); 873 for (ChannelFuture future : futures) { 874 future.awaitUninterruptibly(); 875 } 876 } 877 878 private void closeChannelsAndWait(Channel[] channels, List<ChannelFuture> futures) { 879 for (Channel channel : channels) { 880 try { 881 if (channel != null && channel.isOpen()) { 882 futures.add(channel.close()); 883 } 884 } catch (Exception e) { 885 //ignore 886 } 887 } 888 } 889 } 890 891}