001/*
002 * Copyright (C) 2014 Jörg Prante
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.xbib.elasticsearch.plugin.jdbc;
017
018import org.elasticsearch.common.logging.ESLogger;
019import org.elasticsearch.common.logging.ESLoggerFactory;
020import org.elasticsearch.common.unit.TimeValue;
021import org.xbib.elasticsearch.action.plugin.jdbc.state.get.GetRiverStateRequestBuilder;
022import org.xbib.elasticsearch.action.plugin.jdbc.state.get.GetRiverStateResponse;
023import org.xbib.elasticsearch.plugin.jdbc.pipeline.Pipeline;
024import org.xbib.elasticsearch.plugin.jdbc.pipeline.PipelineProvider;
025import org.xbib.elasticsearch.plugin.jdbc.pipeline.PipelineRequest;
026import org.xbib.elasticsearch.plugin.jdbc.pipeline.executor.MetricSimplePipelineExecutor;
027import org.xbib.elasticsearch.plugin.jdbc.state.RiverState;
028import org.xbib.elasticsearch.river.jdbc.RiverContext;
029import org.xbib.elasticsearch.river.jdbc.RiverFlow;
030
031import java.io.IOException;
032import java.util.LinkedList;
033import java.util.List;
034import java.util.Map;
035import java.util.concurrent.ScheduledThreadPoolExecutor;
036import java.util.concurrent.TimeUnit;
037
038/**
039 * A thread that spawns a single step of river operation.
040 *
041 * @param <T> a pipeline result type
042 * @param <R> a pipeline request type
043 * @param <P> a pipeline type
044 */
045public class RiverThread<T, R extends PipelineRequest, P extends Pipeline<T, R>>
046        implements Runnable {
047
048    private final static ESLogger logger = ESLoggerFactory.getLogger("river.jdbc.RiverThread");
049
050    private final RiverFlow riverFlow;
051
052    private final List<Map<String, Object>> input;
053
054    private MetricSimplePipelineExecutor executor;
055
056    private List<RiverPipeline> pipelines;
057
058    private ScheduledThreadPoolExecutor metricsThreadPoolExecutor;
059
060    private ScheduledThreadPoolExecutor suspensionThreadPoolExecutor;
061
062    private volatile Thread metricsThread;
063
064    private volatile Thread suspensionThread;
065
066    public RiverThread(RiverFlow riverFlow, List<Map<String, Object>> input) {
067        this.riverFlow = riverFlow;
068        this.input = input;
069        this.pipelines = new LinkedList<RiverPipeline>();
070    }
071
072    /**
073     * Before the pipelines are executed, put the river defintions on the queue
074     *
075     * @throws IOException
076     */
077    protected void beforePipelineExecutions() throws IOException {
078        for (Map<String, Object> definition : input) {
079            RiverContext riverContext = riverFlow.newRiverContext();
080            riverContext.setDefinition(definition);
081            riverFlow.getQueue().offer(riverContext);
082        }
083        this.metricsThread = new Thread(new MetricThread());
084        // schedule river metrics thread
085        long metricsInterval = riverFlow.getSettings().getAsTime("metrics_interval", TimeValue.timeValueSeconds(60)).getSeconds();
086        this.metricsThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
087        metricsThreadPoolExecutor.scheduleAtFixedRate(metricsThread, 10L, metricsInterval, TimeUnit.SECONDS);
088        logger.info("scheduled metrics thread at {} seconds", metricsInterval);
089        this.suspensionThread = new Thread(new SuspendThread());
090        // schedule suspension thread
091        long suspensionCheckInterval = TimeValue.timeValueSeconds(1).getSeconds();
092        this.suspensionThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
093        suspensionThreadPoolExecutor.scheduleAtFixedRate(suspensionThread, 10L,
094                suspensionCheckInterval,
095                TimeUnit.SECONDS);
096        logger.info("scheduled suspend check thread at {} seconds", suspensionCheckInterval);
097    }
098
099    @Override
100    public void run() {
101        logger.debug("river flow {} thread is starting", riverFlow);
102        try {
103            beforePipelineExecutions();
104            this.executor = new MetricSimplePipelineExecutor<T, R, P>(riverFlow.getMetric())
105                    .setConcurrency(riverFlow.getSettings().getAsInt("concurrency", 1))
106                    .setPipelineProvider(new PipelineProvider<P>() {
107                        @Override
108                        @SuppressWarnings("unchecked")
109                        public P get() {
110                            RiverPipeline pipeline = new RiverPipeline(riverFlow);
111                            pipelines.add(pipeline);
112                            return (P) pipeline;
113                        }
114                    });
115            executor.prepare()
116                    .execute()
117                    .waitFor();
118        } catch (InterruptedException e) {
119            for (RiverPipeline pipeline : pipelines) {
120                pipeline.setInterrupted(true);
121            }
122            if (metricsThread != null) {
123                logger.debug("interrupting metrics thread");
124                metricsThread.interrupt();
125            }
126            Thread.currentThread().interrupt();
127            logger.warn("interrupted");
128        } catch (Throwable t) {
129            logger.error(t.getMessage(), t);
130        } finally {
131            try {
132                afterPipelineExecutions();
133            } catch (Exception e) {
134                logger.error(e.getMessage(), e);
135            }
136        }
137        logger.debug("river flow {} thread is finished", riverFlow);
138    }
139
140    protected void afterPipelineExecutions() throws Exception {
141        if (executor != null) {
142            executor.shutdown();
143        }
144        if (metricsThreadPoolExecutor != null) {
145            logger.debug("shutting down metrics thread scheduler");
146            metricsThreadPoolExecutor.shutdownNow();
147            metricsThreadPoolExecutor = null;
148        }
149        if (metricsThread != null) {
150            metricsThread.interrupt();
151        }
152        if (suspensionThreadPoolExecutor != null) {
153            logger.debug("shutting down suspension thread scheduler");
154            suspensionThreadPoolExecutor.shutdownNow();
155            suspensionThreadPoolExecutor = null;
156        }
157        if (suspensionThread != null) {
158            suspensionThread.interrupt();
159        }
160    }
161
162    /**
163     * Should be called regularly to check for a new river state, if the river is suspended.
164     * If so, enter suspension mode. This is a bit tricky for a list of pipelines: first, all
165     * pipeline sources and mouths are suspended. Then, a state change is monitored for resuming.
166     * At last, all pipeline sources and mouths are resumed.
167     */
168    public void checkForSuspension() {
169        GetRiverStateRequestBuilder riverStateRequestBuilder = new GetRiverStateRequestBuilder(riverFlow.getClient().admin().cluster())
170                .setRiverName(riverFlow.getRiverName().getName())
171                .setRiverType(riverFlow.getRiverName().getType());
172        GetRiverStateResponse riverStateResponse = riverStateRequestBuilder.execute().actionGet();
173        RiverState riverState = riverStateResponse.getRiverState();
174        if (riverState.isSuspended()) {
175            // suspend all sources and mouths
176            for (RiverPipeline pipeline : pipelines) {
177                RiverContext riverContext = pipeline.getRiverContext();
178                try {
179                    if (riverContext.getRiverSource() != null) {
180                        riverContext.getRiverSource().suspend();
181                    }
182                } catch (Exception e) {
183                    logger.error(e.getMessage(), e);
184                }
185                try {
186                    if (riverContext.getRiverMouth() != null) {
187                        riverContext.getRiverMouth().suspend();
188                    }
189                } catch (Exception e) {
190                    logger.error(e.getMessage(), e);
191                }
192            }
193            // wait for resume
194            try {
195                do {
196                    riverStateRequestBuilder = new GetRiverStateRequestBuilder(riverFlow.getClient().admin().cluster())
197                            .setRiverName(riverFlow.getRiverName().getName())
198                            .setRiverType(riverFlow.getRiverName().getType());
199                    riverStateResponse = riverStateRequestBuilder.execute().actionGet();
200                    riverState = riverStateResponse.getRiverState();
201                    if (riverState.isSuspended()) {
202                        Thread.sleep(1000L);
203                    }
204                } while (riverState.isSuspended());
205            } catch (InterruptedException e) {
206                Thread.currentThread().interrupt();
207                logger.warn("interrupted");
208            } catch (Exception e) {
209                logger.error(e.getMessage(), e);
210            }
211            // resume all sources and mouths
212            for (RiverPipeline pipeline : pipelines) {
213                RiverContext riverContext = pipeline.getRiverContext();
214                try {
215                    if (riverContext.getRiverSource() != null) {
216                        riverContext.getRiverSource().resume();
217                    }
218                } catch (Exception e) {
219                    logger.error(e.getMessage(), e);
220                }
221                try {
222                    if (riverContext.getRiverMouth() != null) {
223                        riverContext.getRiverMouth().resume();
224                    }
225                } catch (Exception e) {
226                    logger.error(e.getMessage(), e);
227                }
228            }
229        }
230    }
231
232    class MetricThread implements Runnable {
233
234        @Override
235        public void run() {
236            for (RiverPipeline pipeline : pipelines) {
237                riverFlow.logMetrics(pipeline.getRiverContext(), "pipeline " + pipeline + " is running");
238            }
239        }
240    }
241
242    class SuspendThread implements Runnable {
243
244        @Override
245        public void run() {
246            checkForSuspension();
247        }
248    }
249
250}