001package org.xbib.elasticsearch.common.io.stream; 002 003import org.elasticsearch.common.compress.Compressor; 004import org.elasticsearch.common.io.UTF8StreamWriter; 005import org.elasticsearch.common.io.stream.HandlesStreamOutput; 006import org.elasticsearch.common.io.stream.StreamOutput; 007import org.elasticsearch.common.unit.ByteSizeValue; 008import org.elasticsearch.common.util.concurrent.ConcurrentCollections; 009import org.elasticsearch.monitor.jvm.JvmInfo; 010 011import java.io.IOException; 012import java.lang.ref.SoftReference; 013import java.util.Queue; 014import java.util.concurrent.atomic.AtomicInteger; 015 016/** 017 * 018 */ 019public class CachedStreamOutput { 020 021 private static Entry newEntry() { 022 BytesStreamOutput bytes = new BytesStreamOutput(); 023 HandlesStreamOutput handles = new HandlesStreamOutput(bytes); 024 return new Entry(bytes, handles); 025 } 026 027 public static class Entry { 028 private final BytesStreamOutput bytes; 029 private final HandlesStreamOutput handles; 030 031 Entry(BytesStreamOutput bytes, HandlesStreamOutput handles) { 032 this.bytes = bytes; 033 this.handles = handles; 034 } 035 036 public void reset() { 037 bytes.reset(); 038 handles.setOut(bytes); 039 handles.clear(); 040 } 041 042 public BytesStreamOutput bytes() { 043 return bytes; 044 } 045 046 public StreamOutput handles() throws IOException { 047 return handles; 048 } 049 050 public StreamOutput bytes(Compressor compressor) throws IOException { 051 return compressor.streamOutput(bytes); 052 } 053 054 public StreamOutput handles(Compressor compressor) throws IOException { 055 StreamOutput compressed = compressor.streamOutput(bytes); 056 handles.clear(); 057 handles.setOut(compressed); 058 return handles; 059 } 060 } 061 062 static class SoftWrapper<T> { 063 private SoftReference<T> ref; 064 065 public SoftWrapper() { 066 } 067 068 public void set(T ref) { 069 this.ref = new SoftReference<T>(ref); 070 } 071 072 public T get() { 073 return ref == null ? null : ref.get(); 074 } 075 076 public void clear() { 077 ref = null; 078 } 079 } 080 081 private static final SoftWrapper<Queue<Entry>> cache = new SoftWrapper<Queue<Entry>>(); 082 private static final AtomicInteger counter = new AtomicInteger(); 083 public static int BYTES_LIMIT = 1 * 1024 * 1024; // don't cache entries that are bigger than that... 084 public static int COUNT_LIMIT = 100; // number of concurrent entries cached 085 086 static { 087 // guess the maximum size per entry and the maximum number of entries based on the heap size 088 long maxHeap = JvmInfo.jvmInfo().mem().heapMax().bytes(); 089 if (maxHeap < ByteSizeValue.parseBytesSizeValue("500mb").bytes()) { 090 BYTES_LIMIT = (int) ByteSizeValue.parseBytesSizeValue("500kb").bytes(); 091 COUNT_LIMIT = 10; 092 } else if (maxHeap < ByteSizeValue.parseBytesSizeValue("1gb").bytes()) { 093 BYTES_LIMIT = (int) ByteSizeValue.parseBytesSizeValue("1mb").bytes(); 094 COUNT_LIMIT = 20; 095 } else if (maxHeap < ByteSizeValue.parseBytesSizeValue("4gb").bytes()) { 096 BYTES_LIMIT = (int) ByteSizeValue.parseBytesSizeValue("2mb").bytes(); 097 COUNT_LIMIT = 50; 098 } else if (maxHeap < ByteSizeValue.parseBytesSizeValue("10gb").bytes()) { 099 BYTES_LIMIT = (int) ByteSizeValue.parseBytesSizeValue("5mb").bytes(); 100 COUNT_LIMIT = 50; 101 } else { 102 BYTES_LIMIT = (int) ByteSizeValue.parseBytesSizeValue("10mb").bytes(); 103 COUNT_LIMIT = 100; 104 } 105 } 106 107 public static void clear() { 108 cache.clear(); 109 } 110 111 public static Entry popEntry() { 112 Queue<Entry> ref = cache.get(); 113 if (ref == null) { 114 return newEntry(); 115 } 116 Entry entry = ref.poll(); 117 if (entry == null) { 118 return newEntry(); 119 } 120 counter.decrementAndGet(); 121 entry.reset(); 122 return entry; 123 } 124 125 public static void pushEntry(Entry entry) { 126 entry.reset(); 127 if (entry.bytes().bytes().length() > BYTES_LIMIT) { 128 return; 129 } 130 Queue<Entry> ref = cache.get(); 131 if (ref == null) { 132 ref = ConcurrentCollections.newQueue(); 133 counter.set(0); 134 cache.set(ref); 135 } 136 if (counter.incrementAndGet() > COUNT_LIMIT) { 137 counter.decrementAndGet(); 138 } else { 139 ref.add(entry); 140 } 141 } 142 143 private static ThreadLocal<SoftReference<UTF8StreamWriter>> utf8StreamWriter = new ThreadLocal<SoftReference<UTF8StreamWriter>>(); 144 145 public static UTF8StreamWriter utf8StreamWriter() { 146 SoftReference<UTF8StreamWriter> ref = utf8StreamWriter.get(); 147 UTF8StreamWriter writer = (ref == null) ? null : ref.get(); 148 if (writer == null) { 149 writer = new UTF8StreamWriter(1024 * 4); 150 utf8StreamWriter.set(new SoftReference<UTF8StreamWriter>(writer)); 151 } 152 writer.reset(); 153 return writer; 154 } 155}