001package org.xbib.elasticsearch.transport.netty; 002 003import org.elasticsearch.Version; 004import org.elasticsearch.common.component.Lifecycle; 005import org.elasticsearch.common.io.ThrowableObjectInputStream; 006import org.elasticsearch.common.io.stream.CachedStreamInput; 007import org.elasticsearch.common.io.stream.StreamInput; 008import org.elasticsearch.common.logging.ESLogger; 009import org.elasticsearch.threadpool.ThreadPool; 010import org.elasticsearch.transport.ActionNotFoundTransportException; 011import org.elasticsearch.transport.RemoteTransportException; 012import org.elasticsearch.transport.ResponseHandlerFailureTransportException; 013import org.elasticsearch.transport.TransportRequest; 014import org.elasticsearch.transport.TransportRequestHandler; 015import org.elasticsearch.transport.TransportResponse; 016import org.elasticsearch.transport.TransportResponseHandler; 017import org.elasticsearch.transport.TransportSerializationException; 018import org.elasticsearch.transport.TransportServiceAdapter; 019import org.elasticsearch.transport.support.TransportStatus; 020import org.jboss.netty.buffer.ChannelBuffer; 021import org.jboss.netty.channel.Channel; 022import org.jboss.netty.channel.ChannelHandlerContext; 023import org.jboss.netty.channel.MessageEvent; 024import org.jboss.netty.channel.SimpleChannelUpstreamHandler; 025 026import java.io.IOException; 027 028/** 029 * A handler (must be the last one!) that does size based frame decoding and forwards the actual message 030 * to the relevant action. 031 */ 032public class MessageChannelHandler extends SimpleChannelUpstreamHandler { 033 034 private final ESLogger logger; 035 036 private final ThreadPool threadPool; 037 038 private final TransportServiceAdapter transportServiceAdapter; 039 040 private final NettyTransport transport; 041 042 public MessageChannelHandler(NettyTransport transport, ESLogger logger) { 043 this.threadPool = transport.threadPool(); 044 this.transportServiceAdapter = transport.transportServiceAdapter(); 045 this.transport = transport; 046 this.logger = logger; 047 } 048 049 @Override 050 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 051 Object m = e.getMessage(); 052 if (!(m instanceof ChannelBuffer)) { 053 ctx.sendUpstream(e); 054 return; 055 } 056 ChannelBuffer buffer = (ChannelBuffer) m; 057 int size = buffer.getInt(buffer.readerIndex() - 4); 058 transportServiceAdapter.received(size + 4); 059 060 int markedReaderIndex = buffer.readerIndex(); 061 int expectedIndexReader = markedReaderIndex + size; 062 063 // netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh 064 // buffer, or in the cumlation buffer, which is cleaned each time 065 StreamInput streamIn = ChannelBufferStreamInputFactory.create(buffer, size); 066 067 long requestId = buffer.readLong(); 068 byte status = buffer.readByte(); 069 Version version = Version.fromId(buffer.readInt()); 070 071 // !!! compression handling removed !!! 072 StreamInput wrappedStream = CachedStreamInput.cachedHandles(streamIn); 073 074 if (TransportStatus.isRequest(status)) { 075 String action = handleRequest(ctx.getChannel(), wrappedStream, requestId, version); 076 if (buffer.readerIndex() != expectedIndexReader) { 077 if (buffer.readerIndex() < expectedIndexReader) { 078 logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action); 079 } else { 080 logger.warn("Message read past expected size (request) for [{}] and action [{}], resetting", requestId, action); 081 } 082 buffer.readerIndex(expectedIndexReader); 083 } 084 } else { 085 TransportResponseHandler handler = transportServiceAdapter.remove(requestId); 086 // ignore if its null, the adapter logs it 087 if (handler != null) { 088 if (TransportStatus.isError(status)) { 089 handlerResponseError(wrappedStream, handler); 090 } else { 091 handleResponse(wrappedStream, handler); 092 } 093 } else { 094 // if its null, skip those bytes 095 buffer.readerIndex(markedReaderIndex + size); 096 } 097 if (buffer.readerIndex() != expectedIndexReader) { 098 if (buffer.readerIndex() < expectedIndexReader) { 099 logger.warn("Message not fully read (response) for [{}] handler {}, error [{}], resetting", requestId, handler, TransportStatus.isError(status)); 100 } else { 101 logger.warn("Message read past expected size (response) for [{}] handler {}, error [{}], resetting", requestId, handler, TransportStatus.isError(status)); 102 } 103 buffer.readerIndex(expectedIndexReader); 104 } 105 } 106 wrappedStream.close(); 107 } 108 109 private void handleResponse(StreamInput buffer, final TransportResponseHandler handler) { 110 final TransportResponse response = handler.newInstance(); 111 try { 112 response.readFrom(buffer); 113 } catch (Throwable e) { 114 handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + response.getClass().getName() + "]", e)); 115 return; 116 } 117 try { 118 if (handler.executor().equals(ThreadPool.Names.SAME)) { 119 handler.handleResponse(response); 120 } else { 121 threadPool.executor(handler.executor()).execute(new ResponseHandler(handler, response)); 122 } 123 } catch (Throwable e) { 124 handleException(handler, new ResponseHandlerFailureTransportException(e)); 125 } 126 } 127 128 private void handlerResponseError(StreamInput buffer, final TransportResponseHandler handler) { 129 Throwable error; 130 try { 131 ThrowableObjectInputStream ois = new ThrowableObjectInputStream(buffer, transport.settings().getClassLoader()); 132 error = (Throwable) ois.readObject(); 133 } catch (Exception e) { 134 error = new TransportSerializationException("Failed to deserialize exception response from stream", e); 135 } 136 handleException(handler, error); 137 } 138 139 private void handleException(final TransportResponseHandler handler, Throwable error) { 140 if (!(error instanceof RemoteTransportException)) { 141 error = new RemoteTransportException(error.getMessage(), error); 142 } 143 final RemoteTransportException rtx = (RemoteTransportException) error; 144 if (handler.executor().equals(ThreadPool.Names.SAME)) { 145 handler.handleException(rtx); 146 } else { 147 threadPool.executor(handler.executor()).execute(new Runnable() { 148 @Override 149 public void run() { 150 try { 151 handler.handleException(rtx); 152 } catch (Exception e) { 153 logger.error("Failed to handle exception response", e); 154 } 155 } 156 }); 157 } 158 } 159 160 private String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException { 161 final String action = buffer.readString(); 162 163 final NettyTransportChannel transportChannel = 164 new NettyTransportChannel(transport, action, channel, requestId, version); 165 try { 166 final TransportRequestHandler handler = transportServiceAdapter.handler(action); 167 if (handler == null) { 168 throw new ActionNotFoundTransportException(action); 169 } 170 final TransportRequest request = handler.newInstance(); 171 request.readFrom(buffer); 172 if (handler.executor().equals(ThreadPool.Names.SAME)) { 173 //noinspection unchecked 174 handler.messageReceived(request, transportChannel); 175 } else { 176 threadPool.executor(handler.executor()).execute(new RequestHandler(handler, request, transportChannel, action)); 177 } 178 } catch (Throwable e) { 179 try { 180 transportChannel.sendResponse(e); 181 } catch (IOException e1) { 182 logger.warn("Failed to send error message back to client for action [" + action + "]", e); 183 logger.warn("Actual Exception", e1); 184 } 185 } 186 return action; 187 } 188 189 190 class ResponseHandler implements Runnable { 191 192 private final TransportResponseHandler handler; 193 private final TransportResponse response; 194 195 public ResponseHandler(TransportResponseHandler handler, TransportResponse response) { 196 this.handler = handler; 197 this.response = response; 198 } 199 200 @SuppressWarnings({"unchecked"}) 201 @Override 202 public void run() { 203 try { 204 handler.handleResponse(response); 205 } catch (Exception e) { 206 handleException(handler, new ResponseHandlerFailureTransportException(e)); 207 } 208 } 209 } 210 211 class RequestHandler implements Runnable { 212 private final TransportRequestHandler handler; 213 private final TransportRequest request; 214 private final NettyTransportChannel transportChannel; 215 private final String action; 216 217 public RequestHandler(TransportRequestHandler handler, TransportRequest request, NettyTransportChannel transportChannel, String action) { 218 this.handler = handler; 219 this.request = request; 220 this.transportChannel = transportChannel; 221 this.action = action; 222 } 223 224 @SuppressWarnings({"unchecked"}) 225 @Override 226 public void run() { 227 try { 228 handler.messageReceived(request, transportChannel); 229 } catch (Throwable e) { 230 if (transport.lifecycleState() == Lifecycle.State.STARTED) { 231 // we can only send a response transport is started.... 232 try { 233 transportChannel.sendResponse(e); 234 } catch (IOException e1) { 235 logger.warn("Failed to send error message back to client for action [" + action + "]", e1); 236 logger.warn("Actual Exception", e); 237 } 238 } 239 } 240 } 241 } 242}