001package org.xbib.elasticsearch.transport.netty; 002 003import org.elasticsearch.common.bytes.BytesReference; 004import org.elasticsearch.common.io.stream.StreamInput; 005import org.jboss.netty.buffer.ChannelBuffer; 006import org.xbib.elasticsearch.common.bytes.ChannelBufferBytesReference; 007 008import java.io.EOFException; 009import java.io.IOException; 010 011/** 012 * A Netty {@link org.jboss.netty.buffer.ChannelBuffer} based {@link org.elasticsearch.common.io.stream.StreamInput}. 013 */ 014public class ChannelBufferStreamInput extends StreamInput { 015 016 private final ChannelBuffer buffer; 017 018 private final int startIndex; 019 020 private final int endIndex; 021 022 public ChannelBufferStreamInput(ChannelBuffer buffer) { 023 this(buffer, buffer.readableBytes()); 024 } 025 026 public ChannelBufferStreamInput(ChannelBuffer buffer, int length) { 027 if (length > buffer.readableBytes()) { 028 throw new IndexOutOfBoundsException(); 029 } 030 this.buffer = buffer; 031 startIndex = buffer.readerIndex(); 032 endIndex = startIndex + length; 033 buffer.markReaderIndex(); 034 } 035 036 @Override 037 public BytesReference readBytesReference(int length) throws IOException { 038 ChannelBufferBytesReference ref = new ChannelBufferBytesReference(buffer.slice(buffer.readerIndex(), length)); 039 buffer.skipBytes(length); 040 return ref; 041 } 042 043 @Override 044 public int available() throws IOException { 045 return endIndex - buffer.readerIndex(); 046 } 047 048 @Override 049 public void mark(int readlimit) { 050 buffer.markReaderIndex(); 051 } 052 053 @Override 054 public boolean markSupported() { 055 return true; 056 } 057 058 @Override 059 public int read() throws IOException { 060 if (available() == 0) { 061 return -1; 062 } 063 return buffer.readByte() & 0xff; 064 } 065 066 @Override 067 public int read(byte[] b, int off, int len) throws IOException { 068 if (len == 0) { 069 return 0; 070 } 071 int available = available(); 072 if (available == 0) { 073 return -1; 074 } 075 076 len = Math.min(available, len); 077 buffer.readBytes(b, off, len); 078 return len; 079 } 080 081 @Override 082 public void reset() throws IOException { 083 buffer.resetReaderIndex(); 084 } 085 086 @Override 087 public long skip(long n) throws IOException { 088 if (n > Integer.MAX_VALUE) { 089 return skipBytes(Integer.MAX_VALUE); 090 } else { 091 return skipBytes((int) n); 092 } 093 } 094 095 public int skipBytes(int n) throws IOException { 096 int nBytes = Math.min(available(), n); 097 buffer.skipBytes(nBytes); 098 return nBytes; 099 } 100 101 102 @Override 103 public byte readByte() throws IOException { 104 return buffer.readByte(); 105 } 106 107 @Override 108 public void readBytes(byte[] b, int offset, int len) throws IOException { 109 int read = read(b, offset, len); 110 if (read < len) { 111 throw new EOFException(); 112 } 113 } 114 115 @Override 116 public void close() throws IOException { 117 // nothing to do here 118 } 119}