001/* 002 * Copyright (C) 2014 Jörg Prante 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.xbib.elasticsearch.plugin.jdbc; 017 018import org.elasticsearch.common.logging.ESLogger; 019import org.elasticsearch.common.logging.ESLoggerFactory; 020import org.elasticsearch.common.unit.TimeValue; 021import org.xbib.elasticsearch.action.plugin.jdbc.state.get.GetRiverStateRequestBuilder; 022import org.xbib.elasticsearch.action.plugin.jdbc.state.get.GetRiverStateResponse; 023import org.xbib.elasticsearch.plugin.jdbc.pipeline.Pipeline; 024import org.xbib.elasticsearch.plugin.jdbc.pipeline.PipelineProvider; 025import org.xbib.elasticsearch.plugin.jdbc.pipeline.PipelineRequest; 026import org.xbib.elasticsearch.plugin.jdbc.pipeline.executor.MetricSimplePipelineExecutor; 027import org.xbib.elasticsearch.plugin.jdbc.state.RiverState; 028import org.xbib.elasticsearch.river.jdbc.RiverContext; 029import org.xbib.elasticsearch.river.jdbc.RiverFlow; 030 031import java.io.IOException; 032import java.util.LinkedList; 033import java.util.List; 034import java.util.Map; 035import java.util.concurrent.ScheduledThreadPoolExecutor; 036import java.util.concurrent.TimeUnit; 037 038/** 039 * A thread that spawns a single step of river operation. 040 * 041 * @param <T> a pipeline result type 042 * @param <R> a pipeline request type 043 * @param <P> a pipeline type 044 */ 045public class RiverThread<T, R extends PipelineRequest, P extends Pipeline<T, R>> 046 implements Runnable { 047 048 private final static ESLogger logger = ESLoggerFactory.getLogger("river.jdbc.RiverThread"); 049 050 private final RiverFlow riverFlow; 051 052 private final List<Map<String, Object>> input; 053 054 private MetricSimplePipelineExecutor executor; 055 056 private List<RiverPipeline> pipelines; 057 058 private ScheduledThreadPoolExecutor metricsThreadPoolExecutor; 059 060 private ScheduledThreadPoolExecutor suspensionThreadPoolExecutor; 061 062 private volatile Thread metricsThread; 063 064 private volatile Thread suspensionThread; 065 066 public RiverThread(RiverFlow riverFlow, List<Map<String, Object>> input) { 067 this.riverFlow = riverFlow; 068 this.input = input; 069 this.pipelines = new LinkedList<RiverPipeline>(); 070 } 071 072 /** 073 * Before the pipelines are executed, put the river defintions on the queue 074 * 075 * @throws IOException 076 */ 077 protected void beforePipelineExecutions() throws IOException { 078 for (Map<String, Object> definition : input) { 079 RiverContext riverContext = riverFlow.newRiverContext(); 080 riverContext.setDefinition(definition); 081 riverFlow.getQueue().offer(riverContext); 082 } 083 this.metricsThread = new Thread(new MetricThread()); 084 // schedule river metrics thread 085 long metricsInterval = riverFlow.getSettings().getAsTime("metrics_interval", TimeValue.timeValueSeconds(60)).getSeconds(); 086 this.metricsThreadPoolExecutor = new ScheduledThreadPoolExecutor(1); 087 metricsThreadPoolExecutor.scheduleAtFixedRate(metricsThread, 10L, metricsInterval, TimeUnit.SECONDS); 088 logger.info("scheduled metrics thread at {} seconds", metricsInterval); 089 this.suspensionThread = new Thread(new SuspendThread()); 090 // schedule suspension thread 091 long suspensionCheckInterval = TimeValue.timeValueSeconds(1).getSeconds(); 092 this.suspensionThreadPoolExecutor = new ScheduledThreadPoolExecutor(1); 093 suspensionThreadPoolExecutor.scheduleAtFixedRate(suspensionThread, 10L, 094 suspensionCheckInterval, 095 TimeUnit.SECONDS); 096 logger.info("scheduled suspend check thread at {} seconds", suspensionCheckInterval); 097 } 098 099 @Override 100 public void run() { 101 logger.debug("river flow {} thread is starting", riverFlow); 102 try { 103 beforePipelineExecutions(); 104 this.executor = new MetricSimplePipelineExecutor<T, R, P>(riverFlow.getMetric()) 105 .setConcurrency(riverFlow.getSettings().getAsInt("concurrency", 1)) 106 .setPipelineProvider(new PipelineProvider<P>() { 107 @Override 108 @SuppressWarnings("unchecked") 109 public P get() { 110 RiverPipeline pipeline = new RiverPipeline(riverFlow); 111 pipelines.add(pipeline); 112 return (P) pipeline; 113 } 114 }); 115 executor.prepare() 116 .execute() 117 .waitFor(); 118 } catch (InterruptedException e) { 119 for (RiverPipeline pipeline : pipelines) { 120 pipeline.setInterrupted(true); 121 } 122 if (metricsThread != null) { 123 logger.debug("interrupting metrics thread"); 124 metricsThread.interrupt(); 125 } 126 Thread.currentThread().interrupt(); 127 logger.warn("interrupted"); 128 } catch (Throwable t) { 129 logger.error(t.getMessage(), t); 130 } finally { 131 try { 132 afterPipelineExecutions(); 133 } catch (Exception e) { 134 logger.error(e.getMessage(), e); 135 } 136 } 137 logger.debug("river flow {} thread is finished", riverFlow); 138 } 139 140 protected void afterPipelineExecutions() throws Exception { 141 if (executor != null) { 142 executor.shutdown(); 143 } 144 if (metricsThreadPoolExecutor != null) { 145 logger.debug("shutting down metrics thread scheduler"); 146 metricsThreadPoolExecutor.shutdownNow(); 147 metricsThreadPoolExecutor = null; 148 } 149 if (metricsThread != null) { 150 metricsThread.interrupt(); 151 } 152 if (suspensionThreadPoolExecutor != null) { 153 logger.debug("shutting down suspension thread scheduler"); 154 suspensionThreadPoolExecutor.shutdownNow(); 155 suspensionThreadPoolExecutor = null; 156 } 157 if (suspensionThread != null) { 158 suspensionThread.interrupt(); 159 } 160 } 161 162 /** 163 * Should be called regularly to check for a new river state, if the river is suspended. 164 * If so, enter suspension mode. This is a bit tricky for a list of pipelines: first, all 165 * pipeline sources and mouths are suspended. Then, a state change is monitored for resuming. 166 * At last, all pipeline sources and mouths are resumed. 167 */ 168 public void checkForSuspension() { 169 GetRiverStateRequestBuilder riverStateRequestBuilder = new GetRiverStateRequestBuilder(riverFlow.getClient().admin().cluster()) 170 .setRiverName(riverFlow.getRiverName().getName()) 171 .setRiverType(riverFlow.getRiverName().getType()); 172 GetRiverStateResponse riverStateResponse = riverStateRequestBuilder.execute().actionGet(); 173 RiverState riverState = riverStateResponse.getRiverState(); 174 if (riverState.isSuspended()) { 175 // suspend all sources and mouths 176 for (RiverPipeline pipeline : pipelines) { 177 RiverContext riverContext = pipeline.getRiverContext(); 178 try { 179 if (riverContext.getRiverSource() != null) { 180 riverContext.getRiverSource().suspend(); 181 } 182 } catch (Exception e) { 183 logger.error(e.getMessage(), e); 184 } 185 try { 186 if (riverContext.getRiverMouth() != null) { 187 riverContext.getRiverMouth().suspend(); 188 } 189 } catch (Exception e) { 190 logger.error(e.getMessage(), e); 191 } 192 } 193 // wait for resume 194 try { 195 do { 196 riverStateRequestBuilder = new GetRiverStateRequestBuilder(riverFlow.getClient().admin().cluster()) 197 .setRiverName(riverFlow.getRiverName().getName()) 198 .setRiverType(riverFlow.getRiverName().getType()); 199 riverStateResponse = riverStateRequestBuilder.execute().actionGet(); 200 riverState = riverStateResponse.getRiverState(); 201 if (riverState.isSuspended()) { 202 Thread.sleep(1000L); 203 } 204 } while (riverState.isSuspended()); 205 } catch (InterruptedException e) { 206 Thread.currentThread().interrupt(); 207 logger.warn("interrupted"); 208 } catch (Exception e) { 209 logger.error(e.getMessage(), e); 210 } 211 // resume all sources and mouths 212 for (RiverPipeline pipeline : pipelines) { 213 RiverContext riverContext = pipeline.getRiverContext(); 214 try { 215 if (riverContext.getRiverSource() != null) { 216 riverContext.getRiverSource().resume(); 217 } 218 } catch (Exception e) { 219 logger.error(e.getMessage(), e); 220 } 221 try { 222 if (riverContext.getRiverMouth() != null) { 223 riverContext.getRiverMouth().resume(); 224 } 225 } catch (Exception e) { 226 logger.error(e.getMessage(), e); 227 } 228 } 229 } 230 } 231 232 class MetricThread implements Runnable { 233 234 @Override 235 public void run() { 236 for (RiverPipeline pipeline : pipelines) { 237 riverFlow.logMetrics(pipeline.getRiverContext(), "pipeline " + pipeline + " is running"); 238 } 239 } 240 } 241 242 class SuspendThread implements Runnable { 243 244 @Override 245 public void run() { 246 checkForSuspension(); 247 } 248 } 249 250}