001/* 002 * Copyright (C) 2014 Jörg Prante 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.xbib.elasticsearch.plugin.jdbc.feeder; 017 018import org.elasticsearch.cluster.metadata.MetaData; 019import org.elasticsearch.common.logging.ESLogger; 020import org.elasticsearch.common.logging.ESLoggerFactory; 021import org.elasticsearch.common.metrics.MeterMetric; 022import org.elasticsearch.common.settings.ImmutableSettings; 023import org.elasticsearch.common.settings.Settings; 024import org.elasticsearch.common.settings.loader.JsonSettingsLoader; 025import org.elasticsearch.common.unit.ByteSizeValue; 026import org.elasticsearch.common.unit.TimeValue; 027import org.elasticsearch.common.xcontent.XContentFactory; 028import org.elasticsearch.common.xcontent.XContentType; 029import org.elasticsearch.common.xcontent.support.XContentMapValues; 030import org.elasticsearch.river.RiverName; 031import org.xbib.elasticsearch.plugin.jdbc.RiverThread; 032import org.xbib.elasticsearch.plugin.jdbc.classloader.uri.URIClassLoader; 033import org.xbib.elasticsearch.plugin.jdbc.client.Ingest; 034import org.xbib.elasticsearch.plugin.jdbc.client.IngestFactory; 035import org.xbib.elasticsearch.plugin.jdbc.client.transport.BulkTransportClient; 036import org.xbib.elasticsearch.plugin.jdbc.cron.CronExpression; 037import org.xbib.elasticsearch.plugin.jdbc.cron.CronThreadPoolExecutor; 038import org.xbib.elasticsearch.plugin.jdbc.state.RiverStatesMetaData; 039import org.xbib.elasticsearch.plugin.jdbc.util.RiverServiceLoader; 040import org.xbib.elasticsearch.river.jdbc.RiverFlow; 041 042import java.io.File; 043import java.io.IOException; 044import java.io.PrintStream; 045import java.io.Reader; 046import java.io.Writer; 047import java.util.Arrays; 048import java.util.List; 049import java.util.Map; 050import java.util.concurrent.ConcurrentLinkedDeque; 051import java.util.concurrent.Executors; 052import java.util.concurrent.Future; 053import java.util.concurrent.LinkedBlockingQueue; 054import java.util.concurrent.ScheduledThreadPoolExecutor; 055import java.util.concurrent.ThreadPoolExecutor; 056import java.util.concurrent.TimeUnit; 057 058import static org.elasticsearch.common.collect.Lists.newLinkedList; 059import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; 060import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; 061 062/** 063 * Standalone feeder for JDBC 064 */ 065public class JDBCFeeder implements CommandLineInterpreter { 066 067 private final static ESLogger logger = ESLoggerFactory.getLogger(JDBCFeeder.class.getSimpleName()); 068 069 /** 070 * Register metadata factory in Elasticsearch for being able to decode 071 * ClusterStateResponse with RiverStatesMetadata 072 */ 073 static { 074 MetaData.registerFactory(RiverStatesMetaData.TYPE, RiverStatesMetaData.FACTORY); 075 } 076 077 protected Reader reader; 078 079 protected Writer writer; 080 081 protected PrintStream printStream; 082 083 protected IngestFactory ingestFactory; 084 085 /** 086 * This ingest is the client for the river flow state operations 087 */ 088 private Ingest ingest; 089 090 private RiverFlow riverFlow; 091 092 private List<Map<String, Object>> definitions; 093 094 private ThreadPoolExecutor threadPoolExecutor; 095 096 private volatile Thread feederThread; 097 098 private volatile boolean closed; 099 100 /** 101 * Constructor for running this from command line 102 */ 103 public JDBCFeeder() { 104 } 105 106 @SuppressWarnings("unchecked") 107 @Override 108 public JDBCFeeder readFrom(Reader reader) { 109 this.reader = reader; 110 try { 111 Map<String, Object> map = XContentFactory.xContent(XContentType.JSON).createParser(reader).mapOrderedAndClose(); 112 Settings settings = settingsBuilder() 113 .put(new JsonSettingsLoader().load(jsonBuilder().map(map).string())) 114 .build(); 115 this.definitions = newLinkedList(); 116 Object pipeline = map.get("jdbc"); 117 if (pipeline instanceof Map) { 118 definitions.add((Map<String, Object>) pipeline); 119 } 120 if (pipeline instanceof List) { 121 definitions.addAll((List<Map<String, Object>>) pipeline); 122 } 123 // before running, create the river flow 124 createRiverFlow(map, settings); 125 } catch (IOException e) { 126 logger.error(e.getMessage(), e); 127 } 128 return this; 129 } 130 131 protected RiverFlow createRiverFlow(Map<String, Object> spec, Settings settings) throws IOException { 132 String strategy = XContentMapValues.nodeStringValue(spec.get("strategy"), "simple"); 133 this.riverFlow = RiverServiceLoader.newRiverFlow(strategy); 134 logger.debug("strategy {}: river flow class {}, spec = {} settings = {}", 135 strategy, riverFlow.getClass().getName(), spec, settings.getAsMap()); 136 this.ingestFactory = createIngestFactory(settings); 137 // out private ingest, needed for having a client in the river flow 138 this.ingest = ingestFactory.create(); 139 riverFlow.setRiverName(new RiverName("jdbc", "feeder")) 140 .setSettings(settings) 141 .setClient(ingest.client()) 142 .setIngestFactory(ingestFactory) 143 .setMetric(new MeterMetric(Executors.newScheduledThreadPool(1), TimeUnit.SECONDS)) 144 .setQueue(new ConcurrentLinkedDeque<Map<String, Object>>()); 145 return riverFlow; 146 } 147 148 @Override 149 public JDBCFeeder writeTo(Writer writer) { 150 this.writer = writer; 151 return this; 152 } 153 154 @Override 155 public JDBCFeeder errorsTo(PrintStream printStream) { 156 this.printStream = printStream; 157 return this; 158 } 159 160 @Override 161 public JDBCFeeder start() throws Exception { 162 this.closed = false; 163 if (ingest.getConnectedNodes().isEmpty()) { 164 throw new IOException("no nodes connected, can't continue"); 165 } 166 this.feederThread = new Thread(new RiverThread(riverFlow, definitions)); 167 List<Future<?>> futures = schedule(feederThread); 168 // wait for all threads to finish 169 for (Future<?> future : futures) { 170 future.get(); 171 } 172 ingest.shutdown(); 173 return this; 174 } 175 176 private List<Future<?>> schedule(Thread thread) { 177 Settings settings = riverFlow.getSettings(); 178 String[] schedule = settings.getAsArray("schedule"); 179 List<Future<?>> futures = newLinkedList(); 180 Long seconds = settings.getAsTime("interval", TimeValue.timeValueSeconds(0)).seconds(); 181 if (schedule != null && schedule.length > 0) { 182 CronThreadPoolExecutor cronThreadPoolExecutor = 183 new CronThreadPoolExecutor(settings.getAsInt("threadpoolsize", 4)); 184 for (String cron : schedule) { 185 futures.add(cronThreadPoolExecutor.schedule(thread, new CronExpression(cron))); 186 } 187 this.threadPoolExecutor = cronThreadPoolExecutor; 188 logger.debug("scheduled feeder instance with cron expressions {}", Arrays.asList(schedule)); 189 } else if (seconds > 0L) { 190 ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = 191 new ScheduledThreadPoolExecutor(settings.getAsInt("threadpoolsize", 4)); 192 futures.add(scheduledThreadPoolExecutor.scheduleAtFixedRate(thread, 0L, seconds, TimeUnit.SECONDS)); 193 logger.debug("scheduled feeder instance at fixed rate of {} seconds", seconds); 194 this.threadPoolExecutor = scheduledThreadPoolExecutor; 195 } else { 196 this.threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, 197 new LinkedBlockingQueue<Runnable>()); 198 futures.add(threadPoolExecutor.submit(thread)); 199 logger.debug("started feeder instance"); 200 } 201 return futures; 202 } 203 204 /** 205 * Shut down feeder instance by Ctrl-C 206 * 207 * @return shutdown thread 208 */ 209 @Override 210 public Thread shutdownHook() { 211 return new Thread() { 212 public void run() { 213 try { 214 shutdown(); 215 } catch (Exception e) { 216 e.printStackTrace(printStream); 217 } 218 } 219 }; 220 } 221 222 @Override 223 public synchronized void shutdown() throws Exception { 224 if (closed) { 225 return; 226 } 227 closed = true; 228 if (threadPoolExecutor != null) { 229 threadPoolExecutor.shutdownNow(); 230 threadPoolExecutor = null; 231 } 232 if (feederThread != null) { 233 feederThread.interrupt(); 234 } 235 if (!ingest.isShutdown()) { 236 ingest.shutdown(); 237 } 238 reader.close(); 239 writer.close(); 240 printStream.close(); 241 } 242 243 private IngestFactory createIngestFactory(final Settings settings) { 244 return new IngestFactory() { 245 @Override 246 public Ingest create() { 247 Integer maxbulkactions = settings.getAsInt("max_bulk_actions", 100); 248 Integer maxconcurrentbulkrequests = settings.getAsInt("max_concurrent_bulk_requests", 249 Runtime.getRuntime().availableProcessors() * 2); 250 ByteSizeValue maxvolume = settings.getAsBytesSize("max_bulk_volume", ByteSizeValue.parseBytesSizeValue("10m")); 251 TimeValue maxrequestwait = settings.getAsTime("max_request_wait", TimeValue.timeValueSeconds(60)); 252 TimeValue flushinterval = settings.getAsTime("flush_interval", TimeValue.timeValueSeconds(5)); 253 File home = new File(settings.get("home", ".")); 254 BulkTransportClient ingest = new BulkTransportClient(); 255 Settings clientSettings = ImmutableSettings.settingsBuilder() 256 .put("cluster.name", settings.get("elasticsearch.cluster", "elasticsearch")) 257 .put("host", settings.get("elasticsearch.host", "localhost")) 258 .put("port", settings.getAsInt("elasticsearch.port", 9300)) 259 .put("sniff", settings.getAsBoolean("elasticsearch.sniff", false)) 260 .put("name", "feeder") // prevents lookup of names.txt, we don't have it, and marks this node as "feeder". See also module load skipping in JDBCRiverPlugin 261 .put("client.transport.ignore_cluster_name", true) // ignore cluster name setting 262 .put("client.transport.ping_timeout", settings.getAsTime("elasticsearch.timeout", TimeValue.timeValueSeconds(10))) // ping timeout 263 .put("client.transport.nodes_sampler_interval", settings.getAsTime("elasticsearch.timeout", TimeValue.timeValueSeconds(5))) // for sniff sampling 264 .put("path.plugins", ".dontexist") // pointing to a non-exiting folder means, this disables loading site plugins 265 // adding our custom class loader is tricky, actions may not be registered to ActionService 266 .classLoader(getClassLoader(getClass().getClassLoader(), home)) 267 .build(); 268 ingest.maxActionsPerBulkRequest(maxbulkactions) 269 .maxConcurrentBulkRequests(maxconcurrentbulkrequests) 270 .maxVolumePerBulkRequest(maxvolume) 271 .maxRequestWait(maxrequestwait) 272 .flushIngestInterval(flushinterval) 273 .newClient(clientSettings); 274 return ingest; 275 } 276 }; 277 } 278 279 /** 280 * We have to add Elasticsearch to our classpath, but exclude all jvm plugins 281 * for starting our TransportClient. 282 * 283 * @param home ES_HOME 284 * @return a custom class loader with our dependencies 285 */ 286 private ClassLoader getClassLoader(ClassLoader parent, File home) { 287 URIClassLoader classLoader = new URIClassLoader(parent); 288 File[] libs = new File(home + "/lib").listFiles(); 289 if (libs != null) { 290 for (File file : libs) { 291 if (file.getName().toLowerCase().endsWith(".jar")) { 292 classLoader.addURI(file.toURI()); 293 } 294 } 295 } 296 return classLoader; 297 } 298 299}