001/* 002 * Copyright (C) 2014 Jörg Prante 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.xbib.elasticsearch.plugin.jdbc.pipeline.executor; 017 018import org.xbib.elasticsearch.plugin.jdbc.pipeline.Pipeline; 019import org.xbib.elasticsearch.plugin.jdbc.pipeline.PipelineExecutor; 020import org.xbib.elasticsearch.plugin.jdbc.pipeline.PipelineProvider; 021import org.xbib.elasticsearch.plugin.jdbc.pipeline.PipelineRequest; 022 023import java.io.IOException; 024import java.util.Collection; 025import java.util.LinkedList; 026import java.util.List; 027import java.util.concurrent.Callable; 028import java.util.concurrent.ExecutionException; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.Executors; 031import java.util.concurrent.Future; 032import java.util.concurrent.TimeUnit; 033 034/** 035 * A simple pipeline executor. 036 * 037 * @param <T> the pipeline result type 038 * @param <R> the pipeline request type 039 * @param <P> the pipeline type 040 */ 041public class SimplePipelineExecutor<T, R extends PipelineRequest, P extends Pipeline<T, R>> 042 implements PipelineExecutor<T, R, P> { 043 044 private ExecutorService executorService; 045 046 private Collection<P> pipelines; 047 048 private Collection<Future<T>> futures; 049 050 private PipelineProvider<P> provider; 051 052 private List<Throwable> exceptions; 053 054 private int concurrency; 055 056 @Override 057 public SimplePipelineExecutor<T, R, P> setConcurrency(int concurrency) { 058 this.concurrency = concurrency; 059 return this; 060 } 061 062 @Override 063 public SimplePipelineExecutor<T, R, P> setPipelineProvider(PipelineProvider<P> provider) { 064 this.provider = provider; 065 return this; 066 } 067 068 @Override 069 public SimplePipelineExecutor<T, R, P> prepare() { 070 if (provider == null) { 071 throw new IllegalStateException("no provider set"); 072 } 073 if (executorService == null) { 074 this.executorService = Executors.newFixedThreadPool(concurrency); 075 } 076 if (concurrency < 1) { 077 concurrency = 1; 078 } 079 this.pipelines = new LinkedList<P>(); 080 // limit to 256 to prevent unresponsive systems 081 for (int i = 0; i < Math.min(concurrency, 256); i++) { 082 pipelines.add(provider.get()); 083 } 084 return this; 085 } 086 087 /** 088 * Execute pipelines 089 * 090 * @return this executor 091 */ 092 @Override 093 public SimplePipelineExecutor<T, R, P> execute() { 094 if (pipelines == null) { 095 prepare(); 096 } 097 if (pipelines.isEmpty()) { 098 throw new IllegalStateException("pipelines empty"); 099 } 100 if (executorService == null) { 101 this.executorService = Executors.newFixedThreadPool(concurrency); 102 } 103 futures = new LinkedList<Future<T>>(); 104 for (Callable<T> pipeline : pipelines) { 105 futures.add(executorService.submit(pipeline)); 106 } 107 return this; 108 } 109 110 /** 111 * Wait for all results of the executions. 112 * 113 * @return this pipeline executor 114 * @throws InterruptedException 115 * @throws java.util.concurrent.ExecutionException 116 */ 117 @Override 118 public SimplePipelineExecutor<T, R, P> waitFor() 119 throws InterruptedException, ExecutionException { 120 if (executorService == null || pipelines == null || futures == null || futures.isEmpty()) { 121 return this; 122 } 123 exceptions = new LinkedList<Throwable>(); 124 for (Future<T> future : futures) { 125 T t = future.get(); 126 } 127 return this; 128 } 129 130 @Override 131 public void shutdown() throws IOException { 132 if (executorService == null) { 133 return; 134 } 135 for (Future<T> future : futures) { 136 future.cancel(true); 137 } 138 executorService.shutdown(); 139 try { 140 if (!executorService.awaitTermination(15, TimeUnit.SECONDS)) { 141 executorService.shutdownNow(); 142 } 143 } catch (InterruptedException e) { 144 executorService.shutdownNow(); 145 throw new IOException("interrupted while shutdown"); 146 } 147 } 148 149 /** 150 * Get the pipelines of this executor. 151 * 152 * @return the pipelines 153 */ 154 @Override 155 public Collection<P> getPipelines() { 156 return pipelines; 157 } 158 159 /** 160 * Get the collected I/O exceptions that were thrown by the pipelines. 161 * 162 * @return list of exceptions 163 */ 164 public List<Throwable> getExceptions() { 165 return exceptions; 166 } 167 168}