001package org.xbib.elasticsearch.river.jdbc.strategy.column; 002 003import org.elasticsearch.common.joda.time.DateTime; 004import org.elasticsearch.common.logging.ESLogger; 005import org.elasticsearch.common.logging.ESLoggerFactory; 006import org.elasticsearch.common.unit.TimeValue; 007import org.elasticsearch.common.xcontent.support.XContentMapValues; 008import org.elasticsearch.river.RiverSettings; 009import org.testng.annotations.Parameters; 010import org.testng.annotations.Test; 011import org.xbib.elasticsearch.plugin.jdbc.util.IndexableObject; 012import org.xbib.elasticsearch.plugin.jdbc.util.PlainIndexableObject; 013import org.xbib.elasticsearch.plugin.jdbc.util.SQLCommand; 014import org.xbib.elasticsearch.plugin.jdbc.state.RiverState; 015import org.xbib.elasticsearch.river.jdbc.strategy.mock.MockRiverMouth; 016 017import java.io.IOException; 018import java.sql.Connection; 019import java.sql.PreparedStatement; 020import java.sql.SQLException; 021import java.sql.Timestamp; 022import java.sql.Types; 023import java.util.HashMap; 024import java.util.Map; 025import java.util.Random; 026 027public class ColumnRiverSourceTests extends AbstractColumnRiverTest { 028 029 private final ESLogger logger = ESLoggerFactory.getLogger(ColumnRiverSourceTests.class.getName()); 030 031 private Random random = new Random(); 032 033 private DateTime LAST_RUN_TIME = new DateTime(new DateTime().getMillis() - 60 * 60 * 1000); 034 035 @Override 036 public ColumnRiverSource newRiverSource() { 037 return new ColumnRiverSource(); 038 } 039 040 @Override 041 public ColumnRiverContext newRiverContext() { 042 return new ColumnRiverContext(); 043 } 044 045 @Test 046 @Parameters({"river-existedWhereClause", "sqlInsert"}) 047 public void testCreateObjects(String riverResource, String sql) throws Exception { 048 verifyCreateObjects(riverResource, sql); 049 } 050 051 @Test 052 @Parameters({"river-whereClausePlaceholder", "sqlInsert"}) 053 public void testCreateObjects_configurationWithWherePlaceholder(String riverResource, String sql) 054 throws Exception { 055 verifyCreateObjects(riverResource, sql); 056 } 057 058 @Test 059 @Parameters({"river-sqlparams", "sqlInsert"}) 060 public void testCreateObjects_configurationWithSqlParams(String riverResource, String sql) 061 throws Exception { 062 verifyCreateObjects(riverResource, sql); 063 } 064 065 @Test 066 @Parameters({"river-sqlForTestDeletions", "sqlInsert"}) 067 public void testRemoveObjects(String riverResource, String insertSql) 068 throws Exception { 069 verifyDeleteObjects(riverResource, insertSql); 070 } 071 072 @Test 073 @Parameters({"river-sqlForTestDeletionsAndWherePlaceholder", "sqlInsert"}) 074 public void testRemoveObjects_configurationWithWherePlaceholder(String riverResource, String insertSql) 075 throws Exception { 076 verifyDeleteObjects(riverResource, insertSql); 077 } 078 079 @Test 080 @Parameters({"river-existedWhereClauseWithOverlap", "sqlInsert"}) 081 public void testCreateObjects_withLastRunTimeStampOverlap(String riverResource, String sql) 082 throws Exception { 083 final int newRecordsOutOfTimeRange = 3; 084 final int newRecordsInTimeRange = 2; 085 final int updatedRecordsInTimeRange = 4; 086 final int updatedRecordsInTimeRangeWithOverlap = 1; 087 testColumnRiver(new MockRiverMouth(), riverResource, sql, new ProductFixture[] { 088 ProductFixture.size(newRecordsOutOfTimeRange).createdAt(oldTimestamp()), 089 ProductFixture.size(newRecordsInTimeRange).createdAt(okTimestamp()), 090 ProductFixture.size(updatedRecordsInTimeRange).createdAt(oldTimestamp()).updatedAt(okTimestamp()), 091 ProductFixture.size(updatedRecordsInTimeRangeWithOverlap).createdAt(oldTimestamp()).updatedAt(overlapTimestamp()), 092 }, newRecordsInTimeRange + updatedRecordsInTimeRange + updatedRecordsInTimeRangeWithOverlap); 093 } 094 095 private void verifyCreateObjects(String riverResource, String sql) 096 throws Exception { 097 final int newRecordsOutOfTimeRange = 3; 098 final int newRecordsInTimeRange = 2; 099 final int updatedRecordsInTimeRange = 4; 100 testColumnRiver(new MockRiverMouth(), riverResource, sql, new ProductFixture[]{ 101 new ProductFixture(newRecordsOutOfTimeRange).createdAt(oldTimestamp()), 102 new ProductFixture(newRecordsInTimeRange).createdAt(okTimestamp()), 103 new ProductFixture(updatedRecordsInTimeRange).createdAt(oldTimestamp()).updatedAt(okTimestamp()), 104 }, newRecordsInTimeRange + updatedRecordsInTimeRange); 105 } 106 107 private void verifyDeleteObjects(String riverResource, String insertSql) 108 throws Exception { 109 MockRiverMouth riverMouth = new MockRiverMouth(); 110 boolean[] shouldProductsBeDeleted = new boolean[]{true, true, false}; 111 ProductFixtures productFixtures = createFixturesAndPopulateMouth(shouldProductsBeDeleted, riverMouth); 112 testColumnRiver(riverMouth, riverResource, insertSql, 113 productFixtures.fixtures, 114 productFixtures.expectedCount); 115 } 116 117 private ProductFixtures createFixturesAndPopulateMouth(boolean[] shouldProductsBeDeleted, MockRiverMouth riverMouth) 118 throws IOException { 119 ProductFixture[] fixtures = new ProductFixture[shouldProductsBeDeleted.length]; 120 int expectedExistsCountAfterRiverRun = 0; 121 for (int i = 0; i < shouldProductsBeDeleted.length; i++) { 122 IndexableObject p = new PlainIndexableObject() 123 .id(Integer.toString(i)) 124 .source(createSource(i)) 125 .optype("delete"); 126 riverMouth.index(p, false); 127 Timestamp deletedAt; 128 if (shouldProductsBeDeleted[i]) { 129 deletedAt = okTimestamp(); 130 } else { 131 deletedAt = oldTimestamp(); 132 expectedExistsCountAfterRiverRun++; 133 } 134 fixtures[i] = ProductFixture.one() 135 .setId(i) 136 .createdAt(oldTimestamp()) 137 .updatedAt(oldTimestamp()) 138 .deletedAt(deletedAt); 139 } 140 return new ProductFixtures(fixtures, expectedExistsCountAfterRiverRun); 141 } 142 143 private Map<String, Object> createSource(int id) { 144 Map<String, Object> map = new HashMap<String, Object>(); 145 map.put("id", id); 146 map.put("name", null); 147 return map; 148 } 149 150 private void testColumnRiver(MockRiverMouth riverMouth, String riverResource, String sql, ProductFixture[] fixtures, int expectedHits) 151 throws Exception { 152 createData(sql, fixtures); 153 context.setRiverMouth(riverMouth); 154 createRiverContext(riverResource); 155 source.fetch(); 156 assertEquals(riverMouth.data().size(), expectedHits); 157 } 158 159 private void createRiverContext(String riverResource) throws IOException { 160 RiverSettings riverSettings = riverSettings(riverResource); 161 Map<String, Object> settings = (Map<String, Object>) riverSettings.settings().get("jdbc"); 162 163 RiverState riverState = new RiverState(); 164 riverState.getMap().put(ColumnRiverFlow.LAST_RUN_TIME, LAST_RUN_TIME); 165 riverState.getMap().put(ColumnRiverFlow.CURRENT_RUN_STARTED_TIME, new DateTime()); 166 167 context 168 .columnCreatedAt(XContentMapValues.nodeStringValue(settings.get("column_created_at"), null)) 169 .columnUpdatedAt(XContentMapValues.nodeStringValue(settings.get("column_updated_at"), null)) 170 .columnDeletedAt(XContentMapValues.nodeStringValue(settings.get("column_deleted_at"), null)) 171 .columnEscape(true) 172 .setLastRunTimeStampOverlap(getLastRunTimestampOverlap(riverSettings)) 173 .setStatements(SQLCommand.parse(settings)) 174 .setRiverState(riverState); 175 } 176 177 private TimeValue getLastRunTimestampOverlap(RiverSettings riverSettings) { 178 TimeValue overlap = TimeValue.timeValueMillis(0); 179 Map<String, Object> settings = ((Map<String, Object>) (riverSettings.settings().get("jdbc"))); 180 if (settings != null && settings.containsKey("last_run_timestamp_overlap")) { 181 overlap = XContentMapValues.nodeTimeValue(settings.get("last_run_timestamp_overlap")); 182 } 183 return overlap; 184 } 185 186 private Timestamp okTimestamp() { 187 return new Timestamp(LAST_RUN_TIME.getMillis() + 60*2*1000); 188 } 189 190 private Timestamp oldTimestamp() { 191 return new Timestamp(LAST_RUN_TIME.getMillis() - 60*2*1000); 192 } 193 194 private Timestamp overlapTimestamp() { 195 return new Timestamp(LAST_RUN_TIME.getMillis() - 1000); 196 } 197 198 private void createData(String sql, ProductFixture[] fixtures) throws SQLException { 199 Connection conn = source.getConnectionForWriting(); 200 for (ProductFixture fixture : fixtures) { 201 createData(conn, sql, fixture); 202 } 203 source.closeWriting(); 204 } 205 206 private void createData(Connection connection, String sql, ProductFixture fixture) throws SQLException { 207 PreparedStatement stmt = connection.prepareStatement(sql); 208 logger.debug("timestamps: [" + fixture.createdAt + ", " + fixture.updatedAt + ", " + fixture.deletedAt + "]"); 209 for (int i = 0; i < fixture.size; i++) { 210 int id = fixture.id >= 0 ? fixture.id : random.nextInt(); 211 logger.debug("id={}", id); 212 stmt.setInt(1, id); 213 stmt.setNull(2, Types.VARCHAR); 214 stmt.setInt(3, 1); 215 stmt.setDouble(4, 1.1); 216 if (fixture.createdAt != null) { 217 stmt.setTimestamp(5, fixture.createdAt); 218 } else { 219 stmt.setNull(5, Types.TIMESTAMP); 220 } 221 if (fixture.updatedAt != null) { 222 stmt.setTimestamp(6, fixture.updatedAt); 223 } else { 224 stmt.setNull(6, Types.TIMESTAMP); 225 } 226 if (fixture.deletedAt != null) { 227 stmt.setTimestamp(7, fixture.deletedAt); 228 } else { 229 stmt.setNull(7, Types.TIMESTAMP); 230 } 231 stmt.execute(); 232 } 233 } 234 235 private static class ProductFixtures { 236 int expectedCount; 237 ProductFixture[] fixtures; 238 239 ProductFixtures(ProductFixture[] fixtures, int expectedCount) { 240 this.expectedCount = expectedCount; 241 this.fixtures = fixtures; 242 } 243 } 244 245 private static class ProductFixture { 246 private int id = -1; 247 private Timestamp createdAt; 248 private Timestamp deletedAt; 249 private Timestamp updatedAt; 250 private int size; 251 252 static ProductFixture one() { 253 return size(1); 254 } 255 256 static ProductFixture size(int size) { 257 return new ProductFixture(size); 258 } 259 260 ProductFixture(int size) { 261 this.size = size; 262 } 263 264 ProductFixture createdAt(Timestamp ts) { 265 this.createdAt = ts; 266 return this; 267 } 268 269 ProductFixture deletedAt(Timestamp ts) { 270 this.deletedAt = ts; 271 return this; 272 } 273 274 ProductFixture updatedAt(Timestamp ts) { 275 this.updatedAt = ts; 276 return this; 277 } 278 279 ProductFixture setId(int id) { 280 this.id = id; 281 return this; 282 } 283 } 284}