001/*
002 * Copyright (C) 2014 Jörg Prante
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.xbib.elasticsearch.river.jdbc.strategy.simple;
017
018import org.elasticsearch.common.joda.time.DateTime;
019import org.elasticsearch.common.joda.time.DateTimeZone;
020import org.elasticsearch.common.logging.ESLogger;
021import org.elasticsearch.common.logging.ESLoggerFactory;
022import org.xbib.elasticsearch.plugin.jdbc.keyvalue.KeyValueStreamListener;
023import org.xbib.elasticsearch.plugin.jdbc.util.RiverMouthKeyValueStreamListener;
024import org.xbib.elasticsearch.plugin.jdbc.util.SQLCommand;
025import org.xbib.elasticsearch.river.jdbc.RiverSource;
026
027import java.io.IOException;
028import java.math.BigDecimal;
029import java.sql.Array;
030import java.sql.Blob;
031import java.sql.CallableStatement;
032import java.sql.Clob;
033import java.sql.Connection;
034import java.sql.DatabaseMetaData;
035import java.sql.Date;
036import java.sql.DriverManager;
037import java.sql.NClob;
038import java.sql.PreparedStatement;
039import java.sql.ResultSet;
040import java.sql.ResultSetMetaData;
041import java.sql.SQLDataException;
042import java.sql.SQLException;
043import java.sql.SQLFeatureNotSupportedException;
044import java.sql.SQLNonTransientConnectionException;
045import java.sql.SQLRecoverableException;
046import java.sql.SQLXML;
047import java.sql.Statement;
048import java.sql.Time;
049import java.sql.Timestamp;
050import java.sql.Types;
051import java.text.NumberFormat;
052import java.text.ParseException;
053import java.util.Calendar;
054import java.util.HashMap;
055import java.util.LinkedList;
056import java.util.List;
057import java.util.Locale;
058import java.util.Map;
059import java.util.Properties;
060import java.util.TimeZone;
061
062import static org.elasticsearch.common.collect.Lists.newLinkedList;
063
064/**
065 * Simple river source implementation.
066 * The simple river source iterates through a JDBC result set,
067 * merges the rows into Elasticsearch documents, and passes them to
068 * a bulk indexer.
069 * There are two channels open, one for reading the database, one for writing.
070 */
071public class SimpleRiverSource<RC extends SimpleRiverContext> implements RiverSource<RC> {
072
073    private final static ESLogger logger = ESLoggerFactory.getLogger("river.jdbc.SimpleRiverSource");
074
075    protected RC context;
076
077    protected String url;
078
079    protected String user;
080
081    protected String password;
082
083    protected Connection readConnection;
084
085    protected Connection writeConnection;
086
087    protected Locale locale;
088
089    protected TimeZone timezone;
090
091    protected Calendar calendar;
092
093    protected DateTimeZone dateTimeZone;
094
095    protected volatile boolean suspended;
096
097    @Override
098    public String strategy() {
099        return "simple";
100    }
101
102    @Override
103    public SimpleRiverSource<RC> newInstance() {
104        return new SimpleRiverSource<RC>();
105    }
106
107    @Override
108    public SimpleRiverSource setRiverContext(RC context) {
109        this.context = context;
110        return this;
111    }
112
113    @Override
114    public SimpleRiverSource setUrl(String url) {
115        this.url = url;
116        return this;
117    }
118
119    public String getUrl() {
120        return url;
121    }
122
123    @Override
124    public SimpleRiverSource setUser(String user) {
125        this.user = user;
126        return this;
127    }
128
129    @Override
130    public SimpleRiverSource setPassword(String password) {
131        this.password = password;
132        return this;
133    }
134
135    @Override
136    public SimpleRiverSource setLocale(Locale locale) {
137        this.locale = locale;
138        // initialize locale for JDBC drivers internals
139        Locale.setDefault(locale);
140        if (timezone == null) {
141            timezone = TimeZone.getTimeZone("UTC");
142        }
143        this.calendar = Calendar.getInstance(timezone, locale);
144        logger.debug("calendar timezone for JDBC timestamps = {}", calendar.getTimeZone().getDisplayName());
145        return this;
146    }
147
148    @Override
149    public Locale getLocale() {
150        if (locale == null) {
151            setLocale(Locale.getDefault());
152        }
153        return locale;
154    }
155
156    @Override
157    public SimpleRiverSource setTimeZone(TimeZone timezone) {
158        this.timezone = timezone;
159        TimeZone.setDefault(timezone); // for JDBC drivers internals
160        if (locale == null) {
161            locale = Locale.getDefault();
162        }
163        this.calendar = Calendar.getInstance(timezone, locale);
164        logger.debug("calendar timezone for JDBC timestamps = {}", calendar.getTimeZone().getDisplayName());
165        // for formatting fetched JDBC time values
166        this.dateTimeZone = DateTimeZone.forTimeZone(timezone);
167        return this;
168    }
169
170    @Override
171    public TimeZone getTimeZone() {
172        if (timezone == null) {
173            setTimeZone(TimeZone.getDefault());
174        }
175        return timezone;
176    }
177
178    @Override
179    public void suspend() {
180        this.suspended = true;
181    }
182
183    @Override
184    public void resume() {
185        this.suspended = false;
186    }
187
188    /**
189     * Get JDBC connection for reading
190     *
191     * @return the connection
192     * @throws SQLException when SQL execution gives an error
193     */
194    @Override
195    public synchronized Connection getConnectionForReading() throws SQLException {
196        boolean invalid = readConnection == null || readConnection.isClosed();
197        try {
198            invalid = invalid || !readConnection.isValid(5);
199        } catch (AbstractMethodError e) {
200            // old/buggy JDBC driver
201            logger.debug(e.getMessage());
202        } catch (SQLFeatureNotSupportedException e) {
203            // postgresql does not support isValid()
204            logger.debug(e.getMessage());
205        }
206        if (invalid) {
207            int retries = context.getRetries();
208            while (retries > 0) {
209                retries--;
210                try {
211                    Properties properties = new Properties();
212                    properties.put("user", user);
213                    properties.put("password", password);
214                    if (context.getConnectionProperties() != null) {
215                        properties.putAll(context.getConnectionProperties());
216                    }
217                    readConnection = DriverManager.getConnection(url, properties);
218                    DatabaseMetaData metaData = readConnection.getMetaData();
219                    if (context.shouldPrepareDatabaseMetadata()) {
220                        prepare(metaData);
221                    }
222                    if (metaData.getTimeDateFunctions().contains("TIMESTAMPDIFF")) {
223                        context.setTimestampDiffSupported(true);
224                    }
225                    // "readonly" is required by MySQL for large result streaming
226                    readConnection.setReadOnly(true);
227                    // Postgresql cursor mode condition:
228                    // fetchsize > 0, no scrollable result set, no auto commit, no holdable cursors over commit
229                    // https://github.com/pgjdbc/pgjdbc/blob/master/org/postgresql/jdbc2/AbstractJdbc2Statement.java#L514
230                    //readConnection.setHoldability(ResultSet.HOLD_CURSORS_OVER_COMMIT);
231                    // many drivers don't like autocommit=true
232                    readConnection.setAutoCommit(context.getAutoCommit());
233                    return readConnection;
234                } catch (SQLException e) {
235                    logger.error("while opening read connection: " + url + " " + e.getMessage(), e);
236                    try {
237                        logger.debug("waiting for {} seconds", context.getMaxRetryWait().seconds());
238                        Thread.sleep(context.getMaxRetryWait().millis());
239                    } catch (InterruptedException ex) {
240                        // do nothing
241                    }
242                }
243            }
244        }
245        return readConnection;
246    }
247
248    /**
249     * Get JDBC connection for writing. FOr executing "update", "insert", callable statements
250     *
251     * @return the connection
252     * @throws SQLException when SQL execution gives an error
253     */
254    @Override
255    public synchronized Connection getConnectionForWriting() throws SQLException {
256        boolean invalid = writeConnection == null || writeConnection.isClosed();
257        try {
258            invalid = invalid || !writeConnection.isValid(5);
259        } catch (AbstractMethodError e) {
260            // old/buggy JDBC driver do not implement isValid()
261        } catch (SQLFeatureNotSupportedException e) {
262            // Example: postgresql does implement but not support isValid()
263        }
264        if (invalid) {
265            int retries = context.getRetries();
266            while (retries > 0) {
267                retries--;
268                try {
269                    Properties properties = new Properties();
270                    properties.put("user", user);
271                    properties.put("password", password);
272                    if (context.getConnectionProperties() != null) {
273                        properties.putAll(context.getConnectionProperties());
274                    }
275                    writeConnection = DriverManager.getConnection(url, properties);
276                    // many drivers don't like autocommit=true
277                    writeConnection.setAutoCommit(context.getAutoCommit());
278                    return writeConnection;
279                } catch (SQLNonTransientConnectionException e) {
280                    // ignore derby drop=true silently
281                } catch (SQLException e) {
282                    logger.error("while opening write connection: " + url + " " + e.getMessage(), e);
283                    try {
284                        Thread.sleep(context.getMaxRetryWait().millis());
285                    } catch (InterruptedException ex) {
286                        // do nothing
287                    }
288                }
289            }
290        }
291        return writeConnection;
292    }
293
294    @Override
295    public void beforeFetch() throws Exception {
296        context.setLastStartDate(new DateTime().getMillis());
297    }
298
299    /**
300     * River fetch. Issue a series of SQL statements.
301     *
302     * @throws SQLException when SQL execution gives an error
303     * @throws IOException  when input/output error occurs
304     */
305    @Override
306    public void fetch() throws SQLException, IOException {
307        logger.debug("fetching, {} SQL commands", context.getStatements().size());
308        try {
309            for (SQLCommand command : context.getStatements()) {
310                context.setLastExecutionStartDate(new DateTime().getMillis());
311                try {
312                    if (command.isCallable()) {
313                        logger.debug("{} executing callable SQL: {}", this, command);
314                        executeCallable(command);
315                    } else if (!command.getParameters().isEmpty()) {
316                        logger.debug("{} executing SQL with params: {}", this, command);
317                        executeWithParameter(command);
318                    } else {
319                        logger.debug("{} executing SQL without params: {}", this, command);
320                        execute(command);
321                    }
322                    context.setLastExecutionEndDate(new DateTime().getMillis());
323                } catch (SQLRecoverableException e) {
324                    long millis = context.getMaxRetryWait().getMillis();
325                    logger.warn("retrying after " + millis / 1000 + " seconds, got exception ", e);
326                    Thread.sleep(context.getMaxRetryWait().getMillis());
327                    if (command.isCallable()) {
328                        logger.debug("retrying, executing callable SQL: {}", command);
329                        executeCallable(command);
330                    } else if (!command.getParameters().isEmpty()) {
331                        logger.debug("retrying, executing SQL with params: {}", command);
332                        executeWithParameter(command);
333                    } else {
334                        logger.debug("retrying, executing SQL without params: {}", command);
335                        execute(command);
336                    }
337                    context.setLastExecutionEndDate(new DateTime().getMillis());
338                }
339            }
340        } catch (Exception e) {
341            throw new IOException(e);
342        }
343    }
344
345    @Override
346    public void afterFetch() throws Exception {
347        context.setLastEndDate(new DateTime().getMillis());
348        shutdown();
349    }
350
351    @Override
352    public void shutdown() {
353        closeReading();
354        logger.debug("read connection closed");
355        readConnection = null;
356        closeWriting();
357        logger.debug("write connection closed");
358        writeConnection = null;
359    }
360
361    /**
362     * Execute SQL query command without parameter binding.
363     *
364     * @param command the SQL command
365     * @throws SQLException when SQL execution gives an error
366     * @throws IOException  when input/output error occurs
367     */
368    private void execute(SQLCommand command) throws Exception {
369        Statement statement = null;
370        ResultSet results = null;
371        try {
372            if (command.isQuery()) {
373                // use read connection
374                // we must not use prepareStatement for Postgresql!
375                // Postgresql requires direct use of executeQuery(sql) for cursor with fetchsize set.
376                Connection connection = getConnectionForReading();
377                if (connection != null) {
378                    logger.debug("{} using read connection {} for executing query", this, connection);
379                    statement = connection.createStatement();
380                    statement.setQueryTimeout(context.getQueryTimeout());
381                    results = executeQuery(statement, command.getSQL());
382                    if (context.shouldPrepareResultSetMetadata()) {
383                        prepare(results.getMetaData());
384                    }
385                    RiverMouthKeyValueStreamListener<Object, Object> listener = new RiverMouthKeyValueStreamListener<Object, Object>()
386                            .output(context.getRiverMouth())
387                            .shouldIgnoreNull(context.shouldIgnoreNull());
388                    merge(command, results, listener);
389                }
390            } else {
391                // use write connection
392                Connection connection = getConnectionForWriting();
393                if (connection != null) {
394                    logger.debug("{} using write connection {} for executing insert/update", this, connection);
395                    statement = connection.createStatement();
396                    executeUpdate(statement, command.getSQL());
397                }
398            }
399        } finally {
400            close(results);
401            close(statement);
402        }
403    }
404
405    /**
406     * Execute SQL query command with parameter binding.
407     *
408     * @param command the SQL command
409     * @throws SQLException when SQL execution gives an error
410     * @throws IOException  when input/output error occurs
411     */
412    private void executeWithParameter(SQLCommand command) throws Exception {
413        PreparedStatement statement = null;
414        ResultSet results = null;
415        try {
416            if (command.isQuery()) {
417                statement = prepareQuery(command.getSQL());
418                bind(statement, command.getParameters());
419                results = executeQuery(statement);
420                RiverMouthKeyValueStreamListener<Object, Object> listener = new RiverMouthKeyValueStreamListener<Object, Object>()
421                        .output(context.getRiverMouth())
422                        .shouldIgnoreNull(context.shouldIgnoreNull());
423                merge(command, results, listener);
424            } else {
425                statement = prepareUpdate(command.getSQL());
426                bind(statement, command.getParameters());
427                executeUpdate(statement);
428            }
429        } finally {
430            close(results);
431            close(statement);
432        }
433    }
434
435    /**
436     * Execute callable SQL command
437     *
438     * @param command the SQL command
439     * @throws SQLException when SQL execution gives an error
440     * @throws IOException  when input/output error occurs
441     */
442    private void executeCallable(SQLCommand command) throws Exception {
443        // call stored procedure
444        CallableStatement statement = null;
445        try {
446            // we do not make a difference betwwen read/write and we assume
447            // it is safe to use the read connection and query the DB
448            Connection connection = getConnectionForWriting();
449            logger.debug("{} using write connection {} for executing callable statement", this, connection);
450            if (connection != null) {
451                statement = connection.prepareCall(command.getSQL());
452                if (!command.getParameters().isEmpty()) {
453                    bind(statement, command.getParameters());
454                }
455                if (!command.getRegister().isEmpty()) {
456                    register(statement, command.getRegister());
457                }
458                boolean hasRows = statement.execute();
459                RiverMouthKeyValueStreamListener<Object, Object> listener = new RiverMouthKeyValueStreamListener<Object, Object>()
460                        .output(context.getRiverMouth());
461                if (hasRows) {
462                    logger.debug("callable execution created result set");
463                    while (hasRows) {
464                        // merge result set, but use register
465                        merge(command, statement.getResultSet(), listener);
466                        hasRows = statement.getMoreResults();
467                    }
468                } else {
469                    // no result set, merge from registered params only
470                    merge(command, statement, listener);
471                }
472            }
473        } finally {
474            close(statement);
475        }
476    }
477
478    /**
479     * Merge key/values from JDBC result set
480     *
481     * @param results  result set
482     * @param listener the value listener
483     * @throws SQLException when SQL execution gives an error
484     * @throws IOException  when input/output error occurs
485     */
486    protected void merge(SQLCommand command, ResultSet results, KeyValueStreamListener listener)
487            throws SQLException, IOException, ParseException {
488        if (listener == null) {
489            return;
490        }
491        beforeRows(command, results, listener);
492        long rows = 0L;
493        while (nextRow(command, results, listener)) {
494            rows++;
495            if (context.getMetric() != null) {
496                context.getMetric().mark();
497            }
498            while (suspended) {
499                try {
500                    Thread.sleep(1000L);
501                } catch (InterruptedException e) {
502                    Thread.currentThread().interrupt();
503                    logger.warn("interrupted");
504                }
505            }
506        }
507        context.setLastRowCount(rows);
508        if (rows > 0) {
509            logger.debug("merged {} rows", rows);
510        } else {
511            logger.debug("no rows merged ");
512        }
513        afterRows(command, results, listener);
514    }
515
516    /**
517     * Prepare a query statement
518     *
519     * @param sql the SQL statement
520     * @return a prepared statement
521     * @throws SQLException when SQL execution gives an error
522     */
523    @Override
524    public PreparedStatement prepareQuery(String sql) throws SQLException {
525        Connection connection = getConnectionForReading();
526        if (connection == null) {
527            throw new SQLException("can't connect to source " + url);
528        }
529        logger.debug("preparing statement with SQL {}", sql);
530        int type = "TYPE_FORWARD_ONLY".equals(context.getResultSetType()) ?
531                ResultSet.TYPE_FORWARD_ONLY : "TYPE_SCROLL_SENSITIVE".equals(context.getResultSetType()) ?
532                ResultSet.TYPE_SCROLL_SENSITIVE : "TYPE_SCROLL_INSENSITIVE".equals(context.getResultSetType()) ?
533                ResultSet.TYPE_SCROLL_INSENSITIVE : ResultSet.TYPE_FORWARD_ONLY;
534        int concurrency = "CONCUR_READ_ONLY".equals(context.getResultSetConcurrency()) ?
535                ResultSet.CONCUR_READ_ONLY : ResultSet.CONCUR_UPDATABLE;
536        return connection.prepareStatement(sql, type, concurrency);
537    }
538
539    /**
540     * Prepare an update statement
541     *
542     * @param sql the SQL statement
543     * @return a prepared statement
544     * @throws SQLException when SQL execution gives an error
545     */
546    @Override
547    public PreparedStatement prepareUpdate(String sql) throws SQLException {
548        Connection connection = getConnectionForWriting();
549        if (connection == null) {
550            throw new SQLException("can't connect to source " + url);
551        }
552        return connection.prepareStatement(sql);
553    }
554
555    /**
556     * Bind values to prepared statement
557     *
558     * @param statement the prepared statement
559     * @param values    the values to bind
560     * @throws SQLException when SQL execution gives an error
561     */
562    @Override
563    public SimpleRiverSource bind(PreparedStatement statement, List<Object> values) throws SQLException {
564        if (values == null) {
565            logger.warn("no values given for bind");
566            return this;
567        }
568        for (int i = 1; i <= values.size(); i++) {
569            bind(statement, i, values.get(i - 1));
570        }
571        return this;
572    }
573
574    /**
575     * Merge key/values from registered params of a callable statement
576     *
577     * @param statement callable statement
578     * @param listener  the value listener
579     * @throws SQLException when SQL execution gives an error
580     * @throws IOException  when input/output error occurs
581     */
582    @SuppressWarnings({"unchecked"})
583    private void merge(SQLCommand command, CallableStatement statement, KeyValueStreamListener listener)
584            throws SQLException, IOException {
585        Map<String, Object> map = command.getRegister();
586        if (map.isEmpty()) {
587            // no register given, return without doing anything
588            return;
589        }
590        List<String> keys = newLinkedList();
591        List<Object> values = newLinkedList();
592        for (Map.Entry<String, Object> entry : map.entrySet()) {
593            String k = entry.getKey();
594            Map<String, Object> v = (Map<String, Object>) entry.getValue();
595            Integer pos = (Integer) v.get("pos"); // the parameter position of the value
596            String field = (String) v.get("field"); // the field for indexing the value (if not key name)
597            keys.add(field != null ? field : k);
598            values.add(statement.getObject(pos));
599        }
600        logger.trace("merge callable statement result: keys={} values={}", keys, values);
601        listener.keys(keys);
602        listener.values(values);
603        listener.end();
604    }
605
606    /**
607     * Register variables in callable statement
608     *
609     * @param statement callable statement
610     * @param values    values
611     * @return this river source
612     * @throws SQLException when SQL execution gives an error
613     */
614    @Override
615    @SuppressWarnings({"unchecked"})
616    public SimpleRiverSource register(CallableStatement statement, Map<String, Object> values) throws SQLException {
617        if (values == null) {
618            return this;
619        }
620        for (Map.Entry<String, Object> me : values.entrySet()) {
621            // { "key" : { "pos": n, "type" : "VARCHAR", "field" : "fieldname" }, ... }
622            Map<String, Object> m = (Map<String, Object>) me.getValue();
623            Integer n = (Integer) m.get("pos");
624            String type = (String) m.get("type");
625            if (n != null && type != null) {
626                logger.debug("n={} type={}", n, toJDBCType(type));
627                try {
628                    statement.registerOutParameter(n, toJDBCType(type));
629                } catch (Throwable t) {
630                    logger.warn("can't register out parameter " + n + " of type " + type);
631                }
632            }
633        }
634        return this;
635    }
636
637    /**
638     * Execute prepared query statement
639     *
640     * @param statement the prepared statement
641     * @return the result set
642     * @throws SQLException when SQL execution gives an error
643     */
644    @Override
645    public ResultSet executeQuery(PreparedStatement statement) throws SQLException {
646        statement.setMaxRows(context.getMaxRows());
647        statement.setFetchSize(context.getFetchSize());
648        return statement.executeQuery();
649    }
650
651    /**
652     * Execute query statement
653     *
654     * @param statement the statement
655     * @param sql       the SQL
656     * @return the result set
657     * @throws SQLException when SQL execution gives an error
658     */
659    @Override
660    public ResultSet executeQuery(Statement statement, String sql) throws SQLException {
661        statement.setMaxRows(context.getMaxRows());
662        statement.setFetchSize(context.getFetchSize());
663        return statement.executeQuery(sql);
664    }
665
666    /**
667     * Execute prepared update statement
668     *
669     * @param statement the prepared statement
670     * @return the result set
671     * @throws SQLException when SQL execution gives an error
672     */
673    @Override
674    public RiverSource executeUpdate(PreparedStatement statement) throws SQLException {
675        statement.executeUpdate();
676        if (!writeConnection.getAutoCommit()) {
677            writeConnection.commit();
678        }
679        return this;
680    }
681
682    /**
683     * Execute prepared update statement
684     *
685     * @param statement the prepared statement
686     * @return the result set
687     * @throws SQLException when SQL execution gives an error
688     */
689    @Override
690    public RiverSource executeUpdate(Statement statement, String sql) throws SQLException {
691        statement.executeUpdate(sql);
692        if (!writeConnection.getAutoCommit()) {
693            writeConnection.commit();
694        }
695        return this;
696    }
697
698    @Override
699    public void beforeRows(ResultSet results, KeyValueStreamListener listener)
700            throws SQLException, IOException {
701        beforeRows(null, results, listener);
702    }
703
704    /**
705     * Before rows are read, let the KeyValueStreamListener know about the keys.
706     * If the SQL command was a callable statement and a register is there, look into the register map
707     * for the key names, not in the result set metadata.
708     *
709     * @param command  the SQL command that created this result set
710     * @param results  the result set
711     * @param listener the key/value stream listener
712     * @throws SQLException when SQL execution gives an error
713     * @throws IOException  when input/output error occurs
714     */
715    @Override
716    @SuppressWarnings({"unchecked"})
717    public void beforeRows(SQLCommand command, ResultSet results, KeyValueStreamListener listener)
718            throws SQLException, IOException {
719        List<String> keys = new LinkedList();
720        if (command != null && command.isCallable() && !command.getRegister().isEmpty()) {
721            for (Map.Entry<String, Object> me : command.getRegister().entrySet()) {
722                keys.add(me.getKey());
723            }
724        } else {
725            ResultSetMetaData metadata = results.getMetaData();
726            int columns = metadata.getColumnCount();
727            for (int i = 1; i <= columns; i++) {
728                if (context.getColumnNameMap() == null) {
729                    keys.add(metadata.getColumnLabel(i));
730                } else {
731                    keys.add(mapColumnName(metadata.getColumnLabel(i)));
732                }
733            }
734        }
735        listener.begin();
736        listener.keys(keys);
737    }
738
739    @Override
740    public boolean nextRow(ResultSet results, KeyValueStreamListener listener)
741            throws SQLException, IOException {
742        return nextRow(null, results, listener);
743    }
744
745    /**
746     * Get next row and prepare the values for processing. The labels of each
747     * columns are used for the ValueListener as paths for JSON object merging.
748     *
749     * @param command  the SQL command that created this result set
750     * @param results  the result set
751     * @param listener the listener
752     * @return true if row exists and was processed, false otherwise
753     * @throws SQLException when SQL execution gives an error
754     * @throws IOException  when input/output error occurs
755     */
756    @Override
757    public boolean nextRow(SQLCommand command, ResultSet results, KeyValueStreamListener listener)
758            throws SQLException, IOException {
759        if (results.next()) {
760            processRow(results, listener);
761            return true;
762        }
763        return false;
764    }
765
766    @Override
767    public void afterRows(ResultSet results, KeyValueStreamListener listener)
768            throws SQLException, IOException {
769        afterRows(null, results, listener);
770    }
771
772    /**
773     * After the rows keys and values, let the listener know about the end of
774     * the result set.
775     *
776     * @param command  the SQL command that created this result set
777     * @param results  the result set
778     * @param listener the key/value stream listener
779     * @throws SQLException when SQL execution gives an error
780     * @throws IOException  when input/output error occurs
781     */
782    @Override
783    public void afterRows(SQLCommand command, ResultSet results, KeyValueStreamListener listener)
784            throws SQLException, IOException {
785        listener.end();
786    }
787
788    @SuppressWarnings({"unchecked"})
789    private void processRow(ResultSet results, KeyValueStreamListener listener)
790            throws SQLException, IOException {
791        List<Object> values = new LinkedList<Object>();
792        ResultSetMetaData metadata = results.getMetaData();
793        int columns = metadata.getColumnCount();
794        context.setLastRow(new HashMap());
795        for (int i = 1; i <= columns; i++) {
796            try {
797                Object value = parseType(results, i, metadata.getColumnType(i), locale);
798                logger.trace("value={} class={}", value, value != null ? value.getClass().getName() : "");
799                values.add(value);
800                context.getLastRow().put("$row." + metadata.getColumnLabel(i), value);
801            } catch (ParseException e) {
802                logger.warn("parse error for value {}, using null instead", results.getObject(i));
803                values.add(null);
804            }
805        }
806        if (listener != null) {
807            listener.values(values);
808        }
809    }
810
811    /**
812     * Close result set
813     *
814     * @param result the result set to be closed or null
815     * @throws SQLException when SQL execution gives an error
816     */
817    @Override
818    public SimpleRiverSource close(ResultSet result) throws SQLException {
819        if (result != null) {
820            result.close();
821        }
822        return this;
823    }
824
825    /**
826     * Close statement
827     *
828     * @param statement the statement to be closed or null
829     * @throws SQLException when SQL execution gives an error
830     */
831    @Override
832    public SimpleRiverSource close(Statement statement) throws SQLException {
833        if (statement != null) {
834            statement.close();
835        }
836        return this;
837    }
838
839    /**
840     * Close read connection
841     */
842    @Override
843    public SimpleRiverSource closeReading() {
844        try {
845            if (readConnection != null && !readConnection.isClosed()) {
846                // always commit before close to finish cursors/transactions
847                if (!readConnection.getAutoCommit()) {
848                    readConnection.commit();
849                }
850                readConnection.close();
851            }
852        } catch (SQLException e) {
853            logger.warn("while closing read connection: " + e.getMessage());
854        }
855        return this;
856    }
857
858    /**
859     * Close read connection
860     */
861    @Override
862    public SimpleRiverSource closeWriting() {
863        try {
864            if (writeConnection != null && !writeConnection.isClosed()) {
865                // always commit before close to finish cursors/transactions
866                if (!writeConnection.getAutoCommit()) {
867                    writeConnection.commit();
868                }
869                writeConnection.close();
870            }
871        } catch (SQLException e) {
872            logger.warn("while closing write connection: " + e.getMessage());
873        }
874        return this;
875    }
876
877    private void prepare(final DatabaseMetaData metaData) throws SQLException {
878        Map<String, Object> m = new HashMap<String, Object>() {
879            {
880                put("$meta.db.allproceduresarecallable", metaData.allProceduresAreCallable());
881                put("$meta.db.alltablesareselectable", metaData.allTablesAreSelectable());
882                put("$meta.db.autocommitclosesallresultsets", metaData.autoCommitFailureClosesAllResultSets());
883                put("$meta.db.datadefinitioncasestransactioncommit", metaData.dataDefinitionCausesTransactionCommit());
884                put("$meta.db.datadefinitionignoredintransactions", metaData.dataDefinitionIgnoredInTransactions());
885                put("$meta.db.doesmaxrowsizeincludeblobs", metaData.doesMaxRowSizeIncludeBlobs());
886                put("$meta.db.catalogseparator", metaData.getCatalogSeparator());
887                put("$meta.db.catalogterm", metaData.getCatalogTerm());
888                put("$meta.db.databasemajorversion", metaData.getDatabaseMajorVersion());
889                put("$meta.db.databaseminorversion", metaData.getDatabaseMinorVersion());
890                put("$meta.db.databaseproductname", metaData.getDatabaseProductName());
891                put("$meta.db.databaseproductversion", metaData.getDatabaseProductVersion());
892                put("$meta.db.defaulttransactionisolation", metaData.getDefaultTransactionIsolation());
893                put("$meta.db.drivermajorversion", metaData.getDriverMajorVersion());
894                put("$meta.db.driverminorversion", metaData.getDriverMinorVersion());
895                put("$meta.db.drivername", metaData.getDriverName());
896                put("$meta.db.driverversion", metaData.getDriverVersion());
897                put("$meta.db.extranamecharacters", metaData.getExtraNameCharacters());
898                put("$meta.db.identifierquotestring", metaData.getIdentifierQuoteString());
899                put("$meta.db.jdbcmajorversion", metaData.getJDBCMajorVersion());
900                put("$meta.db.jdbcminorversion", metaData.getJDBCMinorVersion());
901                put("$meta.db.maxbinaryliterallength", metaData.getMaxBinaryLiteralLength());
902                put("$meta.db.maxcatalognamelength", metaData.getMaxCatalogNameLength());
903                put("$meta.db.maxcharliterallength", metaData.getMaxCharLiteralLength());
904                put("$meta.db.maxcolumnnamelength", metaData.getMaxColumnNameLength());
905                put("$meta.db.maxcolumnsingroupby", metaData.getMaxColumnsInGroupBy());
906                put("$meta.db.maxcolumnsinindex", metaData.getMaxColumnsInIndex());
907                put("$meta.db.maxcolumnsinorderby", metaData.getMaxColumnsInOrderBy());
908                put("$meta.db.maxcolumnsinselect", metaData.getMaxColumnsInSelect());
909                put("$meta.db.maxcolumnsintable", metaData.getMaxColumnsInTable());
910                put("$meta.db.maxconnections", metaData.getMaxConnections());
911                put("$meta.db.maxcursornamelength", metaData.getMaxCursorNameLength());
912                put("$meta.db.maxindexlength", metaData.getMaxIndexLength());
913                put("$meta.db.maxusernamelength", metaData.getMaxUserNameLength());
914                put("$meta.db.maxprocedurenamelength", metaData.getMaxProcedureNameLength());
915                put("$meta.db.maxrowsize", metaData.getMaxRowSize());
916                put("$meta.db.maxschemanamelength", metaData.getMaxSchemaNameLength());
917                put("$meta.db.maxstatementlength", metaData.getMaxStatementLength());
918                put("$meta.db.maxstatements", metaData.getMaxStatements());
919                put("$meta.db.maxtablenamelength", metaData.getMaxTableNameLength());
920                put("$meta.db.maxtablesinselect", metaData.getMaxTablesInSelect());
921                put("$meta.db.numericfunctions", metaData.getNumericFunctions());
922                put("$meta.db.procedureterm", metaData.getProcedureTerm());
923                put("$meta.db.resultsetholdability", metaData.getResultSetHoldability());
924                put("$meta.db.rowidlifetime", metaData.getRowIdLifetime().name());
925                put("$meta.db.schematerm", metaData.getSchemaTerm());
926                put("$meta.db.searchstringescape", metaData.getSearchStringEscape());
927                put("$meta.db.sqlkeywords", metaData.getSQLKeywords());
928                put("$meta.db.sqlstatetype", metaData.getSQLStateType());
929            }
930        };
931        context.setLastDatabaseMetadata(m);
932    }
933
934    private void prepare(final ResultSetMetaData metaData) throws SQLException {
935        Map<String, Object> m = new HashMap<String, Object>() {
936            {
937                put("$meta.row.columnCount", metaData.getColumnCount());
938            }
939        };
940        for (int i = 0; i < metaData.getColumnCount(); i++) {
941            m.put("$meta.rs.catalogname." + i, metaData.getCatalogName(i));
942            m.put("$meta.rs.columnclassname." + i, metaData.getColumnClassName(i));
943            m.put("$meta.rs.columndisplaysize." + i, metaData.getColumnDisplaySize(i));
944            m.put("$meta.rs.columnlabel." + i, metaData.getColumnLabel(i));
945            m.put("$meta.rs.columnname." + i, metaData.getColumnName(i));
946            m.put("$meta.rs.columntype." + i, metaData.getColumnType(i));
947            m.put("$meta.rs.columntypename." + i, metaData.getColumnTypeName(i));
948            m.put("$meta.rs.precision." + i, metaData.getPrecision(i));
949            m.put("$meta.rs.scale." + i, metaData.getScale(i));
950            m.put("$meta.rs.schemaname." + i, metaData.getSchemaName(i));
951            m.put("$meta.rs.tablename." + i, metaData.getTableName(i));
952            m.put("$meta.rs.isautoincrement." + i, metaData.isAutoIncrement(i));
953            m.put("$meta.rs.iscasesensitive." + i, metaData.isCaseSensitive(i));
954            m.put("$meta.rs.iscurrency." + i, metaData.isCurrency(i));
955            m.put("$meta.rs.isdefinitelywritable." + i, metaData.isDefinitelyWritable(i));
956            m.put("$meta.rs.isnullable." + i, metaData.isNullable(i));
957            m.put("$meta.rs.isreadonly." + i, metaData.isReadOnly(i));
958            m.put("$meta.rs.issearchable." + i, metaData.isSearchable(i));
959            m.put("$meta.rs.issigned." + i, metaData.isSigned(i));
960            m.put("$meta.rs.iswritable." + i, metaData.isWritable(i));
961        }
962        context.setLastResultSetMetadata(m);
963    }
964
965    private void bind(PreparedStatement statement, int i, Object value) throws SQLException {
966        logger.trace("bind: value = {}", value);
967        if (value == null) {
968            statement.setNull(i, Types.VARCHAR);
969        } else if (value instanceof String) {
970            String s = (String) value;
971            if ("$now".equals(s)) {
972                Timestamp t = new Timestamp(new DateTime().getMillis());
973                logger.trace("setting $now to {}", t);
974                statement.setTimestamp(i, t, calendar);
975            } else if ("$job".equals(s)) {
976                logger.trace("setting $job to {}", context.getRiverState().getCounter());
977                statement.setInt(i, context.getRiverState().getCounter());
978            } else if ("$count".equals(s)) {
979                logger.trace("setting $count to {}", context.getLastRowCount());
980                statement.setLong(i, context.getLastRowCount());
981            } else if ("$last.sql.start".equals(s)) {
982                if (context.getLastExecutionStartDate() == 0L) {
983                    Timestamp riverStarted = new Timestamp(context.getRiverState().getStarted().getMillis());
984                    context.setLastExecutionStartDate(riverStarted.getTime());
985                }
986                statement.setTimestamp(i, new Timestamp(context.getLastExecutionStartDate()), calendar);
987            } else if ("$last.sql.end".equals(s)) {
988                if (context.getLastExecutionEndDate() == 0L) {
989                    Timestamp riverStarted = context.getRiverState() != null ?
990                            new Timestamp(context.getRiverState().getStarted().getMillis()) : null;
991                    context.setLastExecutionEndDate(riverStarted != null ? riverStarted.getTime() : new DateTime().getMillis());
992                }
993                statement.setTimestamp(i, new Timestamp(context.getLastExecutionEndDate()), calendar);
994            } else if ("$last.sql.sequence.start".equals(s)) {
995                if (context.getLastStartDate() == 0L) {
996                    Timestamp riverStarted = new Timestamp(context.getRiverState().getStarted().getMillis());
997                    context.setLastStartDate(riverStarted.getTime());
998                }
999                statement.setTimestamp(i, new Timestamp(context.getLastStartDate()), calendar);
1000            } else if ("$last.sql.sequence.end".equals(s)) {
1001                if (context.getLastEndDate() == 0L) {
1002                    Timestamp riverStarted = context.getRiverState() != null ?
1003                            new Timestamp(context.getRiverState().getStarted().getMillis()) : null;
1004                    context.setLastEndDate(riverStarted != null ? riverStarted.getTime() : new DateTime().getMillis());
1005                }
1006                statement.setTimestamp(i, new Timestamp(context.getLastEndDate()), calendar);
1007            } else if ("$river.name".equals(s)) {
1008                String name = context.getRiverState().getName();
1009                statement.setString(i, name);
1010            } else if ("$river.state.started".equals(s)) {
1011                Timestamp started = new Timestamp(context.getRiverState().getStarted().getMillis());
1012                statement.setTimestamp(i, started, calendar);
1013            } else if ("$river.state.last_active_begin".equals(s)) {
1014                Timestamp timestamp = new Timestamp(context.getRiverState().getLastActiveBegin().getMillis());
1015                statement.setTimestamp(i, timestamp, calendar);
1016            } else if ("$river.state.last_active_end".equals(s)) {
1017                Timestamp timestamp = new Timestamp(context.getRiverState().getLastActiveEnd().getMillis());
1018                statement.setTimestamp(i, timestamp, calendar);
1019            } else if ("$river.state.counter".equals(s)) {
1020                Integer counter = context.getRiverState().getCounter();
1021                if (counter != null) {
1022                    statement.setInt(i, counter);
1023                }
1024            } else if (context.shouldPrepareDatabaseMetadata()) {
1025                for (String k : context.getLastDatabaseMetadata().keySet()) {
1026                    if (k.equals(s)) {
1027                        statement.setObject(i, context.getLastDatabaseMetadata().get(k));
1028                    }
1029                }
1030            } else if (context.shouldPrepareResultSetMetadata()) {
1031                for (String k : context.getLastResultSetMetadata().keySet()) {
1032                    if (k.equals(s)) {
1033                        statement.setObject(i, context.getLastResultSetMetadata().get(k));
1034                    }
1035                }
1036            } else {
1037                Object rowValue = context.getLastRow().get(s);
1038                if (rowValue != null) {
1039                    statement.setObject(i, rowValue);
1040                } else {
1041                    statement.setString(i, (String) value);
1042                }
1043            }
1044        } else if (value instanceof Integer) {
1045            statement.setInt(i, (Integer) value);
1046        } else if (value instanceof Long) {
1047            statement.setLong(i, (Long) value);
1048        } else if (value instanceof BigDecimal) {
1049            statement.setBigDecimal(i, (BigDecimal) value);
1050        } else if (value instanceof Date) {
1051            statement.setDate(i, (Date) value);
1052        } else if (value instanceof Timestamp) {
1053            statement.setTimestamp(i, (Timestamp) value, calendar);
1054        } else if (value instanceof Float) {
1055            statement.setFloat(i, (Float) value);
1056        } else if (value instanceof Double) {
1057            statement.setDouble(i, (Double) value);
1058        } else {
1059            statement.setObject(i, value);
1060        }
1061    }
1062
1063    /**
1064     * Parse of value of result set
1065     *
1066     * @param result the result set
1067     * @param i      the offset in the result set
1068     * @param type   the JDBC type
1069     * @param locale the locale to use for parsing
1070     * @return The parse value
1071     * @throws SQLException when SQL execution gives an error
1072     * @throws IOException  when input/output error occurs
1073     */
1074    @Override
1075    public Object parseType(ResultSet result, Integer i, int type, Locale locale)
1076            throws SQLException, IOException, ParseException {
1077        logger.trace("i={} type={}", i, type);
1078        switch (type) {
1079            /**
1080             * The JDBC types CHAR, VARCHAR, and LONGVARCHAR are closely
1081             * related. CHAR represents a small, fixed-length character string,
1082             * VARCHAR represents a small, variable-length character string, and
1083             * LONGVARCHAR represents a large, variable-length character string.
1084             */
1085            case Types.CHAR:
1086            case Types.VARCHAR:
1087            case Types.LONGVARCHAR: {
1088                return result.getString(i);
1089            }
1090            case Types.NCHAR:
1091            case Types.NVARCHAR:
1092            case Types.LONGNVARCHAR: {
1093                return result.getNString(i);
1094            }
1095            /**
1096             * The JDBC types BINARY, VARBINARY, and LONGVARBINARY are closely
1097             * related. BINARY represents a small, fixed-length binary value,
1098             * VARBINARY represents a small, variable-length binary value, and
1099             * LONGVARBINARY represents a large, variable-length binary value
1100             */
1101            case Types.BINARY:
1102            case Types.VARBINARY:
1103            case Types.LONGVARBINARY: {
1104                byte[] b = result.getBytes(i);
1105                return context.shouldTreatBinaryAsString() ? (b != null ? new String(b) : null) : b;
1106            }
1107            /**
1108             * The JDBC type ARRAY represents the SQL3 type ARRAY.
1109             *
1110             * An ARRAY value is mapped to an instance of the Array interface in
1111             * the Java programming language. If a driver follows the standard
1112             * implementation, an Array object logically points to an ARRAY
1113             * value on the server rather than containing the elements of the
1114             * ARRAY object, which can greatly increase efficiency. The Array
1115             * interface contains methods for materializing the elements of the
1116             * ARRAY object on the client in the form of either an array or a
1117             * ResultSet object.
1118             */
1119            case Types.ARRAY: {
1120                Array arr = result.getArray(i);
1121                return arr == null ? null : arr.getArray();
1122            }
1123            /**
1124             * The JDBC type BIGINT represents a 64-bit signed integer value
1125             * between -9223372036854775808 and 9223372036854775807.
1126             *
1127             * The corresponding SQL type BIGINT is a nonstandard extension to
1128             * SQL. In practice the SQL BIGINT type is not yet currently
1129             * implemented by any of the major databases, and we recommend that
1130             * its use be avoided in code that is intended to be portable.
1131             *
1132             * The recommended Java mapping for the BIGINT type is as a Java
1133             * long.
1134             */
1135            case Types.BIGINT: {
1136                Object o = result.getLong(i);
1137                return result.wasNull() ? null : o;
1138            }
1139            /**
1140             * The JDBC type BIT represents a single bit value that can be zero
1141             * or one.
1142             *
1143             * SQL-92 defines an SQL BIT type. However, unlike the JDBC BIT
1144             * type, this SQL-92 BIT type can be used as a parameterized type to
1145             * define a fixed-length binary string. Fortunately, SQL-92 also
1146             * permits the use of the simple non-parameterized BIT type to
1147             * represent a single binary digit, and this usage corresponds to
1148             * the JDBC BIT type. Unfortunately, the SQL-92 BIT type is only
1149             * required in "full" SQL-92 and is currently supported by only a
1150             * subset of the major databases. Portable code may therefore prefer
1151             * to use the JDBC SMALLINT type, which is widely supported.
1152             */
1153            case Types.BIT: {
1154                try {
1155                    Object o = result.getInt(i);
1156                    return result.wasNull() ? null : o;
1157                } catch (Exception e) {
1158                    String exceptionClassName = e.getClass().getName();
1159                    // postgresql can not handle boolean, it will throw PSQLException, something like "Bad value for type int : t"
1160                    if ("org.postgresql.util.PSQLException".equals(exceptionClassName)) {
1161                        return "t".equals(result.getString(i));
1162                    }
1163                    throw new IOException(e);
1164                }
1165            }
1166            /**
1167             * The JDBC type BOOLEAN, which is new in the JDBC 3.0 API, maps to
1168             * a boolean in the Java programming language. It provides a
1169             * representation of true and false, and therefore is a better match
1170             * than the JDBC type BIT, which is either 1 or 0.
1171             */
1172            case Types.BOOLEAN: {
1173                return result.getBoolean(i);
1174            }
1175            /**
1176             * The JDBC type BLOB represents an SQL3 BLOB (Binary Large Object).
1177             *
1178             * A JDBC BLOB value is mapped to an instance of the Blob interface
1179             * in the Java programming language. If a driver follows the
1180             * standard implementation, a Blob object logically points to the
1181             * BLOB value on the server rather than containing its binary data,
1182             * greatly improving efficiency. The Blob interface provides methods
1183             * for materializing the BLOB data on the client when that is
1184             * desired.
1185             */
1186            case Types.BLOB: {
1187                Blob blob = result.getBlob(i);
1188                if (blob != null) {
1189                    long n = blob.length();
1190                    if (n > Integer.MAX_VALUE) {
1191                        throw new IOException("can't process blob larger than Integer.MAX_VALUE");
1192                    }
1193                    byte[] tab = blob.getBytes(1, (int) n);
1194                    blob.free();
1195                    return tab;
1196                }
1197                break;
1198            }
1199            /**
1200             * The JDBC type CLOB represents the SQL3 type CLOB (Character Large
1201             * Object).
1202             *
1203             * A JDBC CLOB value is mapped to an instance of the Clob interface
1204             * in the Java programming language. If a driver follows the
1205             * standard implementation, a Clob object logically points to the
1206             * CLOB value on the server rather than containing its character
1207             * data, greatly improving efficiency. Two of the methods on the
1208             * Clob interface materialize the data of a CLOB object on the
1209             * client.
1210             */
1211            case Types.CLOB: {
1212                Clob clob = result.getClob(i);
1213                if (clob != null) {
1214                    long n = clob.length();
1215                    if (n > Integer.MAX_VALUE) {
1216                        throw new IOException("can't process clob larger than Integer.MAX_VALUE");
1217                    }
1218                    String str = clob.getSubString(1, (int) n);
1219                    clob.free();
1220                    return str;
1221                }
1222                break;
1223            }
1224            case Types.NCLOB: {
1225                NClob nclob = result.getNClob(i);
1226                if (nclob != null) {
1227                    long n = nclob.length();
1228                    if (n > Integer.MAX_VALUE) {
1229                        throw new IOException("can't process nclob larger than Integer.MAX_VALUE");
1230                    }
1231                    String str = nclob.getSubString(1, (int) n);
1232                    nclob.free();
1233                    return str;
1234                }
1235                break;
1236            }
1237            /**
1238             * The JDBC type DATALINK, new in the JDBC 3.0 API, is a column
1239             * value that references a file that is outside of a data source but
1240             * is managed by the data source. It maps to the Java type
1241             * java.net.URL and provides a way to manage external files. For
1242             * instance, if the data source is a DBMS, the concurrency controls
1243             * it enforces on its own data can be applied to the external file
1244             * as well.
1245             *
1246             * A DATALINK value is retrieved from a ResultSet object with the
1247             * ResultSet methods getURL or getObject. If the Java platform does
1248             * not support the type of URL returned by getURL or getObject, a
1249             * DATALINK value can be retrieved as a String object with the
1250             * method getString.
1251             *
1252             * java.net.URL values are stored in a database using the method
1253             * setURL. If the Java platform does not support the type of URL
1254             * being set, the method setString can be used instead.
1255             *
1256             *
1257             */
1258            case Types.DATALINK: {
1259                return result.getURL(i);
1260            }
1261            /**
1262             * The JDBC DATE type represents a date consisting of day, month,
1263             * and year. The corresponding SQL DATE type is defined in SQL-92,
1264             * but it is implemented by only a subset of the major databases.
1265             * Some databases offer alternative SQL types that support similar
1266             * semantics.
1267             */
1268            case Types.DATE: {
1269                try {
1270                    Date d = result.getDate(i, calendar);
1271                    return d != null ? formatDate(d.getTime()) : null;
1272                } catch (SQLException e) {
1273                    return null;
1274                }
1275            }
1276            case Types.TIME: {
1277                try {
1278                    Time t = result.getTime(i, calendar);
1279                    return t != null ? formatDate(t.getTime()) : null;
1280                } catch (SQLException e) {
1281                    return null;
1282                }
1283            }
1284            case Types.TIMESTAMP: {
1285                try {
1286                    Timestamp t = result.getTimestamp(i, calendar);
1287                    return t != null ? formatDate(t.getTime()) : null;
1288                } catch (SQLException e) {
1289                    // java.sql.SQLException: Cannot convert value '0000-00-00 00:00:00' from column ... to TIMESTAMP.
1290                    return null;
1291                }
1292            }
1293            /**
1294             * The JDBC types DECIMAL and NUMERIC are very similar. They both
1295             * represent fixed-precision decimal values.
1296             *
1297             * The corresponding SQL types DECIMAL and NUMERIC are defined in
1298             * SQL-92 and are very widely implemented. These SQL types take
1299             * precision and scale parameters. The precision is the total number
1300             * of decimal digits supported, and the scale is the number of
1301             * decimal digits after the decimal point. For most DBMSs, the scale
1302             * is less than or equal to the precision. So for example, the value
1303             * "12.345" has a precision of 5 and a scale of 3, and the value
1304             * ".11" has a precision of 2 and a scale of 2. JDBC requires that
1305             * all DECIMAL and NUMERIC types support both a precision and a
1306             * scale of at least 15.
1307             *
1308             * The sole distinction between DECIMAL and NUMERIC is that the
1309             * SQL-92 specification requires that NUMERIC types be represented
1310             * with exactly the specified precision, whereas for DECIMAL types,
1311             * it allows an implementation to add additional precision beyond
1312             * that specified when the type was created. Thus a column created
1313             * with type NUMERIC(12,4) will always be represented with exactly
1314             * 12 digits, whereas a column created with type DECIMAL(12,4) might
1315             * be represented by some larger number of digits.
1316             *
1317             * The recommended Java mapping for the DECIMAL and NUMERIC types is
1318             * java.math.BigDecimal. The java.math.BigDecimal type provides math
1319             * operations to allow BigDecimal types to be added, subtracted,
1320             * multiplied, and divided with other BigDecimal types, with integer
1321             * types, and with floating point types.
1322             *
1323             * The method recommended for retrieving DECIMAL and NUMERIC values
1324             * is ResultSet.getBigDecimal. JDBC also allows access to these SQL
1325             * types as simple Strings or arrays of char. Thus, Java programmers
1326             * can use getString to receive a DECIMAL or NUMERIC result.
1327             * However, this makes the common case where DECIMAL or NUMERIC are
1328             * used for currency values rather awkward, since it means that
1329             * application writers have to perform math on strings. It is also
1330             * possible to retrieve these SQL types as any of the Java numeric
1331             * types.
1332             */
1333            case Types.DECIMAL:
1334            case Types.NUMERIC: {
1335                BigDecimal bd = null;
1336                try {
1337                    // getBigDecimal() should get obsolete. Most seem to use getString/getObject anyway...
1338                    bd = result.getBigDecimal(i);
1339                } catch (NullPointerException e) {
1340                    // But is it true? JDBC NPE exists since 13 years?
1341                    // http://forums.codeguru.com/archive/index.php/t-32443.html
1342                    // Null values are driving us nuts in JDBC:
1343                    // http://stackoverflow.com/questions/2777214/when-accessing-resultsets-in-jdbc-is-there-an-elegant-way-to-distinguish-betwee
1344                }
1345                if (bd == null || result.wasNull()) {
1346                    return null;
1347                }
1348                if (context.getScale() >= 0) {
1349                    bd = bd.setScale(context.getScale(), context.getRounding());
1350                    try {
1351                        long l = bd.longValueExact();
1352                        if (Long.toString(l).equals(result.getString(i))) {
1353                            // convert to long if possible
1354                            return l;
1355                        } else {
1356                            // convert to double (with precision loss)
1357                            return bd.doubleValue();
1358                        }
1359                    } catch (ArithmeticException e) {
1360                        return bd.doubleValue();
1361                    }
1362                } else {
1363                    return bd.toPlainString();
1364                }
1365            }
1366            /**
1367             * The JDBC type DOUBLE represents a "double precision" floating
1368             * point number that supports 15 digits of mantissa.
1369             *
1370             * The corresponding SQL type is DOUBLE PRECISION, which is defined
1371             * in SQL-92 and is widely supported by the major databases. The
1372             * SQL-92 standard leaves the precision of DOUBLE PRECISION up to
1373             * the implementation, but in practice all the major databases
1374             * supporting DOUBLE PRECISION support a mantissa precision of at
1375             * least 15 digits.
1376             *
1377             * The recommended Java mapping for the DOUBLE type is as a Java
1378             * double.
1379             */
1380            case Types.DOUBLE: {
1381                String s = result.getString(i);
1382                if (result.wasNull() || s == null) {
1383                    return null;
1384                }
1385                NumberFormat format = NumberFormat.getInstance(locale);
1386                Number number = format.parse(s);
1387                return number.doubleValue();
1388            }
1389            /**
1390             * The JDBC type FLOAT is basically equivalent to the JDBC type
1391             * DOUBLE. We provided both FLOAT and DOUBLE in a possibly misguided
1392             * attempt at consistency with previous database APIs. FLOAT
1393             * represents a "double precision" floating point number that
1394             * supports 15 digits of mantissa.
1395             *
1396             * The corresponding SQL type FLOAT is defined in SQL-92. The SQL-92
1397             * standard leaves the precision of FLOAT up to the implementation,
1398             * but in practice all the major databases supporting FLOAT support
1399             * a mantissa precision of at least 15 digits.
1400             *
1401             * The recommended Java mapping for the FLOAT type is as a Java
1402             * double. However, because of the potential confusion between the
1403             * double precision SQL FLOAT and the single precision Java float,
1404             * we recommend that JDBC programmers should normally use the JDBC
1405             * DOUBLE type in preference to FLOAT.
1406             */
1407            case Types.FLOAT: {
1408                String s = result.getString(i);
1409                if (result.wasNull() || s == null) {
1410                    return null;
1411                }
1412                NumberFormat format = NumberFormat.getInstance(locale);
1413                Number number = format.parse(s);
1414                return number.doubleValue();
1415            }
1416            /**
1417             * The JDBC type JAVA_OBJECT, added in the JDBC 2.0 core API, makes
1418             * it easier to use objects in the Java programming language as
1419             * values in a database. JAVA_OBJECT is simply a type code for an
1420             * instance of a class defined in the Java programming language that
1421             * is stored as a database object. The type JAVA_OBJECT is used by a
1422             * database whose type system has been extended so that it can store
1423             * Java objects directly. The JAVA_OBJECT value may be stored as a
1424             * serialized Java object, or it may be stored in some
1425             * vendor-specific format.
1426             *
1427             * The type JAVA_OBJECT is one of the possible values for the column
1428             * DATA_TYPE in the ResultSet objects returned by various
1429             * DatabaseMetaData methods, including getTypeInfo, getColumns, and
1430             * getUDTs. The method getUDTs, part of the new JDBC 2.0 core API,
1431             * will return information about the Java objects contained in a
1432             * particular schema when it is given the appropriate parameters.
1433             * Having this information available facilitates using a Java class
1434             * as a database type.
1435             */
1436            case Types.OTHER:
1437            case Types.JAVA_OBJECT: {
1438                return result.getObject(i);
1439            }
1440            /**
1441             * The JDBC type REAL represents a "single precision" floating point
1442             * number that supports seven digits of mantissa.
1443             *
1444             * The corresponding SQL type REAL is defined in SQL-92 and is
1445             * widely, though not universally, supported by the major databases.
1446             * The SQL-92 standard leaves the precision of REAL up to the
1447             * implementation, but in practice all the major databases
1448             * supporting REAL support a mantissa precision of at least seven
1449             * digits.
1450             *
1451             * The recommended Java mapping for the REAL type is as a Java
1452             * float.
1453             */
1454            case Types.REAL: {
1455                String s = result.getString(i);
1456                if (result.wasNull() || s == null) {
1457                    return null;
1458                }
1459                NumberFormat format = NumberFormat.getInstance(locale);
1460                Number number = format.parse(s);
1461                return number.doubleValue();
1462            }
1463            /**
1464             * The JDBC type TINYINT represents an 8-bit integer value between 0
1465             * and 255 that may be signed or unsigned.
1466             *
1467             * The corresponding SQL type, TINYINT, is currently supported by
1468             * only a subset of the major databases. Portable code may therefore
1469             * prefer to use the JDBC SMALLINT type, which is widely supported.
1470             *
1471             * The recommended Java mapping for the JDBC TINYINT type is as
1472             * either a Java byte or a Java short. The 8-bit Java byte type
1473             * represents a signed value from -128 to 127, so it may not always
1474             * be appropriate for larger TINYINT values, whereas the 16-bit Java
1475             * short will always be able to hold all TINYINT values.
1476             */
1477            /**
1478             * The JDBC type SMALLINT represents a 16-bit signed integer value
1479             * between -32768 and 32767.
1480             *
1481             * The corresponding SQL type, SMALLINT, is defined in SQL-92 and is
1482             * supported by all the major databases. The SQL-92 standard leaves
1483             * the precision of SMALLINT up to the implementation, but in
1484             * practice, all the major databases support at least 16 bits.
1485             *
1486             * The recommended Java mapping for the JDBC SMALLINT type is as a
1487             * Java short.
1488             */
1489            /**
1490             * The JDBC type INTEGER represents a 32-bit signed integer value
1491             * ranging between -2147483648 and 2147483647.
1492             *
1493             * The corresponding SQL type, INTEGER, is defined in SQL-92 and is
1494             * widely supported by all the major databases. The SQL-92 standard
1495             * leaves the precision of INTEGER up to the implementation, but in
1496             * practice all the major databases support at least 32 bits.
1497             *
1498             * The recommended Java mapping for the INTEGER type is as a Java
1499             * int.
1500             */
1501            case Types.TINYINT:
1502            case Types.SMALLINT:
1503            case Types.INTEGER: {
1504                try {
1505                    Integer integer = result.getInt(i);
1506                    return result.wasNull() ? null : integer;
1507                } catch (SQLDataException e) {
1508                    Long l = result.getLong(i);
1509                    return result.wasNull() ? null : l;
1510                }
1511            }
1512
1513            case Types.SQLXML: {
1514                SQLXML xml = result.getSQLXML(i);
1515                return xml != null ? xml.getString() : null;
1516            }
1517
1518            case Types.NULL: {
1519                return null;
1520            }
1521            /**
1522             * The JDBC type DISTINCT field (Types class)>DISTINCT represents
1523             * the SQL3 type DISTINCT.
1524             *
1525             * The standard mapping for a DISTINCT type is to the Java type to
1526             * which the base type of a DISTINCT object would be mapped. For
1527             * example, a DISTINCT type based on a CHAR would be mapped to a
1528             * String object, and a DISTINCT type based on an SQL INTEGER would
1529             * be mapped to an int.
1530             *
1531             * The DISTINCT type may optionally have a custom mapping to a class
1532             * in the Java programming language. A custom mapping consists of a
1533             * class that implements the interface SQLData and an entry in a
1534             * java.util.Map object.
1535             */
1536            case Types.DISTINCT: {
1537                logger.warn("JDBC type not implemented: {}", type);
1538                return null;
1539            }
1540            /**
1541             * The JDBC type STRUCT represents the SQL99 structured type. An SQL
1542             * structured type, which is defined by a user with a CREATE TYPE
1543             * statement, consists of one or more attributes. These attributes
1544             * may be any SQL data type, built-in or user-defined.
1545             *
1546             * The standard mapping for the SQL type STRUCT is to a Struct
1547             * object in the Java programming language. A Struct object contains
1548             * a value for each attribute of the STRUCT value it represents.
1549             *
1550             * A STRUCT value may optionally be custom mapped to a class in the
1551             * Java programming language, and each attribute in the STRUCT may
1552             * be mapped to a field in the class. A custom mapping consists of a
1553             * class that implements the interface SQLData and an entry in a
1554             * java.util.Map object.
1555             *
1556             *
1557             */
1558            case Types.STRUCT: {
1559                logger.warn("JDBC type not implemented: {}", type);
1560                return null;
1561            }
1562            case Types.REF: {
1563                logger.warn("JDBC type not implemented: {}", type);
1564                return null;
1565            }
1566            case Types.ROWID: {
1567                logger.warn("JDBC type not implemented: {}", type);
1568                return null;
1569            }
1570            default: {
1571                logger.warn("unknown JDBC type ignored: {}", type);
1572                return null;
1573            }
1574        }
1575        return null;
1576    }
1577
1578    private int toJDBCType(String type) {
1579        if (type == null) {
1580            return Types.NULL;
1581        } else if (type.equalsIgnoreCase("NULL")) {
1582            return Types.NULL;
1583        } else if (type.equalsIgnoreCase("TINYINT")) {
1584            return Types.TINYINT;
1585        } else if (type.equalsIgnoreCase("SMALLINT")) {
1586            return Types.SMALLINT;
1587        } else if (type.equalsIgnoreCase("INTEGER")) {
1588            return Types.INTEGER;
1589        } else if (type.equalsIgnoreCase("BIGINT")) {
1590            return Types.BIGINT;
1591        } else if (type.equalsIgnoreCase("REAL")) {
1592            return Types.REAL;
1593        } else if (type.equalsIgnoreCase("FLOAT")) {
1594            return Types.FLOAT;
1595        } else if (type.equalsIgnoreCase("DOUBLE")) {
1596            return Types.DOUBLE;
1597        } else if (type.equalsIgnoreCase("DECIMAL")) {
1598            return Types.DECIMAL;
1599        } else if (type.equalsIgnoreCase("NUMERIC")) {
1600            return Types.NUMERIC;
1601        } else if (type.equalsIgnoreCase("BIT")) {
1602            return Types.BIT;
1603        } else if (type.equalsIgnoreCase("BOOLEAN")) {
1604            return Types.BOOLEAN;
1605        } else if (type.equalsIgnoreCase("BINARY")) {
1606            return Types.BINARY;
1607        } else if (type.equalsIgnoreCase("VARBINARY")) {
1608            return Types.VARBINARY;
1609        } else if (type.equalsIgnoreCase("LONGVARBINARY")) {
1610            return Types.LONGVARBINARY;
1611        } else if (type.equalsIgnoreCase("CHAR")) {
1612            return Types.CHAR;
1613        } else if (type.equalsIgnoreCase("VARCHAR")) {
1614            return Types.VARCHAR;
1615        } else if (type.equalsIgnoreCase("LONGVARCHAR")) {
1616            return Types.LONGVARCHAR;
1617        } else if (type.equalsIgnoreCase("DATE")) {
1618            return Types.DATE;
1619        } else if (type.equalsIgnoreCase("TIME")) {
1620            return Types.TIME;
1621        } else if (type.equalsIgnoreCase("TIMESTAMP")) {
1622            return Types.TIMESTAMP;
1623        } else if (type.equalsIgnoreCase("CLOB")) {
1624            return Types.CLOB;
1625        } else if (type.equalsIgnoreCase("BLOB")) {
1626            return Types.BLOB;
1627        } else if (type.equalsIgnoreCase("ARRAY")) {
1628            return Types.ARRAY;
1629        } else if (type.equalsIgnoreCase("STRUCT")) {
1630            return Types.STRUCT;
1631        } else if (type.equalsIgnoreCase("REF")) {
1632            return Types.REF;
1633        } else if (type.equalsIgnoreCase("DATALINK")) {
1634            return Types.DATALINK;
1635        } else if (type.equalsIgnoreCase("DISTINCT")) {
1636            return Types.DISTINCT;
1637        } else if (type.equalsIgnoreCase("JAVA_OBJECT")) {
1638            return Types.JAVA_OBJECT;
1639        } else if (type.equalsIgnoreCase("SQLXML")) {
1640            return Types.SQLXML;
1641        } else if (type.equalsIgnoreCase("ROWID")) {
1642            return Types.ROWID;
1643        }
1644        return Types.OTHER;
1645    }
1646
1647    private String mapColumnName(String columnName) {
1648        // TODO JDK8: StringJoiner
1649        Map<String, Object> columnNameMap = context.getColumnNameMap();
1650        StringBuilder sb = new StringBuilder();
1651        String[] s = columnName.split("\\.");
1652        for (int i = 0; i < s.length; i++) {
1653            if (i > 0) {
1654                sb.append(".");
1655            }
1656            if (columnNameMap.containsKey(s[i])) {
1657                s[i] = columnNameMap.get(s[i]).toString();
1658            } else {
1659                logger.info("no column map entry for {} in map {}", s[i], columnNameMap);
1660            }
1661            sb.append(s[i]);
1662        }
1663        return sb.toString();
1664    }
1665
1666    private String formatDate(long millis) {
1667        return new DateTime(millis).withZone(dateTimeZone).toString();
1668    }
1669
1670}