001package org.xbib.elasticsearch.action.websocket.bulk; 002 003import org.elasticsearch.ElasticsearchIllegalStateException; 004import org.elasticsearch.action.ActionListener; 005import org.elasticsearch.action.ActionRequest; 006import org.elasticsearch.action.bulk.BulkItemResponse; 007import org.elasticsearch.action.bulk.BulkRequest; 008import org.elasticsearch.action.bulk.BulkResponse; 009import org.elasticsearch.action.delete.DeleteRequest; 010import org.elasticsearch.action.index.IndexRequest; 011import org.elasticsearch.client.Client; 012import org.elasticsearch.common.Nullable; 013import org.elasticsearch.common.settings.Settings; 014import org.elasticsearch.common.unit.TimeValue; 015import org.elasticsearch.common.util.concurrent.ConcurrentCollections; 016import org.elasticsearch.common.util.concurrent.EsExecutors; 017import org.elasticsearch.common.xcontent.XContentBuilder; 018import org.elasticsearch.common.xcontent.XContentBuilderString; 019import org.xbib.elasticsearch.websocket.BaseInteractiveHandler; 020import org.xbib.elasticsearch.websocket.InteractiveChannel; 021import org.xbib.elasticsearch.websocket.InteractiveRequest; 022 023import java.io.IOException; 024import java.util.Queue; 025import java.util.concurrent.Executors; 026import java.util.concurrent.ScheduledFuture; 027import java.util.concurrent.ScheduledThreadPoolExecutor; 028import java.util.concurrent.Semaphore; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicLong; 031 032import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; 033 034/** 035 * The bulk handler is derived from the BulkProcessor, but 036 * offers explicit flushing and can receive requests from multiple threads 037 * into a global action queue. 038 * It supports websocket bulk actions and can write back response to an 039 * interactive channel. 040 * The bulk volume is not controlled. 041 * The default concurrency is 32, the number of actions in a bulk is 100. 042 */ 043public class BulkHandler extends BaseInteractiveHandler { 044 045 private final BulkHandler.Listener listener; 046 047 private final int concurrentRequests; 048 049 private final int bulkActions; 050 051 private final TimeValue flushInterval; 052 053 private final Semaphore semaphore; 054 055 private final ScheduledThreadPoolExecutor scheduler; 056 057 private final ScheduledFuture scheduledFuture; 058 059 private final AtomicLong executionIdGen = new AtomicLong(); 060 061 private final static Queue<ActionRequest> bulk = ConcurrentCollections.newQueue(); 062 063 private volatile boolean closed = false; 064 065 @Override 066 public void handleRequest(final InteractiveRequest request, final InteractiveChannel channel) { 067 // will be overriden by bulk action 068 } 069 070 /** 071 * A listener for the execution. 072 */ 073 public static interface Listener { 074 075 /** 076 * Callback before the bulk is executed. 077 */ 078 void beforeBulk(long executionId, BulkRequest request); 079 080 /** 081 * Callback after a successful execution of bulk request. 082 */ 083 void afterBulk(long executionId, BulkRequest request, BulkResponse response); 084 085 /** 086 * Callback after a failed execution of bulk request. 087 */ 088 void afterBulk(long executionId, BulkRequest request, Throwable failure); 089 } 090 091 /** 092 * A listener adapter 093 */ 094 class ListenerAdapter implements BulkHandler.Listener { 095 096 @Override 097 public void beforeBulk(long executionId, BulkRequest request) { 098 } 099 100 @Override 101 public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { 102 } 103 104 @Override 105 public void afterBulk(long executionId, BulkRequest request, Throwable failure) { 106 } 107 } 108 109 /** 110 * A builder used to create a build an instance of a bulk processor. 111 */ 112 public static class Builder { 113 114 private Settings settings; 115 private Client client; 116 private BulkHandler.Listener listener; 117 private int concurrentRequests = 1; 118 private int bulkActions = 100; 119 private TimeValue flushInterval = null; 120 121 /** 122 * Creates a builder of bulk processor with the client to use and the 123 * listener that will be used to be notified on the completion of bulk 124 * requests. 125 */ 126 public Builder(Client client, BulkHandler.Listener listener) { 127 this.client = client; 128 this.listener = listener; 129 } 130 131 /** 132 * Sets the number of concurrent requests allowed to be executed. A 133 * value of 0 means that only a single request will be allowed to be 134 * executed. A value of 1 means 1 concurrent request is allowed to be 135 * executed while accumulating new bulk requests. Defaults to 136 * <tt>1</tt>. 137 */ 138 public BulkHandler.Builder setConcurrentRequests(int concurrentRequests) { 139 this.concurrentRequests = concurrentRequests; 140 return this; 141 } 142 143 /** 144 * Sets when to flush a new bulk request based on the number of actions 145 * currently added. Defaults to <tt>1000</tt>. Can be set to <tt>-1</tt> 146 * to disable it. 147 */ 148 public BulkHandler.Builder setBulkActions(int bulkActions) { 149 this.bulkActions = bulkActions; 150 return this; 151 } 152 153 /** 154 * Sets a flush interval flushing *any* bulk actions pending if the 155 * interval passes. Defaults to not set. 156 * <p/> 157 * Note, {@link #setBulkActions(int)} can 158 * be set to <tt>-1</tt> with the flush interval set allowing for 159 * complete async processing of bulk actions. 160 */ 161 public BulkHandler.Builder setFlushInterval(TimeValue flushInterval) { 162 this.flushInterval = flushInterval; 163 return this; 164 } 165 166 /** 167 * Builds a new bulk processor. 168 */ 169 public BulkHandler build() { 170 return new BulkHandler(settings, client, listener, concurrentRequests, bulkActions, flushInterval); 171 } 172 } 173 174 public static BulkHandler.Builder builder(Client client, BulkHandler.Listener listener) { 175 return new BulkHandler.Builder(client, listener); 176 } 177 178 public BulkHandler(Settings settings, Client client) { 179 super(settings, client); 180 this.listener = new ListenerAdapter(); 181 this.concurrentRequests = 32; 182 this.bulkActions = 100; 183 this.semaphore = new Semaphore(concurrentRequests); 184 this.flushInterval = null; 185 this.scheduler = null; 186 this.scheduledFuture = null; 187 } 188 189 BulkHandler(Settings settings, Client client, BulkHandler.Listener listener, int concurrentRequests, int bulkActions, @Nullable TimeValue flushInterval) { 190 super(settings, client); 191 this.listener = listener; 192 this.concurrentRequests = concurrentRequests; 193 this.bulkActions = bulkActions; 194 this.semaphore = new Semaphore(concurrentRequests); 195 this.flushInterval = flushInterval; 196 if (flushInterval != null) { 197 this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(client.settings(), "websocket_bulk_processor")); 198 this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); 199 this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); 200 this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(new BulkHandler.Flush(), flushInterval.millis(), flushInterval.millis(), TimeUnit.MILLISECONDS); 201 } else { 202 this.scheduler = null; 203 this.scheduledFuture = null; 204 } 205 } 206 207 public BulkHandler.Listener getListener() { 208 return listener; 209 } 210 211 /** 212 * Flushes open bulk actions 213 */ 214 public synchronized void flush() { 215 if (closed) { 216 return; 217 } 218 if (bulk.size() > 0) { 219 execute(null); 220 } 221 } 222 223 /** 224 * Flushes open bulk actions 225 */ 226 public synchronized void flush(InteractiveChannel channel) { 227 if (closed) { 228 return; 229 } 230 if (bulk.size() > 0) { 231 execute(channel); 232 } 233 } 234 235 /** 236 * Closes the processor. If flushing by time is enabled, then its shutdown. 237 * Any remaining bulk actions are flushed. 238 */ 239 public synchronized void close() { 240 if (closed) { 241 return; 242 } 243 closed = true; 244 if (this.scheduledFuture != null) { 245 this.scheduledFuture.cancel(false); 246 this.scheduler.shutdown(); 247 } 248 if (bulk.size() > 0) { 249 execute(null); 250 } 251 } 252 253 /** 254 * Adds an {@link IndexRequest} to the list of actions to execute. Follows 255 * the same behavior of {@link IndexRequest} (for example, if no id is 256 * provided, one will be generated, or usage of the create flag). 257 */ 258 public BulkHandler add(IndexRequest request) { 259 return add((ActionRequest) request); 260 } 261 262 /** 263 * Adds an {@link DeleteRequest} to the list of actions to execute. 264 */ 265 public BulkHandler add(DeleteRequest request) { 266 return add((ActionRequest) request); 267 } 268 269 public BulkHandler add(ActionRequest request) { 270 internalAdd(request); 271 return this; 272 } 273 274 private synchronized void internalAdd(ActionRequest request) { 275 bulk.add(request); 276 executeIfNeeded(); 277 } 278 279 private void executeIfNeeded() { 280 if (closed) { 281 throw new ElasticsearchIllegalStateException("bulk process already closed"); 282 } 283 if (!isOverTheLimit()) { 284 return; 285 } 286 execute(null); 287 } 288 289 /** 290 * Adds an {@link IndexRequest} to the list of actions to execute. Follows 291 * the same behavior of {@link IndexRequest} (for example, if no id is 292 * provided, one will be generated, or usage of the create flag). 293 */ 294 public BulkHandler add(IndexRequest request, InteractiveChannel channel) { 295 return add((ActionRequest) request, channel); 296 } 297 298 /** 299 * Adds an {@link DeleteRequest} to the list of actions to execute. 300 */ 301 public BulkHandler add(DeleteRequest request, InteractiveChannel channel) { 302 return add((ActionRequest) request, channel); 303 } 304 305 public BulkHandler add(ActionRequest request, InteractiveChannel channel) { 306 internalAdd(request, channel); 307 return this; 308 } 309 310 private synchronized void internalAdd(ActionRequest request, InteractiveChannel channel) { 311 bulk.add(request); 312 executeIfNeeded(channel); 313 } 314 315 private void executeIfNeeded(InteractiveChannel channel) { 316 if (closed) { 317 throw new ElasticsearchIllegalStateException("bulk process already closed"); 318 } 319 if (!isOverTheLimit()) { 320 return; 321 } 322 execute(channel); 323 } 324 325 // (currently) needs to be executed under a lock 326 private void execute(final InteractiveChannel channel) { 327 final BulkRequest bulkRequest = new BulkRequest().add(bulk); 328 bulk.clear(); 329 330 final long executionId = executionIdGen.incrementAndGet(); 331 332 if (concurrentRequests == 0) { 333 // execute in a blocking fashion... 334 try { 335 listener.beforeBulk(executionId, bulkRequest); 336 listener.afterBulk(executionId, bulkRequest, client.bulk(bulkRequest).actionGet()); 337 } catch (Exception e) { 338 listener.afterBulk(executionId, bulkRequest, e); 339 } 340 } else { 341 try { 342 semaphore.acquire(); 343 } catch (InterruptedException e) { 344 listener.afterBulk(executionId, bulkRequest, e); 345 return; 346 } 347 listener.beforeBulk(executionId, bulkRequest); 348 client.bulk(bulkRequest, new ActionListener<BulkResponse>() { 349 @Override 350 public void onResponse(BulkResponse response) { 351 try { 352 listener.afterBulk(executionId, bulkRequest, response); 353 if (channel != null) { 354 channel.sendResponse("bulkresponse", buildResponse(response)); 355 } 356 } catch (IOException e) { 357 logger.error("error while sending bulk response", e); 358 } finally { 359 semaphore.release(); 360 } 361 } 362 363 @Override 364 public void onFailure(Throwable t) { 365 try { 366 listener.afterBulk(executionId, bulkRequest, t); 367 if (channel != null) { 368 channel.sendResponse("bulkresponse", t); 369 } 370 } catch (IOException e) { 371 logger.error("error while sending bulk response", e); 372 } finally { 373 semaphore.release(); 374 } 375 } 376 }); 377 } 378 } 379 380 private boolean isOverTheLimit() { 381 if (bulkActions != -1 && bulk.size() > bulkActions) { 382 return true; 383 } 384 return false; 385 } 386 387 class Flush implements Runnable { 388 389 @Override 390 public void run() { 391 synchronized (BulkHandler.this) { 392 if (closed) { 393 return; 394 } 395 if (bulk.size() > 0) { 396 execute(null); 397 } 398 } 399 } 400 } 401 402 /** 403 * Taken from the REST bulk action. 404 * 405 * @param response the bulk response 406 * @return a content builder with the response 407 * @throws IOException 408 */ 409 private XContentBuilder buildResponse(BulkResponse response) throws IOException { 410 XContentBuilder builder = jsonBuilder(); 411 builder.startObject(); 412 builder.field(Fields.TOOK, response.getTookInMillis()); 413 builder.startArray(Fields.ITEMS); 414 for (BulkItemResponse itemResponse : response) { 415 builder.startObject(); 416 builder.startObject(itemResponse.getOpType()); 417 builder.field(Fields._INDEX, itemResponse.getIndex()); 418 builder.field(Fields._TYPE, itemResponse.getType()); 419 builder.field(Fields._ID, itemResponse.getId()); 420 long version = itemResponse.getVersion(); 421 if (version != -1) { 422 builder.field(Fields._VERSION, itemResponse.getVersion()); 423 } 424 if (itemResponse.isFailed()) { 425 builder.field(Fields.ERROR, itemResponse.getFailureMessage()); 426 } else { 427 builder.field(Fields.OK, true); 428 } 429 builder.endObject(); 430 builder.endObject(); 431 } 432 builder.endArray(); 433 builder.endObject(); 434 return builder; 435 } 436 437 static final class Fields { 438 439 static final XContentBuilderString ITEMS = new XContentBuilderString("items"); 440 static final XContentBuilderString _INDEX = new XContentBuilderString("_index"); 441 static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); 442 static final XContentBuilderString _ID = new XContentBuilderString("_id"); 443 static final XContentBuilderString ERROR = new XContentBuilderString("error"); 444 static final XContentBuilderString OK = new XContentBuilderString("ok"); 445 static final XContentBuilderString TOOK = new XContentBuilderString("took"); 446 static final XContentBuilderString _VERSION = new XContentBuilderString("_version"); 447 static final XContentBuilderString MATCHES = new XContentBuilderString("matches"); 448 } 449}