001package org.xbib.elasticsearch.river.jdbc.strategy.simple; 002 003import org.testng.annotations.Parameters; 004import org.testng.annotations.Test; 005import org.xbib.elasticsearch.river.jdbc.RiverSource; 006 007import java.sql.Connection; 008import java.sql.ResultSet; 009 010public class SimpleRiverJobTests extends AbstractSimpleRiverTest { 011 012 @Override 013 public RiverSource newRiverSource() { 014 return new SimpleRiverSource(); 015 } 016 017 @Override 018 public SimpleRiverContext newRiverContext() { 019 return new SimpleRiverContext(); 020 } 021 022 @Test 023 @Parameters({"river1", "sql1", "sql2"}) 024 public void testSimpleRiverJob(String riverResource, String sql1, String sql2) 025 throws Exception { 026 createRandomProductsJob(sql2, 100); 027 Connection connection = source.getConnectionForReading(); 028 ResultSet results = connection.createStatement().executeQuery(sql1); 029 if (!connection.getAutoCommit()) { 030 connection.commit(); 031 } 032 int count = results.next() ? results.getInt(1) : -1; 033 source.close(results); 034 source.closeReading(); 035 assertEquals(count, 100); 036 performRiver(riverResource); 037 assertHits("1", 100); 038 // count docs in source table, must be null, because river deletes them. 039 connection = source.getConnectionForReading(); 040 // sql1 = select count(*) 041 results = connection.createStatement().executeQuery(sql1); 042 if (!connection.getAutoCommit()) { 043 connection.commit(); 044 } 045 count = results.next() ? results.getInt(1) : -1; 046 results.close(); 047 assertEquals(count, 0); 048 } 049 050}