001/* 002 * Copyright (C) 2014 Jörg Prante 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.xbib.elasticsearch.plugin.jdbc.client.transport; 017 018import org.elasticsearch.ElasticsearchIllegalStateException; 019import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; 020import org.elasticsearch.action.bulk.BulkItemResponse; 021import org.elasticsearch.action.bulk.BulkProcessor; 022import org.elasticsearch.action.bulk.BulkRequest; 023import org.elasticsearch.action.bulk.BulkResponse; 024import org.elasticsearch.action.delete.DeleteRequest; 025import org.elasticsearch.action.index.IndexRequest; 026import org.elasticsearch.client.Client; 027import org.elasticsearch.common.collect.ImmutableSet; 028import org.elasticsearch.common.logging.ESLogger; 029import org.elasticsearch.common.logging.ESLoggerFactory; 030import org.elasticsearch.common.settings.ImmutableSettings; 031import org.elasticsearch.common.settings.Settings; 032import org.elasticsearch.common.unit.ByteSizeUnit; 033import org.elasticsearch.common.unit.ByteSizeValue; 034import org.elasticsearch.common.unit.TimeValue; 035import org.xbib.elasticsearch.plugin.jdbc.client.BaseIngestTransportClient; 036import org.xbib.elasticsearch.plugin.jdbc.client.BulkProcessorHelper; 037import org.xbib.elasticsearch.plugin.jdbc.client.ClientHelper; 038import org.xbib.elasticsearch.plugin.jdbc.client.Ingest; 039import org.xbib.elasticsearch.plugin.jdbc.client.Metric; 040 041import java.io.IOException; 042import java.util.Map; 043import java.util.concurrent.atomic.AtomicLong; 044 045/** 046 * Client using the BulkProcessor of Elasticsearch 047 */ 048public class BulkTransportClient extends BaseIngestTransportClient implements Ingest { 049 050 private final static ESLogger logger = ESLoggerFactory.getLogger("river.jdbc.BulkTransportClient"); 051 /** 052 * The default size of a bulk request 053 */ 054 private int maxActionsPerBulkRequest = 100; 055 /** 056 * The default number of maximum concurrent requests 057 */ 058 private int maxConcurrentBulkRequests = Runtime.getRuntime().availableProcessors() * 2; 059 /** 060 * The maximum volume 061 */ 062 private ByteSizeValue maxVolumePerBulkRequest = new ByteSizeValue(10, ByteSizeUnit.MB); 063 064 private TimeValue flushInterval = TimeValue.timeValueSeconds(30); 065 066 /** 067 * The concurrent requests 068 */ 069 private final AtomicLong concurrentRequestCounter = new AtomicLong(0L); 070 071 /** 072 * The BulkProcessor 073 */ 074 private BulkProcessor bulkProcessor; 075 076 private Metric metric; 077 078 private Throwable throwable; 079 080 private volatile boolean suspended = false; 081 082 private volatile boolean closed = false; 083 084 @Override 085 public BulkTransportClient maxActionsPerBulkRequest(int maxActionsPerBulkRequest) { 086 this.maxActionsPerBulkRequest = maxActionsPerBulkRequest; 087 return this; 088 } 089 090 @Override 091 public BulkTransportClient maxConcurrentBulkRequests(int maxConcurrentBulkRequests) { 092 this.maxConcurrentBulkRequests = maxConcurrentBulkRequests; 093 return this; 094 } 095 096 @Override 097 public BulkTransportClient maxVolumePerBulkRequest(ByteSizeValue maxVolumePerBulkRequest) { 098 this.maxVolumePerBulkRequest = maxVolumePerBulkRequest; 099 return this; 100 } 101 102 @Override 103 public BulkTransportClient maxRequestWait(TimeValue timeout) { 104 // ignore, not supported 105 return this; 106 } 107 108 @Override 109 public BulkTransportClient flushIngestInterval(TimeValue flushInterval) { 110 this.flushInterval = flushInterval; 111 return this; 112 } 113 114 public BulkTransportClient newClient(Client client) { 115 return this.newClient(findSettings()); 116 } 117 118 @Override 119 public BulkTransportClient newClient(Map<String, String> settings) { 120 return this.newClient(ImmutableSettings.settingsBuilder().put(settings).build()); 121 } 122 123 @Override 124 public BulkTransportClient newClient(Settings settings) { 125 super.newClient(settings); 126 resetSettings(); 127 this.metric = new Metric(); 128 metric.start(); 129 BulkProcessor.Listener listener = new BulkProcessor.Listener() { 130 @Override 131 public void beforeBulk(long executionId, BulkRequest request) { 132 long l = concurrentRequestCounter.getAndIncrement(); 133 if (metric != null) { 134 int n = request.numberOfActions(); 135 metric.getSubmitted().inc(n); 136 metric.getCurrentIngestNumDocs().inc(n); 137 metric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes()); 138 } 139 logger.debug("before bulk [{}] [actions={}] [bytes={}] [concurrent requests={}]", 140 executionId, 141 request.numberOfActions(), 142 request.estimatedSizeInBytes(), 143 l); 144 } 145 146 @Override 147 public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { 148 long l = concurrentRequestCounter.decrementAndGet(); 149 if (metric != null) { 150 metric.getSucceeded().inc(response.getItems().length); 151 metric.getTotalIngest().inc(response.getTookInMillis()); 152 } 153 int n = 0; 154 for (BulkItemResponse itemResponse : response.getItems()) { 155 if (itemResponse.isFailed()) { 156 n++; 157 metric.getSucceeded().dec(1); 158 metric.getFailed().inc(1); 159 } 160 } 161 logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] [concurrent requests={}]", 162 executionId, 163 metric.getSucceeded().count(), 164 metric.getFailed().count(), 165 response.getTook().millis(), 166 l); 167 if (n > 0) { 168 logger.error("bulk [{}] failed with {} failed items, failure message = {}", 169 executionId, n, response.buildFailureMessage()); 170 } else { 171 metric.getCurrentIngestNumDocs().dec(response.getItems().length); 172 } 173 } 174 175 @Override 176 public void afterBulk(long executionId, BulkRequest requst, Throwable failure) { 177 concurrentRequestCounter.decrementAndGet(); 178 throwable = failure; 179 closed = true; 180 logger.error("bulk [" + executionId + "] error", failure); 181 } 182 }; 183 BulkProcessor.Builder builder = BulkProcessor.builder(client, listener) 184 .setBulkActions(maxActionsPerBulkRequest) 185 .setConcurrentRequests(maxConcurrentBulkRequests) 186 .setFlushInterval(flushInterval); 187 if (maxVolumePerBulkRequest != null) { 188 builder.setBulkSize(maxVolumePerBulkRequest); 189 } 190 this.bulkProcessor = builder.build(); 191 this.closed = false; 192 return this; 193 } 194 195 @Override 196 public Client client() { 197 return client; 198 } 199 200 public BulkTransportClient shards(int value) { 201 super.shards(value); 202 return this; 203 } 204 205 public BulkTransportClient replica(int value) { 206 super.replica(value); 207 return this; 208 } 209 210 @Override 211 public BulkTransportClient newIndex(String index) { 212 if (closed) { 213 throw new ElasticsearchIllegalStateException("client is closed"); 214 } 215 super.newIndex(index); 216 return this; 217 } 218 219 @Override 220 public BulkTransportClient deleteIndex(String index) { 221 if (closed) { 222 throw new ElasticsearchIllegalStateException("client is closed"); 223 } 224 super.deleteIndex(index); 225 return this; 226 } 227 228 @Override 229 public BulkTransportClient startBulk(String index) throws IOException { 230 if (metric == null) { 231 return this; 232 } 233 if (!metric.isBulk(index)) { 234 metric.startBulk(index); 235 ClientHelper.disableRefresh(client, index); 236 } 237 return this; 238 } 239 240 @Override 241 public BulkTransportClient stopBulk(String index) throws IOException { 242 if (metric == null) { 243 return this; 244 } 245 if (metric.isBulk(index)) { 246 metric.stopBulk(index); 247 ClientHelper.enableRefresh(client, index); 248 } 249 return this; 250 } 251 252 @Override 253 public BulkTransportClient flush(String index) { 254 ClientHelper.flush(client, index); 255 return this; 256 } 257 258 @Override 259 public BulkTransportClient refresh(String index) { 260 ClientHelper.refresh(client, index); 261 return this; 262 } 263 264 @Override 265 public BulkTransportClient index(String index, String type, String id, String source) { 266 if (closed) { 267 throw new ElasticsearchIllegalStateException("client is closed"); 268 } 269 try { 270 if (suspended) { 271 Thread.sleep(1000L); 272 return this; 273 } 274 metric.getCurrentIngest().inc(); 275 bulkProcessor.add(new IndexRequest(index).type(type).id(id).create(false).source(source)); 276 } catch (Exception e) { 277 throwable = e; 278 closed = true; 279 logger.error("bulk add of index request failed: " + e.getMessage(), e); 280 } finally { 281 metric.getCurrentIngest().dec(); 282 } 283 return this; 284 } 285 286 @Override 287 public BulkTransportClient bulkIndex(IndexRequest indexRequest) { 288 if (closed) { 289 throw new ElasticsearchIllegalStateException("client is closed"); 290 } 291 try { 292 if (suspended) { 293 Thread.sleep(1000L); 294 return this; 295 } 296 metric.getCurrentIngest().inc(); 297 bulkProcessor.add(indexRequest); 298 } catch (Exception e) { 299 throwable = e; 300 closed = true; 301 logger.error("bulk add of index request failed: " + e.getMessage(), e); 302 } finally { 303 metric.getCurrentIngest().dec(); 304 } 305 return this; 306 } 307 308 @Override 309 public BulkTransportClient delete(String index, String type, String id) { 310 if (closed) { 311 throw new ElasticsearchIllegalStateException("client is closed"); 312 } 313 try { 314 if (suspended) { 315 Thread.sleep(1000L); 316 return this; 317 } 318 metric.getCurrentIngest().inc(); 319 bulkProcessor.add(new DeleteRequest(index).type(type).id(id)); 320 } catch (Exception e) { 321 throwable = e; 322 closed = true; 323 logger.error("bulk add of delete request failed: " + e.getMessage(), e); 324 } finally { 325 metric.getCurrentIngest().dec(); 326 } 327 return this; 328 } 329 330 @Override 331 public BulkTransportClient bulkDelete(DeleteRequest deleteRequest) { 332 if (closed) { 333 throw new ElasticsearchIllegalStateException("client is closed"); 334 } 335 try { 336 if (suspended) { 337 Thread.sleep(1000L); 338 return this; 339 } 340 metric.getCurrentIngest().inc(); 341 bulkProcessor.add(deleteRequest); 342 } catch (Exception e) { 343 throwable = e; 344 closed = true; 345 logger.error("bulk add of delete request failed: " + e.getMessage(), e); 346 } finally { 347 metric.getCurrentIngest().dec(); 348 } 349 return this; 350 } 351 352 @Override 353 public synchronized BulkTransportClient flushIngest() { 354 if (closed) { 355 throw new ElasticsearchIllegalStateException("client is closed"); 356 } 357 if (client == null) { 358 logger.warn("no client"); 359 return this; 360 } 361 logger.debug("flushing bulk processor"); 362 // hacked BulkProcessor to execute the submission of remaining docs. Wait always 30 seconds at most. 363 BulkProcessorHelper.flush(bulkProcessor); 364 return this; 365 } 366 367 @Override 368 public synchronized BulkTransportClient waitForResponses(TimeValue maxWaitTime) throws InterruptedException { 369 if (closed) { 370 throw new ElasticsearchIllegalStateException("client is closed"); 371 } 372 if (client == null) { 373 logger.warn("no client"); 374 return this; 375 } 376 if (metric.getCurrentIngest().count() == 0) { 377 logger.warn("no current ingests"); 378 return this; 379 } 380 BulkProcessorHelper.waitFor(bulkProcessor, maxWaitTime); 381 return this; 382 } 383 384 @Override 385 public BulkTransportClient waitForCluster(ClusterHealthStatus status, TimeValue timeValue) throws IOException { 386 ClientHelper.waitForCluster(client, status, timeValue); 387 return this; 388 } 389 390 @Override 391 public int waitForRecovery(String index) throws IOException { 392 return ClientHelper.waitForRecovery(client, index); 393 } 394 395 @Override 396 public int updateReplicaLevel(String index, int level) throws IOException { 397 return ClientHelper.updateReplicaLevel(client, index, level); 398 } 399 400 @Override 401 public synchronized void shutdown() { 402 if (closed) { 403 super.shutdown(); 404 throw new ElasticsearchIllegalStateException("client is closed"); 405 } 406 try { 407 if (bulkProcessor != null) { 408 logger.debug("closing bulk processor..."); 409 bulkProcessor.close(); 410 } 411 if (metric.indices() != null && !metric.indices().isEmpty()) { 412 logger.debug("stopping bulk mode for indices {}...", metric.indices()); 413 for (String index : ImmutableSet.copyOf(metric.indices())) { 414 stopBulk(index); 415 } 416 } 417 logger.debug("shutting down..."); 418 super.shutdown(); 419 logger.debug("shutting down completed"); 420 } catch (Exception e) { 421 logger.error(e.getMessage(), e); 422 } 423 } 424 425 @Override 426 public void suspend() { 427 suspended = true; 428 } 429 430 @Override 431 public void resume() { 432 suspended = false; 433 } 434 435 public Metric getMetric() { 436 return metric; 437 } 438 439 @Override 440 public boolean hasThrowable() { 441 return throwable != null; 442 } 443 444 @Override 445 public Throwable getThrowable() { 446 return throwable; 447 } 448}