001package org.xbib.elasticsearch.river.jdbc.strategy.simple;
002
003import org.testng.annotations.Parameters;
004import org.testng.annotations.Test;
005import org.xbib.elasticsearch.river.jdbc.RiverSource;
006
007import java.sql.Connection;
008import java.sql.ResultSet;
009
010public class SimpleRiverJobTests extends AbstractSimpleRiverTest {
011
012    @Override
013    public RiverSource newRiverSource() {
014        return new SimpleRiverSource();
015    }
016
017    @Override
018    public SimpleRiverContext newRiverContext() {
019        return new SimpleRiverContext();
020    }
021
022    @Test
023    @Parameters({"river1", "sql1", "sql2"})
024    public void testSimpleRiverJob(String riverResource, String sql1, String sql2)
025            throws Exception {
026        createRandomProductsJob(sql2, 100);
027        Connection connection = source.getConnectionForReading();
028        ResultSet results = connection.createStatement().executeQuery(sql1);
029        if (!connection.getAutoCommit()) {
030            connection.commit();
031        }
032        int count = results.next() ? results.getInt(1) : -1;
033        source.close(results);
034        source.closeReading();
035        assertEquals(count, 100);
036        performRiver(riverResource);
037        assertHits("1", 100);
038        // count docs in source table, must be null, because river deletes them.
039        connection = source.getConnectionForReading();
040        // sql1 = select count(*)
041        results = connection.createStatement().executeQuery(sql1);
042        if (!connection.getAutoCommit()) {
043            connection.commit();
044        }
045        count = results.next() ? results.getInt(1) : -1;
046        results.close();
047        assertEquals(count, 0);
048    }
049
050}