001/* 002 * Copyright (C) 2014 Jörg Prante 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.xbib.elasticsearch.river.jdbc.strategy.simple; 017 018import org.elasticsearch.client.Client; 019import org.elasticsearch.common.joda.time.DateTime; 020import org.elasticsearch.common.logging.ESLogger; 021import org.elasticsearch.common.logging.ESLoggerFactory; 022import org.elasticsearch.common.metrics.MeterMetric; 023import org.elasticsearch.common.settings.Settings; 024import org.elasticsearch.common.settings.loader.JsonSettingsLoader; 025import org.elasticsearch.common.unit.TimeValue; 026import org.elasticsearch.common.xcontent.XContentBuilder; 027import org.elasticsearch.common.xcontent.support.XContentMapValues; 028import org.elasticsearch.river.RiverName; 029import org.xbib.elasticsearch.action.plugin.jdbc.state.get.GetRiverStateRequestBuilder; 030import org.xbib.elasticsearch.action.plugin.jdbc.state.get.GetRiverStateResponse; 031import org.xbib.elasticsearch.action.plugin.jdbc.state.post.PostRiverStateRequestBuilder; 032import org.xbib.elasticsearch.action.plugin.jdbc.state.post.PostRiverStateResponse; 033import org.xbib.elasticsearch.action.plugin.jdbc.state.put.PutRiverStateRequestBuilder; 034import org.xbib.elasticsearch.action.plugin.jdbc.state.put.PutRiverStateResponse; 035import org.xbib.elasticsearch.plugin.jdbc.client.IngestFactory; 036import org.xbib.elasticsearch.plugin.jdbc.client.Metric; 037import org.xbib.elasticsearch.plugin.jdbc.state.RiverState; 038import org.xbib.elasticsearch.plugin.jdbc.util.DurationFormatUtil; 039import org.xbib.elasticsearch.plugin.jdbc.util.LocaleUtil; 040import org.xbib.elasticsearch.plugin.jdbc.util.RiverServiceLoader; 041import org.xbib.elasticsearch.plugin.jdbc.util.SQLCommand; 042import org.xbib.elasticsearch.plugin.jdbc.util.VolumeFormatUtil; 043import org.xbib.elasticsearch.river.jdbc.RiverContext; 044import org.xbib.elasticsearch.river.jdbc.RiverFlow; 045import org.xbib.elasticsearch.river.jdbc.RiverMouth; 046import org.xbib.elasticsearch.river.jdbc.RiverSource; 047 048import java.io.IOException; 049import java.text.NumberFormat; 050import java.util.Collections; 051import java.util.List; 052import java.util.Locale; 053import java.util.Map; 054import java.util.Queue; 055import java.util.TimeZone; 056 057import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; 058import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; 059 060/** 061 * Simple river flow implementation 062 */ 063public class SimpleRiverFlow<RC extends RiverContext> implements RiverFlow<RC> { 064 065 private final static ESLogger logger = ESLoggerFactory.getLogger("river.jdbc.SimpleRiverFlow"); 066 067 private final static ESLogger metricsLogger = ESLoggerFactory.getLogger("river.jdbc.RiverMetrics"); 068 069 private RiverName riverName; 070 071 private Settings settings; 072 073 private IngestFactory ingestFactory; 074 075 private Client client; 076 077 private Queue<RiverContext> queue; 078 079 private MeterMetric meterMetric; 080 081 @Override 082 public String strategy() { 083 return "simple"; 084 } 085 086 @Override 087 public SimpleRiverFlow<RC> newInstance() { 088 return new SimpleRiverFlow<RC>(); 089 } 090 091 @Override 092 public RC newRiverContext() { 093 return (RC) new SimpleRiverContext(); 094 } 095 096 @Override 097 public RiverFlow setRiverName(RiverName riverName) { 098 this.riverName = riverName; 099 return this; 100 } 101 102 @Override 103 public RiverName getRiverName() { 104 return riverName; 105 } 106 107 @Override 108 public RiverFlow setSettings(Settings settings) { 109 this.settings = settings; 110 return this; 111 } 112 113 @Override 114 public Settings getSettings() { 115 return settings; 116 } 117 118 @Override 119 public RiverFlow setIngestFactory(IngestFactory ingestFactory) { 120 this.ingestFactory = ingestFactory; 121 return this; 122 } 123 124 public RiverFlow setClient(Client client) { 125 this.client = client; 126 return this; 127 } 128 129 @Override 130 public Client getClient() { 131 return client; 132 } 133 134 @Override 135 public void execute(RC riverContext) throws Exception { 136 logger.debug("execute: {}/{} with context {}", riverName.getName(), riverName.getType(), riverContext); 137 try { 138 beforeFetch(riverContext); 139 fetch(riverContext); 140 } finally { 141 afterFetch(riverContext); 142 } 143 } 144 145 /** 146 * Before a river task (or river run) starts, this method is called. 147 */ 148 protected void beforeFetch(RC riverContext) throws Exception { 149 logger.debug("before fetch: getting river state for {}/{}", riverName.getName(), riverName.getType()); 150 GetRiverStateRequestBuilder riverStateRequestBuilder = new GetRiverStateRequestBuilder(client.admin().cluster()) 151 .setRiverName(riverName.getName()) 152 .setRiverType(riverName.getType()); 153 GetRiverStateResponse riverStateResponse = riverStateRequestBuilder.execute().actionGet(); 154 RiverState riverState = riverStateResponse.getRiverState(); 155 // if river state was not defined yet, define it now 156 if (riverState == null) { 157 riverState = new RiverState() 158 .setName(riverName.getName()) 159 .setType(riverName.getType()) 160 .setStarted(new DateTime()); 161 PutRiverStateRequestBuilder putRiverStateRequestBuilder = new PutRiverStateRequestBuilder(client.admin().cluster()) 162 .setRiverName(riverName.getName()) 163 .setRiverType(riverName.getType()) 164 .setRiverState(riverState); 165 PutRiverStateResponse putRiverStateResponse = putRiverStateRequestBuilder.execute().actionGet(); 166 if (putRiverStateResponse.isAcknowledged()) { 167 logger.debug("before fetch: put initial state {}", riverState); 168 } else { 169 logger.error("befor fetch: initial state not acknowledged", riverState); 170 } 171 } 172 RiverSource riverSource = createRiverSource(riverContext.getDefinition()); 173 RiverMouth riverMouth = createRiverMouth(riverContext.getDefinition()); 174 riverContext = fillRiverContext(riverContext, riverState, riverSource, riverMouth); 175 logger.debug("before fetch: created source = {}, mouth = {}, context = {}", 176 riverSource, riverMouth, riverContext); 177 Integer counter = riverState.getCounter() + 1; 178 riverContext.setRiverState(riverState.setCounter(counter).setLastActive(new DateTime(), null)); 179 PostRiverStateRequestBuilder postRiverStateRequestBuilder = new PostRiverStateRequestBuilder(client.admin().cluster()) 180 .setRiverName(riverName.getName()) 181 .setRiverType(riverName.getType()) 182 .setRiverState(riverContext.getRiverState()); 183 PostRiverStateResponse postRiverStateResponse = postRiverStateRequestBuilder.execute().actionGet(); 184 if (!postRiverStateResponse.isAcknowledged()) { 185 logger.warn("post river state not acknowledged: {}/{}", riverName.getName(), riverName.getType()); 186 } 187 logger.debug("before fetch: state posted = {}", riverState); 188 // call river source "before fetch" 189 try { 190 riverContext.getRiverSource().beforeFetch(); 191 } catch (Exception e) { 192 logger.error(e.getMessage(), e); 193 } 194 // call river mouth "before fetch" 195 try { 196 riverContext.getRiverMouth().beforeFetch(); 197 } catch (Exception e) { 198 logger.error(e.getMessage(), e); 199 } 200 } 201 202 /** 203 * After river context and state setup, when data should be fetched from river source, this method is called. 204 * The default is to invoke the fetch() method of the river source. 205 * 206 * @throws Exception 207 */ 208 protected void fetch(RiverContext riverContext) throws Exception { 209 riverContext.getRiverSource().fetch(); 210 } 211 212 @Override 213 public RiverFlow setMetric(MeterMetric meterMetric) { 214 this.meterMetric = meterMetric; 215 return this; 216 } 217 218 @Override 219 public MeterMetric getMetric() { 220 return meterMetric; 221 } 222 223 224 @Override 225 public RiverFlow setQueue(Queue<RiverContext> queue) { 226 this.queue = queue; 227 return this; 228 } 229 230 @Override 231 public Queue<RiverContext> getQueue() { 232 return queue; 233 } 234 235 /** 236 * After the river task has completed a single run, this method is called. 237 */ 238 protected void afterFetch(RiverContext riverContext) throws Exception { 239 if (riverContext == null) { 240 return; 241 } 242 try { 243 riverContext.getRiverMouth().afterFetch(); 244 } catch (Exception e) { 245 logger.error(e.getMessage(), e); 246 } 247 try { 248 riverContext.getRiverSource().afterFetch(); 249 } catch (Exception e) { 250 logger.error(e.getMessage(), e); 251 } 252 RiverState riverState = riverContext.getRiverState() 253 .setLastActive(riverContext.getRiverState().getLastActiveBegin(), new DateTime()); 254 PostRiverStateRequestBuilder postRiverStateRequestBuilder = new PostRiverStateRequestBuilder(client.admin().cluster()) 255 .setRiverName(riverName.getName()) 256 .setRiverType(riverName.getType()) 257 .setRiverState(riverState); 258 PostRiverStateResponse postRiverStateResponse = postRiverStateRequestBuilder.execute().actionGet(); 259 if (!postRiverStateResponse.isAcknowledged()) { 260 logger.warn("post river state not acknowledged: {}/{}", riverName.getName(), riverName.getType()); 261 } 262 logger.debug("after fetch: state posted = {}", riverState); 263 } 264 265 protected RiverSource createRiverSource(Map<String, Object> params) { 266 RiverSource riverSource = RiverServiceLoader.newRiverSource(strategy()); 267 logger.debug("found river source class {}, params = {}", riverSource, strategy(), params); 268 String url = XContentMapValues.nodeStringValue(params.get("url"), null); 269 String user = XContentMapValues.nodeStringValue(params.get("user"), null); 270 String password = XContentMapValues.nodeStringValue(params.get("password"), null); 271 String locale = XContentMapValues.nodeStringValue(params.get("locale"), LocaleUtil.fromLocale(Locale.getDefault())); 272 String timezone = XContentMapValues.nodeStringValue(params.get("timezone"), TimeZone.getDefault().getID()); 273 riverSource.setUrl(url) 274 .setUser(user) 275 .setPassword(password) 276 .setLocale(LocaleUtil.toLocale(locale)) 277 .setTimeZone(TimeZone.getTimeZone(timezone)); 278 return riverSource; 279 } 280 281 protected RiverMouth createRiverMouth(Map<String, Object> params) throws IOException { 282 RiverMouth riverMouth = RiverServiceLoader.newRiverMouth(strategy()); 283 logger.debug("found river mouth class {}, params = {}", riverMouth, strategy(), params); 284 String index = XContentMapValues.nodeStringValue(params.get("index"), "jdbc"); 285 String type = XContentMapValues.nodeStringValue(params.get("type"), "jdbc"); 286 riverMouth.setIndex(index) 287 .setType(type) 288 .setIngestFactory(ingestFactory); 289 if (params.get("index_settings") != null) { 290 Map<String, String> loadedSettings = new JsonSettingsLoader() 291 .load(jsonBuilder().map((Map<String, Object>) params.get("index_settings")).string()); 292 riverMouth.setIndexSettings(settingsBuilder().put(loadedSettings).build()); 293 } 294 if (params.get("type_mapping") != null) { 295 XContentBuilder builder = jsonBuilder().map((Map<String, Object>) params.get("type_mapping")); 296 riverMouth.setTypeMapping(Collections.singletonMap(type, builder.string())); 297 } 298 return riverMouth; 299 } 300 301 protected RC fillRiverContext(RC riverContext, RiverState state, 302 RiverSource riverSource, 303 RiverMouth riverMouth) throws IOException { 304 Map<String, Object> params = riverContext.getDefinition(); 305 List<SQLCommand> sql = SQLCommand.parse(params); 306 String rounding = XContentMapValues.nodeStringValue(params.get("rounding"), null); 307 int scale = XContentMapValues.nodeIntegerValue(params.get("scale"), 2); 308 boolean autocommit = XContentMapValues.nodeBooleanValue(params.get("autocommit"), false); 309 int fetchsize = 10; 310 String fetchSizeStr = XContentMapValues.nodeStringValue(params.get("fetchsize"), null); 311 if ("min".equals(fetchSizeStr)) { 312 fetchsize = Integer.MIN_VALUE; // for MySQL streaming mode 313 } else if (fetchSizeStr != null) { 314 fetchsize = Integer.parseInt(fetchSizeStr); 315 } 316 int maxrows = XContentMapValues.nodeIntegerValue(params.get("max_rows"), 0); 317 int maxretries = XContentMapValues.nodeIntegerValue(params.get("max_retries"), 3); 318 TimeValue maxretrywait = XContentMapValues.nodeTimeValue(params.get("max_retries_wait"), TimeValue.timeValueSeconds(30)); 319 String resultSetType = XContentMapValues.nodeStringValue(params.get("resultset_type"), "TYPE_FORWARD_ONLY"); 320 String resultSetConcurrency = XContentMapValues.nodeStringValue(params.get("resultset_concurrency"), "CONCUR_UPDATABLE"); 321 boolean shouldIgnoreNull = XContentMapValues.nodeBooleanValue(params.get("ignore_null_values"), false); 322 boolean shouldPrepareDatabaseMetadata = XContentMapValues.nodeBooleanValue(params.get("prepare_database_metadata"), false); 323 boolean shouldPrepareResultSetMetadata = XContentMapValues.nodeBooleanValue(params.get("prepare_resultset_metadata"), false); 324 Map<String, Object> columnNameMap = (Map<String, Object>) params.get("column_name_map"); 325 int queryTimeout = XContentMapValues.nodeIntegerValue(params.get("query_timeout"), 1800); 326 Map<String, Object> connectionProperties = (Map<String, Object>) params.get("connection_properties"); 327 boolean shouldTreatBinaryAsString = XContentMapValues.nodeBooleanValue(params.get("treat_binary_as_string"), false); 328 329 riverContext.setRiverState(state) 330 .setRiverSource(riverSource) 331 .setRiverMouth(riverMouth) 332 .setMetric(meterMetric) 333 .setRounding(rounding) 334 .setScale(scale) 335 .setStatements(sql) 336 .setAutoCommit(autocommit) 337 .setMaxRows(maxrows) 338 .setFetchSize(fetchsize) 339 .setRetries(maxretries) 340 .setMaxRetryWait(maxretrywait) 341 .setResultSetType(resultSetType) 342 .setResultSetConcurrency(resultSetConcurrency) 343 .shouldIgnoreNull(shouldIgnoreNull) 344 .shouldPrepareDatabaseMetadata(shouldPrepareDatabaseMetadata) 345 .shouldPrepareResultSetMetadata(shouldPrepareResultSetMetadata) 346 .setColumnNameMap(columnNameMap) 347 .setQueryTimeout(queryTimeout) 348 .setConnectionProperties(connectionProperties) 349 .shouldTreatBinaryAsString(shouldTreatBinaryAsString); 350 riverSource.setRiverContext(riverContext); 351 riverMouth.setRiverContext(riverContext); 352 return riverContext; 353 } 354 355 @Override 356 public void logMetrics(RiverContext riverContext, String cause) { 357 MeterMetric metric = getMetric(); 358 if (metric == null) { 359 return; 360 } 361 if (riverContext == null || riverContext.getRiverMouth() == null) { 362 return; 363 } 364 Metric mouthMetric = riverContext.getRiverMouth().getMetric(); 365 if (mouthMetric == null) { 366 return; 367 } 368 long ticks = metric.count(); 369 double mean = metric.meanRate(); 370 double oneminute = metric.oneMinuteRate(); 371 double fiveminute = metric.fiveMinuteRate(); 372 double fifteenminute = metric.fifteenMinuteRate(); 373 374 long bytes = mouthMetric.getTotalIngestSizeInBytes().count(); 375 long elapsed = mouthMetric.elapsed() / 1000000; 376 String elapsedhuman = DurationFormatUtil.formatDurationWords(elapsed, true, true); 377 //double dps = ticks * 1000 / elapsed; 378 double avg = bytes / (ticks + 1); // avoid div by zero 379 double mbps = (bytes * 1000.0 / elapsed) / (1024.0 * 1024.0); 380 NumberFormat formatter = NumberFormat.getNumberInstance(); 381 metricsLogger.info("{}: river {}/{} metrics: {} rows, {} mean, ({} {} {}), ingest metrics: elapsed {}, {} bytes, {} avg, {} MB/s", 382 cause, 383 riverName.getType(), 384 riverName.getName(), 385 ticks, 386 mean, 387 oneminute, 388 fiveminute, 389 fifteenminute, 390 elapsedhuman, 391 VolumeFormatUtil.convertFileSize(bytes), 392 VolumeFormatUtil.convertFileSize(avg), 393 formatter.format(mbps) 394 ); 395 } 396 397}