001package org.xbib.elasticsearch.common.bytes; 002 003import org.apache.lucene.util.BytesRef; 004import org.apache.lucene.util.CharsRef; 005import org.apache.lucene.util.UnicodeUtil; 006import org.elasticsearch.ElasticsearchIllegalArgumentException; 007import org.elasticsearch.common.io.stream.StreamInput; 008import org.elasticsearch.common.util.BigArrays; 009import org.elasticsearch.common.util.ByteArray; 010import org.jboss.netty.buffer.ChannelBuffer; 011import org.jboss.netty.buffer.ChannelBuffers; 012 013import java.io.EOFException; 014import java.io.IOException; 015import java.io.OutputStream; 016import java.nio.ByteBuffer; 017import java.nio.channels.GatheringByteChannel; 018import java.util.Arrays; 019 020/** 021 * A page based bytes reference, internally holding the bytes in a paged 022 * data structure. 023 */ 024public class PagedBytesReference implements BytesReference { 025 026 private static final int PAGE_SIZE = BigArrays.BYTE_PAGE_SIZE; 027 private static final int NIO_GATHERING_LIMIT = 524288; 028 029 private final BigArrays bigarrays; 030 protected final ByteArray bytearray; 031 private final int offset; 032 private final int length; 033 private int hash = 0; 034 035 public PagedBytesReference(BigArrays bigarrays, ByteArray bytearray, int length) { 036 this(bigarrays, bytearray, 0, length); 037 } 038 039 public PagedBytesReference(BigArrays bigarrays, ByteArray bytearray, int from, int length) { 040 this.bigarrays = bigarrays; 041 this.bytearray = bytearray; 042 this.offset = from; 043 this.length = length; 044 } 045 046 @Override 047 public byte get(int index) { 048 return bytearray.get(offset + index); 049 } 050 051 @Override 052 public int length() { 053 return length; 054 } 055 056 @Override 057 public BytesReference slice(int from, int length) { 058 if (from < 0 || (from + length) > length()) { 059 throw new ElasticsearchIllegalArgumentException("can't slice a buffer with length [" + length() + "], with slice parameters from [" + from + "], length [" + length + "]"); 060 } 061 062 return new PagedBytesReference(bigarrays, bytearray, offset + from, length); 063 } 064 065 @Override 066 public StreamInput streamInput() { 067 return new PagedBytesReferenceStreamInput(bytearray, offset, length); 068 } 069 070 @Override 071 public void writeTo(OutputStream os) throws IOException { 072 // nothing to do 073 if (length == 0) { 074 return; 075 } 076 077 BytesRef ref = new BytesRef(); 078 int written = 0; 079 080 // are we a slice? 081 if (offset != 0) { 082 // remaining size of page fragment at offset 083 int fragmentSize = Math.min(length, PAGE_SIZE - (offset % PAGE_SIZE)); 084 bytearray.get(offset, fragmentSize, ref); 085 os.write(ref.bytes, ref.offset, fragmentSize); 086 written += fragmentSize; 087 } 088 089 // handle remainder of pages + trailing fragment 090 while (written < length) { 091 int remaining = length - written; 092 int bulkSize = (remaining > PAGE_SIZE) ? PAGE_SIZE : remaining; 093 bytearray.get(offset + written, bulkSize, ref); 094 os.write(ref.bytes, ref.offset, bulkSize); 095 written += bulkSize; 096 } 097 } 098 099 @Override 100 public void writeTo(GatheringByteChannel channel) throws IOException { 101 // nothing to do 102 if (length == 0) { 103 return; 104 } 105 106 ByteBuffer[] buffers; 107 ByteBuffer currentBuffer = null; 108 BytesRef ref = new BytesRef(); 109 int pos = 0; 110 111 // are we a slice? 112 if (offset != 0) { 113 // remaining size of page fragment at offset 114 int fragmentSize = Math.min(length, PAGE_SIZE - (offset % PAGE_SIZE)); 115 bytearray.get(offset, fragmentSize, ref); 116 currentBuffer = ByteBuffer.wrap(ref.bytes, ref.offset, fragmentSize); 117 pos += fragmentSize; 118 } 119 120 // we only have a single page 121 if (pos == length && currentBuffer != null) { 122 channel.write(currentBuffer); 123 return; 124 } 125 126 // a slice > pagesize will likely require extra buffers for initial/trailing fragments 127 int numBuffers = countRequiredBuffers((currentBuffer != null ? 1 : 0), length - pos); 128 129 buffers = new ByteBuffer[numBuffers]; 130 int bufferSlot = 0; 131 132 if (currentBuffer != null) { 133 buffers[bufferSlot] = currentBuffer; 134 bufferSlot++; 135 } 136 137 // handle remainder of pages + trailing fragment 138 while (pos < length) { 139 int remaining = length - pos; 140 int bulkSize = (remaining > PAGE_SIZE) ? PAGE_SIZE : remaining; 141 bytearray.get(offset + pos, bulkSize, ref); 142 currentBuffer = ByteBuffer.wrap(ref.bytes, ref.offset, bulkSize); 143 buffers[bufferSlot] = currentBuffer; 144 bufferSlot++; 145 pos += bulkSize; 146 } 147 148 // this would indicate that our numBuffer calculation is off by one. 149 assert (numBuffers == bufferSlot); 150 151 // finally write all buffers 152 channel.write(buffers); 153 } 154 155 @Override 156 public byte[] toBytes() { 157 if (length == 0) { 158 return BytesRef.EMPTY_BYTES; 159 } 160 161 BytesRef ref = new BytesRef(); 162 bytearray.get(offset, length, ref); 163 164 // undo the single-page optimization by ByteArray.get(), otherwise 165 // a materialized stream will contain traling garbage/zeros 166 byte[] result = ref.bytes; 167 if (result.length != length || ref.offset != 0) { 168 result = Arrays.copyOfRange(result, ref.offset, ref.offset + length); 169 } 170 171 return result; 172 } 173 174 @Override 175 public BytesArray toBytesArray() { 176 BytesRef ref = new BytesRef(); 177 bytearray.get(offset, length, ref); 178 return new BytesArray(ref); 179 } 180 181 @Override 182 public BytesArray copyBytesArray() { 183 BytesRef ref = new BytesRef(); 184 boolean copied = bytearray.get(offset, length, ref); 185 186 if (copied) { 187 // BigArray has materialized for us, no need to do it again 188 return new BytesArray(ref.bytes, ref.offset, ref.length); 189 } 190 else { 191 // here we need to copy the bytes even when shared 192 byte[] copy = Arrays.copyOfRange(ref.bytes, ref.offset, ref.offset + ref.length); 193 return new BytesArray(copy); 194 } 195 } 196 197 @Override 198 public ChannelBuffer toChannelBuffer() { 199 // nothing to do 200 if (length == 0) { 201 return ChannelBuffers.EMPTY_BUFFER; 202 } 203 204 ChannelBuffer[] buffers; 205 ChannelBuffer currentBuffer = null; 206 BytesRef ref = new BytesRef(); 207 int pos = 0; 208 209 // are we a slice? 210 if (offset != 0) { 211 // remaining size of page fragment at offset 212 int fragmentSize = Math.min(length, PAGE_SIZE - (offset % PAGE_SIZE)); 213 bytearray.get(offset, fragmentSize, ref); 214 currentBuffer = ChannelBuffers.wrappedBuffer(ref.bytes, ref.offset, fragmentSize); 215 pos += fragmentSize; 216 } 217 218 // no need to create a composite buffer for a single page 219 if (pos == length && currentBuffer != null) { 220 return currentBuffer; 221 } 222 223 // a slice > pagesize will likely require extra buffers for initial/trailing fragments 224 int numBuffers = countRequiredBuffers((currentBuffer != null ? 1 : 0), length - pos); 225 226 buffers = new ChannelBuffer[numBuffers]; 227 int bufferSlot = 0; 228 229 if (currentBuffer != null) { 230 buffers[bufferSlot] = currentBuffer; 231 bufferSlot++; 232 } 233 234 // handle remainder of pages + trailing fragment 235 while (pos < length) { 236 int remaining = length - pos; 237 int bulkSize = (remaining > PAGE_SIZE) ? PAGE_SIZE : remaining; 238 bytearray.get(offset + pos, bulkSize, ref); 239 currentBuffer = ChannelBuffers.wrappedBuffer(ref.bytes, ref.offset, bulkSize); 240 buffers[bufferSlot] = currentBuffer; 241 bufferSlot++; 242 pos += bulkSize; 243 } 244 245 // this would indicate that our numBuffer calculation is off by one. 246 assert (numBuffers == bufferSlot); 247 248 // we can use gathering writes from the ChannelBuffers, but only if they are 249 // moderately small to prevent OOMs due to DirectBuffer allocations. 250 return ChannelBuffers.wrappedBuffer(length <= NIO_GATHERING_LIMIT, buffers); 251 } 252 253 @Override 254 public boolean hasArray() { 255 return (offset + length <= PAGE_SIZE); 256 } 257 258 @Override 259 public byte[] array() { 260 if (hasArray()) { 261 if (length == 0) { 262 return BytesRef.EMPTY_BYTES; 263 } 264 265 BytesRef ref = new BytesRef(); 266 bytearray.get(offset, length, ref); 267 return ref.bytes; 268 } 269 270 throw new IllegalStateException("array not available"); 271 } 272 273 @Override 274 public int arrayOffset() { 275 if (hasArray()) { 276 BytesRef ref = new BytesRef(); 277 bytearray.get(offset, length, ref); 278 return ref.offset; 279 } 280 281 throw new IllegalStateException("array not available"); 282 } 283 284 @Override 285 public String toUtf8() { 286 if (length() == 0) { 287 return ""; 288 } 289 290 byte[] bytes = toBytes(); 291 final CharsRef ref = new CharsRef(length); 292 UnicodeUtil.UTF8toUTF16(bytes, offset, length, ref); 293 return ref.toString(); 294 } 295 296 @Override 297 public BytesRef toBytesRef() { 298 BytesRef bref = new BytesRef(); 299 // if length <= pagesize this will dereference the page, or materialize the byte[] 300 bytearray.get(offset, length, bref); 301 return bref; 302 } 303 304 @Override 305 public BytesRef copyBytesRef() { 306 byte[] bytes = toBytes(); 307 return new BytesRef(bytes, offset, length); 308 } 309 310 @Override 311 public int hashCode() { 312 if (hash == 0) { 313 // TODO: delegate to BigArrays via: 314 // hash = bigarrays.hashCode(bytearray); 315 // and for slices: 316 // hash = bigarrays.hashCode(bytearray, offset, length); 317 int tmphash = 1; 318 for (int i = 0; i < length; i++) { 319 tmphash = 31 * tmphash + bytearray.get(offset + i); 320 } 321 hash = tmphash; 322 } 323 return hash; 324 } 325 326 @Override 327 public boolean equals(Object obj) { 328 if (this == obj) { 329 return true; 330 } 331 332 if (!(obj instanceof PagedBytesReference)) { 333 return BytesReference.Helper.bytesEqual(this, (BytesReference)obj); 334 } 335 336 PagedBytesReference other = (PagedBytesReference)obj; 337 if (length != other.length) { 338 return false; 339 } 340 341 // TODO: delegate to BigArrays via: 342 // return bigarrays.equals(bytearray, other.bytearray); 343 // and for slices: 344 // return bigarrays.equals(bytearray, start, other.bytearray, otherstart, len); 345 ByteArray otherArray = other.bytearray; 346 int otherOffset = other.offset; 347 for (int i = 0; i < length; i++) { 348 if (bytearray.get(offset + i) != otherArray.get(otherOffset + i)) { 349 return false; 350 } 351 } 352 return true; 353 } 354 355 private int countRequiredBuffers(int initialCount, int numBytes) { 356 int numBuffers = initialCount; 357 // an "estimate" of how many pages remain - rounded down 358 int pages = numBytes / PAGE_SIZE; 359 // a remaining fragment < pagesize needs at least one buffer 360 numBuffers += (pages == 0) ? 1 : pages; 361 // a remainder that is not a multiple of pagesize also needs an extra buffer 362 numBuffers += (pages > 0 && numBytes % PAGE_SIZE > 0) ? 1 : 0; 363 return numBuffers; 364 } 365 366 private static class PagedBytesReferenceStreamInput extends StreamInput { 367 368 private final ByteArray bytearray; 369 private final BytesRef ref; 370 private final int offset; 371 private final int length; 372 private int pos; 373 374 public PagedBytesReferenceStreamInput(ByteArray bytearray, int offset, int length) { 375 this.bytearray = bytearray; 376 this.ref = new BytesRef(); 377 this.offset = offset; 378 this.length = length; 379 this.pos = 0; 380 381 if (offset + length > bytearray.size()) { 382 throw new IndexOutOfBoundsException("offset+length >= bytearray.size()"); 383 } 384 } 385 386 @Override 387 public byte readByte() throws IOException { 388 if (pos >= length) { 389 throw new EOFException(); 390 } 391 392 return bytearray.get(offset + pos++); 393 } 394 395 @Override 396 public void readBytes(byte[] b, int bOffset, int len) throws IOException { 397 if (len > offset + length) { 398 throw new IndexOutOfBoundsException("Cannot read " + len + " bytes from stream with length " + length + " at pos " + pos); 399 } 400 401 read(b, bOffset, len); 402 } 403 404 @Override 405 public int read() throws IOException { 406 return (pos < length) ? bytearray.get(offset + pos++) : -1; 407 } 408 409 @Override 410 public int read(final byte[] b, final int bOffset, final int len) throws IOException { 411 if (len == 0) { 412 return 0; 413 } 414 415 if (pos >= offset + length) { 416 return -1; 417 } 418 419 final int numBytesToCopy = Math.min(len, length - pos); // copy the full lenth or the remaining part 420 421 // current offset into the underlying ByteArray 422 long byteArrayOffset = offset + pos; 423 424 // bytes already copied 425 int copiedBytes = 0; 426 427 while (copiedBytes < numBytesToCopy) { 428 long pageFragment = PAGE_SIZE - (byteArrayOffset % PAGE_SIZE); // how much can we read until hitting N*PAGE_SIZE? 429 int bulkSize = (int) Math.min(pageFragment, numBytesToCopy - copiedBytes); // we cannot copy more than a page fragment 430 boolean copied = bytearray.get(byteArrayOffset, bulkSize, ref); // get the fragment 431 assert (copied == false); // we should never ever get back a materialized byte[] 432 System.arraycopy(ref.bytes, ref.offset, b, bOffset + copiedBytes, bulkSize); // copy fragment contents 433 copiedBytes += bulkSize; // count how much we copied 434 byteArrayOffset += bulkSize; // advance ByteArray index 435 } 436 437 pos += copiedBytes; // finally advance our stream position 438 return copiedBytes; 439 } 440 441 @Override 442 public void reset() throws IOException { 443 pos = 0; 444 } 445 446 @Override 447 public void close() throws IOException { 448 // do nothing 449 } 450 451 } 452}