001/* 002 * Copyright (C) 2014 Jörg Prante 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.xbib.elasticsearch.plugin.jdbc.river; 017 018import org.elasticsearch.client.Client; 019import org.elasticsearch.common.inject.Inject; 020import org.elasticsearch.common.logging.ESLogger; 021import org.elasticsearch.common.logging.ESLoggerFactory; 022import org.elasticsearch.common.metrics.MeterMetric; 023import org.elasticsearch.common.settings.Settings; 024import org.elasticsearch.common.settings.loader.JsonSettingsLoader; 025import org.elasticsearch.common.unit.ByteSizeValue; 026import org.elasticsearch.common.unit.TimeValue; 027import org.elasticsearch.common.util.concurrent.EsExecutors; 028import org.elasticsearch.common.xcontent.support.XContentMapValues; 029import org.elasticsearch.river.AbstractRiverComponent; 030import org.elasticsearch.river.RiverName; 031import org.elasticsearch.river.RiverSettings; 032import org.xbib.elasticsearch.action.plugin.jdbc.state.get.GetRiverStateRequestBuilder; 033import org.xbib.elasticsearch.action.plugin.jdbc.state.get.GetRiverStateResponse; 034import org.xbib.elasticsearch.plugin.jdbc.RiverThread; 035import org.xbib.elasticsearch.plugin.jdbc.client.Ingest; 036import org.xbib.elasticsearch.plugin.jdbc.client.IngestFactory; 037import org.xbib.elasticsearch.plugin.jdbc.client.node.BulkNodeClient; 038import org.xbib.elasticsearch.plugin.jdbc.cron.CronExpression; 039import org.xbib.elasticsearch.plugin.jdbc.cron.CronThreadPoolExecutor; 040import org.xbib.elasticsearch.plugin.jdbc.execute.RunnableRiver; 041import org.xbib.elasticsearch.plugin.jdbc.state.RiverState; 042import org.xbib.elasticsearch.plugin.jdbc.state.StatefulRiver; 043import org.xbib.elasticsearch.plugin.jdbc.util.RiverServiceLoader; 044import org.xbib.elasticsearch.river.jdbc.RiverFlow; 045 046import java.io.IOException; 047import java.util.Arrays; 048import java.util.List; 049import java.util.Map; 050import java.util.concurrent.ConcurrentLinkedQueue; 051import java.util.concurrent.Executors; 052import java.util.concurrent.Future; 053import java.util.concurrent.LinkedBlockingQueue; 054import java.util.concurrent.ScheduledThreadPoolExecutor; 055import java.util.concurrent.ThreadPoolExecutor; 056import java.util.concurrent.TimeUnit; 057 058import static org.elasticsearch.common.collect.Lists.newLinkedList; 059import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; 060import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; 061 062/** 063 * JDBC river 064 */ 065public class JDBCRiver extends AbstractRiverComponent implements StatefulRiver, RunnableRiver { 066 067 private final static ESLogger logger = ESLoggerFactory.getLogger("river.jdbc.JDBCRiver"); 068 069 private Client client; 070 071 private RiverFlow riverFlow; 072 073 private List<Map<String, Object>> definitions; 074 075 private ThreadPoolExecutor threadPoolExecutor; 076 077 private volatile Thread riverThread; 078 079 private volatile boolean closed; 080 081 private String[] schedule; 082 083 private List<Future<?>> futures; 084 085 private Long interval; 086 087 @Inject 088 @SuppressWarnings({"unchecked"}) 089 public JDBCRiver(RiverName riverName, RiverSettings riverSettings, Client client) { 090 super(riverName, riverSettings); 091 this.client = client; 092 if (!riverSettings.settings().containsKey("jdbc")) { 093 throw new IllegalArgumentException("no 'jdbc' settings in river settings?"); 094 } 095 try { 096 Map<String, String> loadedSettings = new JsonSettingsLoader() 097 .load(jsonBuilder().map(riverSettings.settings()).string()); 098 Settings settings = settingsBuilder().put(loadedSettings).build(); 099 String strategy = XContentMapValues.nodeStringValue(riverSettings.settings().get("strategy"), "simple"); 100 this.schedule = settings.getAsArray("schedule"); 101 this.interval = settings.getAsTime("interval", TimeValue.timeValueSeconds(0)).seconds(); 102 this.definitions = newLinkedList(); 103 Object definition = riverSettings.settings().get("jdbc"); 104 if (definition instanceof Map) { 105 Map<String, Object> map = (Map<String, Object>) definition; 106 definitions.add(map); 107 // legacy mode: check for "strategy", "schedule", "interval" in the "jdbc" definition part 108 if (map.containsKey("strategy")) { 109 strategy = map.get("strategy").toString(); 110 } 111 if (map.containsKey("schedule")) { 112 this.schedule = settingsBuilder().put(new JsonSettingsLoader() 113 .load(jsonBuilder().map(map).string())).build().getAsArray("schedule"); 114 } 115 if (map.containsKey("interval")) { 116 this.interval = XContentMapValues.nodeLongValue(map.get("interval"), 0L); 117 } 118 } 119 if (definition instanceof List) { 120 definitions.addAll((List<Map<String, Object>>) definition); 121 } 122 this.riverFlow = RiverServiceLoader.newRiverFlow(strategy); 123 logger.debug("strategy {}: river flow class {} found, settings = {}", 124 strategy, riverFlow.getClass().getName(), settings.getAsMap()); 125 riverFlow.setRiverName(riverName) 126 .setSettings(settings) 127 .setClient(client) 128 .setIngestFactory(createIngestFactory(settings)) 129 .setMetric(new MeterMetric(Executors.newScheduledThreadPool(1), TimeUnit.SECONDS)) 130 .setQueue(new ConcurrentLinkedQueue<Map<String, Object>>()); 131 } catch (IOException e) { 132 logger.error(e.getMessage(), e); 133 } 134 } 135 136 /** 137 * Called from Elasticsearch after recovery of indices when a node has initialized. 138 */ 139 @Override 140 public void start() { 141 closed = false; 142 this.riverThread = EsExecutors.daemonThreadFactory(settings.globalSettings(), 143 "river(" + riverName().getType() + "/" + riverName().getName() + ")") 144 .newThread(new RiverThread(riverFlow, definitions)); 145 this.futures = schedule(riverThread); 146 // we do not wait for futures here, instead, we return to Elasticsea 147 } 148 149 /** 150 * Called form Elasticsearch when river is deleted. 151 */ 152 @Override 153 public void close() { 154 if (closed) { 155 return; 156 } 157 closed = true; 158 if (threadPoolExecutor != null) { 159 logger.debug("shutting down river thread scheduler"); 160 threadPoolExecutor.shutdownNow(); 161 threadPoolExecutor = null; 162 } 163 if (riverThread != null) { 164 logger.debug("interrupting river thread"); 165 riverThread.interrupt(); 166 } 167 logger.info("river closed [{}/{}]", riverName.getType(), riverName.getName()); 168 } 169 170 /** 171 * Execute river once from execute API, no matter if schedule/interval is defined. 172 * If river is running, prevoius instance are interrupted and closed. 173 */ 174 public void run() { 175 if (!closed) { 176 close(); 177 } 178 this.riverThread = EsExecutors.daemonThreadFactory(settings.globalSettings(), 179 "river(" + riverName().getType() + "/" + riverName().getName() + ")") 180 .newThread(new RiverThread(riverFlow, definitions)); 181 this.threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, 182 new LinkedBlockingQueue<Runnable>()); 183 futures.add(threadPoolExecutor.submit(riverThread)); 184 logger.info("started river instance for single run"); 185 } 186 187 private List<Future<?>> schedule(Thread thread) { 188 Settings settings = riverFlow.getSettings(); 189 List<Future<?>> futures = newLinkedList(); 190 if (schedule != null && schedule.length > 0) { 191 CronThreadPoolExecutor cronThreadPoolExecutor = 192 new CronThreadPoolExecutor(settings.getAsInt("threadpoolsize", 4)); 193 for (String cron : schedule) { 194 futures.add(cronThreadPoolExecutor.schedule(thread, new CronExpression(cron))); 195 } 196 this.threadPoolExecutor = cronThreadPoolExecutor; 197 logger.info("scheduled river instance with cron expressions {}", Arrays.asList(schedule)); 198 } else if (interval > 0L) { 199 ScheduledThreadPoolExecutor scheduledthreadPoolExecutor = new ScheduledThreadPoolExecutor(settings.getAsInt("threadpoolsize", 4)); 200 futures.add(scheduledthreadPoolExecutor.scheduleAtFixedRate(thread, 0L, interval, TimeUnit.SECONDS)); 201 this.threadPoolExecutor = scheduledthreadPoolExecutor; 202 logger.info("scheduled river instance at fixed rate of {} seconds", interval); 203 } else { 204 this.threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, 205 new LinkedBlockingQueue<Runnable>()); 206 futures.add(threadPoolExecutor.submit(thread)); 207 logger.info("started river instance for single run"); 208 } 209 return futures; 210 } 211 212 @Override 213 public RiverState getRiverState() { 214 GetRiverStateRequestBuilder riverStateRequestBuilder = new GetRiverStateRequestBuilder(client.admin().cluster()) 215 .setRiverName(riverName.getName()) 216 .setRiverType(riverName.getType()); 217 GetRiverStateResponse riverStateResponse = riverStateRequestBuilder.execute().actionGet(); 218 return riverStateResponse.getRiverState(); 219 } 220 221 private IngestFactory createIngestFactory(final Settings settings) { 222 return new IngestFactory() { 223 @Override 224 public Ingest create() { 225 Integer maxbulkactions = settings.getAsInt("max_bulk_actions", 100); 226 Integer maxconcurrentbulkrequests = settings.getAsInt("max_concurrent_bulk_requests", 227 Runtime.getRuntime().availableProcessors() * 2); 228 ByteSizeValue maxvolume = settings.getAsBytesSize("max_bulk_volume", ByteSizeValue.parseBytesSizeValue("10m")); 229 TimeValue maxrequestwait = settings.getAsTime("max_request_wait", TimeValue.timeValueSeconds(60)); 230 TimeValue flushinterval = settings.getAsTime("flush_interval", TimeValue.timeValueSeconds(5)); 231 return new BulkNodeClient() 232 .maxActionsPerBulkRequest(maxbulkactions) 233 .maxConcurrentBulkRequests(maxconcurrentbulkrequests) 234 .maxRequestWait(maxrequestwait) 235 .maxVolumePerBulkRequest(maxvolume) 236 .flushIngestInterval(flushinterval) 237 .newClient(client); 238 } 239 }; 240 } 241}