001package org.xbib.elasticsearch.transport.netty;
002
003import org.elasticsearch.common.bytes.BytesReference;
004import org.elasticsearch.common.io.stream.StreamInput;
005import org.jboss.netty.buffer.ChannelBuffer;
006import org.xbib.elasticsearch.common.bytes.ChannelBufferBytesReference;
007
008import java.io.EOFException;
009import java.io.IOException;
010
011/**
012 * A Netty {@link org.jboss.netty.buffer.ChannelBuffer} based {@link org.elasticsearch.common.io.stream.StreamInput}.
013 */
014public class ChannelBufferStreamInput extends StreamInput {
015
016    private final ChannelBuffer buffer;
017
018    private final int startIndex;
019
020    private final int endIndex;
021
022    public ChannelBufferStreamInput(ChannelBuffer buffer) {
023        this(buffer, buffer.readableBytes());
024    }
025
026    public ChannelBufferStreamInput(ChannelBuffer buffer, int length) {
027        if (length > buffer.readableBytes()) {
028            throw new IndexOutOfBoundsException();
029        }
030        this.buffer = buffer;
031        startIndex = buffer.readerIndex();
032        endIndex = startIndex + length;
033        buffer.markReaderIndex();
034    }
035
036    @Override
037    public BytesReference readBytesReference(int length) throws IOException {
038        ChannelBufferBytesReference ref = new ChannelBufferBytesReference(buffer.slice(buffer.readerIndex(), length));
039        buffer.skipBytes(length);
040        return ref;
041    }
042
043    @Override
044    public int available() throws IOException {
045        return endIndex - buffer.readerIndex();
046    }
047
048    @Override
049    public void mark(int readlimit) {
050        buffer.markReaderIndex();
051    }
052
053    @Override
054    public boolean markSupported() {
055        return true;
056    }
057
058    @Override
059    public int read() throws IOException {
060        if (available() == 0) {
061            return -1;
062        }
063        return buffer.readByte() & 0xff;
064    }
065
066    @Override
067    public int read(byte[] b, int off, int len) throws IOException {
068        if (len == 0) {
069            return 0;
070        }
071        int available = available();
072        if (available == 0) {
073            return -1;
074        }
075
076        len = Math.min(available, len);
077        buffer.readBytes(b, off, len);
078        return len;
079    }
080
081    @Override
082    public void reset() throws IOException {
083        buffer.resetReaderIndex();
084    }
085
086    @Override
087    public long skip(long n) throws IOException {
088        if (n > Integer.MAX_VALUE) {
089            return skipBytes(Integer.MAX_VALUE);
090        } else {
091            return skipBytes((int) n);
092        }
093    }
094
095    public int skipBytes(int n) throws IOException {
096        int nBytes = Math.min(available(), n);
097        buffer.skipBytes(nBytes);
098        return nBytes;
099    }
100
101
102    @Override
103    public byte readByte() throws IOException {
104        return buffer.readByte();
105    }
106
107    @Override
108    public void readBytes(byte[] b, int offset, int len) throws IOException {
109        int read = read(b, offset, len);
110        if (read < len) {
111            throw new EOFException();
112        }
113    }
114
115    @Override
116    public void close() throws IOException {
117        // nothing to do here
118    }
119}