001package org.xbib.elasticsearch.common.io.stream;
002
003import org.elasticsearch.common.compress.Compressor;
004import org.elasticsearch.common.io.UTF8StreamWriter;
005import org.elasticsearch.common.io.stream.HandlesStreamOutput;
006import org.elasticsearch.common.io.stream.StreamOutput;
007import org.elasticsearch.common.unit.ByteSizeValue;
008import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
009import org.elasticsearch.monitor.jvm.JvmInfo;
010
011import java.io.IOException;
012import java.lang.ref.SoftReference;
013import java.util.Queue;
014import java.util.concurrent.atomic.AtomicInteger;
015
016/**
017 *
018 */
019public class CachedStreamOutput {
020
021    private static Entry newEntry() {
022        BytesStreamOutput bytes = new BytesStreamOutput();
023        HandlesStreamOutput handles = new HandlesStreamOutput(bytes);
024        return new Entry(bytes, handles);
025    }
026
027    public static class Entry {
028        private final BytesStreamOutput bytes;
029        private final HandlesStreamOutput handles;
030
031        Entry(BytesStreamOutput bytes, HandlesStreamOutput handles) {
032            this.bytes = bytes;
033            this.handles = handles;
034        }
035
036        public void reset() {
037            bytes.reset();
038            handles.setOut(bytes);
039            handles.clear();
040        }
041
042        public BytesStreamOutput bytes() {
043            return bytes;
044        }
045
046        public StreamOutput handles() throws IOException {
047            return handles;
048        }
049
050        public StreamOutput bytes(Compressor compressor) throws IOException {
051            return compressor.streamOutput(bytes);
052        }
053
054        public StreamOutput handles(Compressor compressor) throws IOException {
055            StreamOutput compressed = compressor.streamOutput(bytes);
056            handles.clear();
057            handles.setOut(compressed);
058            return handles;
059        }
060    }
061
062    static class SoftWrapper<T> {
063        private SoftReference<T> ref;
064
065        public SoftWrapper() {
066        }
067
068        public void set(T ref) {
069            this.ref = new SoftReference<T>(ref);
070        }
071
072        public T get() {
073            return ref == null ? null : ref.get();
074        }
075
076        public void clear() {
077            ref = null;
078        }
079    }
080
081    private static final SoftWrapper<Queue<Entry>> cache = new SoftWrapper<Queue<Entry>>();
082    private static final AtomicInteger counter = new AtomicInteger();
083    public static int BYTES_LIMIT = 1 * 1024 * 1024; // don't cache entries that are bigger than that...
084    public static int COUNT_LIMIT = 100; // number of concurrent entries cached
085
086    static {
087        // guess the maximum size per entry and the maximum number of entries based on the heap size
088        long maxHeap = JvmInfo.jvmInfo().mem().heapMax().bytes();
089        if (maxHeap < ByteSizeValue.parseBytesSizeValue("500mb").bytes()) {
090            BYTES_LIMIT = (int) ByteSizeValue.parseBytesSizeValue("500kb").bytes();
091            COUNT_LIMIT = 10;
092        } else if (maxHeap < ByteSizeValue.parseBytesSizeValue("1gb").bytes()) {
093            BYTES_LIMIT = (int) ByteSizeValue.parseBytesSizeValue("1mb").bytes();
094            COUNT_LIMIT = 20;
095        } else if (maxHeap < ByteSizeValue.parseBytesSizeValue("4gb").bytes()) {
096            BYTES_LIMIT = (int) ByteSizeValue.parseBytesSizeValue("2mb").bytes();
097            COUNT_LIMIT = 50;
098        } else if (maxHeap < ByteSizeValue.parseBytesSizeValue("10gb").bytes()) {
099            BYTES_LIMIT = (int) ByteSizeValue.parseBytesSizeValue("5mb").bytes();
100            COUNT_LIMIT = 50;
101        } else {
102            BYTES_LIMIT = (int) ByteSizeValue.parseBytesSizeValue("10mb").bytes();
103            COUNT_LIMIT = 100;
104        }
105    }
106
107    public static void clear() {
108        cache.clear();
109    }
110
111    public static Entry popEntry() {
112        Queue<Entry> ref = cache.get();
113        if (ref == null) {
114            return newEntry();
115        }
116        Entry entry = ref.poll();
117        if (entry == null) {
118            return newEntry();
119        }
120        counter.decrementAndGet();
121        entry.reset();
122        return entry;
123    }
124
125    public static void pushEntry(Entry entry) {
126        entry.reset();
127        if (entry.bytes().bytes().length() > BYTES_LIMIT) {
128            return;
129        }
130        Queue<Entry> ref = cache.get();
131        if (ref == null) {
132            ref = ConcurrentCollections.newQueue();
133            counter.set(0);
134            cache.set(ref);
135        }
136        if (counter.incrementAndGet() > COUNT_LIMIT) {
137            counter.decrementAndGet();
138        } else {
139            ref.add(entry);
140        }
141    }
142
143    private static ThreadLocal<SoftReference<UTF8StreamWriter>> utf8StreamWriter = new ThreadLocal<SoftReference<UTF8StreamWriter>>();
144
145    public static UTF8StreamWriter utf8StreamWriter() {
146        SoftReference<UTF8StreamWriter> ref = utf8StreamWriter.get();
147        UTF8StreamWriter writer = (ref == null) ? null : ref.get();
148        if (writer == null) {
149            writer = new UTF8StreamWriter(1024 * 4);
150            utf8StreamWriter.set(new SoftReference<UTF8StreamWriter>(writer));
151        }
152        writer.reset();
153        return writer;
154    }
155}