001package org.xbib.elasticsearch.common.io.stream; 002 003import org.xbib.elasticsearch.common.bytes.BytesReference; 004import org.xbib.elasticsearch.common.bytes.PagedBytesReference; 005import org.elasticsearch.common.io.BytesStream; 006import org.elasticsearch.common.io.stream.StreamOutput; 007import org.elasticsearch.common.util.BigArrays; 008import org.elasticsearch.common.util.ByteArray; 009 010import java.io.IOException; 011 012public class BytesStreamOutput extends StreamOutput implements BytesStream { 013 014 protected final BigArrays bigarrays; 015 016 protected ByteArray bytes; 017 018 protected int count; 019 020 /** 021 * Create a non recycling {@link BytesStreamOutput} with 1 initial page acquired. 022 */ 023 public BytesStreamOutput() { 024 this(BigArrays.PAGE_SIZE_IN_BYTES); 025 } 026 027 /** 028 * Create a non recycling {@link BytesStreamOutput} with enough initial pages acquired 029 * to satisfy the capacity given by expected size. 030 * 031 * @param expectedSize the expected maximum size of the stream in bytes. 032 */ 033 public BytesStreamOutput(int expectedSize) { 034 this(expectedSize, BigArrays.NON_RECYCLING_INSTANCE); 035 } 036 037 protected BytesStreamOutput(int expectedSize, BigArrays bigarrays) { 038 this.bigarrays = bigarrays; 039 this.bytes = bigarrays.newByteArray(expectedSize); 040 } 041 042 @Override 043 public boolean seekPositionSupported() { 044 return true; 045 } 046 047 @Override 048 public long position() throws IOException { 049 return count; 050 } 051 052 @Override 053 public void writeByte(byte b) throws IOException { 054 ensureCapacity(count+1); 055 bytes.set(count, b); 056 count++; 057 } 058 059 @Override 060 public void writeBytes(byte[] b, int offset, int length) throws IOException { 061 // nothing to copy 062 if (length == 0) { 063 return; 064 } 065 066 // illegal args: offset and/or length exceed array size 067 if (b.length < (offset + length)) { 068 throw new IllegalArgumentException("Illegal offset " + offset + "/length " + length + " for byte[] of length " + b.length); 069 } 070 071 // get enough pages for new size 072 ensureCapacity(count+length); 073 074 // bulk copy 075 bytes.set(count, b, offset, length); 076 077 // advance 078 count += length; 079 } 080 081 public void reset() { 082 // shrink list of pages 083 if (bytes.size() > BigArrays.PAGE_SIZE_IN_BYTES) { 084 bytes = bigarrays.resize(bytes, BigArrays.PAGE_SIZE_IN_BYTES); 085 } 086 087 // go back to start 088 count = 0; 089 } 090 091 @Override 092 public void flush() throws IOException { 093 // nothing to do 094 } 095 096 @Override 097 public void seek(long position) throws IOException { 098 if (position > Integer.MAX_VALUE) { 099 throw new IllegalArgumentException("position " + position + " > Integer.MAX_VALUE"); 100 } 101 102 count = (int)position; 103 ensureCapacity(count); 104 } 105 106 public void skip(int length) { 107 count += length; 108 ensureCapacity(count); 109 } 110 111 @Override 112 public void close() throws IOException { 113 // empty for now. 114 } 115 116 /** 117 * Returns the current size of the buffer. 118 * 119 * @return the value of the <code>count</code> field, which is the number of valid 120 * bytes in this output stream. 121 * @see java.io.ByteArrayOutputStream#count 122 */ 123 public int size() { 124 return count; 125 } 126 127 @Override 128 public org.elasticsearch.common.bytes.BytesReference bytes() { 129 return new org.elasticsearch.common.bytes.PagedBytesReference(bigarrays, bytes, count); 130 } 131 132 public BytesReference ourBytes() { 133 return new PagedBytesReference(bigarrays, bytes, count); 134 } 135 136 private void ensureCapacity(int offset) { 137 bytes = bigarrays.grow(bytes, offset); 138 } 139 140}