001/* 002 * Copyright (C) 2014 Jörg Prante 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.xbib.elasticsearch.river.jdbc.strategy.column; 017 018import org.elasticsearch.common.joda.time.DateTime; 019import org.elasticsearch.common.logging.ESLogger; 020import org.elasticsearch.common.logging.ESLoggerFactory; 021import org.xbib.elasticsearch.plugin.jdbc.keyvalue.KeyValueStreamListener; 022import org.xbib.elasticsearch.plugin.jdbc.util.IndexableObject; 023import org.xbib.elasticsearch.plugin.jdbc.util.RiverMouthKeyValueStreamListener; 024import org.xbib.elasticsearch.plugin.jdbc.util.SQLCommand; 025import org.xbib.elasticsearch.river.jdbc.strategy.simple.SimpleRiverSource; 026 027import java.io.IOException; 028import java.sql.Connection; 029import java.sql.PreparedStatement; 030import java.sql.ResultSet; 031import java.sql.SQLException; 032import java.sql.Timestamp; 033import java.util.ArrayList; 034import java.util.Collections; 035import java.util.LinkedList; 036import java.util.List; 037 038/** 039 * River source implementation for the 'column' strategy 040 * 041 * @author <a href="piotr.sliwa@zineinc.com">Piotr Śliwa</a> 042 */ 043public class ColumnRiverSource<RC extends ColumnRiverContext> extends SimpleRiverSource<RC> { 044 045 private static final ESLogger logger = ESLoggerFactory.getLogger("river.jdbc.ColumnRiverSource"); 046 047 private static final String WHERE_CLAUSE_PLACEHOLDER = "$where"; 048 049 @Override 050 public String strategy() { 051 return "column"; 052 } 053 054 @Override 055 public ColumnRiverSource<RC> newInstance() { 056 return new ColumnRiverSource<RC>(); 057 } 058 059 @Override 060 public void fetch() throws SQLException, IOException { 061 logger.info("fetch state={}", context.getRiverState()); 062 for (SQLCommand command : context.getStatements()) { 063 Connection connection = getConnectionForReading(); 064 if (connection != null) { 065 List<OpInfo> opInfos = getOpInfos(connection); 066 Timestamp lastRunTimestamp = getLastRunTimestamp(); 067 logger.debug("lastRunTimestamp={}", lastRunTimestamp); 068 for (OpInfo opInfo : opInfos) { 069 logger.debug("opinfo={}", opInfo.toString()); 070 fetch(connection, command, opInfo, lastRunTimestamp); 071 } 072 } 073 } 074 } 075 076 private List<OpInfo> getOpInfos(Connection connection) throws SQLException { 077 String quoteString = getIdentifierQuoteString(connection); 078 List<OpInfo> opInfos = new LinkedList<OpInfo>(); 079 String noDeletedWhereClause = context.columnDeletedAt() != null ? 080 " AND " + quoteColumn(context.columnDeletedAt(), quoteString) + " IS NULL" : ""; 081 if (context.isTimestampDiffSupported()) { 082 opInfos.add(new OpInfo("create", "{fn TIMESTAMPDIFF(SQL_TSI_SECOND,?," + quoteColumn(context.columnCreatedAt(), quoteString) + ")} >= 0" + noDeletedWhereClause)); 083 opInfos.add(new OpInfo("index", "{fn TIMESTAMPDIFF(SQL_TSI_SECOND,?," + quoteColumn(context.columnUpdatedAt(), quoteString) + ")} >= 0 AND (" + quoteColumn(context.columnCreatedAt(), quoteString) + " IS NULL OR {fn TIMESTAMPDIFF(SQL_TSI_SECOND,?," + quoteColumn(context.columnCreatedAt(), quoteString) + ")} < 0) " + noDeletedWhereClause, 2)); 084 if (context.columnDeletedAt() != null) { 085 opInfos.add(new OpInfo("delete", "{fn TIMESTAMPDIFF(SQL_TSI_SECOND,?," + quoteColumn(context.columnDeletedAt(), quoteString) + ")} >= 0")); 086 } 087 } else { 088 // no TIMESTAMPDIFF support 089 opInfos.add(new OpInfo("create", quoteColumn(context.columnCreatedAt(), quoteString) + " >= ?" + noDeletedWhereClause)); 090 opInfos.add(new OpInfo("index", quoteColumn(context.columnUpdatedAt(), quoteString) + " >= ? AND (" + quoteColumn(context.columnCreatedAt(), quoteString) + " IS NULL OR " + quoteColumn(context.columnCreatedAt(), quoteString) + " < ?)" + noDeletedWhereClause, 2)); 091 092 if (context.columnDeletedAt() != null) { 093 opInfos.add(new OpInfo("delete", quoteColumn(context.columnDeletedAt(), quoteString) + " >= ?")); 094 } 095 } 096 return opInfos; 097 } 098 099 private String getIdentifierQuoteString(Connection connection) throws SQLException { 100 if (!context.columnEscape()) { 101 return ""; 102 } 103 String quoteString = connection.getMetaData().getIdentifierQuoteString(); 104 quoteString = quoteString == null ? "" : quoteString; 105 return quoteString; 106 } 107 108 private String quoteColumn(String column, String quote) { 109 return quote + column + quote; 110 } 111 112 private Timestamp getLastRunTimestamp() { 113 DateTime lastRunTime = context.getRiverState() != null ? 114 (DateTime) context.getRiverState().getMap().get(ColumnRiverFlow.LAST_RUN_TIME) : null; 115 if (lastRunTime == null) { 116 return new Timestamp(0); 117 } 118 return new Timestamp(lastRunTime.getMillis() - context.getLastRunTimeStampOverlap().millis()); 119 } 120 121 private void fetch(Connection connection, SQLCommand command, OpInfo opInfo, Timestamp lastRunTimestamp) throws IOException, SQLException { 122 String fullSql = addWhereClauseToSqlQuery(command.getSQL(), opInfo.where); 123 PreparedStatement stmt = connection.prepareStatement(fullSql); 124 List<Object> params = createQueryParams(command, lastRunTimestamp, opInfo.paramsInWhere); 125 logger.debug("sql: {}, params {}", fullSql, params); 126 ResultSet result = null; 127 try { 128 bind(stmt, params); 129 result = executeQuery(stmt); 130 KeyValueStreamListener<Object, Object> listener = 131 new ColumnKeyValueStreamListener<Object, Object>(opInfo.opType) 132 .output(context.getRiverMouth()); 133 merge(command, result, listener); 134 } catch (Exception e) { 135 throw new IOException(e); 136 } finally { 137 close(result); 138 close(stmt); 139 } 140 } 141 142 private String addWhereClauseToSqlQuery(String sql, String whereClauseToAppend) { 143 int wherePlaceholderIndex = sql.indexOf(WHERE_CLAUSE_PLACEHOLDER); 144 final String whereKeyword = "where "; 145 int whereIndex = sql.toLowerCase().indexOf(whereKeyword); 146 if (wherePlaceholderIndex >= 0) { 147 return sql.replace(WHERE_CLAUSE_PLACEHOLDER, whereClauseToAppend); 148 } else if (whereIndex >= 0) { 149 return sql.substring(0, whereIndex + whereKeyword.length()) + whereClauseToAppend + " AND " + sql.substring(whereIndex + whereKeyword.length()); 150 } else { 151 return sql + " WHERE " + whereClauseToAppend; 152 } 153 } 154 155 private List<Object> createQueryParams(SQLCommand command, Timestamp lastRunTimestamp, int lastRunTimestampParamsCount) { 156 List<Object> statementParams = command.getParameters() != null ? 157 command.getParameters() : Collections.emptyList(); 158 List<Object> params = new ArrayList<Object>(statementParams.size() + lastRunTimestampParamsCount); 159 for (int i = 0; i < lastRunTimestampParamsCount; i++) { 160 params.add(lastRunTimestamp); 161 } 162 for (Object param : statementParams) { 163 params.add(param); 164 } 165 return params; 166 } 167 168 private class OpInfo { 169 final String opType; 170 final String where; 171 final int paramsInWhere; 172 173 public OpInfo(String opType, String where, int paramsInWhere) { 174 if (where != null && !where.equals("")) { 175 where = "(" + where + ")"; 176 } 177 this.opType = opType; 178 this.where = where; 179 this.paramsInWhere = paramsInWhere; 180 } 181 182 public OpInfo(String opType, String where) { 183 this(opType, where, 1); 184 } 185 186 public String toString() { 187 return opType + " " + where + " " + paramsInWhere; 188 } 189 } 190 191 private class ColumnKeyValueStreamListener<K, V> extends RiverMouthKeyValueStreamListener<K, V> { 192 193 private String opType; 194 195 public ColumnKeyValueStreamListener(String opType) { 196 this.opType = opType; 197 } 198 199 @Override 200 public ColumnKeyValueStreamListener<K, V> end(IndexableObject object) throws IOException { 201 if (!object.source().isEmpty()) { 202 object.optype(opType); 203 } 204 super.end(object); 205 return this; 206 } 207 } 208}