001package org.xbib.elasticsearch.transport.netty; 002 003import org.elasticsearch.Version; 004import org.elasticsearch.common.compress.CompressorFactory; 005import org.elasticsearch.common.io.stream.HandlesStreamOutput; 006import org.elasticsearch.common.io.stream.StreamOutput; 007import org.elasticsearch.common.io.ThrowableObjectOutputStream; 008import org.elasticsearch.common.lease.Releasables; 009import org.elasticsearch.transport.NotSerializableTransportException; 010import org.elasticsearch.transport.RemoteTransportException; 011import org.elasticsearch.transport.TransportChannel; 012import org.elasticsearch.transport.TransportResponse; 013import org.elasticsearch.transport.TransportResponseOptions; 014import org.elasticsearch.transport.support.TransportStatus; 015import org.jboss.netty.buffer.ChannelBuffer; 016import org.jboss.netty.channel.Channel; 017import org.jboss.netty.channel.ChannelFuture; 018import org.xbib.elasticsearch.common.bytes.ReleasableBytesReference; 019import org.xbib.elasticsearch.common.io.stream.BytesStreamOutput; 020import org.xbib.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; 021import org.xbib.elasticsearch.common.netty.ReleaseChannelFutureListener; 022 023import java.io.IOException; 024import java.io.NotSerializableException; 025 026/** 027 * 028 */ 029public class NettyTransportChannel implements TransportChannel { 030 031 private final NettyTransport transport; 032 private final Version version; 033 private final String action; 034 private final Channel channel; 035 private final long requestId; 036 037 public NettyTransportChannel(NettyTransport transport, String action, Channel channel, long requestId, Version version) { 038 this.version = version; 039 this.transport = transport; 040 this.action = action; 041 this.channel = channel; 042 this.requestId = requestId; 043 } 044 045 @Override 046 public String action() { 047 return this.action; 048 } 049 050 @Override 051 public void sendResponse(TransportResponse response) throws IOException { 052 sendResponse(response, TransportResponseOptions.EMPTY); 053 } 054 055 @Override 056 public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException { 057 if (transport.compress) { 058 options.withCompress(true); 059 } 060 byte status = 0; 061 status = TransportStatus.setResponse(status); 062 063 ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(transport.bigArrays); 064 boolean addedReleaseListener = false; 065 try { 066 bStream.skip(NettyHeader.HEADER_SIZE); 067 StreamOutput stream = bStream; 068 if (options.compress()) { 069 status = TransportStatus.setCompress(status); 070 stream = CompressorFactory.defaultCompressor().streamOutput(stream); 071 } 072 stream = new HandlesStreamOutput(stream); 073 stream.setVersion(version); 074 response.writeTo(stream); 075 stream.close(); 076 077 ReleasableBytesReference bytes = bStream.ourBytes(); 078 ChannelBuffer buffer = bytes.toChannelBuffer(); 079 NettyHeader.writeHeader(buffer, requestId, status, version); 080 ChannelFuture future = channel.write(buffer); 081 ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes); 082 future.addListener(listener); 083 addedReleaseListener = true; 084 } finally { 085 if (!addedReleaseListener) { 086 Releasables.close(bStream.ourBytes()); 087 } 088 } 089 } 090 091 @Override 092 public void sendResponse(Throwable error) throws IOException { 093 BytesStreamOutput stream = new BytesStreamOutput(); 094 try { 095 stream.skip(NettyHeader.HEADER_SIZE); 096 RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, error); 097 ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream); 098 too.writeObject(tx); 099 too.close(); 100 } catch (NotSerializableException e) { 101 stream.reset(); 102 stream.skip(org.elasticsearch.transport.netty.NettyHeader.HEADER_SIZE); 103 RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, new NotSerializableTransportException(error)); 104 ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream); 105 too.writeObject(tx); 106 too.close(); 107 } 108 109 byte status = 0; 110 status = TransportStatus.setResponse(status); 111 status = TransportStatus.setError(status); 112 113 ChannelBuffer buffer = stream.ourBytes().toChannelBuffer(); 114 NettyHeader.writeHeader(buffer, requestId, status, version); 115 channel.write(buffer); 116 } 117 118}