001package org.xbib.elasticsearch.action.websocket.bulk;
002
003import org.elasticsearch.ElasticsearchIllegalStateException;
004import org.elasticsearch.action.ActionListener;
005import org.elasticsearch.action.ActionRequest;
006import org.elasticsearch.action.bulk.BulkItemResponse;
007import org.elasticsearch.action.bulk.BulkRequest;
008import org.elasticsearch.action.bulk.BulkResponse;
009import org.elasticsearch.action.delete.DeleteRequest;
010import org.elasticsearch.action.index.IndexRequest;
011import org.elasticsearch.client.Client;
012import org.elasticsearch.common.Nullable;
013import org.elasticsearch.common.settings.Settings;
014import org.elasticsearch.common.unit.TimeValue;
015import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
016import org.elasticsearch.common.util.concurrent.EsExecutors;
017import org.elasticsearch.common.xcontent.XContentBuilder;
018import org.elasticsearch.common.xcontent.XContentBuilderString;
019import org.xbib.elasticsearch.websocket.BaseInteractiveHandler;
020import org.xbib.elasticsearch.websocket.InteractiveChannel;
021import org.xbib.elasticsearch.websocket.InteractiveRequest;
022
023import java.io.IOException;
024import java.util.Queue;
025import java.util.concurrent.Executors;
026import java.util.concurrent.ScheduledFuture;
027import java.util.concurrent.ScheduledThreadPoolExecutor;
028import java.util.concurrent.Semaphore;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicLong;
031
032import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
033
034/**
035 * The bulk handler is derived from the BulkProcessor, but
036 * offers explicit flushing and can receive requests from multiple threads
037 * into a global action queue.
038 * It supports websocket bulk actions and can write back response to an
039 * interactive channel.
040 * The bulk volume is not controlled.
041 * The default concurrency is 32, the number of actions in a bulk is 100.
042 */
043public class BulkHandler extends BaseInteractiveHandler {
044
045    private final BulkHandler.Listener listener;
046
047    private final int concurrentRequests;
048
049    private final int bulkActions;
050
051    private final TimeValue flushInterval;
052
053    private final Semaphore semaphore;
054
055    private final ScheduledThreadPoolExecutor scheduler;
056
057    private final ScheduledFuture scheduledFuture;
058
059    private final AtomicLong executionIdGen = new AtomicLong();
060
061    private final static Queue<ActionRequest> bulk = ConcurrentCollections.newQueue();
062
063    private volatile boolean closed = false;
064
065    @Override
066    public void handleRequest(final InteractiveRequest request, final InteractiveChannel channel) {
067        // will be overriden by bulk action
068    }
069
070    /**
071     * A listener for the execution.
072     */
073    public static interface Listener {
074
075        /**
076         * Callback before the bulk is executed.
077         */
078        void beforeBulk(long executionId, BulkRequest request);
079
080        /**
081         * Callback after a successful execution of bulk request.
082         */
083        void afterBulk(long executionId, BulkRequest request, BulkResponse response);
084
085        /**
086         * Callback after a failed execution of bulk request.
087         */
088        void afterBulk(long executionId, BulkRequest request, Throwable failure);
089    }
090
091    /**
092     * A listener adapter
093     */
094    class ListenerAdapter implements BulkHandler.Listener {
095
096        @Override
097        public void beforeBulk(long executionId, BulkRequest request) {
098        }
099
100        @Override
101        public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
102        }
103
104        @Override
105        public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
106        }
107    }
108
109    /**
110     * A builder used to create a build an instance of a bulk processor.
111     */
112    public static class Builder {
113
114        private Settings settings;
115        private Client client;
116        private BulkHandler.Listener listener;
117        private int concurrentRequests = 1;
118        private int bulkActions = 100;
119        private TimeValue flushInterval = null;
120
121        /**
122         * Creates a builder of bulk processor with the client to use and the
123         * listener that will be used to be notified on the completion of bulk
124         * requests.
125         */
126        public Builder(Client client, BulkHandler.Listener listener) {
127            this.client = client;
128            this.listener = listener;
129        }
130
131        /**
132         * Sets the number of concurrent requests allowed to be executed. A
133         * value of 0 means that only a single request will be allowed to be
134         * executed. A value of 1 means 1 concurrent request is allowed to be
135         * executed while accumulating new bulk requests. Defaults to
136         * <tt>1</tt>.
137         */
138        public BulkHandler.Builder setConcurrentRequests(int concurrentRequests) {
139            this.concurrentRequests = concurrentRequests;
140            return this;
141        }
142
143        /**
144         * Sets when to flush a new bulk request based on the number of actions
145         * currently added. Defaults to <tt>1000</tt>. Can be set to <tt>-1</tt>
146         * to disable it.
147         */
148        public BulkHandler.Builder setBulkActions(int bulkActions) {
149            this.bulkActions = bulkActions;
150            return this;
151        }
152
153        /**
154         * Sets a flush interval flushing *any* bulk actions pending if the
155         * interval passes. Defaults to not set.
156         * <p/>
157         * Note, {@link #setBulkActions(int)} can
158         * be set to <tt>-1</tt> with the flush interval set allowing for
159         * complete async processing of bulk actions.
160         */
161        public BulkHandler.Builder setFlushInterval(TimeValue flushInterval) {
162            this.flushInterval = flushInterval;
163            return this;
164        }
165
166        /**
167         * Builds a new bulk processor.
168         */
169        public BulkHandler build() {
170            return new BulkHandler(settings, client, listener, concurrentRequests, bulkActions, flushInterval);
171        }
172    }
173
174    public static BulkHandler.Builder builder(Client client, BulkHandler.Listener listener) {
175        return new BulkHandler.Builder(client, listener);
176    }
177
178    public BulkHandler(Settings settings, Client client) {
179        super(settings, client);
180        this.listener = new ListenerAdapter();
181        this.concurrentRequests = 32;
182        this.bulkActions = 100;
183        this.semaphore = new Semaphore(concurrentRequests);
184        this.flushInterval = null;
185        this.scheduler = null;
186        this.scheduledFuture = null;
187    }
188
189    BulkHandler(Settings settings, Client client, BulkHandler.Listener listener, int concurrentRequests, int bulkActions, @Nullable TimeValue flushInterval) {
190        super(settings, client);
191        this.listener = listener;
192        this.concurrentRequests = concurrentRequests;
193        this.bulkActions = bulkActions;
194        this.semaphore = new Semaphore(concurrentRequests);
195        this.flushInterval = flushInterval;
196        if (flushInterval != null) {
197            this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(client.settings(), "websocket_bulk_processor"));
198            this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
199            this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
200            this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(new BulkHandler.Flush(), flushInterval.millis(), flushInterval.millis(), TimeUnit.MILLISECONDS);
201        } else {
202            this.scheduler = null;
203            this.scheduledFuture = null;
204        }
205    }
206
207    public BulkHandler.Listener getListener() {
208        return listener;
209    }
210
211    /**
212     * Flushes open bulk actions
213     */
214    public synchronized void flush() {
215        if (closed) {
216            return;
217        }
218        if (bulk.size() > 0) {
219            execute(null);
220        }
221    }
222
223    /**
224     * Flushes open bulk actions
225     */
226    public synchronized void flush(InteractiveChannel channel) {
227        if (closed) {
228            return;
229        }
230        if (bulk.size() > 0) {
231            execute(channel);
232        }
233    }
234
235    /**
236     * Closes the processor. If flushing by time is enabled, then its shutdown.
237     * Any remaining bulk actions are flushed.
238     */
239    public synchronized void close() {
240        if (closed) {
241            return;
242        }
243        closed = true;
244        if (this.scheduledFuture != null) {
245            this.scheduledFuture.cancel(false);
246            this.scheduler.shutdown();
247        }
248        if (bulk.size() > 0) {
249            execute(null);
250        }
251    }
252
253    /**
254     * Adds an {@link IndexRequest} to the list of actions to execute. Follows
255     * the same behavior of {@link IndexRequest} (for example, if no id is
256     * provided, one will be generated, or usage of the create flag).
257     */
258    public BulkHandler add(IndexRequest request) {
259        return add((ActionRequest) request);
260    }
261
262    /**
263     * Adds an {@link DeleteRequest} to the list of actions to execute.
264     */
265    public BulkHandler add(DeleteRequest request) {
266        return add((ActionRequest) request);
267    }
268
269    public BulkHandler add(ActionRequest request) {
270        internalAdd(request);
271        return this;
272    }
273
274    private synchronized void internalAdd(ActionRequest request) {
275        bulk.add(request);
276        executeIfNeeded();
277    }
278
279    private void executeIfNeeded() {
280        if (closed) {
281            throw new ElasticsearchIllegalStateException("bulk process already closed");
282        }
283        if (!isOverTheLimit()) {
284            return;
285        }
286        execute(null);
287    }
288
289    /**
290     * Adds an {@link IndexRequest} to the list of actions to execute. Follows
291     * the same behavior of {@link IndexRequest} (for example, if no id is
292     * provided, one will be generated, or usage of the create flag).
293     */
294    public BulkHandler add(IndexRequest request, InteractiveChannel channel) {
295        return add((ActionRequest) request, channel);
296    }
297
298    /**
299     * Adds an {@link DeleteRequest} to the list of actions to execute.
300     */
301    public BulkHandler add(DeleteRequest request, InteractiveChannel channel) {
302        return add((ActionRequest) request, channel);
303    }
304
305    public BulkHandler add(ActionRequest request, InteractiveChannel channel) {
306        internalAdd(request, channel);
307        return this;
308    }
309
310    private synchronized void internalAdd(ActionRequest request, InteractiveChannel channel) {
311        bulk.add(request);
312        executeIfNeeded(channel);
313    }
314
315    private void executeIfNeeded(InteractiveChannel channel) {
316        if (closed) {
317            throw new ElasticsearchIllegalStateException("bulk process already closed");
318        }
319        if (!isOverTheLimit()) {
320            return;
321        }
322        execute(channel);
323    }
324
325    // (currently) needs to be executed under a lock
326    private void execute(final InteractiveChannel channel) {
327        final BulkRequest bulkRequest = new BulkRequest().add(bulk);
328        bulk.clear();
329
330        final long executionId = executionIdGen.incrementAndGet();
331
332        if (concurrentRequests == 0) {
333            // execute in a blocking fashion...
334            try {
335                listener.beforeBulk(executionId, bulkRequest);
336                listener.afterBulk(executionId, bulkRequest, client.bulk(bulkRequest).actionGet());
337            } catch (Exception e) {
338                listener.afterBulk(executionId, bulkRequest, e);
339            }
340        } else {
341            try {
342                semaphore.acquire();
343            } catch (InterruptedException e) {
344                listener.afterBulk(executionId, bulkRequest, e);
345                return;
346            }
347            listener.beforeBulk(executionId, bulkRequest);
348            client.bulk(bulkRequest, new ActionListener<BulkResponse>() {
349                @Override
350                public void onResponse(BulkResponse response) {
351                    try {
352                        listener.afterBulk(executionId, bulkRequest, response);
353                        if (channel != null) {
354                            channel.sendResponse("bulkresponse", buildResponse(response));
355                        }
356                    } catch (IOException e) {
357                        logger.error("error while sending bulk response", e);
358                    } finally {
359                        semaphore.release();
360                    }
361                }
362
363                @Override
364                public void onFailure(Throwable t) {
365                    try {
366                        listener.afterBulk(executionId, bulkRequest, t);
367                        if (channel != null) {
368                            channel.sendResponse("bulkresponse", t);
369                        }
370                    } catch (IOException e) {
371                        logger.error("error while sending bulk response", e);
372                    } finally {
373                        semaphore.release();
374                    }
375                }
376            });
377        }
378    }
379
380    private boolean isOverTheLimit() {
381        if (bulkActions != -1 && bulk.size() > bulkActions) {
382            return true;
383        }
384        return false;
385    }
386
387    class Flush implements Runnable {
388
389        @Override
390        public void run() {
391            synchronized (BulkHandler.this) {
392                if (closed) {
393                    return;
394                }
395                if (bulk.size() > 0) {
396                    execute(null);
397                }
398            }
399        }
400    }
401
402    /**
403     * Taken from the REST bulk action.
404     *
405     * @param response the bulk response
406     * @return a content builder with the response
407     * @throws IOException
408     */
409    private XContentBuilder buildResponse(BulkResponse response) throws IOException {
410        XContentBuilder builder = jsonBuilder();
411        builder.startObject();
412        builder.field(Fields.TOOK, response.getTookInMillis());
413        builder.startArray(Fields.ITEMS);
414        for (BulkItemResponse itemResponse : response) {
415            builder.startObject();
416            builder.startObject(itemResponse.getOpType());
417            builder.field(Fields._INDEX, itemResponse.getIndex());
418            builder.field(Fields._TYPE, itemResponse.getType());
419            builder.field(Fields._ID, itemResponse.getId());
420            long version = itemResponse.getVersion();
421            if (version != -1) {
422                builder.field(Fields._VERSION, itemResponse.getVersion());
423            }
424            if (itemResponse.isFailed()) {
425                builder.field(Fields.ERROR, itemResponse.getFailureMessage());
426            } else {
427                builder.field(Fields.OK, true);
428            }
429            builder.endObject();
430            builder.endObject();
431        }
432        builder.endArray();
433        builder.endObject();
434        return builder;
435    }
436
437    static final class Fields {
438
439        static final XContentBuilderString ITEMS = new XContentBuilderString("items");
440        static final XContentBuilderString _INDEX = new XContentBuilderString("_index");
441        static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
442        static final XContentBuilderString _ID = new XContentBuilderString("_id");
443        static final XContentBuilderString ERROR = new XContentBuilderString("error");
444        static final XContentBuilderString OK = new XContentBuilderString("ok");
445        static final XContentBuilderString TOOK = new XContentBuilderString("took");
446        static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
447        static final XContentBuilderString MATCHES = new XContentBuilderString("matches");
448    }
449}