001package org.xbib.elasticsearch.river.jdbc.strategy.column;
002
003import org.elasticsearch.common.joda.time.DateTime;
004import org.elasticsearch.common.logging.ESLogger;
005import org.elasticsearch.common.logging.ESLoggerFactory;
006import org.elasticsearch.common.unit.TimeValue;
007import org.elasticsearch.common.xcontent.support.XContentMapValues;
008import org.elasticsearch.river.RiverSettings;
009import org.testng.annotations.Parameters;
010import org.testng.annotations.Test;
011import org.xbib.elasticsearch.plugin.jdbc.util.IndexableObject;
012import org.xbib.elasticsearch.plugin.jdbc.util.PlainIndexableObject;
013import org.xbib.elasticsearch.plugin.jdbc.util.SQLCommand;
014import org.xbib.elasticsearch.plugin.jdbc.state.RiverState;
015import org.xbib.elasticsearch.river.jdbc.strategy.mock.MockRiverMouth;
016
017import java.io.IOException;
018import java.sql.Connection;
019import java.sql.PreparedStatement;
020import java.sql.SQLException;
021import java.sql.Timestamp;
022import java.sql.Types;
023import java.util.HashMap;
024import java.util.Map;
025import java.util.Random;
026
027public class ColumnRiverSourceTests extends AbstractColumnRiverTest {
028
029    private final ESLogger logger = ESLoggerFactory.getLogger(ColumnRiverSourceTests.class.getName());
030
031    private Random random = new Random();
032
033    private DateTime LAST_RUN_TIME = new DateTime(new DateTime().getMillis() - 60 * 60 * 1000);
034
035    @Override
036    public ColumnRiverSource newRiverSource() {
037        return new ColumnRiverSource();
038    }
039
040    @Override
041    public ColumnRiverContext newRiverContext() {
042        return new ColumnRiverContext();
043    }
044
045    @Test
046    @Parameters({"river-existedWhereClause", "sqlInsert"})
047    public void testCreateObjects(String riverResource, String sql) throws Exception {
048        verifyCreateObjects(riverResource, sql);
049    }
050
051    @Test
052    @Parameters({"river-whereClausePlaceholder", "sqlInsert"})
053    public void testCreateObjects_configurationWithWherePlaceholder(String riverResource, String sql)
054            throws Exception {
055        verifyCreateObjects(riverResource, sql);
056    }
057
058    @Test
059    @Parameters({"river-sqlparams", "sqlInsert"})
060    public void testCreateObjects_configurationWithSqlParams(String riverResource, String sql)
061            throws Exception {
062        verifyCreateObjects(riverResource, sql);
063    }
064
065    @Test
066    @Parameters({"river-sqlForTestDeletions", "sqlInsert"})
067    public void testRemoveObjects(String riverResource, String insertSql)
068            throws Exception {
069        verifyDeleteObjects(riverResource, insertSql);
070    }
071
072    @Test
073    @Parameters({"river-sqlForTestDeletionsAndWherePlaceholder", "sqlInsert"})
074    public void testRemoveObjects_configurationWithWherePlaceholder(String riverResource, String insertSql)
075            throws Exception {
076        verifyDeleteObjects(riverResource, insertSql);
077    }
078
079    @Test
080    @Parameters({"river-existedWhereClauseWithOverlap", "sqlInsert"})
081    public void testCreateObjects_withLastRunTimeStampOverlap(String riverResource, String sql)
082        throws Exception {
083        final int newRecordsOutOfTimeRange = 3;
084        final int newRecordsInTimeRange = 2;
085        final int updatedRecordsInTimeRange = 4;
086        final int updatedRecordsInTimeRangeWithOverlap = 1;
087        testColumnRiver(new MockRiverMouth(), riverResource, sql, new ProductFixture[] {
088            ProductFixture.size(newRecordsOutOfTimeRange).createdAt(oldTimestamp()),
089            ProductFixture.size(newRecordsInTimeRange).createdAt(okTimestamp()),
090            ProductFixture.size(updatedRecordsInTimeRange).createdAt(oldTimestamp()).updatedAt(okTimestamp()),
091            ProductFixture.size(updatedRecordsInTimeRangeWithOverlap).createdAt(oldTimestamp()).updatedAt(overlapTimestamp()),
092        }, newRecordsInTimeRange + updatedRecordsInTimeRange + updatedRecordsInTimeRangeWithOverlap);
093    }
094
095    private void verifyCreateObjects(String riverResource, String sql)
096            throws Exception {
097        final int newRecordsOutOfTimeRange = 3;
098        final int newRecordsInTimeRange = 2;
099        final int updatedRecordsInTimeRange = 4;
100        testColumnRiver(new MockRiverMouth(), riverResource, sql, new ProductFixture[]{
101                new ProductFixture(newRecordsOutOfTimeRange).createdAt(oldTimestamp()),
102                new ProductFixture(newRecordsInTimeRange).createdAt(okTimestamp()),
103                new ProductFixture(updatedRecordsInTimeRange).createdAt(oldTimestamp()).updatedAt(okTimestamp()),
104        }, newRecordsInTimeRange + updatedRecordsInTimeRange);
105    }
106
107    private void verifyDeleteObjects(String riverResource, String insertSql)
108            throws Exception {
109        MockRiverMouth riverMouth = new MockRiverMouth();
110        boolean[] shouldProductsBeDeleted = new boolean[]{true, true, false};
111        ProductFixtures productFixtures = createFixturesAndPopulateMouth(shouldProductsBeDeleted, riverMouth);
112        testColumnRiver(riverMouth, riverResource, insertSql,
113                productFixtures.fixtures,
114                productFixtures.expectedCount);
115    }
116
117    private ProductFixtures createFixturesAndPopulateMouth(boolean[] shouldProductsBeDeleted, MockRiverMouth riverMouth)
118            throws IOException {
119        ProductFixture[] fixtures = new ProductFixture[shouldProductsBeDeleted.length];
120        int expectedExistsCountAfterRiverRun = 0;
121        for (int i = 0; i < shouldProductsBeDeleted.length; i++) {
122            IndexableObject p = new PlainIndexableObject()
123                    .id(Integer.toString(i))
124                    .source(createSource(i))
125                    .optype("delete");
126            riverMouth.index(p, false);
127            Timestamp deletedAt;
128            if (shouldProductsBeDeleted[i]) {
129                deletedAt = okTimestamp();
130            } else {
131                deletedAt = oldTimestamp();
132                expectedExistsCountAfterRiverRun++;
133            }
134            fixtures[i] = ProductFixture.one()
135                    .setId(i)
136                    .createdAt(oldTimestamp())
137                    .updatedAt(oldTimestamp())
138                    .deletedAt(deletedAt);
139        }
140        return new ProductFixtures(fixtures, expectedExistsCountAfterRiverRun);
141    }
142
143    private Map<String, Object> createSource(int id) {
144        Map<String, Object> map = new HashMap<String, Object>();
145        map.put("id", id);
146        map.put("name", null);
147        return map;
148    }
149
150    private void testColumnRiver(MockRiverMouth riverMouth, String riverResource, String sql, ProductFixture[] fixtures, int expectedHits)
151            throws Exception {
152        createData(sql, fixtures);
153        context.setRiverMouth(riverMouth);
154        createRiverContext(riverResource);
155        source.fetch();
156        assertEquals(riverMouth.data().size(), expectedHits);
157    }
158
159    private void createRiverContext(String riverResource) throws IOException {
160        RiverSettings riverSettings = riverSettings(riverResource);
161        Map<String, Object> settings = (Map<String, Object>) riverSettings.settings().get("jdbc");
162
163        RiverState riverState = new RiverState();
164        riverState.getMap().put(ColumnRiverFlow.LAST_RUN_TIME, LAST_RUN_TIME);
165        riverState.getMap().put(ColumnRiverFlow.CURRENT_RUN_STARTED_TIME, new DateTime());
166
167        context
168            .columnCreatedAt(XContentMapValues.nodeStringValue(settings.get("column_created_at"), null))
169            .columnUpdatedAt(XContentMapValues.nodeStringValue(settings.get("column_updated_at"), null))
170            .columnDeletedAt(XContentMapValues.nodeStringValue(settings.get("column_deleted_at"), null))
171            .columnEscape(true)
172            .setLastRunTimeStampOverlap(getLastRunTimestampOverlap(riverSettings))
173            .setStatements(SQLCommand.parse(settings))
174            .setRiverState(riverState);
175    }
176
177    private TimeValue getLastRunTimestampOverlap(RiverSettings riverSettings) {
178        TimeValue overlap = TimeValue.timeValueMillis(0);
179        Map<String, Object> settings = ((Map<String, Object>) (riverSettings.settings().get("jdbc")));
180        if (settings != null && settings.containsKey("last_run_timestamp_overlap")) {
181            overlap = XContentMapValues.nodeTimeValue(settings.get("last_run_timestamp_overlap"));
182        }
183        return overlap;
184    }
185
186    private Timestamp okTimestamp() {
187        return new Timestamp(LAST_RUN_TIME.getMillis() + 60*2*1000);
188    }
189
190    private Timestamp oldTimestamp() {
191        return new Timestamp(LAST_RUN_TIME.getMillis() - 60*2*1000);
192    }
193
194    private Timestamp overlapTimestamp() {
195        return new Timestamp(LAST_RUN_TIME.getMillis() - 1000);
196    }
197
198    private void createData(String sql, ProductFixture[] fixtures) throws SQLException {
199        Connection conn = source.getConnectionForWriting();
200        for (ProductFixture fixture : fixtures) {
201            createData(conn, sql, fixture);
202        }
203        source.closeWriting();
204    }
205
206    private void createData(Connection connection, String sql, ProductFixture fixture) throws SQLException {
207        PreparedStatement stmt = connection.prepareStatement(sql);
208        logger.debug("timestamps: [" + fixture.createdAt + ", " + fixture.updatedAt + ", " + fixture.deletedAt + "]");
209        for (int i = 0; i < fixture.size; i++) {
210            int id = fixture.id >= 0 ? fixture.id : random.nextInt();
211            logger.debug("id={}", id);
212            stmt.setInt(1, id);
213            stmt.setNull(2, Types.VARCHAR);
214            stmt.setInt(3, 1);
215            stmt.setDouble(4, 1.1);
216            if (fixture.createdAt != null) {
217                stmt.setTimestamp(5, fixture.createdAt);
218            } else {
219                stmt.setNull(5, Types.TIMESTAMP);
220            }
221            if (fixture.updatedAt != null) {
222                stmt.setTimestamp(6, fixture.updatedAt);
223            } else {
224                stmt.setNull(6, Types.TIMESTAMP);
225            }
226            if (fixture.deletedAt != null) {
227                stmt.setTimestamp(7, fixture.deletedAt);
228            } else {
229                stmt.setNull(7, Types.TIMESTAMP);
230            }
231            stmt.execute();
232        }
233    }
234
235    private static class ProductFixtures {
236        int expectedCount;
237        ProductFixture[] fixtures;
238
239        ProductFixtures(ProductFixture[] fixtures, int expectedCount) {
240            this.expectedCount = expectedCount;
241            this.fixtures = fixtures;
242        }
243    }
244
245    private static class ProductFixture {
246        private int id = -1;
247        private Timestamp createdAt;
248        private Timestamp deletedAt;
249        private Timestamp updatedAt;
250        private int size;
251
252        static ProductFixture one() {
253            return size(1);
254        }
255
256        static ProductFixture size(int size) {
257            return new ProductFixture(size);
258        }
259
260        ProductFixture(int size) {
261            this.size = size;
262        }
263
264        ProductFixture createdAt(Timestamp ts) {
265            this.createdAt = ts;
266            return this;
267        }
268
269        ProductFixture deletedAt(Timestamp ts) {
270            this.deletedAt = ts;
271            return this;
272        }
273
274        ProductFixture updatedAt(Timestamp ts) {
275            this.updatedAt = ts;
276            return this;
277        }
278
279        ProductFixture setId(int id) {
280            this.id = id;
281            return this;
282        }
283    }
284}