001package org.xbib.elasticsearch.http.netty;
002
003import org.elasticsearch.ElasticsearchException;
004import org.elasticsearch.common.component.AbstractLifecycleComponent;
005import org.elasticsearch.common.inject.Inject;
006import org.elasticsearch.common.network.NetworkService;
007import org.elasticsearch.common.network.NetworkUtils;
008import org.elasticsearch.common.settings.Settings;
009import org.elasticsearch.common.transport.BoundTransportAddress;
010import org.elasticsearch.common.transport.InetSocketTransportAddress;
011import org.elasticsearch.common.transport.NetworkExceptionHelper;
012import org.elasticsearch.common.transport.PortsRange;
013import org.elasticsearch.common.unit.ByteSizeUnit;
014import org.elasticsearch.common.unit.ByteSizeValue;
015import org.elasticsearch.common.util.BigArrays;
016import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
017import org.elasticsearch.common.xcontent.XContentBuilder;
018import org.elasticsearch.transport.BindTransportException;
019import org.jboss.netty.bootstrap.ServerBootstrap;
020import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory;
021import org.jboss.netty.channel.Channel;
022import org.jboss.netty.channel.ChannelHandlerContext;
023import org.jboss.netty.channel.ExceptionEvent;
024import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory;
025import org.jboss.netty.channel.ReceiveBufferSizePredictorFactory;
026import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
027import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame;
028import org.jboss.netty.handler.codec.http.websocketx.WebSocketFrame;
029import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
030import org.jboss.netty.handler.timeout.ReadTimeoutException;
031import org.xbib.elasticsearch.websocket.Presence;
032import org.xbib.elasticsearch.websocket.client.WebSocketActionListener;
033import org.xbib.elasticsearch.websocket.client.WebSocketClient;
034import org.xbib.elasticsearch.common.netty.OpenChannelsHandler;
035import org.xbib.elasticsearch.http.BindHttpException;
036import org.xbib.elasticsearch.http.HttpChannel;
037import org.xbib.elasticsearch.http.HttpInfo;
038import org.xbib.elasticsearch.http.HttpRequest;
039import org.xbib.elasticsearch.http.HttpServerAdapter;
040import org.xbib.elasticsearch.http.HttpServerTransport;
041import org.xbib.elasticsearch.http.HttpStats;
042import org.xbib.elasticsearch.http.WebSocketServerAdapter;
043import org.xbib.elasticsearch.http.netty.client.NettyWebSocketClientFactory;
044
045import java.io.IOException;
046import java.net.InetAddress;
047import java.net.InetSocketAddress;
048import java.net.URI;
049import java.util.Map;
050import java.util.concurrent.Executors;
051import java.util.concurrent.atomic.AtomicReference;
052
053import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_DEFAULT_RECEIVE_BUFFER_SIZE;
054import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_DEFAULT_SEND_BUFFER_SIZE;
055import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_KEEP_ALIVE;
056import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_NO_DELAY;
057import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_RECEIVE_BUFFER_SIZE;
058import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_REUSE_ADDRESS;
059import static org.elasticsearch.common.network.NetworkService.TcpSettings.TCP_SEND_BUFFER_SIZE;
060import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
061import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
062
063/**
064 * WebSocket server transport. Based on HttpServerTransport.
065 * Extended for channel lookup and message forwarding.
066 */
067public class NettyWebSocketServerTransport
068        extends AbstractLifecycleComponent<HttpServerTransport>
069        implements HttpServerTransport {
070
071    private final NetworkService networkService;
072
073    final BigArrays bigArrays;
074
075    final ByteSizeValue maxContentLength;
076
077    final ByteSizeValue maxInitialLineLength;
078
079    final ByteSizeValue maxHeaderSize;
080
081    final ByteSizeValue maxChunkSize;
082
083    final boolean compression;
084
085    final int compressionLevel;
086
087    final boolean resetCookies;
088
089    private final int workerCount;
090
091    private final String port;
092
093    private final String bindHost;
094
095    private final String publishHost;
096
097    private final Boolean onsiteonly;
098
099    private final Boolean tcpNoDelay;
100
101    private final Boolean tcpKeepAlive;
102
103    private final Boolean reuseAddress;
104
105    private final ByteSizeValue tcpSendBufferSize;
106
107    private final ByteSizeValue tcpReceiveBufferSize;
108
109    private final ReceiveBufferSizePredictorFactory receiveBufferSizePredictorFactory;
110
111    final ByteSizeValue maxCumulationBufferCapacity;
112
113    final int maxCompositeBufferComponents;
114
115    private volatile ServerBootstrap serverBootstrap;
116
117    private volatile BoundTransportAddress boundAddress;
118
119    private volatile Channel serverChannel;
120
121    protected OpenChannelsHandler serverOpenChannels;
122
123    private volatile HttpServerAdapter httpServerAdapter;
124
125    private volatile WebSocketServerAdapter webSocketServerAdapter;
126
127    private Map<String, WebSocketClient> nodeChannels = ConcurrentCollections.newConcurrentMap();
128
129    private final static NettyWebSocketClientFactory clientFactory = new NettyWebSocketClientFactory();
130
131    @Inject
132    public NettyWebSocketServerTransport(Settings settings, NetworkService networkService,
133                                         BigArrays bigArrays) {
134        super(settings);
135        this.networkService = networkService;
136        this.bigArrays = bigArrays;
137
138        if (settings.getAsBoolean("netty.epollBugWorkaround", false)) {
139            System.setProperty("org.jboss.netty.epollBugWorkaround", "true");
140        }
141
142        String hostname = "localhost";
143        try {
144            // doing a reverse lookup on the server's IP address using the naming service (DNS) configured in the OS
145            hostname = InetAddress.getLocalHost().getHostName();
146            logger.info("detected host name by reverse IP lookup: {}", hostname);
147        } catch (Exception e) {
148            logger.warn("unable to look up this machine's host name, assuming localhost. Check your DNS for correct setup");
149        }
150
151        ByteSizeValue maxContentLength = componentSettings.getAsBytesSize("max_content_length", settings.getAsBytesSize("websocket.max_content_length", new ByteSizeValue(100, ByteSizeUnit.MB)));
152        this.maxChunkSize = componentSettings.getAsBytesSize("max_chunk_size", settings.getAsBytesSize("websocket.max_chunk_size", new ByteSizeValue(8, ByteSizeUnit.KB)));
153        this.maxHeaderSize = componentSettings.getAsBytesSize("max_header_size", settings.getAsBytesSize("websocket.max_header_size", new ByteSizeValue(8, ByteSizeUnit.KB)));
154        this.maxInitialLineLength = componentSettings.getAsBytesSize("max_initial_line_length", settings.getAsBytesSize("websocket.max_initial_line_length", new ByteSizeValue(4, ByteSizeUnit.KB)));
155        // don't reset cookies by default, since I don't think we really need to
156        // note, parsing cookies was fixed in netty 3.5.1 regarding stack allocation, but still, currently, we don't need cookies
157        this.resetCookies = componentSettings.getAsBoolean("reset_cookies", settings.getAsBoolean("websocket.reset_cookies", false));
158        this.maxCumulationBufferCapacity = componentSettings.getAsBytesSize("max_cumulation_buffer_capacity", null);
159        this.maxCompositeBufferComponents = componentSettings.getAsInt("max_composite_buffer_components", -1);
160        this.workerCount = componentSettings.getAsInt("worker_count", Runtime.getRuntime().availableProcessors() * 2);
161        this.port = componentSettings.get("port", settings.get("websocket.port", "9400-9500"));
162        this.bindHost = componentSettings.get("bind_host", settings.get("websocket.bind_host", settings.get("websocket.host", hostname)));
163        this.publishHost = componentSettings.get("publish_host", settings.get("websocket.publish_host", settings.get("websocket.host", hostname)));
164        this.onsiteonly = componentSettings.getAsBoolean("onsiteonly", settings.getAsBoolean("websocket.onsiteonly", true));
165        this.tcpNoDelay = componentSettings.getAsBoolean("tcp_no_delay", settings.getAsBoolean(TCP_NO_DELAY, true));
166        this.tcpKeepAlive = componentSettings.getAsBoolean("tcp_keep_alive", settings.getAsBoolean(TCP_KEEP_ALIVE, true));
167        this.reuseAddress = componentSettings.getAsBoolean("reuse_address", settings.getAsBoolean(TCP_REUSE_ADDRESS, NetworkUtils.defaultReuseAddress()));
168        this.tcpSendBufferSize = componentSettings.getAsBytesSize("tcp_send_buffer_size", settings.getAsBytesSize(TCP_SEND_BUFFER_SIZE, TCP_DEFAULT_SEND_BUFFER_SIZE));
169        this.tcpReceiveBufferSize = componentSettings.getAsBytesSize("tcp_receive_buffer_size", settings.getAsBytesSize(TCP_RECEIVE_BUFFER_SIZE, TCP_DEFAULT_RECEIVE_BUFFER_SIZE));
170
171        long defaultReceiverPredictor = 512 * 1024;
172        // skip JVM info
173        ByteSizeValue receivePredictorMin = componentSettings.getAsBytesSize("receive_predictor_min", componentSettings.getAsBytesSize("receive_predictor_size", new ByteSizeValue(defaultReceiverPredictor)));
174        ByteSizeValue receivePredictorMax = componentSettings.getAsBytesSize("receive_predictor_max", componentSettings.getAsBytesSize("receive_predictor_size", new ByteSizeValue(defaultReceiverPredictor)));
175        if (receivePredictorMax.bytes() == receivePredictorMin.bytes()) {
176            receiveBufferSizePredictorFactory = new FixedReceiveBufferSizePredictorFactory((int) receivePredictorMax.bytes());
177        } else {
178            receiveBufferSizePredictorFactory = new AdaptiveReceiveBufferSizePredictorFactory((int) receivePredictorMin.bytes(), (int) receivePredictorMin.bytes(), (int) receivePredictorMax.bytes());
179        }
180
181        this.compression = settings.getAsBoolean("websocket.compression", false);
182        this.compressionLevel = settings.getAsInt("websocket.compression_level", 6);
183
184        // validate max content length
185        if (maxContentLength.bytes() > Integer.MAX_VALUE) {
186            logger.warn("maxContentLength[" + maxContentLength + "] set to high value, resetting it to [100mb]");
187            maxContentLength = new ByteSizeValue(100, ByteSizeUnit.MB);
188        }
189        this.maxContentLength = maxContentLength;
190
191        logger.debug("using max_chunk_size[{}], max_header_size[{}], max_initial_line_length[{}], max_content_length[{}]",
192                maxChunkSize, maxHeaderSize, maxInitialLineLength, this.maxContentLength);
193    }
194
195    public Settings settings() {
196        return this.settings;
197    }
198
199    @Override
200    public void httpServerAdapter(HttpServerAdapter httpServerAdapter) {
201        this.httpServerAdapter = httpServerAdapter;
202    }
203
204    @Override
205    public void webSocketServerAdapter(WebSocketServerAdapter webSocketServerAdapter) {
206        this.webSocketServerAdapter = webSocketServerAdapter;
207    }
208
209
210    @Override
211    protected void doStart() throws ElasticsearchException {
212        this.serverOpenChannels = new OpenChannelsHandler(logger);
213
214        /* we do not support oio for websocket - it wouldn't work either */
215        serverBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
216                Executors.newCachedThreadPool(daemonThreadFactory(settings, "websocket_server_boss")),
217                Executors.newCachedThreadPool(daemonThreadFactory(settings, "websocket_server_worker")),
218                workerCount));
219
220        serverBootstrap.setPipelineFactory(new NettyWebSocketServerPipelineFactory(this));
221
222        if (tcpNoDelay != null) {
223            serverBootstrap.setOption("child.tcpNoDelay", tcpNoDelay);
224        }
225        if (tcpKeepAlive != null) {
226            serverBootstrap.setOption("child.keepAlive", tcpKeepAlive);
227        }
228        if (tcpSendBufferSize != null && tcpSendBufferSize.bytes() > 0) {
229            serverBootstrap.setOption("child.sendBufferSize", tcpSendBufferSize.bytes());
230        }
231        if (tcpReceiveBufferSize != null && tcpReceiveBufferSize.bytes() > 0) {
232            serverBootstrap.setOption("child.receiveBufferSize", tcpReceiveBufferSize.bytes());
233        }
234        serverBootstrap.setOption("receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory);
235        serverBootstrap.setOption("child.receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory);
236        if (reuseAddress != null) {
237            serverBootstrap.setOption("reuseAddress", reuseAddress);
238            serverBootstrap.setOption("child.reuseAddress", reuseAddress);
239        }
240
241        // Bind and start to accept incoming connections.
242        InetAddress hostAddressX;
243        try {
244            hostAddressX = networkService.resolveBindHostAddress(bindHost);
245        } catch (IOException e) {
246            throw new BindHttpException("Failed to resolve host [" + bindHost + "]", e);
247        }
248        final InetAddress hostAddress = hostAddressX;
249
250        // Fail if host address is a public IP but only on-site networking is allowed.
251        if (onsiteonly) {
252            if (hostAddress == null || (!hostAddress.isLoopbackAddress()
253                    && !hostAddress.isLinkLocalAddress()
254                    && !hostAddress.isSiteLocalAddress())) {
255                throw new ElasticsearchException("Bind host " + bindHost
256                        + (hostAddress != null ? "(address " + hostAddress + ") " : "")
257                        + "is not on-site and not permitted by default. Check 'websocket.onsiteonly' setting in configuration.");
258            }
259        }
260
261        PortsRange portsRange = new PortsRange(port);
262        final AtomicReference<Exception> lastException = new AtomicReference();
263        boolean success = portsRange.iterate(new PortsRange.PortCallback() {
264            @Override
265            public boolean onPortNumber(int portNumber) {
266                try {
267                    serverChannel = serverBootstrap.bind(new InetSocketAddress(hostAddress, portNumber));
268                } catch (Exception e) {
269                    lastException.set(e);
270                    return false;
271                }
272                return true;
273            }
274        });
275        if (!success) {
276            throw new BindHttpException("Failed to bind to [" + port + "]", lastException.get());
277        }
278        InetSocketAddress boundAddress = (InetSocketAddress) serverChannel.getLocalAddress();
279        InetSocketAddress publishAddress;
280        try {
281            publishAddress = new InetSocketAddress(networkService.resolvePublishHostAddress(publishHost), boundAddress.getPort());
282        } catch (Exception e) {
283            throw new BindTransportException("Failed to resolve publish address", e);
284        }
285        this.boundAddress = new BoundTransportAddress(new InetSocketTransportAddress(boundAddress), new InetSocketTransportAddress(publishAddress));
286    }
287
288    @Override
289    protected void doStop() throws ElasticsearchException {
290        if (serverChannel != null) {
291            serverChannel.close().awaitUninterruptibly();
292            serverChannel = null;
293        }
294        if (serverOpenChannels != null) {
295            serverOpenChannels.close();
296            serverOpenChannels = null;
297        }
298        if (serverBootstrap != null) {
299            serverBootstrap.releaseExternalResources();
300            serverBootstrap = null;
301        }
302    }
303
304    @Override
305    protected void doClose() throws ElasticsearchException {
306    }
307
308    @Override
309    public BoundTransportAddress boundAddress() {
310        return this.boundAddress;
311    }
312
313    @Override
314    public HttpInfo info() {
315        return new HttpInfo(boundAddress(), maxContentLength.bytes());
316    }
317
318
319    @Override
320    public HttpStats stats() {
321        OpenChannelsHandler channels = serverOpenChannels;
322        return new HttpStats(channels == null ? 0 : channels.numberOfOpenChannels(), channels == null ? 0 : channels.totalChannels());
323    }
324
325    /**
326     * Dispatch reqeuest to HTTP adapter.
327     *
328     * @param request
329     * @param channel
330     */
331    void dispatchRequest(HttpRequest request, HttpChannel channel) {
332        httpServerAdapter.dispatchRequest(request, channel);
333    }
334
335    /**
336     * A channel appeared or disappeared.
337     *
338     * @param presence
339     * @param topic
340     * @param channel
341     */
342    void presence(Presence presence, String topic, Channel channel) throws IOException {
343        webSocketServerAdapter.presence(presence, topic, channel);
344    }
345
346    /**
347     * Send a websocket frame.
348     *
349     * @param handshaker
350     * @param frame
351     * @param context
352     */
353    void frame(WebSocketServerHandshaker handshaker, WebSocketFrame frame, ChannelHandlerContext context) {
354        webSocketServerAdapter.frame(handshaker, frame, context);
355    }
356
357    /**
358     * Returns a channel if it is in the server open channel table, given by ID.
359     *
360     * @param id the channel ID
361     * @return the channel if open or null if not present.
362     */
363    @Override
364    public Channel channel(Integer id) {
365        return serverOpenChannels.channel(id);
366    }
367
368    /**
369     * Forward a message to another node with websockets for delivery.
370     *
371     * @param websocketNodeAddress the websocket address of the other node, e.g.
372     *                             "/10.0.0.1:9400"
373     * @param channelId            the channel ID on the other node for delivering the
374     *                             message
375     * @param builder              the builder for the message
376     */
377    @Override
378    public void forward(final String websocketNodeAddress, final Integer channelId, final XContentBuilder builder) {
379        try {
380            // build "forward" text frame
381            XContentBuilder forwardBuilder = jsonBuilder();
382            forwardBuilder.startObject()
383                    .field("channel", channelId)
384                    .rawField("message", builder.bytes())
385                    .endObject();
386            final TextWebSocketFrame frame = new NettyInteractiveResponse("forward", forwardBuilder).response();
387            // use a websocket client pool
388            WebSocketClient client = nodeChannels.get(websocketNodeAddress);
389            if (client == null) {
390                final URI uri = new URI("ws:/" + websocketNodeAddress + "/websocket");
391                client = clientFactory.newClient(uri, new WebSocketActionListener() {
392                    @Override
393                    public void onConnect(WebSocketClient client) {
394                        nodeChannels.put(websocketNodeAddress, client);
395                        client.send(frame);
396                    }
397
398                    @Override
399                    public void onDisconnect(WebSocketClient client) {
400                        logger.warn("node disconnected: {}", uri);
401                        nodeChannels.remove(websocketNodeAddress);
402                    }
403
404                    @Override
405                    public void onMessage(WebSocketClient client, WebSocketFrame frame) {
406                        logger.info("unexpected response {}", frame);
407                    }
408
409                    @Override
410                    public void onError(Throwable t) {
411                        logger.error(t.getMessage(), t);
412                        nodeChannels.remove(websocketNodeAddress);
413                    }
414                });
415                client.connect();
416            } else {
417                client.send(frame);
418            }
419        } catch (Exception e) {
420            logger.error(e.getMessage(), e);
421        }
422    }
423
424    void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
425        if (e.getCause() instanceof ReadTimeoutException) {
426            if (logger.isTraceEnabled()) {
427                logger.trace("Connection timeout [{}]", ctx.getChannel().getRemoteAddress());
428            }
429            ctx.getChannel().close();
430        } else {
431            if (!lifecycle.started()) {
432                // ignore
433                return;
434            }
435            if (!NetworkExceptionHelper.isCloseConnectionException(e.getCause())) {
436                logger.warn("Caught exception while handling client http traffic, closing connection {}", e.getCause(), ctx.getChannel());
437                ctx.getChannel().close();
438            } else {
439                logger.debug("Caught exception while handling client http traffic, closing connection {}", e.getCause(), ctx.getChannel());
440                ctx.getChannel().close();
441            }
442        }
443    }
444
445}