001/*
002 * Copyright (C) 2014 Jörg Prante
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.xbib.elasticsearch.plugin.jdbc.pipeline.executor;
017
018import org.xbib.elasticsearch.plugin.jdbc.pipeline.Pipeline;
019import org.xbib.elasticsearch.plugin.jdbc.pipeline.PipelineExecutor;
020import org.xbib.elasticsearch.plugin.jdbc.pipeline.PipelineProvider;
021import org.xbib.elasticsearch.plugin.jdbc.pipeline.PipelineRequest;
022
023import java.io.IOException;
024import java.util.Collection;
025import java.util.LinkedList;
026import java.util.List;
027import java.util.concurrent.Callable;
028import java.util.concurrent.ExecutionException;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Executors;
031import java.util.concurrent.Future;
032import java.util.concurrent.TimeUnit;
033
034/**
035 * A simple pipeline executor.
036 *
037 * @param <T> the pipeline result type
038 * @param <R> the pipeline request type
039 * @param <P> the pipeline type
040 */
041public class SimplePipelineExecutor<T, R extends PipelineRequest, P extends Pipeline<T, R>>
042        implements PipelineExecutor<T, R, P> {
043
044    private ExecutorService executorService;
045
046    private Collection<P> pipelines;
047
048    private Collection<Future<T>> futures;
049
050    private PipelineProvider<P> provider;
051
052    private List<Throwable> exceptions;
053
054    private int concurrency;
055
056    @Override
057    public SimplePipelineExecutor<T, R, P> setConcurrency(int concurrency) {
058        this.concurrency = concurrency;
059        return this;
060    }
061
062    @Override
063    public SimplePipelineExecutor<T, R, P> setPipelineProvider(PipelineProvider<P> provider) {
064        this.provider = provider;
065        return this;
066    }
067
068    @Override
069    public SimplePipelineExecutor<T, R, P> prepare() {
070        if (provider == null) {
071            throw new IllegalStateException("no provider set");
072        }
073        if (executorService == null) {
074            this.executorService = Executors.newFixedThreadPool(concurrency);
075        }
076        if (concurrency < 1) {
077            concurrency = 1;
078        }
079        this.pipelines = new LinkedList<P>();
080        // limit to 256 to prevent unresponsive systems
081        for (int i = 0; i < Math.min(concurrency, 256); i++) {
082            pipelines.add(provider.get());
083        }
084        return this;
085    }
086
087    /**
088     * Execute pipelines
089     *
090     * @return this executor
091     */
092    @Override
093    public SimplePipelineExecutor<T, R, P> execute() {
094        if (pipelines == null) {
095            prepare();
096        }
097        if (pipelines.isEmpty()) {
098            throw new IllegalStateException("pipelines empty");
099        }
100        if (executorService == null) {
101            this.executorService = Executors.newFixedThreadPool(concurrency);
102        }
103        futures = new LinkedList<Future<T>>();
104        for (Callable<T> pipeline : pipelines) {
105            futures.add(executorService.submit(pipeline));
106        }
107        return this;
108    }
109
110    /**
111     * Wait for all results of the executions.
112     *
113     * @return this pipeline executor
114     * @throws InterruptedException
115     * @throws java.util.concurrent.ExecutionException
116     */
117    @Override
118    public SimplePipelineExecutor<T, R, P> waitFor()
119            throws InterruptedException, ExecutionException {
120        if (executorService == null || pipelines == null || futures == null || futures.isEmpty()) {
121            return this;
122        }
123        exceptions = new LinkedList<Throwable>();
124        for (Future<T> future : futures) {
125            T t = future.get();
126        }
127        return this;
128    }
129
130    @Override
131    public void shutdown() throws IOException {
132        if (executorService == null) {
133            return;
134        }
135        for (Future<T> future : futures) {
136            future.cancel(true);
137        }
138        executorService.shutdown();
139        try {
140            if (!executorService.awaitTermination(15, TimeUnit.SECONDS)) {
141                executorService.shutdownNow();
142            }
143        } catch (InterruptedException e) {
144            executorService.shutdownNow();
145            throw new IOException("interrupted while shutdown");
146        }
147    }
148
149    /**
150     * Get the pipelines of this executor.
151     *
152     * @return the pipelines
153     */
154    @Override
155    public Collection<P> getPipelines() {
156        return pipelines;
157    }
158
159    /**
160     * Get the collected I/O exceptions that were thrown by the pipelines.
161     *
162     * @return list of exceptions
163     */
164    public List<Throwable> getExceptions() {
165        return exceptions;
166    }
167
168}