001package org.xbib.elasticsearch.river.jdbc.strategy.simple;
002
003import org.elasticsearch.common.logging.ESLogger;
004import org.elasticsearch.common.logging.ESLoggerFactory;
005import org.testng.annotations.Optional;
006import org.testng.annotations.Parameters;
007import org.testng.annotations.Test;
008import org.xbib.elasticsearch.plugin.jdbc.util.IndexableObject;
009import org.xbib.elasticsearch.river.jdbc.support.StringKeyValueStreamListener;
010import org.xbib.elasticsearch.plugin.jdbc.util.Values;
011import org.xbib.elasticsearch.river.jdbc.RiverMouth;
012import org.xbib.elasticsearch.river.jdbc.RiverSource;
013import org.xbib.elasticsearch.river.jdbc.strategy.mock.MockRiverMouth;
014import org.xbib.elasticsearch.plugin.jdbc.keyvalue.KeyValueStreamListener;
015
016import java.io.IOException;
017import java.sql.Connection;
018import java.sql.PreparedStatement;
019import java.sql.ResultSet;
020import java.util.Iterator;
021import java.util.LinkedList;
022import java.util.List;
023
024public class SimpleRiverSourceTests extends AbstractSimpleRiverTest {
025
026    private static final ESLogger logger = ESLoggerFactory.getLogger(SimpleRiverSourceTests.class.getName());
027
028    @Override
029    public RiverSource newRiverSource() {
030        return new SimpleRiverSource();
031    }
032
033    @Override
034    public SimpleRiverContext newRiverContext() {
035        return new SimpleRiverContext();
036    }
037
038    @Test
039    public void testSimpleConnectionClose() throws Exception {
040        Connection connection = source.getConnectionForReading();
041        assertFalse(connection.isClosed());
042        source.closeReading();
043        assertTrue(connection.isClosed());
044        source.getConnectionForReading();
045    }
046
047    @Test
048    @Parameters({"sql1"})
049    public void testSimpleSQL(String sql) throws Exception {
050        PreparedStatement statement = source.prepareQuery(sql);
051        ResultSet results = source.executeQuery(statement);
052        for (int i = 0; i < 5; i++) {
053            assertTrue(results.next());
054        }
055        source.close(results);
056        source.close(statement);
057    }
058
059    @Test
060    @Parameters({"sql2", "n"})
061    public void testSimpleStarQuery(String sql, @Optional Integer n) throws Exception {
062        List<Object> params = new LinkedList<Object>();
063        RiverMouth output = new MockRiverMouth() {
064            @Override
065            public void index(IndexableObject object, boolean create) throws IOException {
066                logger.debug("object={}", object);
067            }
068        };
069        PreparedStatement statement = source.prepareQuery(sql);
070        source.bind(statement, params);
071        ResultSet results = source.executeQuery(statement);
072        KeyValueStreamListener listener = new StringKeyValueStreamListener()
073                .output(output);
074        long rows = 0L;
075        source.beforeRows(results, listener);
076        while (source.nextRow(results, listener)) {
077            rows++;
078        }
079        source.afterRows(results, listener);
080        assertEquals(rows, n == null ? 5 : n);
081        source.close(results);
082        source.close(statement);
083    }
084
085    @Test
086    @Parameters({"sql3"})
087    public void testSimpleNullInteger(String sql) throws Exception {
088        List<Object> params = new LinkedList<Object>();
089        RiverMouth mouth = new MockRiverMouth() {
090            @Override
091            public void index(IndexableObject object, boolean create) throws IOException {
092                if (object == null || object.source() == null) {
093                    throw new IllegalArgumentException("object missing");
094                }
095                Values o = (Values) object.source().get("amount");
096                if (o == null) {
097                    o = (Values) object.source().get("AMOUNT"); // hsqldb is uppercase
098                }
099                if (!o.isNull()) {
100                    throw new IllegalArgumentException("amount not null??? " + o.getClass().getName());
101                }
102            }
103        };
104        PreparedStatement statement = source.prepareQuery(sql);
105        source.bind(statement, params);
106        ResultSet results = source.executeQuery(statement);
107        KeyValueStreamListener listener = new StringKeyValueStreamListener()
108                .output(mouth);
109        long rows = 0L;
110        source.beforeRows(results, listener);
111        if (source.nextRow(results, listener)) {
112            // only one row
113            rows++;
114        }
115        source.afterRows(results, listener);
116        assertEquals(rows, 1);
117        source.close(results);
118        source.close(statement);
119    }
120
121    /**
122     * Test JDBC Array to structured object array
123     *
124     * @param sql the array select statement
125     * @throws Exception if test fails
126     */
127    @Test
128    @Parameters({"sql4", "res1", "res2"})
129    public void testSimpleArray(@Optional String sql, @Optional String res1, @Optional String res2) throws Exception {
130        if (sql == null) {
131            return;
132        }
133        List<Object> params = new LinkedList<Object>();
134        final List<IndexableObject> result = new LinkedList<IndexableObject>();
135        RiverMouth mouth = new MockRiverMouth() {
136            @Override
137            public void index(IndexableObject object, boolean create) throws IOException {
138                if (object == null || object.source() == null) {
139                    throw new IllegalArgumentException("object missing");
140                }
141                result.add(object);
142            }
143        };
144        PreparedStatement statement = source.prepareQuery(sql);
145        source.bind(statement, params);
146        ResultSet results = source.executeQuery(statement);
147        KeyValueStreamListener listener = new StringKeyValueStreamListener()
148                .output(mouth);
149        long rows = 0L;
150        source.beforeRows(results, listener);
151        while (source.nextRow(results, listener)) {
152            rows++;
153        }
154        source.afterRows(results, listener);
155        assertEquals(rows, 2);
156        source.close(results);
157        source.close(statement);
158        Iterator<IndexableObject> it = result.iterator();
159        assertEquals(it.next().source().toString(), res1);
160        assertEquals(it.next().source().toString(), res2);
161    }
162
163}