001/*
002 * Copyright (C) 2014 Jörg Prante
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.xbib.elasticsearch.plugin.jdbc.client.transport;
017
018import org.elasticsearch.ElasticsearchIllegalStateException;
019import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
020import org.elasticsearch.action.bulk.BulkItemResponse;
021import org.elasticsearch.action.bulk.BulkProcessor;
022import org.elasticsearch.action.bulk.BulkRequest;
023import org.elasticsearch.action.bulk.BulkResponse;
024import org.elasticsearch.action.delete.DeleteRequest;
025import org.elasticsearch.action.index.IndexRequest;
026import org.elasticsearch.client.Client;
027import org.elasticsearch.common.collect.ImmutableSet;
028import org.elasticsearch.common.logging.ESLogger;
029import org.elasticsearch.common.logging.ESLoggerFactory;
030import org.elasticsearch.common.settings.ImmutableSettings;
031import org.elasticsearch.common.settings.Settings;
032import org.elasticsearch.common.unit.ByteSizeUnit;
033import org.elasticsearch.common.unit.ByteSizeValue;
034import org.elasticsearch.common.unit.TimeValue;
035import org.xbib.elasticsearch.plugin.jdbc.client.BaseIngestTransportClient;
036import org.xbib.elasticsearch.plugin.jdbc.client.BulkProcessorHelper;
037import org.xbib.elasticsearch.plugin.jdbc.client.ClientHelper;
038import org.xbib.elasticsearch.plugin.jdbc.client.Ingest;
039import org.xbib.elasticsearch.plugin.jdbc.client.Metric;
040
041import java.io.IOException;
042import java.util.Map;
043import java.util.concurrent.atomic.AtomicLong;
044
045/**
046 * Client using the BulkProcessor of Elasticsearch
047 */
048public class BulkTransportClient extends BaseIngestTransportClient implements Ingest {
049
050    private final static ESLogger logger = ESLoggerFactory.getLogger("river.jdbc.BulkTransportClient");
051    /**
052     * The default size of a bulk request
053     */
054    private int maxActionsPerBulkRequest = 100;
055    /**
056     * The default number of maximum concurrent requests
057     */
058    private int maxConcurrentBulkRequests = Runtime.getRuntime().availableProcessors() * 2;
059    /**
060     * The maximum volume
061     */
062    private ByteSizeValue maxVolumePerBulkRequest = new ByteSizeValue(10, ByteSizeUnit.MB);
063
064    private TimeValue flushInterval = TimeValue.timeValueSeconds(30);
065
066    /**
067     * The concurrent requests
068     */
069    private final AtomicLong concurrentRequestCounter = new AtomicLong(0L);
070
071    /**
072     * The BulkProcessor
073     */
074    private BulkProcessor bulkProcessor;
075
076    private Metric metric;
077
078    private Throwable throwable;
079
080    private volatile boolean suspended = false;
081
082    private volatile boolean closed = false;
083
084    @Override
085    public BulkTransportClient maxActionsPerBulkRequest(int maxActionsPerBulkRequest) {
086        this.maxActionsPerBulkRequest = maxActionsPerBulkRequest;
087        return this;
088    }
089
090    @Override
091    public BulkTransportClient maxConcurrentBulkRequests(int maxConcurrentBulkRequests) {
092        this.maxConcurrentBulkRequests = maxConcurrentBulkRequests;
093        return this;
094    }
095
096    @Override
097    public BulkTransportClient maxVolumePerBulkRequest(ByteSizeValue maxVolumePerBulkRequest) {
098        this.maxVolumePerBulkRequest = maxVolumePerBulkRequest;
099        return this;
100    }
101
102    @Override
103    public BulkTransportClient maxRequestWait(TimeValue timeout) {
104        // ignore, not supported
105        return this;
106    }
107
108    @Override
109    public BulkTransportClient flushIngestInterval(TimeValue flushInterval) {
110        this.flushInterval = flushInterval;
111        return this;
112    }
113
114    public BulkTransportClient newClient(Client client) {
115        return this.newClient(findSettings());
116    }
117
118    @Override
119    public BulkTransportClient newClient(Map<String, String> settings) {
120        return this.newClient(ImmutableSettings.settingsBuilder().put(settings).build());
121    }
122
123    @Override
124    public BulkTransportClient newClient(Settings settings) {
125        super.newClient(settings);
126        resetSettings();
127        this.metric = new Metric();
128        metric.start();
129        BulkProcessor.Listener listener = new BulkProcessor.Listener() {
130            @Override
131            public void beforeBulk(long executionId, BulkRequest request) {
132                long l = concurrentRequestCounter.getAndIncrement();
133                if (metric != null) {
134                    int n = request.numberOfActions();
135                    metric.getSubmitted().inc(n);
136                    metric.getCurrentIngestNumDocs().inc(n);
137                    metric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes());
138                }
139                logger.debug("before bulk [{}] [actions={}] [bytes={}] [concurrent requests={}]",
140                        executionId,
141                        request.numberOfActions(),
142                        request.estimatedSizeInBytes(),
143                        l);
144            }
145
146            @Override
147            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
148                long l = concurrentRequestCounter.decrementAndGet();
149                if (metric != null) {
150                    metric.getSucceeded().inc(response.getItems().length);
151                    metric.getTotalIngest().inc(response.getTookInMillis());
152                }
153                int n = 0;
154                for (BulkItemResponse itemResponse : response.getItems()) {
155                    if (itemResponse.isFailed()) {
156                        n++;
157                        metric.getSucceeded().dec(1);
158                        metric.getFailed().inc(1);
159                    }
160                }
161                logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] [concurrent requests={}]",
162                        executionId,
163                        metric.getSucceeded().count(),
164                        metric.getFailed().count(),
165                        response.getTook().millis(),
166                        l);
167                if (n > 0) {
168                    logger.error("bulk [{}] failed with {} failed items, failure message = {}",
169                            executionId, n, response.buildFailureMessage());
170                } else {
171                    metric.getCurrentIngestNumDocs().dec(response.getItems().length);
172                }
173            }
174
175            @Override
176            public void afterBulk(long executionId, BulkRequest requst, Throwable failure) {
177                concurrentRequestCounter.decrementAndGet();
178                throwable = failure;
179                closed = true;
180                logger.error("bulk [" + executionId + "] error", failure);
181            }
182        };
183        BulkProcessor.Builder builder = BulkProcessor.builder(client, listener)
184                .setBulkActions(maxActionsPerBulkRequest)
185                .setConcurrentRequests(maxConcurrentBulkRequests)
186                .setFlushInterval(flushInterval);
187        if (maxVolumePerBulkRequest != null) {
188            builder.setBulkSize(maxVolumePerBulkRequest);
189        }
190        this.bulkProcessor = builder.build();
191        this.closed = false;
192        return this;
193    }
194
195    @Override
196    public Client client() {
197        return client;
198    }
199
200    public BulkTransportClient shards(int value) {
201        super.shards(value);
202        return this;
203    }
204
205    public BulkTransportClient replica(int value) {
206        super.replica(value);
207        return this;
208    }
209
210    @Override
211    public BulkTransportClient newIndex(String index) {
212        if (closed) {
213            throw new ElasticsearchIllegalStateException("client is closed");
214        }
215        super.newIndex(index);
216        return this;
217    }
218
219    @Override
220    public BulkTransportClient deleteIndex(String index) {
221        if (closed) {
222            throw new ElasticsearchIllegalStateException("client is closed");
223        }
224        super.deleteIndex(index);
225        return this;
226    }
227
228    @Override
229    public BulkTransportClient startBulk(String index) throws IOException {
230        if (metric == null) {
231            return this;
232        }
233        if (!metric.isBulk(index)) {
234            metric.startBulk(index);
235            ClientHelper.disableRefresh(client, index);
236        }
237        return this;
238    }
239
240    @Override
241    public BulkTransportClient stopBulk(String index) throws IOException {
242        if (metric == null) {
243            return this;
244        }
245        if (metric.isBulk(index)) {
246            metric.stopBulk(index);
247            ClientHelper.enableRefresh(client, index);
248        }
249        return this;
250    }
251
252    @Override
253    public BulkTransportClient flush(String index) {
254        ClientHelper.flush(client, index);
255        return this;
256    }
257
258    @Override
259    public BulkTransportClient refresh(String index) {
260        ClientHelper.refresh(client, index);
261        return this;
262    }
263
264    @Override
265    public BulkTransportClient index(String index, String type, String id, String source) {
266        if (closed) {
267            throw new ElasticsearchIllegalStateException("client is closed");
268        }
269        try {
270            if (suspended) {
271                Thread.sleep(1000L);
272                return this;
273            }
274            metric.getCurrentIngest().inc();
275            bulkProcessor.add(new IndexRequest(index).type(type).id(id).create(false).source(source));
276        } catch (Exception e) {
277            throwable = e;
278            closed = true;
279            logger.error("bulk add of index request failed: " + e.getMessage(), e);
280        } finally {
281            metric.getCurrentIngest().dec();
282        }
283        return this;
284    }
285
286    @Override
287    public BulkTransportClient bulkIndex(IndexRequest indexRequest) {
288        if (closed) {
289            throw new ElasticsearchIllegalStateException("client is closed");
290        }
291        try {
292            if (suspended) {
293                Thread.sleep(1000L);
294                return this;
295            }
296            metric.getCurrentIngest().inc();
297            bulkProcessor.add(indexRequest);
298        } catch (Exception e) {
299            throwable = e;
300            closed = true;
301            logger.error("bulk add of index request failed: " + e.getMessage(), e);
302        } finally {
303            metric.getCurrentIngest().dec();
304        }
305        return this;
306    }
307
308    @Override
309    public BulkTransportClient delete(String index, String type, String id) {
310        if (closed) {
311            throw new ElasticsearchIllegalStateException("client is closed");
312        }
313        try {
314            if (suspended) {
315                Thread.sleep(1000L);
316                return this;
317            }
318            metric.getCurrentIngest().inc();
319            bulkProcessor.add(new DeleteRequest(index).type(type).id(id));
320        } catch (Exception e) {
321            throwable = e;
322            closed = true;
323            logger.error("bulk add of delete request failed: " + e.getMessage(), e);
324        } finally {
325            metric.getCurrentIngest().dec();
326        }
327        return this;
328    }
329
330    @Override
331    public BulkTransportClient bulkDelete(DeleteRequest deleteRequest) {
332        if (closed) {
333            throw new ElasticsearchIllegalStateException("client is closed");
334        }
335        try {
336            if (suspended) {
337                Thread.sleep(1000L);
338                return this;
339            }
340            metric.getCurrentIngest().inc();
341            bulkProcessor.add(deleteRequest);
342        } catch (Exception e) {
343            throwable = e;
344            closed = true;
345            logger.error("bulk add of delete request failed: " + e.getMessage(), e);
346        } finally {
347            metric.getCurrentIngest().dec();
348        }
349        return this;
350    }
351
352    @Override
353    public synchronized BulkTransportClient flushIngest() {
354        if (closed) {
355            throw new ElasticsearchIllegalStateException("client is closed");
356        }
357        if (client == null) {
358            logger.warn("no client");
359            return this;
360        }
361        logger.debug("flushing bulk processor");
362        // hacked BulkProcessor to execute the submission of remaining docs. Wait always 30 seconds at most.
363        BulkProcessorHelper.flush(bulkProcessor);
364        return this;
365    }
366
367    @Override
368    public synchronized BulkTransportClient waitForResponses(TimeValue maxWaitTime) throws InterruptedException {
369        if (closed) {
370            throw new ElasticsearchIllegalStateException("client is closed");
371        }
372        if (client == null) {
373            logger.warn("no client");
374            return this;
375        }
376        if (metric.getCurrentIngest().count() == 0) {
377            logger.warn("no current ingests");
378            return this;
379        }
380        BulkProcessorHelper.waitFor(bulkProcessor, maxWaitTime);
381        return this;
382    }
383
384    @Override
385    public BulkTransportClient waitForCluster(ClusterHealthStatus status, TimeValue timeValue) throws IOException {
386        ClientHelper.waitForCluster(client, status, timeValue);
387        return this;
388    }
389
390    @Override
391    public int waitForRecovery(String index) throws IOException {
392        return ClientHelper.waitForRecovery(client, index);
393    }
394
395    @Override
396    public int updateReplicaLevel(String index, int level) throws IOException {
397        return ClientHelper.updateReplicaLevel(client, index, level);
398    }
399
400    @Override
401    public synchronized void shutdown() {
402        if (closed) {
403            super.shutdown();
404            throw new ElasticsearchIllegalStateException("client is closed");
405        }
406        try {
407            if (bulkProcessor != null) {
408                logger.debug("closing bulk processor...");
409                bulkProcessor.close();
410            }
411            if (metric.indices() != null && !metric.indices().isEmpty()) {
412                logger.debug("stopping bulk mode for indices {}...", metric.indices());
413                for (String index : ImmutableSet.copyOf(metric.indices())) {
414                    stopBulk(index);
415                }
416            }
417            logger.debug("shutting down...");
418            super.shutdown();
419            logger.debug("shutting down completed");
420        } catch (Exception e) {
421            logger.error(e.getMessage(), e);
422        }
423    }
424
425    @Override
426    public void suspend() {
427        suspended = true;
428    }
429
430    @Override
431    public void resume() {
432        suspended = false;
433    }
434
435    public Metric getMetric() {
436        return metric;
437    }
438
439    @Override
440    public boolean hasThrowable() {
441        return throwable != null;
442    }
443
444    @Override
445    public Throwable getThrowable() {
446        return throwable;
447    }
448}