001/* 002 * Copyright (C) 2014 Jörg Prante 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.xbib.elasticsearch.river.jdbc.strategy.simple; 017 018import org.elasticsearch.common.logging.ESLogger; 019import org.elasticsearch.common.logging.ESLoggerFactory; 020import org.elasticsearch.common.metrics.MeterMetric; 021import org.elasticsearch.common.unit.TimeValue; 022import org.elasticsearch.common.xcontent.XContentBuilder; 023import org.elasticsearch.common.xcontent.XContentHelper; 024import org.xbib.elasticsearch.plugin.jdbc.state.RiverState; 025import org.xbib.elasticsearch.plugin.jdbc.util.SQLCommand; 026import org.xbib.elasticsearch.river.jdbc.RiverContext; 027import org.xbib.elasticsearch.river.jdbc.RiverMouth; 028import org.xbib.elasticsearch.river.jdbc.RiverSource; 029 030import java.io.IOException; 031import java.math.BigDecimal; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035 036import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; 037 038/** 039 * The river context consists of the parameters that span source and mouth settings. 040 * It represents the river state, for supporting the river task execution, and river scripting. 041 */ 042public class SimpleRiverContext implements RiverContext { 043 044 private final static ESLogger logger = ESLoggerFactory.getLogger("river.jdbc.SimpleRiverContext"); 045 046 private Map<String, Object> definition; 047 048 /** 049 * The state of the river 050 */ 051 private RiverState riverState; 052 053 /** 054 * The metrics 055 */ 056 private MeterMetric metric; 057 058 /** 059 * The source of the river 060 */ 061 private RiverSource source; 062 063 /** 064 * The target of the river 065 */ 066 private RiverMouth mouth; 067 068 /** 069 * Autocomit enabled or not 070 */ 071 private boolean autocommit; 072 073 /** 074 * The fetch size 075 */ 076 private int fetchSize; 077 078 /** 079 * The maximum numbe rof rows per statement execution 080 */ 081 private int maxRows; 082 083 /** 084 * The number of retries 085 */ 086 private int retries = 1; 087 088 /** 089 * The time to wait between retries 090 */ 091 private TimeValue maxretrywait = TimeValue.timeValueSeconds(30); 092 093 private int rounding; 094 095 private int scale = -1; 096 097 private String resultSetType = "TYPE_FORWARD_ONLY"; 098 099 private String resultSetConcurrency = "CONCUR_UPDATABLE"; 100 101 private boolean shouldIgnoreNull; 102 103 private boolean shouldPrepareResultSetMetadata; 104 105 private boolean shouldPrepareDatabaseMetadata; 106 107 private Map<String, Object> lastResultSetMetadata = new HashMap<String, Object>(); 108 109 private Map<String, Object> lastDatabaseMetadata = new HashMap<String, Object>(); 110 111 private long lastRowCount; 112 113 private long lastStartDate; 114 115 private long lastEndDate; 116 117 private long lastExecutionStartDate; 118 119 private long lastExecutionEndDate; 120 121 private Map<String, Object> columnNameMap; 122 123 private Map<String, Object> lastRow = new HashMap<String, Object>(); 124 125 private List<SQLCommand> sql; 126 127 private boolean isTimestampDiffSupported; 128 129 private int queryTimeout; 130 131 private Map<String, Object> connectionProperties = new HashMap<String, Object>(); 132 133 private boolean shouldTreatBinaryAsString; 134 135 @Override 136 public SimpleRiverContext setDefinition(Map<String, Object> definition) { 137 this.definition = definition; 138 return this; 139 } 140 141 @Override 142 public Map<String, Object> getDefinition() { 143 return definition; 144 } 145 146 @Override 147 public SimpleRiverContext setRiverState(RiverState riverState) { 148 this.riverState = riverState; 149 return this; 150 } 151 152 @Override 153 public RiverState getRiverState() { 154 return riverState; 155 } 156 157 @Override 158 public SimpleRiverContext setRiverSource(RiverSource source) { 159 this.source = source; 160 return this; 161 } 162 163 @Override 164 public RiverSource getRiverSource() { 165 return source; 166 } 167 168 public SimpleRiverContext setRiverMouth(RiverMouth mouth) { 169 this.mouth = mouth; 170 return this; 171 } 172 173 public RiverMouth getRiverMouth() { 174 return mouth; 175 } 176 177 @Override 178 public RiverContext setMetric(MeterMetric metric) { 179 this.metric = metric; 180 return this; 181 } 182 183 @Override 184 public MeterMetric getMetric() { 185 return metric; 186 } 187 188 public SimpleRiverContext setAutoCommit(boolean autocommit) { 189 this.autocommit = autocommit; 190 return this; 191 } 192 193 public boolean getAutoCommit() { 194 return autocommit; 195 } 196 197 public SimpleRiverContext setFetchSize(int fetchSize) { 198 this.fetchSize = fetchSize; 199 return this; 200 } 201 202 public int getFetchSize() { 203 return fetchSize; 204 } 205 206 public SimpleRiverContext setMaxRows(int maxRows) { 207 this.maxRows = maxRows; 208 return this; 209 } 210 211 public int getMaxRows() { 212 return maxRows; 213 } 214 215 public SimpleRiverContext setRetries(int retries) { 216 this.retries = retries; 217 return this; 218 } 219 220 public int getRetries() { 221 return retries; 222 } 223 224 public SimpleRiverContext setMaxRetryWait(TimeValue maxretrywait) { 225 this.maxretrywait = maxretrywait; 226 return this; 227 } 228 229 public TimeValue getMaxRetryWait() { 230 return maxretrywait; 231 } 232 233 public SimpleRiverContext setRounding(String rounding) { 234 if ("ceiling".equalsIgnoreCase(rounding)) { 235 this.rounding = BigDecimal.ROUND_CEILING; 236 } else if ("down".equalsIgnoreCase(rounding)) { 237 this.rounding = BigDecimal.ROUND_DOWN; 238 } else if ("floor".equalsIgnoreCase(rounding)) { 239 this.rounding = BigDecimal.ROUND_FLOOR; 240 } else if ("halfdown".equalsIgnoreCase(rounding)) { 241 this.rounding = BigDecimal.ROUND_HALF_DOWN; 242 } else if ("halfeven".equalsIgnoreCase(rounding)) { 243 this.rounding = BigDecimal.ROUND_HALF_EVEN; 244 } else if ("halfup".equalsIgnoreCase(rounding)) { 245 this.rounding = BigDecimal.ROUND_HALF_UP; 246 } else if ("unnecessary".equalsIgnoreCase(rounding)) { 247 this.rounding = BigDecimal.ROUND_UNNECESSARY; 248 } else if ("up".equalsIgnoreCase(rounding)) { 249 this.rounding = BigDecimal.ROUND_UP; 250 } 251 return this; 252 } 253 254 public int getRounding() { 255 return rounding; 256 } 257 258 public SimpleRiverContext setScale(int scale) { 259 this.scale = scale; 260 return this; 261 } 262 263 public int getScale() { 264 return scale; 265 } 266 267 public SimpleRiverContext setResultSetType(String resultSetType) { 268 this.resultSetType = resultSetType; 269 return this; 270 } 271 272 public String getResultSetType() { 273 return resultSetType; 274 } 275 276 public SimpleRiverContext setResultSetConcurrency(String resultSetConcurrency) { 277 this.resultSetConcurrency = resultSetConcurrency; 278 return this; 279 } 280 281 public String getResultSetConcurrency() { 282 return resultSetConcurrency; 283 } 284 285 public SimpleRiverContext shouldIgnoreNull(boolean shouldIgnoreNull) { 286 this.shouldIgnoreNull = shouldIgnoreNull; 287 return this; 288 } 289 290 public boolean shouldIgnoreNull() { 291 return shouldIgnoreNull; 292 } 293 294 public SimpleRiverContext shouldPrepareResultSetMetadata(boolean shouldPrepareResultSetMetadata) { 295 this.shouldPrepareResultSetMetadata = shouldPrepareResultSetMetadata; 296 return this; 297 } 298 299 public boolean shouldPrepareResultSetMetadata() { 300 return shouldPrepareResultSetMetadata; 301 } 302 303 public SimpleRiverContext shouldPrepareDatabaseMetadata(boolean shouldPrepareDatabaseMetadata) { 304 this.shouldPrepareDatabaseMetadata = shouldPrepareDatabaseMetadata; 305 return this; 306 } 307 308 public boolean shouldPrepareDatabaseMetadata() { 309 return shouldPrepareDatabaseMetadata; 310 } 311 312 public SimpleRiverContext setLastResultSetMetadata(Map<String, Object> lastResultSetMetadata) { 313 this.lastResultSetMetadata = lastResultSetMetadata; 314 return this; 315 } 316 317 public Map<String, Object> getLastResultSetMetadata() { 318 return lastResultSetMetadata; 319 } 320 321 public SimpleRiverContext setLastDatabaseMetadata(Map<String, Object> lastDatabaseMetadata) { 322 this.lastDatabaseMetadata = lastDatabaseMetadata; 323 return this; 324 } 325 326 public Map<String, Object> getLastDatabaseMetadata() { 327 return lastDatabaseMetadata; 328 } 329 330 public SimpleRiverContext setLastRowCount(long lastRowCount) { 331 this.lastRowCount = lastRowCount; 332 return this; 333 } 334 335 public long getLastRowCount() { 336 return lastRowCount; 337 } 338 339 public SimpleRiverContext setLastStartDate(long lastStartDate) { 340 this.lastStartDate = lastStartDate; 341 return this; 342 } 343 344 public long getLastStartDate() { 345 return lastStartDate; 346 } 347 348 public SimpleRiverContext setLastEndDate(long lastEndDate) { 349 this.lastEndDate = lastEndDate; 350 return this; 351 } 352 353 public long getLastEndDate() { 354 return lastEndDate; 355 } 356 357 public SimpleRiverContext setLastExecutionStartDate(long lastExecutionStartDate) { 358 this.lastExecutionStartDate = lastExecutionStartDate; 359 return this; 360 } 361 362 public long getLastExecutionStartDate() { 363 return lastExecutionStartDate; 364 } 365 366 public SimpleRiverContext setLastExecutionEndDate(long lastExecutionEndDate) { 367 this.lastExecutionEndDate = lastExecutionEndDate; 368 return this; 369 } 370 371 public long getLastExecutionEndDate() { 372 return lastExecutionEndDate; 373 } 374 375 public SimpleRiverContext setColumnNameMap(Map<String, Object> columnNameMap) { 376 this.columnNameMap = columnNameMap; 377 return this; 378 } 379 380 public Map<String, Object> getColumnNameMap() { 381 return columnNameMap; 382 } 383 384 385 public SimpleRiverContext setLastRow(Map<String, Object> lastRow) { 386 this.lastRow = lastRow; 387 return this; 388 } 389 390 public Map<String, Object> getLastRow() { 391 return lastRow; 392 } 393 394 395 public SimpleRiverContext setStatements(List<SQLCommand> sql) { 396 this.sql = sql; 397 return this; 398 } 399 400 public List<SQLCommand> getStatements() { 401 return sql; 402 } 403 404 public SimpleRiverContext setTimestampDiffSupported(boolean supported) { 405 this.isTimestampDiffSupported = supported; 406 return this; 407 } 408 409 public boolean isTimestampDiffSupported() { 410 return isTimestampDiffSupported; 411 } 412 413 public SimpleRiverContext setQueryTimeout(int queryTimeout) { 414 this.queryTimeout = queryTimeout; 415 return this; 416 } 417 418 public int getQueryTimeout() { 419 return queryTimeout; 420 } 421 422 public SimpleRiverContext setConnectionProperties(Map<String, Object> connectionProperties) { 423 this.connectionProperties = connectionProperties; 424 return this; 425 } 426 427 public Map<String, Object> getConnectionProperties() { 428 return connectionProperties; 429 } 430 431 public SimpleRiverContext shouldTreatBinaryAsString(boolean shouldTreatBinaryAsString) { 432 this.shouldTreatBinaryAsString = shouldTreatBinaryAsString; 433 return this; 434 } 435 436 public boolean shouldTreatBinaryAsString() { 437 return shouldTreatBinaryAsString; 438 } 439 440 public SimpleRiverContext release() { 441 try { 442 if (mouth != null) { 443 mouth.shutdown(); 444 mouth = null; 445 } 446 } catch (IOException e) { 447 logger.error(e.getMessage(), e); 448 } 449 try { 450 if (source != null) { 451 source.shutdown(); 452 source = null; 453 } 454 } catch (IOException e) { 455 logger.error(e.getMessage(), e); 456 } 457 return this; 458 } 459 460 public Map<String, Object> asMap() { 461 try { 462 XContentBuilder builder = jsonBuilder() 463 .startObject() 464 .field("autocommit", autocommit) 465 .field("fetchsize", fetchSize) 466 .field("maxrows", maxRows) 467 .field("retries", retries) 468 .field("maxretrywait", maxretrywait) 469 .field("resultsetconcurrency", resultSetConcurrency) 470 .field("resultsettype", resultSetType) 471 .field("rounding", rounding) 472 .field("scale", scale) 473 .field("shouldignorenull", shouldIgnoreNull) 474 .field("lastResultSetMetadata", lastResultSetMetadata) 475 .field("lastDatabaseMetadata", lastDatabaseMetadata) 476 .field("lastStartDate", lastStartDate) 477 .field("lastEndDate", lastEndDate) 478 .field("lastExecutionStartDate", lastExecutionStartDate) 479 .field("lastExecutionEndDate", lastExecutionEndDate) 480 .field("columnNameMap", columnNameMap) 481 .field("lastRow", lastRow) 482 .field("sql", sql) 483 .field("isTimestampDiffSupported", isTimestampDiffSupported) 484 .field("queryTimeout", queryTimeout) 485 .field("connectionProperties") 486 .map(connectionProperties) 487 .endObject(); 488 return XContentHelper.convertToMap(builder.bytes(), true).v2(); 489 } catch (IOException e) { 490 // should really not happen 491 return new HashMap<String, Object>(); 492 } 493 } 494 495 @Override 496 public String toString() { 497 return asMap().toString(); 498 } 499}