001/*
002 * Copyright (C) 2014 Jörg Prante
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.xbib.elasticsearch.plugin.jdbc.client.node;
017
018import org.elasticsearch.ElasticsearchIllegalStateException;
019import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
020import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
021import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
022import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
023import org.elasticsearch.action.bulk.BulkItemResponse;
024import org.elasticsearch.action.bulk.BulkProcessor;
025import org.elasticsearch.action.bulk.BulkRequest;
026import org.elasticsearch.action.bulk.BulkResponse;
027import org.elasticsearch.action.delete.DeleteRequest;
028import org.elasticsearch.action.index.IndexRequest;
029import org.elasticsearch.client.Client;
030import org.elasticsearch.common.collect.ImmutableSet;
031import org.elasticsearch.common.logging.ESLogger;
032import org.elasticsearch.common.logging.ESLoggerFactory;
033import org.elasticsearch.common.settings.ImmutableSettings;
034import org.elasticsearch.common.settings.Settings;
035import org.elasticsearch.common.unit.ByteSizeUnit;
036import org.elasticsearch.common.unit.ByteSizeValue;
037import org.elasticsearch.common.unit.TimeValue;
038import org.xbib.elasticsearch.plugin.jdbc.client.BulkProcessorHelper;
039import org.xbib.elasticsearch.plugin.jdbc.client.ClientHelper;
040import org.xbib.elasticsearch.plugin.jdbc.client.ConfigHelper;
041import org.xbib.elasticsearch.plugin.jdbc.client.Ingest;
042import org.xbib.elasticsearch.plugin.jdbc.client.Metric;
043
044import java.io.IOException;
045import java.io.InputStream;
046import java.util.Arrays;
047import java.util.List;
048import java.util.Map;
049
050/**
051 * Node client support with bulk processing
052 */
053public class BulkNodeClient implements Ingest {
054
055    private final static ESLogger logger = ESLoggerFactory.getLogger("river.jdbc.BulkNodeClient");
056
057    private int maxActionsPerBulkRequest = 100;
058
059    private int maxConcurrentBulkRequests = Runtime.getRuntime().availableProcessors() * 2;
060
061    private ByteSizeValue maxVolume = new ByteSizeValue(10, ByteSizeUnit.MB);
062
063    private TimeValue flushInterval = TimeValue.timeValueSeconds(30);
064
065    private final ConfigHelper configHelper = new ConfigHelper();
066
067    private Client client;
068
069    private BulkProcessor bulkProcessor;
070
071    private boolean isShutdown = false;
072
073    private Metric metric = new Metric();
074
075    private boolean closed = false;
076
077    private boolean suspended = false;
078
079    private Throwable throwable;
080
081    @Override
082    public BulkNodeClient shards(int shards) {
083        configHelper.setting("index.number_of_shards", shards);
084        return this;
085    }
086
087    @Override
088    public BulkNodeClient replica(int replica) {
089        configHelper.setting("index.number_of_replica", replica);
090        return this;
091    }
092
093    @Override
094    public BulkNodeClient maxActionsPerBulkRequest(int maxActionsPerBulkRequest) {
095        this.maxActionsPerBulkRequest = maxActionsPerBulkRequest;
096        return this;
097    }
098
099    @Override
100    public BulkNodeClient maxConcurrentBulkRequests(int maxConcurrentBulkRequests) {
101        this.maxConcurrentBulkRequests = maxConcurrentBulkRequests;
102        return this;
103    }
104
105    @Override
106    public BulkNodeClient maxVolumePerBulkRequest(ByteSizeValue maxVolume) {
107        this.maxVolume = maxVolume;
108        return this;
109    }
110
111    @Override
112    public BulkNodeClient maxRequestWait(TimeValue timeValue) {
113        // ignore, not implemented
114        return this;
115    }
116
117    @Override
118    public BulkNodeClient flushIngestInterval(TimeValue flushInterval) {
119        this.flushInterval = flushInterval;
120        return this;
121    }
122
123    @Override
124    public BulkNodeClient newClient(Settings settings) {
125        throw new UnsupportedOperationException();
126    }
127
128    @Override
129    public BulkNodeClient newClient(Map<String, String> client) {
130        throw new UnsupportedOperationException();
131    }
132
133    @Override
134    public List<String> getConnectedNodes() {
135        return Arrays.asList(client.toString());
136    }
137
138    @Override
139    public BulkNodeClient newClient(Client client) {
140        this.client = client;
141        this.metric = new Metric();
142        metric.start();
143        BulkProcessor.Listener listener = new BulkProcessor.Listener() {
144            @Override
145            public void beforeBulk(long executionId, BulkRequest request) {
146                metric.getCurrentIngest().inc();
147                long l = metric.getCurrentIngest().count();
148                int n = request.numberOfActions();
149                metric.getSubmitted().inc(n);
150                metric.getCurrentIngestNumDocs().inc(n);
151                metric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes());
152                logger.debug("before bulk [{}] [actions={}] [bytes={}] [concurrent requests={}]",
153                        executionId,
154                        request.numberOfActions(),
155                        request.estimatedSizeInBytes(),
156                        l);
157            }
158
159            @Override
160            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
161                metric.getCurrentIngest().dec();
162                long l = metric.getCurrentIngest().count();
163                metric.getSucceeded().inc(response.getItems().length);
164                metric.getFailed().inc(0);
165                metric.getTotalIngest().inc(response.getTookInMillis());
166                int n = 0;
167                for (BulkItemResponse itemResponse : response.getItems()) {
168                    if (itemResponse.isFailed()) {
169                        n++;
170                        metric.getSucceeded().dec(1);
171                        metric.getFailed().inc(1);
172                    }
173                }
174                logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] {} concurrent requests",
175                        executionId,
176                        metric.getSucceeded().count(),
177                        metric.getFailed().count(),
178                        response.getTook().millis(),
179                        l);
180                if (n > 0) {
181                    logger.error("bulk [{}] failed with {} failed items, failure message = {}",
182                            executionId, n, response.buildFailureMessage());
183                } else {
184                    metric.getCurrentIngestNumDocs().dec(response.getItems().length);
185                }
186            }
187
188            @Override
189            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
190                metric.getCurrentIngest().dec();
191                throwable = failure;
192                closed = true;
193                logger.error("after bulk [" + executionId + "] error", failure);
194            }
195        };
196        BulkProcessor.Builder builder = BulkProcessor.builder(client, listener)
197                .setBulkActions(maxActionsPerBulkRequest)  // off-by-one
198                .setConcurrentRequests(maxConcurrentBulkRequests)
199                .setFlushInterval(flushInterval);
200        if (maxVolume != null) {
201            builder.setBulkSize(maxVolume);
202        }
203        this.bulkProcessor = builder.build();
204        try {
205            waitForCluster(ClusterHealthStatus.YELLOW, TimeValue.timeValueSeconds(30));
206            closed = false;
207        } catch (IOException e) {
208            logger.error(e.getMessage(), e);
209            closed = true;
210        }
211        return this;
212    }
213
214    @Override
215    public Client client() {
216        return client;
217    }
218
219    @Override
220    public Metric getMetric() {
221        return metric;
222    }
223
224    @Override
225    public BulkNodeClient putMapping(String index) {
226        if (client == null) {
227            logger.warn("no client for put mapping");
228            return this;
229        }
230        configHelper.putMapping(client, index);
231        return this;
232    }
233
234    @Override
235    public BulkNodeClient deleteMapping(String index, String type) {
236        if (client == null) {
237            logger.warn("no client for delete mapping");
238            return this;
239        }
240        configHelper.deleteMapping(client, index, type);
241        return this;
242    }
243
244    @Override
245    public BulkNodeClient index(String index, String type, String id, String source) {
246        if (closed) {
247            throw new ElasticsearchIllegalStateException("client is closed");
248        }
249        try {
250            if (metric != null) {
251                metric.getCurrentIngest().inc();
252            }
253            bulkProcessor.add(new IndexRequest(index).type(type).id(id).create(false).source(source));
254        } catch (Exception e) {
255            throwable = e;
256            closed = true;
257            logger.error("bulk add of index request failed: " + e.getMessage(), e);
258        } finally {
259            if (metric != null) {
260                metric.getCurrentIngest().dec();
261            }
262        }
263        return this;
264    }
265
266    @Override
267    public BulkNodeClient bulkIndex(IndexRequest indexRequest) {
268        if (closed) {
269            throw new ElasticsearchIllegalStateException("client is closed");
270        }
271        try {
272            if (suspended) {
273                Thread.sleep(1000L);
274                return this;
275            }
276            if (metric != null) {
277                metric.getCurrentIngest().inc();
278            }
279            bulkProcessor.add(indexRequest);
280        } catch (Exception e) {
281            throwable = e;
282            closed = true;
283            logger.error("bulk add of index request failed: " + e.getMessage(), e);
284        } finally {
285            if (metric != null) {
286                metric.getCurrentIngest().dec();
287            }
288        }
289        return this;
290    }
291
292    @Override
293    public BulkNodeClient delete(String index, String type, String id) {
294        if (closed) {
295            throw new ElasticsearchIllegalStateException("client is closed");
296        }
297        try {
298            if (suspended) {
299                Thread.sleep(1000L);
300                return this;
301            }
302            if (metric != null) {
303                metric.getCurrentIngest().inc();
304            }
305            bulkProcessor.add(new DeleteRequest(index).type(type).id(id));
306        } catch (Exception e) {
307            throwable = e;
308            closed = true;
309            logger.error("bulk add of delete failed: " + e.getMessage(), e);
310        } finally {
311            if (metric != null) {
312                metric.getCurrentIngest().dec();
313            }
314        }
315        return this;
316    }
317
318    @Override
319    public BulkNodeClient bulkDelete(DeleteRequest deleteRequest) {
320        if (closed) {
321            throw new ElasticsearchIllegalStateException("client is closed");
322        }
323        try {
324            if (suspended) {
325                Thread.sleep(1000L);
326                return this;
327            }
328            if (metric != null) {
329                metric.getCurrentIngest().inc();
330            }
331            bulkProcessor.add(deleteRequest);
332        } catch (Exception e) {
333            throwable = e;
334            closed = true;
335            logger.error("bulk add of delete failed: " + e.getMessage(), e);
336        } finally {
337            if (metric != null) {
338                metric.getCurrentIngest().dec();
339            }
340        }
341        return this;
342    }
343
344    @Override
345    public BulkNodeClient flushIngest() {
346        if (closed) {
347            throw new ElasticsearchIllegalStateException("client is closed");
348        }
349        logger.debug("flushing bulk processor");
350        BulkProcessorHelper.flush(bulkProcessor);
351        return this;
352    }
353
354    @Override
355    public BulkNodeClient waitForResponses(TimeValue maxWaitTime) throws InterruptedException {
356        if (closed) {
357            throw new ElasticsearchIllegalStateException("client is closed");
358        }
359        if (metric.getCurrentIngest().count() == 0) {
360            logger.debug("no current ingests");
361            return this;
362        }
363        BulkProcessorHelper.waitFor(bulkProcessor, maxWaitTime);
364        return this;
365    }
366
367    @Override
368    public BulkNodeClient startBulk(String index) throws IOException {
369        if (metric == null) {
370            return this;
371        }
372        if (!metric.isBulk(index)) {
373            metric.startBulk(index);
374            ClientHelper.disableRefresh(client, index);
375        }
376        return this;
377    }
378
379    @Override
380    public BulkNodeClient stopBulk(String index) throws IOException {
381        if (metric == null) {
382            return this;
383        }
384        if (metric.isBulk(index)) {
385            metric.stopBulk(index);
386            ClientHelper.enableRefresh(client, index);
387        }
388        return this;
389    }
390
391    @Override
392    public BulkNodeClient flush(String index) {
393        ClientHelper.flush(client, index);
394        return this;
395    }
396
397    @Override
398    public BulkNodeClient refresh(String index) {
399        ClientHelper.refresh(client, index);
400        return this;
401    }
402
403    @Override
404    public int updateReplicaLevel(String index, int level) throws IOException {
405        return ClientHelper.updateReplicaLevel(client, index, level);
406    }
407
408
409    @Override
410    public BulkNodeClient waitForCluster(ClusterHealthStatus status, TimeValue timeout) throws IOException {
411        ClientHelper.waitForCluster(client, status, timeout);
412        return this;
413    }
414
415    @Override
416    public int waitForRecovery(String index) throws IOException {
417        return ClientHelper.waitForRecovery(client, index);
418    }
419
420    @Override
421    public synchronized void shutdown() {
422        try {
423            if (bulkProcessor != null) {
424                logger.debug("closing bulk processor...");
425                bulkProcessor.close();
426            }
427            if (metric != null && metric.indices() != null && !metric.indices().isEmpty()) {
428                logger.debug("stopping bulk mode for indices {}...", metric.indices());
429                for (String index : ImmutableSet.copyOf(metric.indices())) {
430                    stopBulk(index);
431                }
432            }
433            logger.debug("shutting down...");
434            client.close();
435            logger.debug("shutting down completed");
436        } catch (Exception e) {
437            logger.error(e.getMessage(), e);
438        } finally {
439            isShutdown = true;
440        }
441    }
442
443    public boolean isShutdown() {
444        return isShutdown;
445    }
446
447    @Override
448    public void suspend() {
449        suspended = true;
450    }
451
452    @Override
453    public void resume() {
454        suspended = false;
455    }
456
457    @Override
458    public BulkNodeClient newIndex(String index) throws IOException {
459        return newIndex(index, null, null);
460    }
461
462    @Override
463    public BulkNodeClient newIndex(String index, String type, InputStream settings, InputStream mappings) throws IOException {
464        configHelper.reset();
465        configHelper.setting(settings);
466        configHelper.mapping(type, mappings);
467        return newIndex(index, configHelper.settings(), configHelper.mappings());
468    }
469
470    @Override
471    public BulkNodeClient newIndex(String index, Settings settings, Map<String, String> mappings) throws IOException {
472        if (closed) {
473            throw new ElasticsearchIllegalStateException("client is closed");
474        }
475        if (client == null) {
476            logger.warn("no client for create index");
477            return this;
478        }
479        if (index == null) {
480            logger.warn("no index name given to create index");
481            return this;
482        }
483        CreateIndexRequestBuilder createIndexRequestBuilder =
484                new CreateIndexRequestBuilder(client.admin().indices()).setIndex(index);
485        Settings concreteSettings;
486        if (settings == null && getSettings() != null) {
487            concreteSettings = getSettings();
488        } else if (settings != null) {
489            concreteSettings = settings;
490        } else {
491            concreteSettings = null;
492        }
493        if (concreteSettings != null) {
494            logger.info("newIndex: settings = {}", concreteSettings.getAsMap());
495            createIndexRequestBuilder.setSettings(concreteSettings);
496        }
497        if (mappings == null && getMappings() != null) {
498            for (String type : getMappings().keySet()) {
499                logger.info("newIndex: type = {} mappings = {}", type, getMappings().get(type));
500                createIndexRequestBuilder.addMapping(type, getMappings().get(type));
501            }
502        } else if (mappings != null) {
503            for (String type : mappings.keySet()) {
504                logger.info("newIndex: type = {} mappings = {}", type, mappings.get(type));
505                createIndexRequestBuilder.addMapping(type, mappings.get(type));
506            }
507        }
508        CreateIndexResponse response = createIndexRequestBuilder.execute().actionGet();
509        if (response.isAcknowledged()) {
510            logger.info("index {} created", index);
511        } else {
512            throw new IOException("creation of index " + index + " not acknowledged");
513        }
514        return this;
515    }
516
517    @Override
518    public BulkNodeClient deleteIndex(String index) {
519        if (closed) {
520            throw new ElasticsearchIllegalStateException("client is closed");
521        }
522        if (client == null) {
523            logger.warn("no client");
524            return this;
525        }
526        if (index == null) {
527            logger.warn("no index name given to delete index");
528            return this;
529        }
530        DeleteIndexRequestBuilder deleteIndexRequestBuilder =
531                new DeleteIndexRequestBuilder(client.admin().indices(), index);
532        deleteIndexRequestBuilder.execute().actionGet();
533        return this;
534    }
535
536    @Override
537    public boolean hasThrowable() {
538        return throwable != null;
539    }
540
541    @Override
542    public Throwable getThrowable() {
543        return throwable;
544    }
545
546    public void setSettings(Settings settings) {
547        configHelper.settings(settings);
548    }
549
550    public Settings getSettings() {
551        return configHelper.settings();
552    }
553
554    public ImmutableSettings.Builder getSettingsBuilder() {
555        return configHelper.settingsBuilder();
556    }
557
558    public void setting(InputStream in) throws IOException {
559        configHelper.setting(in);
560    }
561
562    public void addSetting(String key, String value) {
563        configHelper.setting(key, value);
564    }
565
566    public void addSetting(String key, Boolean value) {
567        configHelper.setting(key, value);
568    }
569
570    public void addSetting(String key, Integer value) {
571        configHelper.setting(key, value);
572    }
573
574    public void mapping(String type, InputStream in) throws IOException {
575        configHelper.mapping(type, in);
576    }
577
578    public void mapping(String type, String mapping) throws IOException {
579        configHelper.mapping(type, mapping);
580    }
581
582    public Map<String, String> getMappings() {
583        return configHelper.mappings();
584    }
585
586}