001package org.xbib.elasticsearch.common.netty;
002
003import org.elasticsearch.common.logging.ESLogger;
004import org.elasticsearch.common.metrics.CounterMetric;
005import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
006import org.jboss.netty.channel.Channel;
007import org.jboss.netty.channel.ChannelEvent;
008import org.jboss.netty.channel.ChannelFuture;
009import org.jboss.netty.channel.ChannelFutureListener;
010import org.jboss.netty.channel.ChannelHandler;
011import org.jboss.netty.channel.ChannelHandlerContext;
012import org.jboss.netty.channel.ChannelState;
013import org.jboss.netty.channel.ChannelStateEvent;
014import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
015
016import java.util.Map;
017
018/**
019 *
020 */
021@ChannelHandler.Sharable
022public class OpenChannelsHandler extends SimpleChannelUpstreamHandler {
023
024    final Map<Integer, Channel> openChannels = ConcurrentCollections.newConcurrentMap();
025
026    final CounterMetric openChannelsMetric = new CounterMetric();
027
028    final CounterMetric totalChannelsMetric = new CounterMetric();
029
030    final ESLogger logger;
031
032    public OpenChannelsHandler(ESLogger logger) {
033        this.logger = logger;
034    }
035
036    final ChannelFutureListener remover = new ChannelFutureListener() {
037        @Override
038        public void operationComplete(ChannelFuture future) throws Exception {
039            Channel channel = openChannels.remove(future.getChannel().getId());
040            if (channel != null) {
041                openChannelsMetric.dec();
042            }
043            if (logger.isTraceEnabled()) {
044                logger.trace("channel closed: {}", future.getChannel());
045            }
046        }
047    };
048
049    @Override
050    public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
051        if (e instanceof ChannelStateEvent) {
052            ChannelStateEvent evt = (ChannelStateEvent) e;
053            // OPEN is also sent to when closing channel, but with FALSE on it to indicate it closes
054            if (evt.getState() == ChannelState.OPEN && Boolean.TRUE.equals(evt.getValue())) {
055                if (logger.isTraceEnabled()) {
056                    logger.trace("channel opened: {}", ctx.getChannel());
057                }
058                Channel channel = openChannels.put(ctx.getChannel().getId(), ctx.getChannel());
059                if (channel == null) {
060                    openChannelsMetric.inc();
061                    totalChannelsMetric.inc();
062                    ctx.getChannel().getCloseFuture().addListener(remover);
063                }
064            }
065        }
066        ctx.sendUpstream(e);
067    }
068
069    public Channel channel(Integer id) {
070        return openChannels.get(id);
071    }
072
073    public long numberOfOpenChannels() {
074        return openChannelsMetric.count();
075    }
076
077    public long totalChannels() {
078        return totalChannelsMetric.count();
079    }
080
081    public void close() {
082        for (Channel channel : openChannels.values()) {
083            channel.close().awaitUninterruptibly();
084        }
085    }
086}