001package org.xbib.elasticsearch.common.netty; 002 003import org.elasticsearch.common.logging.ESLogger; 004import org.elasticsearch.common.metrics.CounterMetric; 005import org.elasticsearch.common.util.concurrent.ConcurrentCollections; 006import org.jboss.netty.channel.Channel; 007import org.jboss.netty.channel.ChannelEvent; 008import org.jboss.netty.channel.ChannelFuture; 009import org.jboss.netty.channel.ChannelFutureListener; 010import org.jboss.netty.channel.ChannelHandler; 011import org.jboss.netty.channel.ChannelHandlerContext; 012import org.jboss.netty.channel.ChannelState; 013import org.jboss.netty.channel.ChannelStateEvent; 014import org.jboss.netty.channel.SimpleChannelUpstreamHandler; 015 016import java.util.Map; 017 018/** 019 * 020 */ 021@ChannelHandler.Sharable 022public class OpenChannelsHandler extends SimpleChannelUpstreamHandler { 023 024 final Map<Integer, Channel> openChannels = ConcurrentCollections.newConcurrentMap(); 025 026 final CounterMetric openChannelsMetric = new CounterMetric(); 027 028 final CounterMetric totalChannelsMetric = new CounterMetric(); 029 030 final ESLogger logger; 031 032 public OpenChannelsHandler(ESLogger logger) { 033 this.logger = logger; 034 } 035 036 final ChannelFutureListener remover = new ChannelFutureListener() { 037 @Override 038 public void operationComplete(ChannelFuture future) throws Exception { 039 Channel channel = openChannels.remove(future.getChannel().getId()); 040 if (channel != null) { 041 openChannelsMetric.dec(); 042 } 043 if (logger.isTraceEnabled()) { 044 logger.trace("channel closed: {}", future.getChannel()); 045 } 046 } 047 }; 048 049 @Override 050 public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception { 051 if (e instanceof ChannelStateEvent) { 052 ChannelStateEvent evt = (ChannelStateEvent) e; 053 // OPEN is also sent to when closing channel, but with FALSE on it to indicate it closes 054 if (evt.getState() == ChannelState.OPEN && Boolean.TRUE.equals(evt.getValue())) { 055 if (logger.isTraceEnabled()) { 056 logger.trace("channel opened: {}", ctx.getChannel()); 057 } 058 Channel channel = openChannels.put(ctx.getChannel().getId(), ctx.getChannel()); 059 if (channel == null) { 060 openChannelsMetric.inc(); 061 totalChannelsMetric.inc(); 062 ctx.getChannel().getCloseFuture().addListener(remover); 063 } 064 } 065 } 066 ctx.sendUpstream(e); 067 } 068 069 public Channel channel(Integer id) { 070 return openChannels.get(id); 071 } 072 073 public long numberOfOpenChannels() { 074 return openChannelsMetric.count(); 075 } 076 077 public long totalChannels() { 078 return totalChannelsMetric.count(); 079 } 080 081 public void close() { 082 for (Channel channel : openChannels.values()) { 083 channel.close().awaitUninterruptibly(); 084 } 085 } 086}