001/* 002 * Copyright (C) 2014 Jörg Prante 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.xbib.elasticsearch.plugin.jdbc; 017 018import org.elasticsearch.common.logging.ESLogger; 019import org.elasticsearch.common.logging.ESLoggerFactory; 020import org.xbib.elasticsearch.plugin.jdbc.pipeline.Pipeline; 021import org.xbib.elasticsearch.plugin.jdbc.pipeline.element.ContextPipelineElement; 022import org.xbib.elasticsearch.river.jdbc.RiverContext; 023import org.xbib.elasticsearch.river.jdbc.RiverFlow; 024 025import java.util.Queue; 026 027/** 028 * A river pipeline is a collection of threads that can be executed in parallel during a river run 029 */ 030public class RiverPipeline implements Pipeline<Boolean, ContextPipelineElement> { 031 032 private final static ESLogger logger = ESLoggerFactory.getLogger("river.jdbc.RiverPipeline"); 033 034 private final RiverFlow riverFlow; 035 036 private RiverContext riverContext; 037 038 private volatile boolean interrupted; 039 040 public RiverPipeline(RiverFlow riverFlow) { 041 this.riverFlow = riverFlow; 042 } 043 044 public void setInterrupted(boolean interrupted) { 045 this.interrupted = interrupted; 046 } 047 048 public boolean isInterrupted() { 049 return interrupted; 050 } 051 052 /** 053 * Call this thread. Iterate over all request and pass them to request listeners. 054 * At least, this pipeline itself can listen to requests and handle errors. 055 * Only PipelineExceptions are handled for each listener. Other execptions will quit the 056 * pipeline request executions. 057 * 058 * @return a metric about the pipeline request executions. 059 * @throws Exception if pipeline execution was sborted by a non-PipelineException 060 */ 061 @Override 062 public Boolean call() throws Exception { 063 interrupted = false; 064 while (hasNext() && !interrupted) { 065 ContextPipelineElement r = next(); 066 request(this, r); 067 } 068 riverFlow.logMetrics(riverContext, "pipeline " + this + " complete"); 069 logger.debug("releasing river context"); 070 riverContext.release(); 071 return true; 072 } 073 074 /** 075 * Removing pipeline requests is not supported. 076 */ 077 @Override 078 public void remove() { 079 throw new UnsupportedOperationException("remove not supported"); 080 } 081 082 private void request(Pipeline<Boolean, ContextPipelineElement> pipeline, ContextPipelineElement request) { 083 try { 084 if (riverContext != null) { 085 riverFlow.logMetrics(riverContext, "next request for pipeline " + this); 086 } 087 this.riverContext = request.get(); 088 riverFlow.execute(riverContext); 089 } catch (Exception e) { 090 logger.error(e.getMessage(), e); 091 } 092 } 093 094 @Override 095 public boolean hasNext() { 096 return !riverFlow.getQueue().isEmpty(); 097 } 098 099 @Override 100 public ContextPipelineElement next() { 101 Queue<RiverContext> queue = riverFlow.getQueue(); 102 return new ContextPipelineElement().set(queue.poll()); 103 } 104 105 public RiverContext getRiverContext() { 106 return riverContext; 107 } 108 109}