001package org.xbib.elasticsearch.river.jdbc.strategy.mock;
002
003import org.elasticsearch.common.logging.ESLogger;
004import org.elasticsearch.common.logging.ESLoggerFactory;
005import org.testng.annotations.Parameters;
006import org.testng.annotations.Test;
007import org.xbib.elasticsearch.plugin.jdbc.util.IndexableObject;
008import org.xbib.elasticsearch.river.jdbc.strategy.simple.SimpleRiverContext;
009import org.xbib.elasticsearch.river.jdbc.support.StringKeyValueStreamListener;
010import org.xbib.elasticsearch.river.jdbc.RiverMouth;
011import org.xbib.elasticsearch.river.jdbc.RiverSource;
012import org.xbib.elasticsearch.river.jdbc.strategy.simple.SimpleRiverSource;
013import org.xbib.elasticsearch.river.jdbc.strategy.simple.AbstractSimpleRiverTest;
014import org.xbib.elasticsearch.plugin.jdbc.keyvalue.KeyValueStreamListener;
015
016import java.io.IOException;
017import java.sql.PreparedStatement;
018import java.sql.ResultSet;
019import java.util.LinkedList;
020import java.util.List;
021
022public class RiverMockTests extends AbstractSimpleRiverTest {
023
024    private static final ESLogger logger = ESLoggerFactory.getLogger(RiverMockTests.class.getName());
025
026    @Override
027    public RiverSource newRiverSource() {
028        return new SimpleRiverSource();
029    }
030
031    @Override
032    public SimpleRiverContext newRiverContext() {
033        return new SimpleRiverContext();
034    }
035
036    @Test
037    @Parameters({"sql1"})
038    public void testMockBill(String sql) throws Exception {
039        List<Object> params = new LinkedList();
040        RiverMouth output = new MockRiverMouth() {
041            @Override
042            public void index(IndexableObject object, boolean create) throws IOException {
043                logger.debug("sql1 object={}", object);
044            }
045        };
046        PreparedStatement statement = source.prepareQuery(sql);
047        source.bind(statement, params);
048        ResultSet results = source.executeQuery(statement);
049        StringKeyValueStreamListener listener = new StringKeyValueStreamListener()
050                .output(output);
051        long rows = 0L;
052        source.beforeRows(results, listener);
053        while (source.nextRow(results, listener)) {
054            rows++;
055        }
056        source.afterRows(results, listener);
057        assertEquals(rows, 5);
058        source.close(results);
059        source.close(statement);
060    }
061
062    @Test
063    @Parameters({"sql2"})
064    public void testMockDepartments(String sql) throws Exception {
065        List<Object> params = new LinkedList();
066        RiverMouth output = new MockRiverMouth() {
067            @Override
068            public void index(IndexableObject object, boolean create) throws IOException {
069                logger.debug("sql2 object={}", object);
070            }
071        };
072        PreparedStatement statement = source.prepareQuery(sql);
073        source.bind(statement, params);
074        ResultSet results = source.executeQuery(statement);
075        StringKeyValueStreamListener listener = new StringKeyValueStreamListener()
076                .output(output);
077        source.beforeRows(results, listener);
078        long rows = 0L;
079        while (source.nextRow(results, listener)) {
080            rows++;
081        }
082        source.afterRows(results, listener);
083        assertEquals(rows, 11);
084        source.close(results);
085        source.close(statement);
086    }
087
088    @Test
089    @Parameters({"sql3"})
090    public void testMockHighBills(String sql) throws Exception {
091        List<Object> params = new LinkedList();
092        params.add(2.00);
093        RiverMouth output = new MockRiverMouth() {
094            @Override
095            public void index(IndexableObject object, boolean create) throws IOException {
096                logger.debug("sql3={}", object);
097            }
098        };
099        PreparedStatement statement = source.prepareQuery(sql);
100        source.bind(statement, params);
101        ResultSet results = source.executeQuery(statement);
102        KeyValueStreamListener listener = new StringKeyValueStreamListener()
103                .output(output);
104        source.beforeRows(results, listener);
105        long rows = 0L;
106        while (source.nextRow(results, listener)) {
107            rows++;
108        }
109        source.afterRows(results, listener);
110        assertEquals(rows, 2);
111        source.close(results);
112        source.close(statement);
113    }
114
115    @Test
116    @Parameters({"sql4"})
117    public void testMockTimePeriod(String sql) throws Exception {
118        List<Object> params = new LinkedList();
119        params.add("2012-06-10 00:00:00");
120        RiverMouth output = new MockRiverMouth() {
121            @Override
122            public void index(IndexableObject object, boolean create) throws IOException {
123                logger.debug("object={}", object);
124            }
125        };
126        PreparedStatement statement = source.prepareQuery(sql);
127        source.bind(statement, params);
128        ResultSet results = source.executeQuery(statement);
129        StringKeyValueStreamListener listener = new StringKeyValueStreamListener()
130                .output(output);
131        source.beforeRows(results, listener);
132        long rows = 0L;
133        while (source.nextRow(results, listener)) {
134            rows++;
135        }
136        source.afterRows(results, listener);
137        assertEquals(rows, 3);
138        source.close(results);
139        source.close(statement);
140    }
141
142    @Test
143    @Parameters({"sql5"})
144    public void testMockIndexId(String sql) throws Exception {
145        MockRiverMouth mock = new MockRiverMouth() {
146            @Override
147            public void index(IndexableObject object, boolean create) throws IOException {
148                super.index(object, create);
149                logger.debug("products={}", object);
150            }
151        };
152        //mock.setIndex("products").setType("products");
153        PreparedStatement statement = source.prepareQuery(sql);
154        ResultSet results = source.executeQuery(statement);
155        StringKeyValueStreamListener listener = new StringKeyValueStreamListener()
156                .output(mock);
157        source.beforeRows(results, listener);
158        while (source.nextRow(results, listener)) {
159            // ignore
160        }
161        source.afterRows(results, listener);
162        assertEquals(mock.getCounter(), 3);
163        source.close(results);
164        source.close(statement);
165    }
166
167
168}