001package org.xbib.elasticsearch.http.netty; 002 003import org.elasticsearch.ElasticsearchException; 004import org.elasticsearch.common.component.AbstractLifecycleComponent; 005import org.elasticsearch.common.inject.Inject; 006import org.elasticsearch.common.network.NetworkService; 007import org.elasticsearch.common.network.NetworkUtils; 008import org.elasticsearch.common.settings.Settings; 009import org.elasticsearch.common.transport.BoundTransportAddress; 010import org.elasticsearch.common.transport.InetSocketTransportAddress; 011import org.elasticsearch.common.transport.NetworkExceptionHelper; 012import org.elasticsearch.common.transport.PortsRange; 013import org.elasticsearch.common.unit.ByteSizeUnit; 014import org.elasticsearch.common.unit.ByteSizeValue; 015import org.elasticsearch.common.util.BigArrays; 016import org.elasticsearch.common.util.concurrent.ConcurrentCollections; 017import org.elasticsearch.common.xcontent.XContentBuilder; 018import org.elasticsearch.transport.BindTransportException; 019import org.jboss.netty.bootstrap.ServerBootstrap; 020import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory; 021import org.jboss.netty.channel.Channel; 022import org.jboss.netty.channel.ChannelHandlerContext; 023import org.jboss.netty.channel.ExceptionEvent; 024import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory; 025import org.jboss.netty.channel.ReceiveBufferSizePredictorFactory; 026import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; 027import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame; 028import org.jboss.netty.handler.codec.http.websocketx.WebSocketFrame; 029import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; 030import org.jboss.netty.handler.timeout.ReadTimeoutException; 031import org.xbib.elasticsearch.websocket.Presence; 032import org.xbib.elasticsearch.websocket.client.WebSocketActionListener; 033import org.xbib.elasticsearch.websocket.client.WebSocketClient; 034import org.xbib.elasticsearch.common.netty.OpenChannelsHandler; 035import org.xbib.elasticsearch.http.BindHttpException; 036import org.xbib.elasticsearch.http.HttpChannel; 037import org.xbib.elasticsearch.http.HttpInfo; 038import org.xbib.elasticsearch.http.HttpRequest; 039import org.xbib.elasticsearch.http.HttpServerAdapter; 040import org.xbib.elasticsearch.http.HttpServerTransport; 041import org.xbib.elasticsearch.http.HttpStats; 042import org.xbib.elasticsearch.http.WebSocketServerAdapter; 043import org.xbib.elasticsearch.http.netty.client.NettyWebSocketClientFactory; 044 045import java.io.IOException; 046import java.net.InetAddress; 047import java.net.InetSocketAddress; 048import java.net.URI; 049import java.util.Map; 050import java.util.concurrent.Executors; 051import java.util.concurrent.atomic.AtomicReference; 052 053import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_DEFAULT_RECEIVE_BUFFER_SIZE; 054import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_DEFAULT_SEND_BUFFER_SIZE; 055import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_KEEP_ALIVE; 056import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_NO_DELAY; 057import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_RECEIVE_BUFFER_SIZE; 058import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_REUSE_ADDRESS; 059import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_SEND_BUFFER_SIZE; 060import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; 061import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; 062 063/** 064 * WebSocket server transport. Based on HttpServerTransport. 065 * Extended for channel lookup and message forwarding. 066 */ 067public class NettyWebSocketServerTransport 068 extends AbstractLifecycleComponent<HttpServerTransport> 069 implements HttpServerTransport { 070 071 private final NetworkService networkService; 072 073 final BigArrays bigArrays; 074 075 final ByteSizeValue maxContentLength; 076 077 final ByteSizeValue maxInitialLineLength; 078 079 final ByteSizeValue maxHeaderSize; 080 081 final ByteSizeValue maxChunkSize; 082 083 final boolean compression; 084 085 final int compressionLevel; 086 087 final boolean resetCookies; 088 089 private final int workerCount; 090 091 private final String port; 092 093 private final String bindHost; 094 095 private final String publishHost; 096 097 private final Boolean onsiteonly; 098 099 private final Boolean tcpNoDelay; 100 101 private final Boolean tcpKeepAlive; 102 103 private final Boolean reuseAddress; 104 105 private final ByteSizeValue tcpSendBufferSize; 106 107 private final ByteSizeValue tcpReceiveBufferSize; 108 109 private final ReceiveBufferSizePredictorFactory receiveBufferSizePredictorFactory; 110 111 final ByteSizeValue maxCumulationBufferCapacity; 112 113 final int maxCompositeBufferComponents; 114 115 private volatile ServerBootstrap serverBootstrap; 116 117 private volatile BoundTransportAddress boundAddress; 118 119 private volatile Channel serverChannel; 120 121 protected OpenChannelsHandler serverOpenChannels; 122 123 private volatile HttpServerAdapter httpServerAdapter; 124 125 private volatile WebSocketServerAdapter webSocketServerAdapter; 126 127 private Map<String, WebSocketClient> nodeChannels = ConcurrentCollections.newConcurrentMap(); 128 129 private final static NettyWebSocketClientFactory clientFactory = new NettyWebSocketClientFactory(); 130 131 @Inject 132 public NettyWebSocketServerTransport(Settings settings, NetworkService networkService, 133 BigArrays bigArrays) { 134 super(settings); 135 this.networkService = networkService; 136 this.bigArrays = bigArrays; 137 138 if (settings.getAsBoolean("netty.epollBugWorkaround", false)) { 139 System.setProperty("org.jboss.netty.epollBugWorkaround", "true"); 140 } 141 142 String hostname = "localhost"; 143 try { 144 // doing a reverse lookup on the server's IP address using the naming service (DNS) configured in the OS 145 hostname = InetAddress.getLocalHost().getHostName(); 146 logger.info("detected host name by reverse IP lookup: {}", hostname); 147 } catch (Exception e) { 148 logger.warn("unable to look up this machine's host name, assuming localhost. Check your DNS for correct setup"); 149 } 150 151 ByteSizeValue maxContentLength = componentSettings.getAsBytesSize("max_content_length", settings.getAsBytesSize("websocket.max_content_length", new ByteSizeValue(100, ByteSizeUnit.MB))); 152 this.maxChunkSize = componentSettings.getAsBytesSize("max_chunk_size", settings.getAsBytesSize("websocket.max_chunk_size", new ByteSizeValue(8, ByteSizeUnit.KB))); 153 this.maxHeaderSize = componentSettings.getAsBytesSize("max_header_size", settings.getAsBytesSize("websocket.max_header_size", new ByteSizeValue(8, ByteSizeUnit.KB))); 154 this.maxInitialLineLength = componentSettings.getAsBytesSize("max_initial_line_length", settings.getAsBytesSize("websocket.max_initial_line_length", new ByteSizeValue(4, ByteSizeUnit.KB))); 155 // don't reset cookies by default, since I don't think we really need to 156 // note, parsing cookies was fixed in netty 3.5.1 regarding stack allocation, but still, currently, we don't need cookies 157 this.resetCookies = componentSettings.getAsBoolean("reset_cookies", settings.getAsBoolean("websocket.reset_cookies", false)); 158 this.maxCumulationBufferCapacity = componentSettings.getAsBytesSize("max_cumulation_buffer_capacity", null); 159 this.maxCompositeBufferComponents = componentSettings.getAsInt("max_composite_buffer_components", -1); 160 this.workerCount = componentSettings.getAsInt("worker_count", Runtime.getRuntime().availableProcessors() * 2); 161 this.port = componentSettings.get("port", settings.get("websocket.port", "9400-9500")); 162 this.bindHost = componentSettings.get("bind_host", settings.get("websocket.bind_host", settings.get("websocket.host", hostname))); 163 this.publishHost = componentSettings.get("publish_host", settings.get("websocket.publish_host", settings.get("websocket.host", hostname))); 164 this.onsiteonly = componentSettings.getAsBoolean("onsiteonly", settings.getAsBoolean("websocket.onsiteonly", true)); 165 this.tcpNoDelay = componentSettings.getAsBoolean("tcp_no_delay", settings.getAsBoolean(TCP_NO_DELAY, true)); 166 this.tcpKeepAlive = componentSettings.getAsBoolean("tcp_keep_alive", settings.getAsBoolean(TCP_KEEP_ALIVE, true)); 167 this.reuseAddress = componentSettings.getAsBoolean("reuse_address", settings.getAsBoolean(TCP_REUSE_ADDRESS, NetworkUtils.defaultReuseAddress())); 168 this.tcpSendBufferSize = componentSettings.getAsBytesSize("tcp_send_buffer_size", settings.getAsBytesSize(TCP_SEND_BUFFER_SIZE, TCP_DEFAULT_SEND_BUFFER_SIZE)); 169 this.tcpReceiveBufferSize = componentSettings.getAsBytesSize("tcp_receive_buffer_size", settings.getAsBytesSize(TCP_RECEIVE_BUFFER_SIZE, TCP_DEFAULT_RECEIVE_BUFFER_SIZE)); 170 171 long defaultReceiverPredictor = 512 * 1024; 172 // skip JVM info 173 ByteSizeValue receivePredictorMin = componentSettings.getAsBytesSize("receive_predictor_min", componentSettings.getAsBytesSize("receive_predictor_size", new ByteSizeValue(defaultReceiverPredictor))); 174 ByteSizeValue receivePredictorMax = componentSettings.getAsBytesSize("receive_predictor_max", componentSettings.getAsBytesSize("receive_predictor_size", new ByteSizeValue(defaultReceiverPredictor))); 175 if (receivePredictorMax.bytes() == receivePredictorMin.bytes()) { 176 receiveBufferSizePredictorFactory = new FixedReceiveBufferSizePredictorFactory((int) receivePredictorMax.bytes()); 177 } else { 178 receiveBufferSizePredictorFactory = new AdaptiveReceiveBufferSizePredictorFactory((int) receivePredictorMin.bytes(), (int) receivePredictorMin.bytes(), (int) receivePredictorMax.bytes()); 179 } 180 181 this.compression = settings.getAsBoolean("websocket.compression", false); 182 this.compressionLevel = settings.getAsInt("websocket.compression_level", 6); 183 184 // validate max content length 185 if (maxContentLength.bytes() > Integer.MAX_VALUE) { 186 logger.warn("maxContentLength[" + maxContentLength + "] set to high value, resetting it to [100mb]"); 187 maxContentLength = new ByteSizeValue(100, ByteSizeUnit.MB); 188 } 189 this.maxContentLength = maxContentLength; 190 191 logger.debug("using max_chunk_size[{}], max_header_size[{}], max_initial_line_length[{}], max_content_length[{}]", 192 maxChunkSize, maxHeaderSize, maxInitialLineLength, this.maxContentLength); 193 } 194 195 public Settings settings() { 196 return this.settings; 197 } 198 199 @Override 200 public void httpServerAdapter(HttpServerAdapter httpServerAdapter) { 201 this.httpServerAdapter = httpServerAdapter; 202 } 203 204 @Override 205 public void webSocketServerAdapter(WebSocketServerAdapter webSocketServerAdapter) { 206 this.webSocketServerAdapter = webSocketServerAdapter; 207 } 208 209 210 @Override 211 protected void doStart() throws ElasticsearchException { 212 this.serverOpenChannels = new OpenChannelsHandler(logger); 213 214 /* we do not support oio for websocket - it wouldn't work either */ 215 serverBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory( 216 Executors.newCachedThreadPool(daemonThreadFactory(settings, "websocket_server_boss")), 217 Executors.newCachedThreadPool(daemonThreadFactory(settings, "websocket_server_worker")), 218 workerCount)); 219 220 serverBootstrap.setPipelineFactory(new NettyWebSocketServerPipelineFactory(this)); 221 222 if (tcpNoDelay != null) { 223 serverBootstrap.setOption("child.tcpNoDelay", tcpNoDelay); 224 } 225 if (tcpKeepAlive != null) { 226 serverBootstrap.setOption("child.keepAlive", tcpKeepAlive); 227 } 228 if (tcpSendBufferSize != null && tcpSendBufferSize.bytes() > 0) { 229 serverBootstrap.setOption("child.sendBufferSize", tcpSendBufferSize.bytes()); 230 } 231 if (tcpReceiveBufferSize != null && tcpReceiveBufferSize.bytes() > 0) { 232 serverBootstrap.setOption("child.receiveBufferSize", tcpReceiveBufferSize.bytes()); 233 } 234 serverBootstrap.setOption("receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory); 235 serverBootstrap.setOption("child.receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory); 236 if (reuseAddress != null) { 237 serverBootstrap.setOption("reuseAddress", reuseAddress); 238 serverBootstrap.setOption("child.reuseAddress", reuseAddress); 239 } 240 241 // Bind and start to accept incoming connections. 242 InetAddress hostAddressX; 243 try { 244 hostAddressX = networkService.resolveBindHostAddress(bindHost); 245 } catch (IOException e) { 246 throw new BindHttpException("Failed to resolve host [" + bindHost + "]", e); 247 } 248 final InetAddress hostAddress = hostAddressX; 249 250 // Fail if host address is a public IP but only on-site networking is allowed. 251 if (onsiteonly) { 252 if (hostAddress == null || (!hostAddress.isLoopbackAddress() 253 && !hostAddress.isLinkLocalAddress() 254 && !hostAddress.isSiteLocalAddress())) { 255 throw new ElasticsearchException("Bind host " + bindHost 256 + (hostAddress != null ? "(address " + hostAddress + ") " : "") 257 + "is not on-site and not permitted by default. Check 'websocket.onsiteonly' setting in configuration."); 258 } 259 } 260 261 PortsRange portsRange = new PortsRange(port); 262 final AtomicReference<Exception> lastException = new AtomicReference(); 263 boolean success = portsRange.iterate(new PortsRange.PortCallback() { 264 @Override 265 public boolean onPortNumber(int portNumber) { 266 try { 267 serverChannel = serverBootstrap.bind(new InetSocketAddress(hostAddress, portNumber)); 268 } catch (Exception e) { 269 lastException.set(e); 270 return false; 271 } 272 return true; 273 } 274 }); 275 if (!success) { 276 throw new BindHttpException("Failed to bind to [" + port + "]", lastException.get()); 277 } 278 InetSocketAddress boundAddress = (InetSocketAddress) serverChannel.getLocalAddress(); 279 InetSocketAddress publishAddress; 280 try { 281 publishAddress = new InetSocketAddress(networkService.resolvePublishHostAddress(publishHost), boundAddress.getPort()); 282 } catch (Exception e) { 283 throw new BindTransportException("Failed to resolve publish address", e); 284 } 285 this.boundAddress = new BoundTransportAddress(new InetSocketTransportAddress(boundAddress), new InetSocketTransportAddress(publishAddress)); 286 } 287 288 @Override 289 protected void doStop() throws ElasticsearchException { 290 if (serverChannel != null) { 291 serverChannel.close().awaitUninterruptibly(); 292 serverChannel = null; 293 } 294 if (serverOpenChannels != null) { 295 serverOpenChannels.close(); 296 serverOpenChannels = null; 297 } 298 if (serverBootstrap != null) { 299 serverBootstrap.releaseExternalResources(); 300 serverBootstrap = null; 301 } 302 } 303 304 @Override 305 protected void doClose() throws ElasticsearchException { 306 } 307 308 @Override 309 public BoundTransportAddress boundAddress() { 310 return this.boundAddress; 311 } 312 313 @Override 314 public HttpInfo info() { 315 return new HttpInfo(boundAddress(), maxContentLength.bytes()); 316 } 317 318 319 @Override 320 public HttpStats stats() { 321 OpenChannelsHandler channels = serverOpenChannels; 322 return new HttpStats(channels == null ? 0 : channels.numberOfOpenChannels(), channels == null ? 0 : channels.totalChannels()); 323 } 324 325 /** 326 * Dispatch reqeuest to HTTP adapter. 327 * 328 * @param request 329 * @param channel 330 */ 331 void dispatchRequest(HttpRequest request, HttpChannel channel) { 332 httpServerAdapter.dispatchRequest(request, channel); 333 } 334 335 /** 336 * A channel appeared or disappeared. 337 * 338 * @param presence 339 * @param topic 340 * @param channel 341 */ 342 void presence(Presence presence, String topic, Channel channel) throws IOException { 343 webSocketServerAdapter.presence(presence, topic, channel); 344 } 345 346 /** 347 * Send a websocket frame. 348 * 349 * @param handshaker 350 * @param frame 351 * @param context 352 */ 353 void frame(WebSocketServerHandshaker handshaker, WebSocketFrame frame, ChannelHandlerContext context) { 354 webSocketServerAdapter.frame(handshaker, frame, context); 355 } 356 357 /** 358 * Returns a channel if it is in the server open channel table, given by ID. 359 * 360 * @param id the channel ID 361 * @return the channel if open or null if not present. 362 */ 363 @Override 364 public Channel channel(Integer id) { 365 return serverOpenChannels.channel(id); 366 } 367 368 /** 369 * Forward a message to another node with websockets for delivery. 370 * 371 * @param websocketNodeAddress the websocket address of the other node, e.g. 372 * "/10.0.0.1:9400" 373 * @param channelId the channel ID on the other node for delivering the 374 * message 375 * @param builder the builder for the message 376 */ 377 @Override 378 public void forward(final String websocketNodeAddress, final Integer channelId, final XContentBuilder builder) { 379 try { 380 // build "forward" text frame 381 XContentBuilder forwardBuilder = jsonBuilder(); 382 forwardBuilder.startObject() 383 .field("channel", channelId) 384 .rawField("message", builder.bytes()) 385 .endObject(); 386 final TextWebSocketFrame frame = new NettyInteractiveResponse("forward", forwardBuilder).response(); 387 // use a websocket client pool 388 WebSocketClient client = nodeChannels.get(websocketNodeAddress); 389 if (client == null) { 390 final URI uri = new URI("ws:/" + websocketNodeAddress + "/websocket"); 391 client = clientFactory.newClient(uri, new WebSocketActionListener() { 392 @Override 393 public void onConnect(WebSocketClient client) { 394 nodeChannels.put(websocketNodeAddress, client); 395 client.send(frame); 396 } 397 398 @Override 399 public void onDisconnect(WebSocketClient client) { 400 logger.warn("node disconnected: {}", uri); 401 nodeChannels.remove(websocketNodeAddress); 402 } 403 404 @Override 405 public void onMessage(WebSocketClient client, WebSocketFrame frame) { 406 logger.info("unexpected response {}", frame); 407 } 408 409 @Override 410 public void onError(Throwable t) { 411 logger.error(t.getMessage(), t); 412 nodeChannels.remove(websocketNodeAddress); 413 } 414 }); 415 client.connect(); 416 } else { 417 client.send(frame); 418 } 419 } catch (Exception e) { 420 logger.error(e.getMessage(), e); 421 } 422 } 423 424 void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { 425 if (e.getCause() instanceof ReadTimeoutException) { 426 if (logger.isTraceEnabled()) { 427 logger.trace("Connection timeout [{}]", ctx.getChannel().getRemoteAddress()); 428 } 429 ctx.getChannel().close(); 430 } else { 431 if (!lifecycle.started()) { 432 // ignore 433 return; 434 } 435 if (!NetworkExceptionHelper.isCloseConnectionException(e.getCause())) { 436 logger.warn("Caught exception while handling client http traffic, closing connection {}", e.getCause(), ctx.getChannel()); 437 ctx.getChannel().close(); 438 } else { 439 logger.debug("Caught exception while handling client http traffic, closing connection {}", e.getCause(), ctx.getChannel()); 440 ctx.getChannel().close(); 441 } 442 } 443 } 444 445}