001/*
002 * Copyright (C) 2014 Jörg Prante
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.xbib.elasticsearch.river.jdbc.strategy.column;
017
018import org.elasticsearch.common.joda.time.DateTime;
019import org.elasticsearch.common.logging.ESLogger;
020import org.elasticsearch.common.logging.ESLoggerFactory;
021import org.xbib.elasticsearch.plugin.jdbc.keyvalue.KeyValueStreamListener;
022import org.xbib.elasticsearch.plugin.jdbc.util.IndexableObject;
023import org.xbib.elasticsearch.plugin.jdbc.util.RiverMouthKeyValueStreamListener;
024import org.xbib.elasticsearch.plugin.jdbc.util.SQLCommand;
025import org.xbib.elasticsearch.river.jdbc.strategy.simple.SimpleRiverSource;
026
027import java.io.IOException;
028import java.sql.Connection;
029import java.sql.PreparedStatement;
030import java.sql.ResultSet;
031import java.sql.SQLException;
032import java.sql.Timestamp;
033import java.util.ArrayList;
034import java.util.Collections;
035import java.util.LinkedList;
036import java.util.List;
037
038/**
039 * River source implementation for the 'column' strategy
040 *
041 * @author <a href="piotr.sliwa@zineinc.com">Piotr Śliwa</a>
042 */
043public class ColumnRiverSource<RC extends ColumnRiverContext> extends SimpleRiverSource<RC> {
044
045    private static final ESLogger logger = ESLoggerFactory.getLogger("river.jdbc.ColumnRiverSource");
046
047    private static final String WHERE_CLAUSE_PLACEHOLDER = "$where";
048
049    @Override
050    public String strategy() {
051        return "column";
052    }
053
054    @Override
055    public ColumnRiverSource<RC> newInstance() {
056        return new ColumnRiverSource<RC>();
057    }
058
059    @Override
060    public void fetch() throws SQLException, IOException {
061        logger.info("fetch state={}", context.getRiverState());
062        for (SQLCommand command : context.getStatements()) {
063            Connection connection = getConnectionForReading();
064            if (connection != null) {
065                List<OpInfo> opInfos = getOpInfos(connection);
066                Timestamp lastRunTimestamp = getLastRunTimestamp();
067                logger.debug("lastRunTimestamp={}", lastRunTimestamp);
068                for (OpInfo opInfo : opInfos) {
069                    logger.debug("opinfo={}", opInfo.toString());
070                    fetch(connection, command, opInfo, lastRunTimestamp);
071                }
072            }
073        }
074    }
075
076    private List<OpInfo> getOpInfos(Connection connection) throws SQLException {
077        String quoteString = getIdentifierQuoteString(connection);
078        List<OpInfo> opInfos = new LinkedList<OpInfo>();
079        String noDeletedWhereClause = context.columnDeletedAt() != null ?
080                " AND " + quoteColumn(context.columnDeletedAt(), quoteString) + " IS NULL" : "";
081        if (context.isTimestampDiffSupported()) {
082            opInfos.add(new OpInfo("create", "{fn TIMESTAMPDIFF(SQL_TSI_SECOND,?," + quoteColumn(context.columnCreatedAt(), quoteString) + ")} >= 0" + noDeletedWhereClause));
083            opInfos.add(new OpInfo("index", "{fn TIMESTAMPDIFF(SQL_TSI_SECOND,?," + quoteColumn(context.columnUpdatedAt(), quoteString) + ")} >= 0 AND (" + quoteColumn(context.columnCreatedAt(), quoteString) + " IS NULL OR {fn TIMESTAMPDIFF(SQL_TSI_SECOND,?," + quoteColumn(context.columnCreatedAt(), quoteString) + ")} < 0) " + noDeletedWhereClause, 2));
084            if (context.columnDeletedAt() != null) {
085                opInfos.add(new OpInfo("delete", "{fn TIMESTAMPDIFF(SQL_TSI_SECOND,?," + quoteColumn(context.columnDeletedAt(), quoteString) + ")} >= 0"));
086            }
087        } else {
088            // no TIMESTAMPDIFF support
089            opInfos.add(new OpInfo("create", quoteColumn(context.columnCreatedAt(), quoteString) + " >= ?" + noDeletedWhereClause));
090            opInfos.add(new OpInfo("index", quoteColumn(context.columnUpdatedAt(), quoteString) + " >= ? AND (" + quoteColumn(context.columnCreatedAt(), quoteString) + " IS NULL OR " + quoteColumn(context.columnCreatedAt(), quoteString) + " < ?)" + noDeletedWhereClause, 2));
091
092            if (context.columnDeletedAt() != null) {
093                opInfos.add(new OpInfo("delete", quoteColumn(context.columnDeletedAt(), quoteString) + " >= ?"));
094            }
095        }
096        return opInfos;
097    }
098
099    private String getIdentifierQuoteString(Connection connection) throws SQLException {
100        if (!context.columnEscape()) {
101            return "";
102        }
103        String quoteString = connection.getMetaData().getIdentifierQuoteString();
104        quoteString = quoteString == null ? "" : quoteString;
105        return quoteString;
106    }
107
108    private String quoteColumn(String column, String quote) {
109        return quote + column + quote;
110    }
111
112    private Timestamp getLastRunTimestamp() {
113        DateTime lastRunTime = context.getRiverState() != null ?
114                (DateTime) context.getRiverState().getMap().get(ColumnRiverFlow.LAST_RUN_TIME) : null;
115        if (lastRunTime == null) {
116            return new Timestamp(0);
117        }
118        return new Timestamp(lastRunTime.getMillis() - context.getLastRunTimeStampOverlap().millis());
119    }
120
121    private void fetch(Connection connection, SQLCommand command, OpInfo opInfo, Timestamp lastRunTimestamp) throws IOException, SQLException {
122        String fullSql = addWhereClauseToSqlQuery(command.getSQL(), opInfo.where);
123        PreparedStatement stmt = connection.prepareStatement(fullSql);
124        List<Object> params = createQueryParams(command, lastRunTimestamp, opInfo.paramsInWhere);
125        logger.debug("sql: {}, params {}", fullSql, params);
126        ResultSet result = null;
127        try {
128            bind(stmt, params);
129            result = executeQuery(stmt);
130            KeyValueStreamListener<Object, Object> listener =
131                    new ColumnKeyValueStreamListener<Object, Object>(opInfo.opType)
132                            .output(context.getRiverMouth());
133            merge(command, result, listener);
134        } catch (Exception e) {
135            throw new IOException(e);
136        } finally {
137            close(result);
138            close(stmt);
139        }
140    }
141
142    private String addWhereClauseToSqlQuery(String sql, String whereClauseToAppend) {
143        int wherePlaceholderIndex = sql.indexOf(WHERE_CLAUSE_PLACEHOLDER);
144        final String whereKeyword = "where ";
145        int whereIndex = sql.toLowerCase().indexOf(whereKeyword);
146        if (wherePlaceholderIndex >= 0) {
147            return sql.replace(WHERE_CLAUSE_PLACEHOLDER, whereClauseToAppend);
148        } else if (whereIndex >= 0) {
149            return sql.substring(0, whereIndex + whereKeyword.length()) + whereClauseToAppend + " AND " + sql.substring(whereIndex + whereKeyword.length());
150        } else {
151            return sql + " WHERE " + whereClauseToAppend;
152        }
153    }
154
155    private List<Object> createQueryParams(SQLCommand command, Timestamp lastRunTimestamp, int lastRunTimestampParamsCount) {
156        List<Object> statementParams = command.getParameters() != null ?
157                command.getParameters() : Collections.emptyList();
158        List<Object> params = new ArrayList<Object>(statementParams.size() + lastRunTimestampParamsCount);
159        for (int i = 0; i < lastRunTimestampParamsCount; i++) {
160            params.add(lastRunTimestamp);
161        }
162        for (Object param : statementParams) {
163            params.add(param);
164        }
165        return params;
166    }
167
168    private class OpInfo {
169        final String opType;
170        final String where;
171        final int paramsInWhere;
172
173        public OpInfo(String opType, String where, int paramsInWhere) {
174            if (where != null && !where.equals("")) {
175                where = "(" + where + ")";
176            }
177            this.opType = opType;
178            this.where = where;
179            this.paramsInWhere = paramsInWhere;
180        }
181
182        public OpInfo(String opType, String where) {
183            this(opType, where, 1);
184        }
185
186        public String toString() {
187            return opType + " " + where + " " + paramsInWhere;
188        }
189    }
190
191    private class ColumnKeyValueStreamListener<K, V> extends RiverMouthKeyValueStreamListener<K, V> {
192
193        private String opType;
194
195        public ColumnKeyValueStreamListener(String opType) {
196            this.opType = opType;
197        }
198
199        @Override
200        public ColumnKeyValueStreamListener<K, V> end(IndexableObject object) throws IOException {
201            if (!object.source().isEmpty()) {
202                object.optype(opType);
203            }
204            super.end(object);
205            return this;
206        }
207    }
208}