001package org.xbib.elasticsearch.transport.netty;
002
003import org.elasticsearch.Version;
004import org.elasticsearch.common.component.Lifecycle;
005import org.elasticsearch.common.io.ThrowableObjectInputStream;
006import org.elasticsearch.common.io.stream.CachedStreamInput;
007import org.elasticsearch.common.io.stream.StreamInput;
008import org.elasticsearch.common.logging.ESLogger;
009import org.elasticsearch.threadpool.ThreadPool;
010import org.elasticsearch.transport.ActionNotFoundTransportException;
011import org.elasticsearch.transport.RemoteTransportException;
012import org.elasticsearch.transport.ResponseHandlerFailureTransportException;
013import org.elasticsearch.transport.TransportRequest;
014import org.elasticsearch.transport.TransportRequestHandler;
015import org.elasticsearch.transport.TransportResponse;
016import org.elasticsearch.transport.TransportResponseHandler;
017import org.elasticsearch.transport.TransportSerializationException;
018import org.elasticsearch.transport.TransportServiceAdapter;
019import org.elasticsearch.transport.support.TransportStatus;
020import org.jboss.netty.buffer.ChannelBuffer;
021import org.jboss.netty.channel.Channel;
022import org.jboss.netty.channel.ChannelHandlerContext;
023import org.jboss.netty.channel.MessageEvent;
024import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
025
026import java.io.IOException;
027
028/**
029 * A handler (must be the last one!) that does size based frame decoding and forwards the actual message
030 * to the relevant action.
031 */
032public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
033
034    private final ESLogger logger;
035
036    private final ThreadPool threadPool;
037
038    private final TransportServiceAdapter transportServiceAdapter;
039
040    private final NettyTransport transport;
041
042    public MessageChannelHandler(NettyTransport transport, ESLogger logger) {
043        this.threadPool = transport.threadPool();
044        this.transportServiceAdapter = transport.transportServiceAdapter();
045        this.transport = transport;
046        this.logger = logger;
047    }
048
049    @Override
050    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
051        Object m = e.getMessage();
052        if (!(m instanceof ChannelBuffer)) {
053            ctx.sendUpstream(e);
054            return;
055        }
056        ChannelBuffer buffer = (ChannelBuffer) m;
057        int size = buffer.getInt(buffer.readerIndex() - 4);
058        transportServiceAdapter.received(size + 4);
059
060        int markedReaderIndex = buffer.readerIndex();
061        int expectedIndexReader = markedReaderIndex + size;
062
063        // netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh
064        // buffer, or in the cumlation buffer, which is cleaned each time
065        StreamInput streamIn = ChannelBufferStreamInputFactory.create(buffer, size);
066
067        long requestId = buffer.readLong();
068        byte status = buffer.readByte();
069        Version version = Version.fromId(buffer.readInt());
070
071        // !!! compression handling removed !!!
072        StreamInput wrappedStream = CachedStreamInput.cachedHandles(streamIn);
073
074        if (TransportStatus.isRequest(status)) {
075            String action = handleRequest(ctx.getChannel(), wrappedStream, requestId, version);
076            if (buffer.readerIndex() != expectedIndexReader) {
077                if (buffer.readerIndex() < expectedIndexReader) {
078                    logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action);
079                } else {
080                    logger.warn("Message read past expected size (request) for [{}] and action [{}], resetting", requestId, action);
081                }
082                buffer.readerIndex(expectedIndexReader);
083            }
084        } else {
085            TransportResponseHandler handler = transportServiceAdapter.remove(requestId);
086            // ignore if its null, the adapter logs it
087            if (handler != null) {
088                if (TransportStatus.isError(status)) {
089                    handlerResponseError(wrappedStream, handler);
090                } else {
091                    handleResponse(wrappedStream, handler);
092                }
093            } else {
094                // if its null, skip those bytes
095                buffer.readerIndex(markedReaderIndex + size);
096            }
097            if (buffer.readerIndex() != expectedIndexReader) {
098                if (buffer.readerIndex() < expectedIndexReader) {
099                    logger.warn("Message not fully read (response) for [{}] handler {}, error [{}], resetting", requestId, handler, TransportStatus.isError(status));
100                } else {
101                    logger.warn("Message read past expected size (response) for [{}] handler {}, error [{}], resetting", requestId, handler, TransportStatus.isError(status));
102                }
103                buffer.readerIndex(expectedIndexReader);
104            }
105        }
106        wrappedStream.close();
107    }
108
109    private void handleResponse(StreamInput buffer, final TransportResponseHandler handler) {
110        final TransportResponse response = handler.newInstance();
111        try {
112            response.readFrom(buffer);
113        } catch (Throwable e) {
114            handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + response.getClass().getName() + "]", e));
115            return;
116        }
117        try {
118            if (handler.executor().equals(ThreadPool.Names.SAME)) {
119                handler.handleResponse(response);
120            } else {
121                threadPool.executor(handler.executor()).execute(new ResponseHandler(handler, response));
122            }
123        } catch (Throwable e) {
124            handleException(handler, new ResponseHandlerFailureTransportException(e));
125        }
126    }
127
128    private void handlerResponseError(StreamInput buffer, final TransportResponseHandler handler) {
129        Throwable error;
130        try {
131            ThrowableObjectInputStream ois = new ThrowableObjectInputStream(buffer, transport.settings().getClassLoader());
132            error = (Throwable) ois.readObject();
133        } catch (Exception e) {
134            error = new TransportSerializationException("Failed to deserialize exception response from stream", e);
135        }
136        handleException(handler, error);
137    }
138
139    private void handleException(final TransportResponseHandler handler, Throwable error) {
140        if (!(error instanceof RemoteTransportException)) {
141            error = new RemoteTransportException(error.getMessage(), error);
142        }
143        final RemoteTransportException rtx = (RemoteTransportException) error;
144        if (handler.executor().equals(ThreadPool.Names.SAME)) {
145            handler.handleException(rtx);
146        } else {
147            threadPool.executor(handler.executor()).execute(new Runnable() {
148                @Override
149                public void run() {
150                    try {
151                        handler.handleException(rtx);
152                    } catch (Exception e) {
153                        logger.error("Failed to handle exception response", e);
154                    }
155                }
156            });
157        }
158    }
159
160    private String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
161        final String action = buffer.readString();
162
163        final NettyTransportChannel transportChannel =
164                new NettyTransportChannel(transport, action, channel, requestId, version);
165        try {
166            final TransportRequestHandler handler = transportServiceAdapter.handler(action);
167            if (handler == null) {
168                throw new ActionNotFoundTransportException(action);
169            }
170            final TransportRequest request = handler.newInstance();
171            request.readFrom(buffer);
172            if (handler.executor().equals(ThreadPool.Names.SAME)) {
173                //noinspection unchecked
174                handler.messageReceived(request, transportChannel);
175            } else {
176                threadPool.executor(handler.executor()).execute(new RequestHandler(handler, request, transportChannel, action));
177            }
178        } catch (Throwable e) {
179            try {
180                transportChannel.sendResponse(e);
181            } catch (IOException e1) {
182                logger.warn("Failed to send error message back to client for action [" + action + "]", e);
183                logger.warn("Actual Exception", e1);
184            }
185        }
186        return action;
187    }
188
189
190    class ResponseHandler implements Runnable {
191
192        private final TransportResponseHandler handler;
193        private final TransportResponse response;
194
195        public ResponseHandler(TransportResponseHandler handler, TransportResponse response) {
196            this.handler = handler;
197            this.response = response;
198        }
199
200        @SuppressWarnings({"unchecked"})
201        @Override
202        public void run() {
203            try {
204                handler.handleResponse(response);
205            } catch (Exception e) {
206                handleException(handler, new ResponseHandlerFailureTransportException(e));
207            }
208        }
209    }
210
211    class RequestHandler implements Runnable {
212        private final TransportRequestHandler handler;
213        private final TransportRequest request;
214        private final NettyTransportChannel transportChannel;
215        private final String action;
216
217        public RequestHandler(TransportRequestHandler handler, TransportRequest request, NettyTransportChannel transportChannel, String action) {
218            this.handler = handler;
219            this.request = request;
220            this.transportChannel = transportChannel;
221            this.action = action;
222        }
223
224        @SuppressWarnings({"unchecked"})
225        @Override
226        public void run() {
227            try {
228                handler.messageReceived(request, transportChannel);
229            } catch (Throwable e) {
230                if (transport.lifecycleState() == Lifecycle.State.STARTED) {
231                    // we can only send a response transport is started....
232                    try {
233                        transportChannel.sendResponse(e);
234                    } catch (IOException e1) {
235                        logger.warn("Failed to send error message back to client for action [" + action + "]", e1);
236                        logger.warn("Actual Exception", e);
237                    }
238                }
239            }
240        }
241    }
242}