001package org.xbib.elasticsearch.common.bytes;
002
003import org.apache.lucene.util.BytesRef;
004import org.apache.lucene.util.CharsRef;
005import org.apache.lucene.util.UnicodeUtil;
006import org.elasticsearch.ElasticsearchIllegalArgumentException;
007import org.elasticsearch.common.io.stream.StreamInput;
008import org.elasticsearch.common.util.BigArrays;
009import org.elasticsearch.common.util.ByteArray;
010import org.jboss.netty.buffer.ChannelBuffer;
011import org.jboss.netty.buffer.ChannelBuffers;
012
013import java.io.EOFException;
014import java.io.IOException;
015import java.io.OutputStream;
016import java.nio.ByteBuffer;
017import java.nio.channels.GatheringByteChannel;
018import java.util.Arrays;
019
020/**
021 * A page based bytes reference, internally holding the bytes in a paged
022 * data structure.
023 */
024public class PagedBytesReference implements BytesReference {
025
026    private static final int PAGE_SIZE = BigArrays.BYTE_PAGE_SIZE;
027    private static final int NIO_GATHERING_LIMIT = 524288;
028
029    private final BigArrays bigarrays;
030    protected final ByteArray bytearray;
031    private final int offset;
032    private final int length;
033    private int hash = 0;
034
035    public PagedBytesReference(BigArrays bigarrays, ByteArray bytearray, int length) {
036        this(bigarrays, bytearray, 0, length);
037    }
038
039    public PagedBytesReference(BigArrays bigarrays, ByteArray bytearray, int from, int length) {
040        this.bigarrays = bigarrays;
041        this.bytearray = bytearray;
042        this.offset = from;
043        this.length = length;
044    }
045
046    @Override
047    public byte get(int index) {
048        return bytearray.get(offset + index);
049    }
050
051    @Override
052    public int length() {
053        return length;
054    }
055
056    @Override
057    public BytesReference slice(int from, int length) {
058        if (from < 0 || (from + length) > length()) {
059            throw new ElasticsearchIllegalArgumentException("can't slice a buffer with length [" + length() + "], with slice parameters from [" + from + "], length [" + length + "]");
060        }
061
062        return new PagedBytesReference(bigarrays, bytearray, offset + from, length);
063    }
064
065    @Override
066    public StreamInput streamInput() {
067        return new PagedBytesReferenceStreamInput(bytearray, offset, length);
068    }
069
070    @Override
071    public void writeTo(OutputStream os) throws IOException {
072        // nothing to do
073        if (length == 0) {
074            return;
075        }
076
077        BytesRef ref = new BytesRef();
078        int written = 0;
079
080        // are we a slice?
081        if (offset != 0) {
082            // remaining size of page fragment at offset
083            int fragmentSize = Math.min(length, PAGE_SIZE - (offset % PAGE_SIZE));
084            bytearray.get(offset, fragmentSize, ref);
085            os.write(ref.bytes, ref.offset, fragmentSize);
086            written += fragmentSize;
087        }
088
089        // handle remainder of pages + trailing fragment
090        while (written < length) {
091            int remaining = length - written;
092            int bulkSize = (remaining > PAGE_SIZE) ? PAGE_SIZE : remaining;
093            bytearray.get(offset + written, bulkSize, ref);
094            os.write(ref.bytes, ref.offset, bulkSize);
095            written += bulkSize;
096        }
097    }
098
099    @Override
100    public void writeTo(GatheringByteChannel channel) throws IOException {
101        // nothing to do
102        if (length == 0) {
103            return;
104        }
105
106        ByteBuffer[] buffers;
107        ByteBuffer currentBuffer = null;
108        BytesRef ref = new BytesRef();
109        int pos = 0;
110
111        // are we a slice?
112        if (offset != 0) {
113            // remaining size of page fragment at offset
114            int fragmentSize = Math.min(length, PAGE_SIZE - (offset % PAGE_SIZE));
115            bytearray.get(offset, fragmentSize, ref);
116            currentBuffer = ByteBuffer.wrap(ref.bytes, ref.offset, fragmentSize);
117            pos += fragmentSize;
118        }
119
120        // we only have a single page
121        if (pos == length && currentBuffer != null) {
122            channel.write(currentBuffer);
123            return;
124        }
125
126        // a slice > pagesize will likely require extra buffers for initial/trailing fragments
127        int numBuffers = countRequiredBuffers((currentBuffer != null ? 1 : 0), length - pos);
128
129        buffers = new ByteBuffer[numBuffers];
130        int bufferSlot = 0;
131
132        if (currentBuffer != null) {
133            buffers[bufferSlot] = currentBuffer;
134            bufferSlot++;
135        }
136
137        // handle remainder of pages + trailing fragment
138        while (pos < length) {
139            int remaining = length - pos;
140            int bulkSize = (remaining > PAGE_SIZE) ? PAGE_SIZE : remaining;
141            bytearray.get(offset + pos, bulkSize, ref);
142            currentBuffer = ByteBuffer.wrap(ref.bytes, ref.offset, bulkSize);
143            buffers[bufferSlot] = currentBuffer;
144            bufferSlot++;
145            pos += bulkSize;
146        }
147
148        // this would indicate that our numBuffer calculation is off by one.
149        assert (numBuffers == bufferSlot);
150
151        // finally write all buffers
152        channel.write(buffers);
153    }
154
155    @Override
156    public byte[] toBytes() {
157        if (length == 0) {
158            return BytesRef.EMPTY_BYTES;
159        }
160
161        BytesRef ref = new BytesRef();
162        bytearray.get(offset, length, ref);
163
164        // undo the single-page optimization by ByteArray.get(), otherwise
165        // a materialized stream will contain traling garbage/zeros
166        byte[] result = ref.bytes;
167        if (result.length != length || ref.offset != 0) {
168            result = Arrays.copyOfRange(result, ref.offset, ref.offset + length);
169        }
170
171        return result;
172    }
173
174    @Override
175    public BytesArray toBytesArray() {
176        BytesRef ref = new BytesRef();
177        bytearray.get(offset, length, ref);
178        return new BytesArray(ref);
179    }
180
181    @Override
182    public BytesArray copyBytesArray() {
183        BytesRef ref = new BytesRef();
184        boolean copied = bytearray.get(offset, length, ref);
185
186        if (copied) {
187            // BigArray has materialized for us, no need to do it again
188            return new BytesArray(ref.bytes, ref.offset, ref.length);
189        }
190        else {
191            // here we need to copy the bytes even when shared
192            byte[] copy = Arrays.copyOfRange(ref.bytes, ref.offset, ref.offset + ref.length);
193            return new BytesArray(copy);
194        }
195    }
196
197    @Override
198    public ChannelBuffer toChannelBuffer() {
199        // nothing to do
200        if (length == 0) {
201            return ChannelBuffers.EMPTY_BUFFER;
202        }
203
204        ChannelBuffer[] buffers;
205        ChannelBuffer currentBuffer = null;
206        BytesRef ref = new BytesRef();
207        int pos = 0;
208
209        // are we a slice?
210        if (offset != 0) {
211            // remaining size of page fragment at offset
212            int fragmentSize = Math.min(length, PAGE_SIZE - (offset % PAGE_SIZE));
213            bytearray.get(offset, fragmentSize, ref);
214            currentBuffer = ChannelBuffers.wrappedBuffer(ref.bytes, ref.offset, fragmentSize);
215            pos += fragmentSize;
216        }
217
218        // no need to create a composite buffer for a single page
219        if (pos == length && currentBuffer != null) {
220            return currentBuffer;
221        }
222
223        // a slice > pagesize will likely require extra buffers for initial/trailing fragments
224        int numBuffers = countRequiredBuffers((currentBuffer != null ? 1 : 0), length - pos);
225
226        buffers = new ChannelBuffer[numBuffers];
227        int bufferSlot = 0;
228
229        if (currentBuffer != null) {
230            buffers[bufferSlot] = currentBuffer;
231            bufferSlot++;
232        }
233
234        // handle remainder of pages + trailing fragment
235        while (pos < length) {
236            int remaining = length - pos;
237            int bulkSize = (remaining > PAGE_SIZE) ? PAGE_SIZE : remaining;
238            bytearray.get(offset + pos, bulkSize, ref);
239            currentBuffer = ChannelBuffers.wrappedBuffer(ref.bytes, ref.offset, bulkSize);
240            buffers[bufferSlot] = currentBuffer;
241            bufferSlot++;
242            pos += bulkSize;
243        }
244
245        // this would indicate that our numBuffer calculation is off by one.
246        assert (numBuffers == bufferSlot);
247
248        // we can use gathering writes from the ChannelBuffers, but only if they are
249        // moderately small to prevent OOMs due to DirectBuffer allocations.
250        return ChannelBuffers.wrappedBuffer(length <= NIO_GATHERING_LIMIT, buffers);
251    }
252
253    @Override
254    public boolean hasArray() {
255        return (offset + length <= PAGE_SIZE);
256    }
257
258    @Override
259    public byte[] array() {
260        if (hasArray()) {
261            if (length == 0) {
262                return BytesRef.EMPTY_BYTES;
263            }
264
265            BytesRef ref = new BytesRef();
266            bytearray.get(offset, length, ref);
267            return ref.bytes;
268        }
269
270        throw new IllegalStateException("array not available");
271    }
272
273    @Override
274    public int arrayOffset() {
275        if (hasArray()) {
276            BytesRef ref = new BytesRef();
277            bytearray.get(offset, length, ref);
278            return ref.offset;
279        }
280
281        throw new IllegalStateException("array not available");
282    }
283
284    @Override
285    public String toUtf8() {
286        if (length() == 0) {
287            return "";
288        }
289
290        byte[] bytes = toBytes();
291        final CharsRef ref = new CharsRef(length);
292        UnicodeUtil.UTF8toUTF16(bytes, offset, length, ref);
293        return ref.toString();
294    }
295
296    @Override
297    public BytesRef toBytesRef() {
298        BytesRef bref = new BytesRef();
299        // if length <= pagesize this will dereference the page, or materialize the byte[]
300        bytearray.get(offset, length, bref);
301        return bref;
302    }
303
304    @Override
305    public BytesRef copyBytesRef() {
306        byte[] bytes = toBytes();
307        return new BytesRef(bytes, offset, length);
308    }
309
310    @Override
311    public int hashCode() {
312        if (hash == 0) {
313            // TODO: delegate to BigArrays via:
314            // hash = bigarrays.hashCode(bytearray);
315            // and for slices:
316            // hash = bigarrays.hashCode(bytearray, offset, length);
317            int tmphash = 1;
318            for (int i = 0; i < length; i++) {
319                tmphash = 31 * tmphash + bytearray.get(offset + i);
320            }
321            hash = tmphash;
322        }
323        return hash;
324    }
325
326    @Override
327    public boolean equals(Object obj) {
328        if (this == obj) {
329            return true;
330        }
331
332        if (!(obj instanceof PagedBytesReference)) {
333            return BytesReference.Helper.bytesEqual(this, (BytesReference)obj);
334        }
335
336        PagedBytesReference other = (PagedBytesReference)obj;
337        if (length != other.length) {
338            return false;
339        }
340
341        // TODO: delegate to BigArrays via:
342        // return bigarrays.equals(bytearray, other.bytearray);
343        // and for slices:
344        // return bigarrays.equals(bytearray, start, other.bytearray, otherstart, len);
345        ByteArray otherArray = other.bytearray;
346        int otherOffset = other.offset;
347        for (int i = 0; i < length; i++) {
348            if (bytearray.get(offset + i) != otherArray.get(otherOffset + i)) {
349                return false;
350            }
351        }
352        return true;
353    }
354
355    private int countRequiredBuffers(int initialCount, int numBytes) {
356        int numBuffers = initialCount;
357        // an "estimate" of how many pages remain - rounded down
358        int pages = numBytes / PAGE_SIZE;
359        // a remaining fragment < pagesize needs at least one buffer
360        numBuffers += (pages == 0) ? 1 : pages;
361        // a remainder that is not a multiple of pagesize also needs an extra buffer
362        numBuffers += (pages > 0 && numBytes % PAGE_SIZE > 0) ? 1 : 0;
363        return numBuffers;
364    }
365
366    private static class PagedBytesReferenceStreamInput extends StreamInput {
367
368        private final ByteArray bytearray;
369        private final BytesRef ref;
370        private final int offset;
371        private final int length;
372        private int pos;
373
374        public PagedBytesReferenceStreamInput(ByteArray bytearray, int offset, int length) {
375            this.bytearray = bytearray;
376            this.ref = new BytesRef();
377            this.offset = offset;
378            this.length = length;
379            this.pos = 0;
380
381            if (offset + length > bytearray.size()) {
382                throw new IndexOutOfBoundsException("offset+length >= bytearray.size()");
383            }
384        }
385
386        @Override
387        public byte readByte() throws IOException {
388            if (pos >= length) {
389                throw new EOFException();
390            }
391
392            return bytearray.get(offset + pos++);
393        }
394
395        @Override
396        public void readBytes(byte[] b, int bOffset, int len) throws IOException {
397            if (len > offset + length) {
398                throw new IndexOutOfBoundsException("Cannot read " + len + " bytes from stream with length " + length + " at pos " + pos);
399            }
400
401            read(b, bOffset, len);
402        }
403
404        @Override
405        public int read() throws IOException {
406            return (pos < length) ? bytearray.get(offset + pos++) : -1; 
407        }
408
409        @Override
410        public int read(final byte[] b, final int bOffset, final int len) throws IOException {
411            if (len == 0) {
412                return 0;
413            }
414
415            if (pos >= offset + length) {
416                return -1;
417            }
418
419            final int numBytesToCopy = Math.min(len, length - pos); // copy the full lenth or the remaining part
420
421            // current offset into the underlying ByteArray
422            long byteArrayOffset = offset + pos;
423
424            // bytes already copied
425            int copiedBytes = 0;
426
427            while (copiedBytes < numBytesToCopy) {
428                long pageFragment = PAGE_SIZE - (byteArrayOffset % PAGE_SIZE); // how much can we read until hitting N*PAGE_SIZE?
429                int bulkSize = (int) Math.min(pageFragment, numBytesToCopy - copiedBytes); // we cannot copy more than a page fragment
430                boolean copied = bytearray.get(byteArrayOffset, bulkSize, ref); // get the fragment
431                assert (copied == false); // we should never ever get back a materialized byte[]
432                System.arraycopy(ref.bytes, ref.offset, b, bOffset + copiedBytes, bulkSize); // copy fragment contents
433                copiedBytes += bulkSize; // count how much we copied
434                byteArrayOffset += bulkSize; // advance ByteArray index
435            }
436
437            pos += copiedBytes; // finally advance our stream position
438            return copiedBytes;
439        }
440
441        @Override
442        public void reset() throws IOException {
443            pos = 0;
444        }
445
446        @Override
447        public void close() throws IOException {
448            // do nothing
449        }
450
451    }
452}