001/* 002 * Copyright (C) 2014 Jörg Prante 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.xbib.elasticsearch.river.jdbc.strategy.simple; 017 018import org.elasticsearch.common.joda.time.DateTime; 019import org.elasticsearch.common.joda.time.DateTimeZone; 020import org.elasticsearch.common.logging.ESLogger; 021import org.elasticsearch.common.logging.ESLoggerFactory; 022import org.xbib.elasticsearch.plugin.jdbc.keyvalue.KeyValueStreamListener; 023import org.xbib.elasticsearch.plugin.jdbc.util.RiverMouthKeyValueStreamListener; 024import org.xbib.elasticsearch.plugin.jdbc.util.SQLCommand; 025import org.xbib.elasticsearch.river.jdbc.RiverSource; 026 027import java.io.IOException; 028import java.math.BigDecimal; 029import java.sql.Array; 030import java.sql.Blob; 031import java.sql.CallableStatement; 032import java.sql.Clob; 033import java.sql.Connection; 034import java.sql.DatabaseMetaData; 035import java.sql.Date; 036import java.sql.DriverManager; 037import java.sql.NClob; 038import java.sql.PreparedStatement; 039import java.sql.ResultSet; 040import java.sql.ResultSetMetaData; 041import java.sql.SQLDataException; 042import java.sql.SQLException; 043import java.sql.SQLFeatureNotSupportedException; 044import java.sql.SQLNonTransientConnectionException; 045import java.sql.SQLRecoverableException; 046import java.sql.SQLXML; 047import java.sql.Statement; 048import java.sql.Time; 049import java.sql.Timestamp; 050import java.sql.Types; 051import java.text.NumberFormat; 052import java.text.ParseException; 053import java.util.Calendar; 054import java.util.HashMap; 055import java.util.LinkedList; 056import java.util.List; 057import java.util.Locale; 058import java.util.Map; 059import java.util.Properties; 060import java.util.TimeZone; 061 062import static org.elasticsearch.common.collect.Lists.newLinkedList; 063 064/** 065 * Simple river source implementation. 066 * The simple river source iterates through a JDBC result set, 067 * merges the rows into Elasticsearch documents, and passes them to 068 * a bulk indexer. 069 * There are two channels open, one for reading the database, one for writing. 070 */ 071public class SimpleRiverSource<RC extends SimpleRiverContext> implements RiverSource<RC> { 072 073 private final static ESLogger logger = ESLoggerFactory.getLogger("river.jdbc.SimpleRiverSource"); 074 075 protected RC context; 076 077 protected String url; 078 079 protected String user; 080 081 protected String password; 082 083 protected Connection readConnection; 084 085 protected Connection writeConnection; 086 087 protected Locale locale; 088 089 protected TimeZone timezone; 090 091 protected Calendar calendar; 092 093 protected DateTimeZone dateTimeZone; 094 095 protected volatile boolean suspended; 096 097 @Override 098 public String strategy() { 099 return "simple"; 100 } 101 102 @Override 103 public SimpleRiverSource<RC> newInstance() { 104 return new SimpleRiverSource<RC>(); 105 } 106 107 @Override 108 public SimpleRiverSource setRiverContext(RC context) { 109 this.context = context; 110 return this; 111 } 112 113 @Override 114 public SimpleRiverSource setUrl(String url) { 115 this.url = url; 116 return this; 117 } 118 119 public String getUrl() { 120 return url; 121 } 122 123 @Override 124 public SimpleRiverSource setUser(String user) { 125 this.user = user; 126 return this; 127 } 128 129 @Override 130 public SimpleRiverSource setPassword(String password) { 131 this.password = password; 132 return this; 133 } 134 135 @Override 136 public SimpleRiverSource setLocale(Locale locale) { 137 this.locale = locale; 138 // initialize locale for JDBC drivers internals 139 Locale.setDefault(locale); 140 if (timezone == null) { 141 timezone = TimeZone.getTimeZone("UTC"); 142 } 143 this.calendar = Calendar.getInstance(timezone, locale); 144 logger.debug("calendar timezone for JDBC timestamps = {}", calendar.getTimeZone().getDisplayName()); 145 return this; 146 } 147 148 @Override 149 public Locale getLocale() { 150 if (locale == null) { 151 setLocale(Locale.getDefault()); 152 } 153 return locale; 154 } 155 156 @Override 157 public SimpleRiverSource setTimeZone(TimeZone timezone) { 158 this.timezone = timezone; 159 TimeZone.setDefault(timezone); // for JDBC drivers internals 160 if (locale == null) { 161 locale = Locale.getDefault(); 162 } 163 this.calendar = Calendar.getInstance(timezone, locale); 164 logger.debug("calendar timezone for JDBC timestamps = {}", calendar.getTimeZone().getDisplayName()); 165 // for formatting fetched JDBC time values 166 this.dateTimeZone = DateTimeZone.forTimeZone(timezone); 167 return this; 168 } 169 170 @Override 171 public TimeZone getTimeZone() { 172 if (timezone == null) { 173 setTimeZone(TimeZone.getDefault()); 174 } 175 return timezone; 176 } 177 178 @Override 179 public void suspend() { 180 this.suspended = true; 181 } 182 183 @Override 184 public void resume() { 185 this.suspended = false; 186 } 187 188 /** 189 * Get JDBC connection for reading 190 * 191 * @return the connection 192 * @throws SQLException when SQL execution gives an error 193 */ 194 @Override 195 public synchronized Connection getConnectionForReading() throws SQLException { 196 boolean invalid = readConnection == null || readConnection.isClosed(); 197 try { 198 invalid = invalid || !readConnection.isValid(5); 199 } catch (AbstractMethodError e) { 200 // old/buggy JDBC driver 201 logger.debug(e.getMessage()); 202 } catch (SQLFeatureNotSupportedException e) { 203 // postgresql does not support isValid() 204 logger.debug(e.getMessage()); 205 } 206 if (invalid) { 207 int retries = context.getRetries(); 208 while (retries > 0) { 209 retries--; 210 try { 211 Properties properties = new Properties(); 212 properties.put("user", user); 213 properties.put("password", password); 214 if (context.getConnectionProperties() != null) { 215 properties.putAll(context.getConnectionProperties()); 216 } 217 readConnection = DriverManager.getConnection(url, properties); 218 DatabaseMetaData metaData = readConnection.getMetaData(); 219 if (context.shouldPrepareDatabaseMetadata()) { 220 prepare(metaData); 221 } 222 if (metaData.getTimeDateFunctions().contains("TIMESTAMPDIFF")) { 223 context.setTimestampDiffSupported(true); 224 } 225 // "readonly" is required by MySQL for large result streaming 226 readConnection.setReadOnly(true); 227 // Postgresql cursor mode condition: 228 // fetchsize > 0, no scrollable result set, no auto commit, no holdable cursors over commit 229 // https://github.com/pgjdbc/pgjdbc/blob/master/org/postgresql/jdbc2/AbstractJdbc2Statement.java#L514 230 //readConnection.setHoldability(ResultSet.HOLD_CURSORS_OVER_COMMIT); 231 // many drivers don't like autocommit=true 232 readConnection.setAutoCommit(context.getAutoCommit()); 233 return readConnection; 234 } catch (SQLException e) { 235 logger.error("while opening read connection: " + url + " " + e.getMessage(), e); 236 try { 237 logger.debug("waiting for {} seconds", context.getMaxRetryWait().seconds()); 238 Thread.sleep(context.getMaxRetryWait().millis()); 239 } catch (InterruptedException ex) { 240 // do nothing 241 } 242 } 243 } 244 } 245 return readConnection; 246 } 247 248 /** 249 * Get JDBC connection for writing. FOr executing "update", "insert", callable statements 250 * 251 * @return the connection 252 * @throws SQLException when SQL execution gives an error 253 */ 254 @Override 255 public synchronized Connection getConnectionForWriting() throws SQLException { 256 boolean invalid = writeConnection == null || writeConnection.isClosed(); 257 try { 258 invalid = invalid || !writeConnection.isValid(5); 259 } catch (AbstractMethodError e) { 260 // old/buggy JDBC driver do not implement isValid() 261 } catch (SQLFeatureNotSupportedException e) { 262 // Example: postgresql does implement but not support isValid() 263 } 264 if (invalid) { 265 int retries = context.getRetries(); 266 while (retries > 0) { 267 retries--; 268 try { 269 Properties properties = new Properties(); 270 properties.put("user", user); 271 properties.put("password", password); 272 if (context.getConnectionProperties() != null) { 273 properties.putAll(context.getConnectionProperties()); 274 } 275 writeConnection = DriverManager.getConnection(url, properties); 276 // many drivers don't like autocommit=true 277 writeConnection.setAutoCommit(context.getAutoCommit()); 278 return writeConnection; 279 } catch (SQLNonTransientConnectionException e) { 280 // ignore derby drop=true silently 281 } catch (SQLException e) { 282 logger.error("while opening write connection: " + url + " " + e.getMessage(), e); 283 try { 284 Thread.sleep(context.getMaxRetryWait().millis()); 285 } catch (InterruptedException ex) { 286 // do nothing 287 } 288 } 289 } 290 } 291 return writeConnection; 292 } 293 294 @Override 295 public void beforeFetch() throws Exception { 296 context.setLastStartDate(new DateTime().getMillis()); 297 } 298 299 /** 300 * River fetch. Issue a series of SQL statements. 301 * 302 * @throws SQLException when SQL execution gives an error 303 * @throws IOException when input/output error occurs 304 */ 305 @Override 306 public void fetch() throws SQLException, IOException { 307 logger.debug("fetching, {} SQL commands", context.getStatements().size()); 308 try { 309 for (SQLCommand command : context.getStatements()) { 310 context.setLastExecutionStartDate(new DateTime().getMillis()); 311 try { 312 if (command.isCallable()) { 313 logger.debug("{} executing callable SQL: {}", this, command); 314 executeCallable(command); 315 } else if (!command.getParameters().isEmpty()) { 316 logger.debug("{} executing SQL with params: {}", this, command); 317 executeWithParameter(command); 318 } else { 319 logger.debug("{} executing SQL without params: {}", this, command); 320 execute(command); 321 } 322 context.setLastExecutionEndDate(new DateTime().getMillis()); 323 } catch (SQLRecoverableException e) { 324 long millis = context.getMaxRetryWait().getMillis(); 325 logger.warn("retrying after " + millis / 1000 + " seconds, got exception ", e); 326 Thread.sleep(context.getMaxRetryWait().getMillis()); 327 if (command.isCallable()) { 328 logger.debug("retrying, executing callable SQL: {}", command); 329 executeCallable(command); 330 } else if (!command.getParameters().isEmpty()) { 331 logger.debug("retrying, executing SQL with params: {}", command); 332 executeWithParameter(command); 333 } else { 334 logger.debug("retrying, executing SQL without params: {}", command); 335 execute(command); 336 } 337 context.setLastExecutionEndDate(new DateTime().getMillis()); 338 } 339 } 340 } catch (Exception e) { 341 throw new IOException(e); 342 } 343 } 344 345 @Override 346 public void afterFetch() throws Exception { 347 context.setLastEndDate(new DateTime().getMillis()); 348 shutdown(); 349 } 350 351 @Override 352 public void shutdown() { 353 closeReading(); 354 logger.debug("read connection closed"); 355 readConnection = null; 356 closeWriting(); 357 logger.debug("write connection closed"); 358 writeConnection = null; 359 } 360 361 /** 362 * Execute SQL query command without parameter binding. 363 * 364 * @param command the SQL command 365 * @throws SQLException when SQL execution gives an error 366 * @throws IOException when input/output error occurs 367 */ 368 private void execute(SQLCommand command) throws Exception { 369 Statement statement = null; 370 ResultSet results = null; 371 try { 372 if (command.isQuery()) { 373 // use read connection 374 // we must not use prepareStatement for Postgresql! 375 // Postgresql requires direct use of executeQuery(sql) for cursor with fetchsize set. 376 Connection connection = getConnectionForReading(); 377 if (connection != null) { 378 logger.debug("{} using read connection {} for executing query", this, connection); 379 statement = connection.createStatement(); 380 statement.setQueryTimeout(context.getQueryTimeout()); 381 results = executeQuery(statement, command.getSQL()); 382 if (context.shouldPrepareResultSetMetadata()) { 383 prepare(results.getMetaData()); 384 } 385 RiverMouthKeyValueStreamListener<Object, Object> listener = new RiverMouthKeyValueStreamListener<Object, Object>() 386 .output(context.getRiverMouth()) 387 .shouldIgnoreNull(context.shouldIgnoreNull()); 388 merge(command, results, listener); 389 } 390 } else { 391 // use write connection 392 Connection connection = getConnectionForWriting(); 393 if (connection != null) { 394 logger.debug("{} using write connection {} for executing insert/update", this, connection); 395 statement = connection.createStatement(); 396 executeUpdate(statement, command.getSQL()); 397 } 398 } 399 } finally { 400 close(results); 401 close(statement); 402 } 403 } 404 405 /** 406 * Execute SQL query command with parameter binding. 407 * 408 * @param command the SQL command 409 * @throws SQLException when SQL execution gives an error 410 * @throws IOException when input/output error occurs 411 */ 412 private void executeWithParameter(SQLCommand command) throws Exception { 413 PreparedStatement statement = null; 414 ResultSet results = null; 415 try { 416 if (command.isQuery()) { 417 statement = prepareQuery(command.getSQL()); 418 bind(statement, command.getParameters()); 419 results = executeQuery(statement); 420 RiverMouthKeyValueStreamListener<Object, Object> listener = new RiverMouthKeyValueStreamListener<Object, Object>() 421 .output(context.getRiverMouth()) 422 .shouldIgnoreNull(context.shouldIgnoreNull()); 423 merge(command, results, listener); 424 } else { 425 statement = prepareUpdate(command.getSQL()); 426 bind(statement, command.getParameters()); 427 executeUpdate(statement); 428 } 429 } finally { 430 close(results); 431 close(statement); 432 } 433 } 434 435 /** 436 * Execute callable SQL command 437 * 438 * @param command the SQL command 439 * @throws SQLException when SQL execution gives an error 440 * @throws IOException when input/output error occurs 441 */ 442 private void executeCallable(SQLCommand command) throws Exception { 443 // call stored procedure 444 CallableStatement statement = null; 445 try { 446 // we do not make a difference betwwen read/write and we assume 447 // it is safe to use the read connection and query the DB 448 Connection connection = getConnectionForWriting(); 449 logger.debug("{} using write connection {} for executing callable statement", this, connection); 450 if (connection != null) { 451 statement = connection.prepareCall(command.getSQL()); 452 if (!command.getParameters().isEmpty()) { 453 bind(statement, command.getParameters()); 454 } 455 if (!command.getRegister().isEmpty()) { 456 register(statement, command.getRegister()); 457 } 458 boolean hasRows = statement.execute(); 459 RiverMouthKeyValueStreamListener<Object, Object> listener = new RiverMouthKeyValueStreamListener<Object, Object>() 460 .output(context.getRiverMouth()); 461 if (hasRows) { 462 logger.debug("callable execution created result set"); 463 while (hasRows) { 464 // merge result set, but use register 465 merge(command, statement.getResultSet(), listener); 466 hasRows = statement.getMoreResults(); 467 } 468 } else { 469 // no result set, merge from registered params only 470 merge(command, statement, listener); 471 } 472 } 473 } finally { 474 close(statement); 475 } 476 } 477 478 /** 479 * Merge key/values from JDBC result set 480 * 481 * @param results result set 482 * @param listener the value listener 483 * @throws SQLException when SQL execution gives an error 484 * @throws IOException when input/output error occurs 485 */ 486 protected void merge(SQLCommand command, ResultSet results, KeyValueStreamListener listener) 487 throws SQLException, IOException, ParseException { 488 if (listener == null) { 489 return; 490 } 491 beforeRows(command, results, listener); 492 long rows = 0L; 493 while (nextRow(command, results, listener)) { 494 rows++; 495 if (context.getMetric() != null) { 496 context.getMetric().mark(); 497 } 498 while (suspended) { 499 try { 500 Thread.sleep(1000L); 501 } catch (InterruptedException e) { 502 Thread.currentThread().interrupt(); 503 logger.warn("interrupted"); 504 } 505 } 506 } 507 context.setLastRowCount(rows); 508 if (rows > 0) { 509 logger.debug("merged {} rows", rows); 510 } else { 511 logger.debug("no rows merged "); 512 } 513 afterRows(command, results, listener); 514 } 515 516 /** 517 * Prepare a query statement 518 * 519 * @param sql the SQL statement 520 * @return a prepared statement 521 * @throws SQLException when SQL execution gives an error 522 */ 523 @Override 524 public PreparedStatement prepareQuery(String sql) throws SQLException { 525 Connection connection = getConnectionForReading(); 526 if (connection == null) { 527 throw new SQLException("can't connect to source " + url); 528 } 529 logger.debug("preparing statement with SQL {}", sql); 530 int type = "TYPE_FORWARD_ONLY".equals(context.getResultSetType()) ? 531 ResultSet.TYPE_FORWARD_ONLY : "TYPE_SCROLL_SENSITIVE".equals(context.getResultSetType()) ? 532 ResultSet.TYPE_SCROLL_SENSITIVE : "TYPE_SCROLL_INSENSITIVE".equals(context.getResultSetType()) ? 533 ResultSet.TYPE_SCROLL_INSENSITIVE : ResultSet.TYPE_FORWARD_ONLY; 534 int concurrency = "CONCUR_READ_ONLY".equals(context.getResultSetConcurrency()) ? 535 ResultSet.CONCUR_READ_ONLY : ResultSet.CONCUR_UPDATABLE; 536 return connection.prepareStatement(sql, type, concurrency); 537 } 538 539 /** 540 * Prepare an update statement 541 * 542 * @param sql the SQL statement 543 * @return a prepared statement 544 * @throws SQLException when SQL execution gives an error 545 */ 546 @Override 547 public PreparedStatement prepareUpdate(String sql) throws SQLException { 548 Connection connection = getConnectionForWriting(); 549 if (connection == null) { 550 throw new SQLException("can't connect to source " + url); 551 } 552 return connection.prepareStatement(sql); 553 } 554 555 /** 556 * Bind values to prepared statement 557 * 558 * @param statement the prepared statement 559 * @param values the values to bind 560 * @throws SQLException when SQL execution gives an error 561 */ 562 @Override 563 public SimpleRiverSource bind(PreparedStatement statement, List<Object> values) throws SQLException { 564 if (values == null) { 565 logger.warn("no values given for bind"); 566 return this; 567 } 568 for (int i = 1; i <= values.size(); i++) { 569 bind(statement, i, values.get(i - 1)); 570 } 571 return this; 572 } 573 574 /** 575 * Merge key/values from registered params of a callable statement 576 * 577 * @param statement callable statement 578 * @param listener the value listener 579 * @throws SQLException when SQL execution gives an error 580 * @throws IOException when input/output error occurs 581 */ 582 @SuppressWarnings({"unchecked"}) 583 private void merge(SQLCommand command, CallableStatement statement, KeyValueStreamListener listener) 584 throws SQLException, IOException { 585 Map<String, Object> map = command.getRegister(); 586 if (map.isEmpty()) { 587 // no register given, return without doing anything 588 return; 589 } 590 List<String> keys = newLinkedList(); 591 List<Object> values = newLinkedList(); 592 for (Map.Entry<String, Object> entry : map.entrySet()) { 593 String k = entry.getKey(); 594 Map<String, Object> v = (Map<String, Object>) entry.getValue(); 595 Integer pos = (Integer) v.get("pos"); // the parameter position of the value 596 String field = (String) v.get("field"); // the field for indexing the value (if not key name) 597 keys.add(field != null ? field : k); 598 values.add(statement.getObject(pos)); 599 } 600 logger.trace("merge callable statement result: keys={} values={}", keys, values); 601 listener.keys(keys); 602 listener.values(values); 603 listener.end(); 604 } 605 606 /** 607 * Register variables in callable statement 608 * 609 * @param statement callable statement 610 * @param values values 611 * @return this river source 612 * @throws SQLException when SQL execution gives an error 613 */ 614 @Override 615 @SuppressWarnings({"unchecked"}) 616 public SimpleRiverSource register(CallableStatement statement, Map<String, Object> values) throws SQLException { 617 if (values == null) { 618 return this; 619 } 620 for (Map.Entry<String, Object> me : values.entrySet()) { 621 // { "key" : { "pos": n, "type" : "VARCHAR", "field" : "fieldname" }, ... } 622 Map<String, Object> m = (Map<String, Object>) me.getValue(); 623 Integer n = (Integer) m.get("pos"); 624 String type = (String) m.get("type"); 625 if (n != null && type != null) { 626 logger.debug("n={} type={}", n, toJDBCType(type)); 627 try { 628 statement.registerOutParameter(n, toJDBCType(type)); 629 } catch (Throwable t) { 630 logger.warn("can't register out parameter " + n + " of type " + type); 631 } 632 } 633 } 634 return this; 635 } 636 637 /** 638 * Execute prepared query statement 639 * 640 * @param statement the prepared statement 641 * @return the result set 642 * @throws SQLException when SQL execution gives an error 643 */ 644 @Override 645 public ResultSet executeQuery(PreparedStatement statement) throws SQLException { 646 statement.setMaxRows(context.getMaxRows()); 647 statement.setFetchSize(context.getFetchSize()); 648 return statement.executeQuery(); 649 } 650 651 /** 652 * Execute query statement 653 * 654 * @param statement the statement 655 * @param sql the SQL 656 * @return the result set 657 * @throws SQLException when SQL execution gives an error 658 */ 659 @Override 660 public ResultSet executeQuery(Statement statement, String sql) throws SQLException { 661 statement.setMaxRows(context.getMaxRows()); 662 statement.setFetchSize(context.getFetchSize()); 663 return statement.executeQuery(sql); 664 } 665 666 /** 667 * Execute prepared update statement 668 * 669 * @param statement the prepared statement 670 * @return the result set 671 * @throws SQLException when SQL execution gives an error 672 */ 673 @Override 674 public RiverSource executeUpdate(PreparedStatement statement) throws SQLException { 675 statement.executeUpdate(); 676 if (!writeConnection.getAutoCommit()) { 677 writeConnection.commit(); 678 } 679 return this; 680 } 681 682 /** 683 * Execute prepared update statement 684 * 685 * @param statement the prepared statement 686 * @return the result set 687 * @throws SQLException when SQL execution gives an error 688 */ 689 @Override 690 public RiverSource executeUpdate(Statement statement, String sql) throws SQLException { 691 statement.executeUpdate(sql); 692 if (!writeConnection.getAutoCommit()) { 693 writeConnection.commit(); 694 } 695 return this; 696 } 697 698 @Override 699 public void beforeRows(ResultSet results, KeyValueStreamListener listener) 700 throws SQLException, IOException { 701 beforeRows(null, results, listener); 702 } 703 704 /** 705 * Before rows are read, let the KeyValueStreamListener know about the keys. 706 * If the SQL command was a callable statement and a register is there, look into the register map 707 * for the key names, not in the result set metadata. 708 * 709 * @param command the SQL command that created this result set 710 * @param results the result set 711 * @param listener the key/value stream listener 712 * @throws SQLException when SQL execution gives an error 713 * @throws IOException when input/output error occurs 714 */ 715 @Override 716 @SuppressWarnings({"unchecked"}) 717 public void beforeRows(SQLCommand command, ResultSet results, KeyValueStreamListener listener) 718 throws SQLException, IOException { 719 List<String> keys = new LinkedList(); 720 if (command != null && command.isCallable() && !command.getRegister().isEmpty()) { 721 for (Map.Entry<String, Object> me : command.getRegister().entrySet()) { 722 keys.add(me.getKey()); 723 } 724 } else { 725 ResultSetMetaData metadata = results.getMetaData(); 726 int columns = metadata.getColumnCount(); 727 for (int i = 1; i <= columns; i++) { 728 if (context.getColumnNameMap() == null) { 729 keys.add(metadata.getColumnLabel(i)); 730 } else { 731 keys.add(mapColumnName(metadata.getColumnLabel(i))); 732 } 733 } 734 } 735 listener.begin(); 736 listener.keys(keys); 737 } 738 739 @Override 740 public boolean nextRow(ResultSet results, KeyValueStreamListener listener) 741 throws SQLException, IOException { 742 return nextRow(null, results, listener); 743 } 744 745 /** 746 * Get next row and prepare the values for processing. The labels of each 747 * columns are used for the ValueListener as paths for JSON object merging. 748 * 749 * @param command the SQL command that created this result set 750 * @param results the result set 751 * @param listener the listener 752 * @return true if row exists and was processed, false otherwise 753 * @throws SQLException when SQL execution gives an error 754 * @throws IOException when input/output error occurs 755 */ 756 @Override 757 public boolean nextRow(SQLCommand command, ResultSet results, KeyValueStreamListener listener) 758 throws SQLException, IOException { 759 if (results.next()) { 760 processRow(results, listener); 761 return true; 762 } 763 return false; 764 } 765 766 @Override 767 public void afterRows(ResultSet results, KeyValueStreamListener listener) 768 throws SQLException, IOException { 769 afterRows(null, results, listener); 770 } 771 772 /** 773 * After the rows keys and values, let the listener know about the end of 774 * the result set. 775 * 776 * @param command the SQL command that created this result set 777 * @param results the result set 778 * @param listener the key/value stream listener 779 * @throws SQLException when SQL execution gives an error 780 * @throws IOException when input/output error occurs 781 */ 782 @Override 783 public void afterRows(SQLCommand command, ResultSet results, KeyValueStreamListener listener) 784 throws SQLException, IOException { 785 listener.end(); 786 } 787 788 @SuppressWarnings({"unchecked"}) 789 private void processRow(ResultSet results, KeyValueStreamListener listener) 790 throws SQLException, IOException { 791 List<Object> values = new LinkedList<Object>(); 792 ResultSetMetaData metadata = results.getMetaData(); 793 int columns = metadata.getColumnCount(); 794 context.setLastRow(new HashMap()); 795 for (int i = 1; i <= columns; i++) { 796 try { 797 Object value = parseType(results, i, metadata.getColumnType(i), locale); 798 logger.trace("value={} class={}", value, value != null ? value.getClass().getName() : ""); 799 values.add(value); 800 context.getLastRow().put("$row." + metadata.getColumnLabel(i), value); 801 } catch (ParseException e) { 802 logger.warn("parse error for value {}, using null instead", results.getObject(i)); 803 values.add(null); 804 } 805 } 806 if (listener != null) { 807 listener.values(values); 808 } 809 } 810 811 /** 812 * Close result set 813 * 814 * @param result the result set to be closed or null 815 * @throws SQLException when SQL execution gives an error 816 */ 817 @Override 818 public SimpleRiverSource close(ResultSet result) throws SQLException { 819 if (result != null) { 820 result.close(); 821 } 822 return this; 823 } 824 825 /** 826 * Close statement 827 * 828 * @param statement the statement to be closed or null 829 * @throws SQLException when SQL execution gives an error 830 */ 831 @Override 832 public SimpleRiverSource close(Statement statement) throws SQLException { 833 if (statement != null) { 834 statement.close(); 835 } 836 return this; 837 } 838 839 /** 840 * Close read connection 841 */ 842 @Override 843 public SimpleRiverSource closeReading() { 844 try { 845 if (readConnection != null && !readConnection.isClosed()) { 846 // always commit before close to finish cursors/transactions 847 if (!readConnection.getAutoCommit()) { 848 readConnection.commit(); 849 } 850 readConnection.close(); 851 } 852 } catch (SQLException e) { 853 logger.warn("while closing read connection: " + e.getMessage()); 854 } 855 return this; 856 } 857 858 /** 859 * Close read connection 860 */ 861 @Override 862 public SimpleRiverSource closeWriting() { 863 try { 864 if (writeConnection != null && !writeConnection.isClosed()) { 865 // always commit before close to finish cursors/transactions 866 if (!writeConnection.getAutoCommit()) { 867 writeConnection.commit(); 868 } 869 writeConnection.close(); 870 } 871 } catch (SQLException e) { 872 logger.warn("while closing write connection: " + e.getMessage()); 873 } 874 return this; 875 } 876 877 private void prepare(final DatabaseMetaData metaData) throws SQLException { 878 Map<String, Object> m = new HashMap<String, Object>() { 879 { 880 put("$meta.db.allproceduresarecallable", metaData.allProceduresAreCallable()); 881 put("$meta.db.alltablesareselectable", metaData.allTablesAreSelectable()); 882 put("$meta.db.autocommitclosesallresultsets", metaData.autoCommitFailureClosesAllResultSets()); 883 put("$meta.db.datadefinitioncasestransactioncommit", metaData.dataDefinitionCausesTransactionCommit()); 884 put("$meta.db.datadefinitionignoredintransactions", metaData.dataDefinitionIgnoredInTransactions()); 885 put("$meta.db.doesmaxrowsizeincludeblobs", metaData.doesMaxRowSizeIncludeBlobs()); 886 put("$meta.db.catalogseparator", metaData.getCatalogSeparator()); 887 put("$meta.db.catalogterm", metaData.getCatalogTerm()); 888 put("$meta.db.databasemajorversion", metaData.getDatabaseMajorVersion()); 889 put("$meta.db.databaseminorversion", metaData.getDatabaseMinorVersion()); 890 put("$meta.db.databaseproductname", metaData.getDatabaseProductName()); 891 put("$meta.db.databaseproductversion", metaData.getDatabaseProductVersion()); 892 put("$meta.db.defaulttransactionisolation", metaData.getDefaultTransactionIsolation()); 893 put("$meta.db.drivermajorversion", metaData.getDriverMajorVersion()); 894 put("$meta.db.driverminorversion", metaData.getDriverMinorVersion()); 895 put("$meta.db.drivername", metaData.getDriverName()); 896 put("$meta.db.driverversion", metaData.getDriverVersion()); 897 put("$meta.db.extranamecharacters", metaData.getExtraNameCharacters()); 898 put("$meta.db.identifierquotestring", metaData.getIdentifierQuoteString()); 899 put("$meta.db.jdbcmajorversion", metaData.getJDBCMajorVersion()); 900 put("$meta.db.jdbcminorversion", metaData.getJDBCMinorVersion()); 901 put("$meta.db.maxbinaryliterallength", metaData.getMaxBinaryLiteralLength()); 902 put("$meta.db.maxcatalognamelength", metaData.getMaxCatalogNameLength()); 903 put("$meta.db.maxcharliterallength", metaData.getMaxCharLiteralLength()); 904 put("$meta.db.maxcolumnnamelength", metaData.getMaxColumnNameLength()); 905 put("$meta.db.maxcolumnsingroupby", metaData.getMaxColumnsInGroupBy()); 906 put("$meta.db.maxcolumnsinindex", metaData.getMaxColumnsInIndex()); 907 put("$meta.db.maxcolumnsinorderby", metaData.getMaxColumnsInOrderBy()); 908 put("$meta.db.maxcolumnsinselect", metaData.getMaxColumnsInSelect()); 909 put("$meta.db.maxcolumnsintable", metaData.getMaxColumnsInTable()); 910 put("$meta.db.maxconnections", metaData.getMaxConnections()); 911 put("$meta.db.maxcursornamelength", metaData.getMaxCursorNameLength()); 912 put("$meta.db.maxindexlength", metaData.getMaxIndexLength()); 913 put("$meta.db.maxusernamelength", metaData.getMaxUserNameLength()); 914 put("$meta.db.maxprocedurenamelength", metaData.getMaxProcedureNameLength()); 915 put("$meta.db.maxrowsize", metaData.getMaxRowSize()); 916 put("$meta.db.maxschemanamelength", metaData.getMaxSchemaNameLength()); 917 put("$meta.db.maxstatementlength", metaData.getMaxStatementLength()); 918 put("$meta.db.maxstatements", metaData.getMaxStatements()); 919 put("$meta.db.maxtablenamelength", metaData.getMaxTableNameLength()); 920 put("$meta.db.maxtablesinselect", metaData.getMaxTablesInSelect()); 921 put("$meta.db.numericfunctions", metaData.getNumericFunctions()); 922 put("$meta.db.procedureterm", metaData.getProcedureTerm()); 923 put("$meta.db.resultsetholdability", metaData.getResultSetHoldability()); 924 put("$meta.db.rowidlifetime", metaData.getRowIdLifetime().name()); 925 put("$meta.db.schematerm", metaData.getSchemaTerm()); 926 put("$meta.db.searchstringescape", metaData.getSearchStringEscape()); 927 put("$meta.db.sqlkeywords", metaData.getSQLKeywords()); 928 put("$meta.db.sqlstatetype", metaData.getSQLStateType()); 929 } 930 }; 931 context.setLastDatabaseMetadata(m); 932 } 933 934 private void prepare(final ResultSetMetaData metaData) throws SQLException { 935 Map<String, Object> m = new HashMap<String, Object>() { 936 { 937 put("$meta.row.columnCount", metaData.getColumnCount()); 938 } 939 }; 940 for (int i = 0; i < metaData.getColumnCount(); i++) { 941 m.put("$meta.rs.catalogname." + i, metaData.getCatalogName(i)); 942 m.put("$meta.rs.columnclassname." + i, metaData.getColumnClassName(i)); 943 m.put("$meta.rs.columndisplaysize." + i, metaData.getColumnDisplaySize(i)); 944 m.put("$meta.rs.columnlabel." + i, metaData.getColumnLabel(i)); 945 m.put("$meta.rs.columnname." + i, metaData.getColumnName(i)); 946 m.put("$meta.rs.columntype." + i, metaData.getColumnType(i)); 947 m.put("$meta.rs.columntypename." + i, metaData.getColumnTypeName(i)); 948 m.put("$meta.rs.precision." + i, metaData.getPrecision(i)); 949 m.put("$meta.rs.scale." + i, metaData.getScale(i)); 950 m.put("$meta.rs.schemaname." + i, metaData.getSchemaName(i)); 951 m.put("$meta.rs.tablename." + i, metaData.getTableName(i)); 952 m.put("$meta.rs.isautoincrement." + i, metaData.isAutoIncrement(i)); 953 m.put("$meta.rs.iscasesensitive." + i, metaData.isCaseSensitive(i)); 954 m.put("$meta.rs.iscurrency." + i, metaData.isCurrency(i)); 955 m.put("$meta.rs.isdefinitelywritable." + i, metaData.isDefinitelyWritable(i)); 956 m.put("$meta.rs.isnullable." + i, metaData.isNullable(i)); 957 m.put("$meta.rs.isreadonly." + i, metaData.isReadOnly(i)); 958 m.put("$meta.rs.issearchable." + i, metaData.isSearchable(i)); 959 m.put("$meta.rs.issigned." + i, metaData.isSigned(i)); 960 m.put("$meta.rs.iswritable." + i, metaData.isWritable(i)); 961 } 962 context.setLastResultSetMetadata(m); 963 } 964 965 private void bind(PreparedStatement statement, int i, Object value) throws SQLException { 966 logger.trace("bind: value = {}", value); 967 if (value == null) { 968 statement.setNull(i, Types.VARCHAR); 969 } else if (value instanceof String) { 970 String s = (String) value; 971 if ("$now".equals(s)) { 972 Timestamp t = new Timestamp(new DateTime().getMillis()); 973 logger.trace("setting $now to {}", t); 974 statement.setTimestamp(i, t, calendar); 975 } else if ("$job".equals(s)) { 976 logger.trace("setting $job to {}", context.getRiverState().getCounter()); 977 statement.setInt(i, context.getRiverState().getCounter()); 978 } else if ("$count".equals(s)) { 979 logger.trace("setting $count to {}", context.getLastRowCount()); 980 statement.setLong(i, context.getLastRowCount()); 981 } else if ("$last.sql.start".equals(s)) { 982 if (context.getLastExecutionStartDate() == 0L) { 983 Timestamp riverStarted = new Timestamp(context.getRiverState().getStarted().getMillis()); 984 context.setLastExecutionStartDate(riverStarted.getTime()); 985 } 986 statement.setTimestamp(i, new Timestamp(context.getLastExecutionStartDate()), calendar); 987 } else if ("$last.sql.end".equals(s)) { 988 if (context.getLastExecutionEndDate() == 0L) { 989 Timestamp riverStarted = context.getRiverState() != null ? 990 new Timestamp(context.getRiverState().getStarted().getMillis()) : null; 991 context.setLastExecutionEndDate(riverStarted != null ? riverStarted.getTime() : new DateTime().getMillis()); 992 } 993 statement.setTimestamp(i, new Timestamp(context.getLastExecutionEndDate()), calendar); 994 } else if ("$last.sql.sequence.start".equals(s)) { 995 if (context.getLastStartDate() == 0L) { 996 Timestamp riverStarted = new Timestamp(context.getRiverState().getStarted().getMillis()); 997 context.setLastStartDate(riverStarted.getTime()); 998 } 999 statement.setTimestamp(i, new Timestamp(context.getLastStartDate()), calendar); 1000 } else if ("$last.sql.sequence.end".equals(s)) { 1001 if (context.getLastEndDate() == 0L) { 1002 Timestamp riverStarted = context.getRiverState() != null ? 1003 new Timestamp(context.getRiverState().getStarted().getMillis()) : null; 1004 context.setLastEndDate(riverStarted != null ? riverStarted.getTime() : new DateTime().getMillis()); 1005 } 1006 statement.setTimestamp(i, new Timestamp(context.getLastEndDate()), calendar); 1007 } else if ("$river.name".equals(s)) { 1008 String name = context.getRiverState().getName(); 1009 statement.setString(i, name); 1010 } else if ("$river.state.started".equals(s)) { 1011 Timestamp started = new Timestamp(context.getRiverState().getStarted().getMillis()); 1012 statement.setTimestamp(i, started, calendar); 1013 } else if ("$river.state.last_active_begin".equals(s)) { 1014 Timestamp timestamp = new Timestamp(context.getRiverState().getLastActiveBegin().getMillis()); 1015 statement.setTimestamp(i, timestamp, calendar); 1016 } else if ("$river.state.last_active_end".equals(s)) { 1017 Timestamp timestamp = new Timestamp(context.getRiverState().getLastActiveEnd().getMillis()); 1018 statement.setTimestamp(i, timestamp, calendar); 1019 } else if ("$river.state.counter".equals(s)) { 1020 Integer counter = context.getRiverState().getCounter(); 1021 if (counter != null) { 1022 statement.setInt(i, counter); 1023 } 1024 } else if (context.shouldPrepareDatabaseMetadata()) { 1025 for (String k : context.getLastDatabaseMetadata().keySet()) { 1026 if (k.equals(s)) { 1027 statement.setObject(i, context.getLastDatabaseMetadata().get(k)); 1028 } 1029 } 1030 } else if (context.shouldPrepareResultSetMetadata()) { 1031 for (String k : context.getLastResultSetMetadata().keySet()) { 1032 if (k.equals(s)) { 1033 statement.setObject(i, context.getLastResultSetMetadata().get(k)); 1034 } 1035 } 1036 } else { 1037 Object rowValue = context.getLastRow().get(s); 1038 if (rowValue != null) { 1039 statement.setObject(i, rowValue); 1040 } else { 1041 statement.setString(i, (String) value); 1042 } 1043 } 1044 } else if (value instanceof Integer) { 1045 statement.setInt(i, (Integer) value); 1046 } else if (value instanceof Long) { 1047 statement.setLong(i, (Long) value); 1048 } else if (value instanceof BigDecimal) { 1049 statement.setBigDecimal(i, (BigDecimal) value); 1050 } else if (value instanceof Date) { 1051 statement.setDate(i, (Date) value); 1052 } else if (value instanceof Timestamp) { 1053 statement.setTimestamp(i, (Timestamp) value, calendar); 1054 } else if (value instanceof Float) { 1055 statement.setFloat(i, (Float) value); 1056 } else if (value instanceof Double) { 1057 statement.setDouble(i, (Double) value); 1058 } else { 1059 statement.setObject(i, value); 1060 } 1061 } 1062 1063 /** 1064 * Parse of value of result set 1065 * 1066 * @param result the result set 1067 * @param i the offset in the result set 1068 * @param type the JDBC type 1069 * @param locale the locale to use for parsing 1070 * @return The parse value 1071 * @throws SQLException when SQL execution gives an error 1072 * @throws IOException when input/output error occurs 1073 */ 1074 @Override 1075 public Object parseType(ResultSet result, Integer i, int type, Locale locale) 1076 throws SQLException, IOException, ParseException { 1077 logger.trace("i={} type={}", i, type); 1078 switch (type) { 1079 /** 1080 * The JDBC types CHAR, VARCHAR, and LONGVARCHAR are closely 1081 * related. CHAR represents a small, fixed-length character string, 1082 * VARCHAR represents a small, variable-length character string, and 1083 * LONGVARCHAR represents a large, variable-length character string. 1084 */ 1085 case Types.CHAR: 1086 case Types.VARCHAR: 1087 case Types.LONGVARCHAR: { 1088 return result.getString(i); 1089 } 1090 case Types.NCHAR: 1091 case Types.NVARCHAR: 1092 case Types.LONGNVARCHAR: { 1093 return result.getNString(i); 1094 } 1095 /** 1096 * The JDBC types BINARY, VARBINARY, and LONGVARBINARY are closely 1097 * related. BINARY represents a small, fixed-length binary value, 1098 * VARBINARY represents a small, variable-length binary value, and 1099 * LONGVARBINARY represents a large, variable-length binary value 1100 */ 1101 case Types.BINARY: 1102 case Types.VARBINARY: 1103 case Types.LONGVARBINARY: { 1104 byte[] b = result.getBytes(i); 1105 return context.shouldTreatBinaryAsString() ? (b != null ? new String(b) : null) : b; 1106 } 1107 /** 1108 * The JDBC type ARRAY represents the SQL3 type ARRAY. 1109 * 1110 * An ARRAY value is mapped to an instance of the Array interface in 1111 * the Java programming language. If a driver follows the standard 1112 * implementation, an Array object logically points to an ARRAY 1113 * value on the server rather than containing the elements of the 1114 * ARRAY object, which can greatly increase efficiency. The Array 1115 * interface contains methods for materializing the elements of the 1116 * ARRAY object on the client in the form of either an array or a 1117 * ResultSet object. 1118 */ 1119 case Types.ARRAY: { 1120 Array arr = result.getArray(i); 1121 return arr == null ? null : arr.getArray(); 1122 } 1123 /** 1124 * The JDBC type BIGINT represents a 64-bit signed integer value 1125 * between -9223372036854775808 and 9223372036854775807. 1126 * 1127 * The corresponding SQL type BIGINT is a nonstandard extension to 1128 * SQL. In practice the SQL BIGINT type is not yet currently 1129 * implemented by any of the major databases, and we recommend that 1130 * its use be avoided in code that is intended to be portable. 1131 * 1132 * The recommended Java mapping for the BIGINT type is as a Java 1133 * long. 1134 */ 1135 case Types.BIGINT: { 1136 Object o = result.getLong(i); 1137 return result.wasNull() ? null : o; 1138 } 1139 /** 1140 * The JDBC type BIT represents a single bit value that can be zero 1141 * or one. 1142 * 1143 * SQL-92 defines an SQL BIT type. However, unlike the JDBC BIT 1144 * type, this SQL-92 BIT type can be used as a parameterized type to 1145 * define a fixed-length binary string. Fortunately, SQL-92 also 1146 * permits the use of the simple non-parameterized BIT type to 1147 * represent a single binary digit, and this usage corresponds to 1148 * the JDBC BIT type. Unfortunately, the SQL-92 BIT type is only 1149 * required in "full" SQL-92 and is currently supported by only a 1150 * subset of the major databases. Portable code may therefore prefer 1151 * to use the JDBC SMALLINT type, which is widely supported. 1152 */ 1153 case Types.BIT: { 1154 try { 1155 Object o = result.getInt(i); 1156 return result.wasNull() ? null : o; 1157 } catch (Exception e) { 1158 String exceptionClassName = e.getClass().getName(); 1159 // postgresql can not handle boolean, it will throw PSQLException, something like "Bad value for type int : t" 1160 if ("org.postgresql.util.PSQLException".equals(exceptionClassName)) { 1161 return "t".equals(result.getString(i)); 1162 } 1163 throw new IOException(e); 1164 } 1165 } 1166 /** 1167 * The JDBC type BOOLEAN, which is new in the JDBC 3.0 API, maps to 1168 * a boolean in the Java programming language. It provides a 1169 * representation of true and false, and therefore is a better match 1170 * than the JDBC type BIT, which is either 1 or 0. 1171 */ 1172 case Types.BOOLEAN: { 1173 return result.getBoolean(i); 1174 } 1175 /** 1176 * The JDBC type BLOB represents an SQL3 BLOB (Binary Large Object). 1177 * 1178 * A JDBC BLOB value is mapped to an instance of the Blob interface 1179 * in the Java programming language. If a driver follows the 1180 * standard implementation, a Blob object logically points to the 1181 * BLOB value on the server rather than containing its binary data, 1182 * greatly improving efficiency. The Blob interface provides methods 1183 * for materializing the BLOB data on the client when that is 1184 * desired. 1185 */ 1186 case Types.BLOB: { 1187 Blob blob = result.getBlob(i); 1188 if (blob != null) { 1189 long n = blob.length(); 1190 if (n > Integer.MAX_VALUE) { 1191 throw new IOException("can't process blob larger than Integer.MAX_VALUE"); 1192 } 1193 byte[] tab = blob.getBytes(1, (int) n); 1194 blob.free(); 1195 return tab; 1196 } 1197 break; 1198 } 1199 /** 1200 * The JDBC type CLOB represents the SQL3 type CLOB (Character Large 1201 * Object). 1202 * 1203 * A JDBC CLOB value is mapped to an instance of the Clob interface 1204 * in the Java programming language. If a driver follows the 1205 * standard implementation, a Clob object logically points to the 1206 * CLOB value on the server rather than containing its character 1207 * data, greatly improving efficiency. Two of the methods on the 1208 * Clob interface materialize the data of a CLOB object on the 1209 * client. 1210 */ 1211 case Types.CLOB: { 1212 Clob clob = result.getClob(i); 1213 if (clob != null) { 1214 long n = clob.length(); 1215 if (n > Integer.MAX_VALUE) { 1216 throw new IOException("can't process clob larger than Integer.MAX_VALUE"); 1217 } 1218 String str = clob.getSubString(1, (int) n); 1219 clob.free(); 1220 return str; 1221 } 1222 break; 1223 } 1224 case Types.NCLOB: { 1225 NClob nclob = result.getNClob(i); 1226 if (nclob != null) { 1227 long n = nclob.length(); 1228 if (n > Integer.MAX_VALUE) { 1229 throw new IOException("can't process nclob larger than Integer.MAX_VALUE"); 1230 } 1231 String str = nclob.getSubString(1, (int) n); 1232 nclob.free(); 1233 return str; 1234 } 1235 break; 1236 } 1237 /** 1238 * The JDBC type DATALINK, new in the JDBC 3.0 API, is a column 1239 * value that references a file that is outside of a data source but 1240 * is managed by the data source. It maps to the Java type 1241 * java.net.URL and provides a way to manage external files. For 1242 * instance, if the data source is a DBMS, the concurrency controls 1243 * it enforces on its own data can be applied to the external file 1244 * as well. 1245 * 1246 * A DATALINK value is retrieved from a ResultSet object with the 1247 * ResultSet methods getURL or getObject. If the Java platform does 1248 * not support the type of URL returned by getURL or getObject, a 1249 * DATALINK value can be retrieved as a String object with the 1250 * method getString. 1251 * 1252 * java.net.URL values are stored in a database using the method 1253 * setURL. If the Java platform does not support the type of URL 1254 * being set, the method setString can be used instead. 1255 * 1256 * 1257 */ 1258 case Types.DATALINK: { 1259 return result.getURL(i); 1260 } 1261 /** 1262 * The JDBC DATE type represents a date consisting of day, month, 1263 * and year. The corresponding SQL DATE type is defined in SQL-92, 1264 * but it is implemented by only a subset of the major databases. 1265 * Some databases offer alternative SQL types that support similar 1266 * semantics. 1267 */ 1268 case Types.DATE: { 1269 try { 1270 Date d = result.getDate(i, calendar); 1271 return d != null ? formatDate(d.getTime()) : null; 1272 } catch (SQLException e) { 1273 return null; 1274 } 1275 } 1276 case Types.TIME: { 1277 try { 1278 Time t = result.getTime(i, calendar); 1279 return t != null ? formatDate(t.getTime()) : null; 1280 } catch (SQLException e) { 1281 return null; 1282 } 1283 } 1284 case Types.TIMESTAMP: { 1285 try { 1286 Timestamp t = result.getTimestamp(i, calendar); 1287 return t != null ? formatDate(t.getTime()) : null; 1288 } catch (SQLException e) { 1289 // java.sql.SQLException: Cannot convert value '0000-00-00 00:00:00' from column ... to TIMESTAMP. 1290 return null; 1291 } 1292 } 1293 /** 1294 * The JDBC types DECIMAL and NUMERIC are very similar. They both 1295 * represent fixed-precision decimal values. 1296 * 1297 * The corresponding SQL types DECIMAL and NUMERIC are defined in 1298 * SQL-92 and are very widely implemented. These SQL types take 1299 * precision and scale parameters. The precision is the total number 1300 * of decimal digits supported, and the scale is the number of 1301 * decimal digits after the decimal point. For most DBMSs, the scale 1302 * is less than or equal to the precision. So for example, the value 1303 * "12.345" has a precision of 5 and a scale of 3, and the value 1304 * ".11" has a precision of 2 and a scale of 2. JDBC requires that 1305 * all DECIMAL and NUMERIC types support both a precision and a 1306 * scale of at least 15. 1307 * 1308 * The sole distinction between DECIMAL and NUMERIC is that the 1309 * SQL-92 specification requires that NUMERIC types be represented 1310 * with exactly the specified precision, whereas for DECIMAL types, 1311 * it allows an implementation to add additional precision beyond 1312 * that specified when the type was created. Thus a column created 1313 * with type NUMERIC(12,4) will always be represented with exactly 1314 * 12 digits, whereas a column created with type DECIMAL(12,4) might 1315 * be represented by some larger number of digits. 1316 * 1317 * The recommended Java mapping for the DECIMAL and NUMERIC types is 1318 * java.math.BigDecimal. The java.math.BigDecimal type provides math 1319 * operations to allow BigDecimal types to be added, subtracted, 1320 * multiplied, and divided with other BigDecimal types, with integer 1321 * types, and with floating point types. 1322 * 1323 * The method recommended for retrieving DECIMAL and NUMERIC values 1324 * is ResultSet.getBigDecimal. JDBC also allows access to these SQL 1325 * types as simple Strings or arrays of char. Thus, Java programmers 1326 * can use getString to receive a DECIMAL or NUMERIC result. 1327 * However, this makes the common case where DECIMAL or NUMERIC are 1328 * used for currency values rather awkward, since it means that 1329 * application writers have to perform math on strings. It is also 1330 * possible to retrieve these SQL types as any of the Java numeric 1331 * types. 1332 */ 1333 case Types.DECIMAL: 1334 case Types.NUMERIC: { 1335 BigDecimal bd = null; 1336 try { 1337 // getBigDecimal() should get obsolete. Most seem to use getString/getObject anyway... 1338 bd = result.getBigDecimal(i); 1339 } catch (NullPointerException e) { 1340 // But is it true? JDBC NPE exists since 13 years? 1341 // http://forums.codeguru.com/archive/index.php/t-32443.html 1342 // Null values are driving us nuts in JDBC: 1343 // http://stackoverflow.com/questions/2777214/when-accessing-resultsets-in-jdbc-is-there-an-elegant-way-to-distinguish-betwee 1344 } 1345 if (bd == null || result.wasNull()) { 1346 return null; 1347 } 1348 if (context.getScale() >= 0) { 1349 bd = bd.setScale(context.getScale(), context.getRounding()); 1350 try { 1351 long l = bd.longValueExact(); 1352 if (Long.toString(l).equals(result.getString(i))) { 1353 // convert to long if possible 1354 return l; 1355 } else { 1356 // convert to double (with precision loss) 1357 return bd.doubleValue(); 1358 } 1359 } catch (ArithmeticException e) { 1360 return bd.doubleValue(); 1361 } 1362 } else { 1363 return bd.toPlainString(); 1364 } 1365 } 1366 /** 1367 * The JDBC type DOUBLE represents a "double precision" floating 1368 * point number that supports 15 digits of mantissa. 1369 * 1370 * The corresponding SQL type is DOUBLE PRECISION, which is defined 1371 * in SQL-92 and is widely supported by the major databases. The 1372 * SQL-92 standard leaves the precision of DOUBLE PRECISION up to 1373 * the implementation, but in practice all the major databases 1374 * supporting DOUBLE PRECISION support a mantissa precision of at 1375 * least 15 digits. 1376 * 1377 * The recommended Java mapping for the DOUBLE type is as a Java 1378 * double. 1379 */ 1380 case Types.DOUBLE: { 1381 String s = result.getString(i); 1382 if (result.wasNull() || s == null) { 1383 return null; 1384 } 1385 NumberFormat format = NumberFormat.getInstance(locale); 1386 Number number = format.parse(s); 1387 return number.doubleValue(); 1388 } 1389 /** 1390 * The JDBC type FLOAT is basically equivalent to the JDBC type 1391 * DOUBLE. We provided both FLOAT and DOUBLE in a possibly misguided 1392 * attempt at consistency with previous database APIs. FLOAT 1393 * represents a "double precision" floating point number that 1394 * supports 15 digits of mantissa. 1395 * 1396 * The corresponding SQL type FLOAT is defined in SQL-92. The SQL-92 1397 * standard leaves the precision of FLOAT up to the implementation, 1398 * but in practice all the major databases supporting FLOAT support 1399 * a mantissa precision of at least 15 digits. 1400 * 1401 * The recommended Java mapping for the FLOAT type is as a Java 1402 * double. However, because of the potential confusion between the 1403 * double precision SQL FLOAT and the single precision Java float, 1404 * we recommend that JDBC programmers should normally use the JDBC 1405 * DOUBLE type in preference to FLOAT. 1406 */ 1407 case Types.FLOAT: { 1408 String s = result.getString(i); 1409 if (result.wasNull() || s == null) { 1410 return null; 1411 } 1412 NumberFormat format = NumberFormat.getInstance(locale); 1413 Number number = format.parse(s); 1414 return number.doubleValue(); 1415 } 1416 /** 1417 * The JDBC type JAVA_OBJECT, added in the JDBC 2.0 core API, makes 1418 * it easier to use objects in the Java programming language as 1419 * values in a database. JAVA_OBJECT is simply a type code for an 1420 * instance of a class defined in the Java programming language that 1421 * is stored as a database object. The type JAVA_OBJECT is used by a 1422 * database whose type system has been extended so that it can store 1423 * Java objects directly. The JAVA_OBJECT value may be stored as a 1424 * serialized Java object, or it may be stored in some 1425 * vendor-specific format. 1426 * 1427 * The type JAVA_OBJECT is one of the possible values for the column 1428 * DATA_TYPE in the ResultSet objects returned by various 1429 * DatabaseMetaData methods, including getTypeInfo, getColumns, and 1430 * getUDTs. The method getUDTs, part of the new JDBC 2.0 core API, 1431 * will return information about the Java objects contained in a 1432 * particular schema when it is given the appropriate parameters. 1433 * Having this information available facilitates using a Java class 1434 * as a database type. 1435 */ 1436 case Types.OTHER: 1437 case Types.JAVA_OBJECT: { 1438 return result.getObject(i); 1439 } 1440 /** 1441 * The JDBC type REAL represents a "single precision" floating point 1442 * number that supports seven digits of mantissa. 1443 * 1444 * The corresponding SQL type REAL is defined in SQL-92 and is 1445 * widely, though not universally, supported by the major databases. 1446 * The SQL-92 standard leaves the precision of REAL up to the 1447 * implementation, but in practice all the major databases 1448 * supporting REAL support a mantissa precision of at least seven 1449 * digits. 1450 * 1451 * The recommended Java mapping for the REAL type is as a Java 1452 * float. 1453 */ 1454 case Types.REAL: { 1455 String s = result.getString(i); 1456 if (result.wasNull() || s == null) { 1457 return null; 1458 } 1459 NumberFormat format = NumberFormat.getInstance(locale); 1460 Number number = format.parse(s); 1461 return number.doubleValue(); 1462 } 1463 /** 1464 * The JDBC type TINYINT represents an 8-bit integer value between 0 1465 * and 255 that may be signed or unsigned. 1466 * 1467 * The corresponding SQL type, TINYINT, is currently supported by 1468 * only a subset of the major databases. Portable code may therefore 1469 * prefer to use the JDBC SMALLINT type, which is widely supported. 1470 * 1471 * The recommended Java mapping for the JDBC TINYINT type is as 1472 * either a Java byte or a Java short. The 8-bit Java byte type 1473 * represents a signed value from -128 to 127, so it may not always 1474 * be appropriate for larger TINYINT values, whereas the 16-bit Java 1475 * short will always be able to hold all TINYINT values. 1476 */ 1477 /** 1478 * The JDBC type SMALLINT represents a 16-bit signed integer value 1479 * between -32768 and 32767. 1480 * 1481 * The corresponding SQL type, SMALLINT, is defined in SQL-92 and is 1482 * supported by all the major databases. The SQL-92 standard leaves 1483 * the precision of SMALLINT up to the implementation, but in 1484 * practice, all the major databases support at least 16 bits. 1485 * 1486 * The recommended Java mapping for the JDBC SMALLINT type is as a 1487 * Java short. 1488 */ 1489 /** 1490 * The JDBC type INTEGER represents a 32-bit signed integer value 1491 * ranging between -2147483648 and 2147483647. 1492 * 1493 * The corresponding SQL type, INTEGER, is defined in SQL-92 and is 1494 * widely supported by all the major databases. The SQL-92 standard 1495 * leaves the precision of INTEGER up to the implementation, but in 1496 * practice all the major databases support at least 32 bits. 1497 * 1498 * The recommended Java mapping for the INTEGER type is as a Java 1499 * int. 1500 */ 1501 case Types.TINYINT: 1502 case Types.SMALLINT: 1503 case Types.INTEGER: { 1504 try { 1505 Integer integer = result.getInt(i); 1506 return result.wasNull() ? null : integer; 1507 } catch (SQLDataException e) { 1508 Long l = result.getLong(i); 1509 return result.wasNull() ? null : l; 1510 } 1511 } 1512 1513 case Types.SQLXML: { 1514 SQLXML xml = result.getSQLXML(i); 1515 return xml != null ? xml.getString() : null; 1516 } 1517 1518 case Types.NULL: { 1519 return null; 1520 } 1521 /** 1522 * The JDBC type DISTINCT field (Types class)>DISTINCT represents 1523 * the SQL3 type DISTINCT. 1524 * 1525 * The standard mapping for a DISTINCT type is to the Java type to 1526 * which the base type of a DISTINCT object would be mapped. For 1527 * example, a DISTINCT type based on a CHAR would be mapped to a 1528 * String object, and a DISTINCT type based on an SQL INTEGER would 1529 * be mapped to an int. 1530 * 1531 * The DISTINCT type may optionally have a custom mapping to a class 1532 * in the Java programming language. A custom mapping consists of a 1533 * class that implements the interface SQLData and an entry in a 1534 * java.util.Map object. 1535 */ 1536 case Types.DISTINCT: { 1537 logger.warn("JDBC type not implemented: {}", type); 1538 return null; 1539 } 1540 /** 1541 * The JDBC type STRUCT represents the SQL99 structured type. An SQL 1542 * structured type, which is defined by a user with a CREATE TYPE 1543 * statement, consists of one or more attributes. These attributes 1544 * may be any SQL data type, built-in or user-defined. 1545 * 1546 * The standard mapping for the SQL type STRUCT is to a Struct 1547 * object in the Java programming language. A Struct object contains 1548 * a value for each attribute of the STRUCT value it represents. 1549 * 1550 * A STRUCT value may optionally be custom mapped to a class in the 1551 * Java programming language, and each attribute in the STRUCT may 1552 * be mapped to a field in the class. A custom mapping consists of a 1553 * class that implements the interface SQLData and an entry in a 1554 * java.util.Map object. 1555 * 1556 * 1557 */ 1558 case Types.STRUCT: { 1559 logger.warn("JDBC type not implemented: {}", type); 1560 return null; 1561 } 1562 case Types.REF: { 1563 logger.warn("JDBC type not implemented: {}", type); 1564 return null; 1565 } 1566 case Types.ROWID: { 1567 logger.warn("JDBC type not implemented: {}", type); 1568 return null; 1569 } 1570 default: { 1571 logger.warn("unknown JDBC type ignored: {}", type); 1572 return null; 1573 } 1574 } 1575 return null; 1576 } 1577 1578 private int toJDBCType(String type) { 1579 if (type == null) { 1580 return Types.NULL; 1581 } else if (type.equalsIgnoreCase("NULL")) { 1582 return Types.NULL; 1583 } else if (type.equalsIgnoreCase("TINYINT")) { 1584 return Types.TINYINT; 1585 } else if (type.equalsIgnoreCase("SMALLINT")) { 1586 return Types.SMALLINT; 1587 } else if (type.equalsIgnoreCase("INTEGER")) { 1588 return Types.INTEGER; 1589 } else if (type.equalsIgnoreCase("BIGINT")) { 1590 return Types.BIGINT; 1591 } else if (type.equalsIgnoreCase("REAL")) { 1592 return Types.REAL; 1593 } else if (type.equalsIgnoreCase("FLOAT")) { 1594 return Types.FLOAT; 1595 } else if (type.equalsIgnoreCase("DOUBLE")) { 1596 return Types.DOUBLE; 1597 } else if (type.equalsIgnoreCase("DECIMAL")) { 1598 return Types.DECIMAL; 1599 } else if (type.equalsIgnoreCase("NUMERIC")) { 1600 return Types.NUMERIC; 1601 } else if (type.equalsIgnoreCase("BIT")) { 1602 return Types.BIT; 1603 } else if (type.equalsIgnoreCase("BOOLEAN")) { 1604 return Types.BOOLEAN; 1605 } else if (type.equalsIgnoreCase("BINARY")) { 1606 return Types.BINARY; 1607 } else if (type.equalsIgnoreCase("VARBINARY")) { 1608 return Types.VARBINARY; 1609 } else if (type.equalsIgnoreCase("LONGVARBINARY")) { 1610 return Types.LONGVARBINARY; 1611 } else if (type.equalsIgnoreCase("CHAR")) { 1612 return Types.CHAR; 1613 } else if (type.equalsIgnoreCase("VARCHAR")) { 1614 return Types.VARCHAR; 1615 } else if (type.equalsIgnoreCase("LONGVARCHAR")) { 1616 return Types.LONGVARCHAR; 1617 } else if (type.equalsIgnoreCase("DATE")) { 1618 return Types.DATE; 1619 } else if (type.equalsIgnoreCase("TIME")) { 1620 return Types.TIME; 1621 } else if (type.equalsIgnoreCase("TIMESTAMP")) { 1622 return Types.TIMESTAMP; 1623 } else if (type.equalsIgnoreCase("CLOB")) { 1624 return Types.CLOB; 1625 } else if (type.equalsIgnoreCase("BLOB")) { 1626 return Types.BLOB; 1627 } else if (type.equalsIgnoreCase("ARRAY")) { 1628 return Types.ARRAY; 1629 } else if (type.equalsIgnoreCase("STRUCT")) { 1630 return Types.STRUCT; 1631 } else if (type.equalsIgnoreCase("REF")) { 1632 return Types.REF; 1633 } else if (type.equalsIgnoreCase("DATALINK")) { 1634 return Types.DATALINK; 1635 } else if (type.equalsIgnoreCase("DISTINCT")) { 1636 return Types.DISTINCT; 1637 } else if (type.equalsIgnoreCase("JAVA_OBJECT")) { 1638 return Types.JAVA_OBJECT; 1639 } else if (type.equalsIgnoreCase("SQLXML")) { 1640 return Types.SQLXML; 1641 } else if (type.equalsIgnoreCase("ROWID")) { 1642 return Types.ROWID; 1643 } 1644 return Types.OTHER; 1645 } 1646 1647 private String mapColumnName(String columnName) { 1648 // TODO JDK8: StringJoiner 1649 Map<String, Object> columnNameMap = context.getColumnNameMap(); 1650 StringBuilder sb = new StringBuilder(); 1651 String[] s = columnName.split("\\."); 1652 for (int i = 0; i < s.length; i++) { 1653 if (i > 0) { 1654 sb.append("."); 1655 } 1656 if (columnNameMap.containsKey(s[i])) { 1657 s[i] = columnNameMap.get(s[i]).toString(); 1658 } else { 1659 logger.info("no column map entry for {} in map {}", s[i], columnNameMap); 1660 } 1661 sb.append(s[i]); 1662 } 1663 return sb.toString(); 1664 } 1665 1666 private String formatDate(long millis) { 1667 return new DateTime(millis).withZone(dateTimeZone).toString(); 1668 } 1669 1670}