001package org.xbib.elasticsearch.river.jdbc.strategy.simple;
002
003import org.testng.annotations.Parameters;
004import org.testng.annotations.Test;
005import org.xbib.elasticsearch.river.jdbc.RiverSource;
006
007public class SimpleRiverScheduleTests extends AbstractSimpleRiverTest {
008
009    @Override
010    public RiverSource newRiverSource() {
011        return new SimpleRiverSource();
012    }
013
014    @Override
015    public SimpleRiverContext newRiverContext() {
016        return new SimpleRiverContext();
017    }
018
019    /**
020     * Product table star select, scheduled for more than two runs
021     * @param riverResource the river resource
022     * @param sql the SQL statement
023     * @throws Exception if test fails
024     */
025    @Test
026    @Parameters({"river6", "sql1"})
027    public void testSimpleSchedule(String riverResource, String sql) throws Exception {
028        createRandomProducts(sql, 100);
029        createRiver(riverResource);
030        waitForRiver();
031        waitForActiveRiver();
032        Thread.sleep(12500L); // run more than twice
033        client("1").admin().indices().prepareRefresh(index).execute().actionGet();
034        long hits = client("1").prepareSearch(index).execute().actionGet().getHits().getTotalHits();
035        assertTrue(hits > 100L);
036    }
037
038    /**
039     * Test read and write of timestamps in a table. We create 100 timestamps over hour interval,
040     * current timestamp $now is in the center.
041     * Selecting timestamps from $now, there should be at least 50 rows/hits per run, if $now works.
042     *
043     * @param riverResource the river JSON resource
044     * @param sql the sql statement to select timestamps
045     * @throws Exception
046     */
047    @Test
048    @Parameters({"river7", "sql2"})
049    public void testTimestamps(String riverResource, String sql) throws Exception {
050        createTimestampedLogs(sql, 100, "iw_IL", "Asia/Jerusalem"); // TODO make timezone/locale configurable
051        createRiver(riverResource);
052        waitForRiver();
053        waitForActiveRiver();
054        Thread.sleep(12500L); // ensure at least two runs
055        client("1").admin().indices().prepareRefresh(index).execute().actionGet();
056        long hits = client("1").prepareSearch(index).execute().actionGet().getHits().getTotalHits();
057        // just an estimation, at least two runs should deliver 50 hits each.
058        assertTrue(hits > 99L);
059    }
060
061}