001/* 002 * Copyright (C) 2014 Jörg Prante 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.xbib.elasticsearch.plugin.jdbc.client.node; 017 018import org.elasticsearch.ElasticsearchIllegalStateException; 019import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; 020import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; 021import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; 022import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder; 023import org.elasticsearch.action.bulk.BulkItemResponse; 024import org.elasticsearch.action.bulk.BulkProcessor; 025import org.elasticsearch.action.bulk.BulkRequest; 026import org.elasticsearch.action.bulk.BulkResponse; 027import org.elasticsearch.action.delete.DeleteRequest; 028import org.elasticsearch.action.index.IndexRequest; 029import org.elasticsearch.client.Client; 030import org.elasticsearch.common.collect.ImmutableSet; 031import org.elasticsearch.common.logging.ESLogger; 032import org.elasticsearch.common.logging.ESLoggerFactory; 033import org.elasticsearch.common.settings.ImmutableSettings; 034import org.elasticsearch.common.settings.Settings; 035import org.elasticsearch.common.unit.ByteSizeUnit; 036import org.elasticsearch.common.unit.ByteSizeValue; 037import org.elasticsearch.common.unit.TimeValue; 038import org.xbib.elasticsearch.plugin.jdbc.client.BulkProcessorHelper; 039import org.xbib.elasticsearch.plugin.jdbc.client.ClientHelper; 040import org.xbib.elasticsearch.plugin.jdbc.client.ConfigHelper; 041import org.xbib.elasticsearch.plugin.jdbc.client.Ingest; 042import org.xbib.elasticsearch.plugin.jdbc.client.Metric; 043 044import java.io.IOException; 045import java.io.InputStream; 046import java.util.Arrays; 047import java.util.List; 048import java.util.Map; 049 050/** 051 * Node client support with bulk processing 052 */ 053public class BulkNodeClient implements Ingest { 054 055 private final static ESLogger logger = ESLoggerFactory.getLogger("river.jdbc.BulkNodeClient"); 056 057 private int maxActionsPerBulkRequest = 100; 058 059 private int maxConcurrentBulkRequests = Runtime.getRuntime().availableProcessors() * 2; 060 061 private ByteSizeValue maxVolume = new ByteSizeValue(10, ByteSizeUnit.MB); 062 063 private TimeValue flushInterval = TimeValue.timeValueSeconds(30); 064 065 private final ConfigHelper configHelper = new ConfigHelper(); 066 067 private Client client; 068 069 private BulkProcessor bulkProcessor; 070 071 private boolean isShutdown = false; 072 073 private Metric metric = new Metric(); 074 075 private boolean closed = false; 076 077 private boolean suspended = false; 078 079 private Throwable throwable; 080 081 @Override 082 public BulkNodeClient shards(int shards) { 083 configHelper.setting("index.number_of_shards", shards); 084 return this; 085 } 086 087 @Override 088 public BulkNodeClient replica(int replica) { 089 configHelper.setting("index.number_of_replica", replica); 090 return this; 091 } 092 093 @Override 094 public BulkNodeClient maxActionsPerBulkRequest(int maxActionsPerBulkRequest) { 095 this.maxActionsPerBulkRequest = maxActionsPerBulkRequest; 096 return this; 097 } 098 099 @Override 100 public BulkNodeClient maxConcurrentBulkRequests(int maxConcurrentBulkRequests) { 101 this.maxConcurrentBulkRequests = maxConcurrentBulkRequests; 102 return this; 103 } 104 105 @Override 106 public BulkNodeClient maxVolumePerBulkRequest(ByteSizeValue maxVolume) { 107 this.maxVolume = maxVolume; 108 return this; 109 } 110 111 @Override 112 public BulkNodeClient maxRequestWait(TimeValue timeValue) { 113 // ignore, not implemented 114 return this; 115 } 116 117 @Override 118 public BulkNodeClient flushIngestInterval(TimeValue flushInterval) { 119 this.flushInterval = flushInterval; 120 return this; 121 } 122 123 @Override 124 public BulkNodeClient newClient(Settings settings) { 125 throw new UnsupportedOperationException(); 126 } 127 128 @Override 129 public BulkNodeClient newClient(Map<String, String> client) { 130 throw new UnsupportedOperationException(); 131 } 132 133 @Override 134 public List<String> getConnectedNodes() { 135 return Arrays.asList(client.toString()); 136 } 137 138 @Override 139 public BulkNodeClient newClient(Client client) { 140 this.client = client; 141 this.metric = new Metric(); 142 metric.start(); 143 BulkProcessor.Listener listener = new BulkProcessor.Listener() { 144 @Override 145 public void beforeBulk(long executionId, BulkRequest request) { 146 metric.getCurrentIngest().inc(); 147 long l = metric.getCurrentIngest().count(); 148 int n = request.numberOfActions(); 149 metric.getSubmitted().inc(n); 150 metric.getCurrentIngestNumDocs().inc(n); 151 metric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes()); 152 logger.debug("before bulk [{}] [actions={}] [bytes={}] [concurrent requests={}]", 153 executionId, 154 request.numberOfActions(), 155 request.estimatedSizeInBytes(), 156 l); 157 } 158 159 @Override 160 public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { 161 metric.getCurrentIngest().dec(); 162 long l = metric.getCurrentIngest().count(); 163 metric.getSucceeded().inc(response.getItems().length); 164 metric.getFailed().inc(0); 165 metric.getTotalIngest().inc(response.getTookInMillis()); 166 int n = 0; 167 for (BulkItemResponse itemResponse : response.getItems()) { 168 if (itemResponse.isFailed()) { 169 n++; 170 metric.getSucceeded().dec(1); 171 metric.getFailed().inc(1); 172 } 173 } 174 logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] {} concurrent requests", 175 executionId, 176 metric.getSucceeded().count(), 177 metric.getFailed().count(), 178 response.getTook().millis(), 179 l); 180 if (n > 0) { 181 logger.error("bulk [{}] failed with {} failed items, failure message = {}", 182 executionId, n, response.buildFailureMessage()); 183 } else { 184 metric.getCurrentIngestNumDocs().dec(response.getItems().length); 185 } 186 } 187 188 @Override 189 public void afterBulk(long executionId, BulkRequest request, Throwable failure) { 190 metric.getCurrentIngest().dec(); 191 throwable = failure; 192 closed = true; 193 logger.error("after bulk [" + executionId + "] error", failure); 194 } 195 }; 196 BulkProcessor.Builder builder = BulkProcessor.builder(client, listener) 197 .setBulkActions(maxActionsPerBulkRequest) // off-by-one 198 .setConcurrentRequests(maxConcurrentBulkRequests) 199 .setFlushInterval(flushInterval); 200 if (maxVolume != null) { 201 builder.setBulkSize(maxVolume); 202 } 203 this.bulkProcessor = builder.build(); 204 try { 205 waitForCluster(ClusterHealthStatus.YELLOW, TimeValue.timeValueSeconds(30)); 206 closed = false; 207 } catch (IOException e) { 208 logger.error(e.getMessage(), e); 209 closed = true; 210 } 211 return this; 212 } 213 214 @Override 215 public Client client() { 216 return client; 217 } 218 219 @Override 220 public Metric getMetric() { 221 return metric; 222 } 223 224 @Override 225 public BulkNodeClient putMapping(String index) { 226 if (client == null) { 227 logger.warn("no client for put mapping"); 228 return this; 229 } 230 configHelper.putMapping(client, index); 231 return this; 232 } 233 234 @Override 235 public BulkNodeClient deleteMapping(String index, String type) { 236 if (client == null) { 237 logger.warn("no client for delete mapping"); 238 return this; 239 } 240 configHelper.deleteMapping(client, index, type); 241 return this; 242 } 243 244 @Override 245 public BulkNodeClient index(String index, String type, String id, String source) { 246 if (closed) { 247 throw new ElasticsearchIllegalStateException("client is closed"); 248 } 249 try { 250 if (metric != null) { 251 metric.getCurrentIngest().inc(); 252 } 253 bulkProcessor.add(new IndexRequest(index).type(type).id(id).create(false).source(source)); 254 } catch (Exception e) { 255 throwable = e; 256 closed = true; 257 logger.error("bulk add of index request failed: " + e.getMessage(), e); 258 } finally { 259 if (metric != null) { 260 metric.getCurrentIngest().dec(); 261 } 262 } 263 return this; 264 } 265 266 @Override 267 public BulkNodeClient bulkIndex(IndexRequest indexRequest) { 268 if (closed) { 269 throw new ElasticsearchIllegalStateException("client is closed"); 270 } 271 try { 272 if (suspended) { 273 Thread.sleep(1000L); 274 return this; 275 } 276 if (metric != null) { 277 metric.getCurrentIngest().inc(); 278 } 279 bulkProcessor.add(indexRequest); 280 } catch (Exception e) { 281 throwable = e; 282 closed = true; 283 logger.error("bulk add of index request failed: " + e.getMessage(), e); 284 } finally { 285 if (metric != null) { 286 metric.getCurrentIngest().dec(); 287 } 288 } 289 return this; 290 } 291 292 @Override 293 public BulkNodeClient delete(String index, String type, String id) { 294 if (closed) { 295 throw new ElasticsearchIllegalStateException("client is closed"); 296 } 297 try { 298 if (suspended) { 299 Thread.sleep(1000L); 300 return this; 301 } 302 if (metric != null) { 303 metric.getCurrentIngest().inc(); 304 } 305 bulkProcessor.add(new DeleteRequest(index).type(type).id(id)); 306 } catch (Exception e) { 307 throwable = e; 308 closed = true; 309 logger.error("bulk add of delete failed: " + e.getMessage(), e); 310 } finally { 311 if (metric != null) { 312 metric.getCurrentIngest().dec(); 313 } 314 } 315 return this; 316 } 317 318 @Override 319 public BulkNodeClient bulkDelete(DeleteRequest deleteRequest) { 320 if (closed) { 321 throw new ElasticsearchIllegalStateException("client is closed"); 322 } 323 try { 324 if (suspended) { 325 Thread.sleep(1000L); 326 return this; 327 } 328 if (metric != null) { 329 metric.getCurrentIngest().inc(); 330 } 331 bulkProcessor.add(deleteRequest); 332 } catch (Exception e) { 333 throwable = e; 334 closed = true; 335 logger.error("bulk add of delete failed: " + e.getMessage(), e); 336 } finally { 337 if (metric != null) { 338 metric.getCurrentIngest().dec(); 339 } 340 } 341 return this; 342 } 343 344 @Override 345 public BulkNodeClient flushIngest() { 346 if (closed) { 347 throw new ElasticsearchIllegalStateException("client is closed"); 348 } 349 logger.debug("flushing bulk processor"); 350 BulkProcessorHelper.flush(bulkProcessor); 351 return this; 352 } 353 354 @Override 355 public BulkNodeClient waitForResponses(TimeValue maxWaitTime) throws InterruptedException { 356 if (closed) { 357 throw new ElasticsearchIllegalStateException("client is closed"); 358 } 359 if (metric.getCurrentIngest().count() == 0) { 360 logger.debug("no current ingests"); 361 return this; 362 } 363 BulkProcessorHelper.waitFor(bulkProcessor, maxWaitTime); 364 return this; 365 } 366 367 @Override 368 public BulkNodeClient startBulk(String index) throws IOException { 369 if (metric == null) { 370 return this; 371 } 372 if (!metric.isBulk(index)) { 373 metric.startBulk(index); 374 ClientHelper.disableRefresh(client, index); 375 } 376 return this; 377 } 378 379 @Override 380 public BulkNodeClient stopBulk(String index) throws IOException { 381 if (metric == null) { 382 return this; 383 } 384 if (metric.isBulk(index)) { 385 metric.stopBulk(index); 386 ClientHelper.enableRefresh(client, index); 387 } 388 return this; 389 } 390 391 @Override 392 public BulkNodeClient flush(String index) { 393 ClientHelper.flush(client, index); 394 return this; 395 } 396 397 @Override 398 public BulkNodeClient refresh(String index) { 399 ClientHelper.refresh(client, index); 400 return this; 401 } 402 403 @Override 404 public int updateReplicaLevel(String index, int level) throws IOException { 405 return ClientHelper.updateReplicaLevel(client, index, level); 406 } 407 408 409 @Override 410 public BulkNodeClient waitForCluster(ClusterHealthStatus status, TimeValue timeout) throws IOException { 411 ClientHelper.waitForCluster(client, status, timeout); 412 return this; 413 } 414 415 @Override 416 public int waitForRecovery(String index) throws IOException { 417 return ClientHelper.waitForRecovery(client, index); 418 } 419 420 @Override 421 public synchronized void shutdown() { 422 try { 423 if (bulkProcessor != null) { 424 logger.debug("closing bulk processor..."); 425 bulkProcessor.close(); 426 } 427 if (metric != null && metric.indices() != null && !metric.indices().isEmpty()) { 428 logger.debug("stopping bulk mode for indices {}...", metric.indices()); 429 for (String index : ImmutableSet.copyOf(metric.indices())) { 430 stopBulk(index); 431 } 432 } 433 logger.debug("shutting down..."); 434 client.close(); 435 logger.debug("shutting down completed"); 436 } catch (Exception e) { 437 logger.error(e.getMessage(), e); 438 } finally { 439 isShutdown = true; 440 } 441 } 442 443 public boolean isShutdown() { 444 return isShutdown; 445 } 446 447 @Override 448 public void suspend() { 449 suspended = true; 450 } 451 452 @Override 453 public void resume() { 454 suspended = false; 455 } 456 457 @Override 458 public BulkNodeClient newIndex(String index) throws IOException { 459 return newIndex(index, null, null); 460 } 461 462 @Override 463 public BulkNodeClient newIndex(String index, String type, InputStream settings, InputStream mappings) throws IOException { 464 configHelper.reset(); 465 configHelper.setting(settings); 466 configHelper.mapping(type, mappings); 467 return newIndex(index, configHelper.settings(), configHelper.mappings()); 468 } 469 470 @Override 471 public BulkNodeClient newIndex(String index, Settings settings, Map<String, String> mappings) throws IOException { 472 if (closed) { 473 throw new ElasticsearchIllegalStateException("client is closed"); 474 } 475 if (client == null) { 476 logger.warn("no client for create index"); 477 return this; 478 } 479 if (index == null) { 480 logger.warn("no index name given to create index"); 481 return this; 482 } 483 CreateIndexRequestBuilder createIndexRequestBuilder = 484 new CreateIndexRequestBuilder(client.admin().indices()).setIndex(index); 485 Settings concreteSettings; 486 if (settings == null && getSettings() != null) { 487 concreteSettings = getSettings(); 488 } else if (settings != null) { 489 concreteSettings = settings; 490 } else { 491 concreteSettings = null; 492 } 493 if (concreteSettings != null) { 494 logger.info("newIndex: settings = {}", concreteSettings.getAsMap()); 495 createIndexRequestBuilder.setSettings(concreteSettings); 496 } 497 if (mappings == null && getMappings() != null) { 498 for (String type : getMappings().keySet()) { 499 logger.info("newIndex: type = {} mappings = {}", type, getMappings().get(type)); 500 createIndexRequestBuilder.addMapping(type, getMappings().get(type)); 501 } 502 } else if (mappings != null) { 503 for (String type : mappings.keySet()) { 504 logger.info("newIndex: type = {} mappings = {}", type, mappings.get(type)); 505 createIndexRequestBuilder.addMapping(type, mappings.get(type)); 506 } 507 } 508 CreateIndexResponse response = createIndexRequestBuilder.execute().actionGet(); 509 if (response.isAcknowledged()) { 510 logger.info("index {} created", index); 511 } else { 512 throw new IOException("creation of index " + index + " not acknowledged"); 513 } 514 return this; 515 } 516 517 @Override 518 public BulkNodeClient deleteIndex(String index) { 519 if (closed) { 520 throw new ElasticsearchIllegalStateException("client is closed"); 521 } 522 if (client == null) { 523 logger.warn("no client"); 524 return this; 525 } 526 if (index == null) { 527 logger.warn("no index name given to delete index"); 528 return this; 529 } 530 DeleteIndexRequestBuilder deleteIndexRequestBuilder = 531 new DeleteIndexRequestBuilder(client.admin().indices(), index); 532 deleteIndexRequestBuilder.execute().actionGet(); 533 return this; 534 } 535 536 @Override 537 public boolean hasThrowable() { 538 return throwable != null; 539 } 540 541 @Override 542 public Throwable getThrowable() { 543 return throwable; 544 } 545 546 public void setSettings(Settings settings) { 547 configHelper.settings(settings); 548 } 549 550 public Settings getSettings() { 551 return configHelper.settings(); 552 } 553 554 public ImmutableSettings.Builder getSettingsBuilder() { 555 return configHelper.settingsBuilder(); 556 } 557 558 public void setting(InputStream in) throws IOException { 559 configHelper.setting(in); 560 } 561 562 public void addSetting(String key, String value) { 563 configHelper.setting(key, value); 564 } 565 566 public void addSetting(String key, Boolean value) { 567 configHelper.setting(key, value); 568 } 569 570 public void addSetting(String key, Integer value) { 571 configHelper.setting(key, value); 572 } 573 574 public void mapping(String type, InputStream in) throws IOException { 575 configHelper.mapping(type, in); 576 } 577 578 public void mapping(String type, String mapping) throws IOException { 579 configHelper.mapping(type, mapping); 580 } 581 582 public Map<String, String> getMappings() { 583 return configHelper.mappings(); 584 } 585 586}