001package org.xbib.elasticsearch.river.jdbc.strategy.simple; 002 003import org.elasticsearch.common.logging.ESLogger; 004import org.elasticsearch.common.logging.ESLoggerFactory; 005import org.testng.annotations.Optional; 006import org.testng.annotations.Parameters; 007import org.testng.annotations.Test; 008import org.xbib.elasticsearch.plugin.jdbc.util.IndexableObject; 009import org.xbib.elasticsearch.river.jdbc.support.StringKeyValueStreamListener; 010import org.xbib.elasticsearch.plugin.jdbc.util.Values; 011import org.xbib.elasticsearch.river.jdbc.RiverMouth; 012import org.xbib.elasticsearch.river.jdbc.RiverSource; 013import org.xbib.elasticsearch.river.jdbc.strategy.mock.MockRiverMouth; 014import org.xbib.elasticsearch.plugin.jdbc.keyvalue.KeyValueStreamListener; 015 016import java.io.IOException; 017import java.sql.Connection; 018import java.sql.PreparedStatement; 019import java.sql.ResultSet; 020import java.util.Iterator; 021import java.util.LinkedList; 022import java.util.List; 023 024public class SimpleRiverSourceTests extends AbstractSimpleRiverTest { 025 026 private static final ESLogger logger = ESLoggerFactory.getLogger(SimpleRiverSourceTests.class.getName()); 027 028 @Override 029 public RiverSource newRiverSource() { 030 return new SimpleRiverSource(); 031 } 032 033 @Override 034 public SimpleRiverContext newRiverContext() { 035 return new SimpleRiverContext(); 036 } 037 038 @Test 039 public void testSimpleConnectionClose() throws Exception { 040 Connection connection = source.getConnectionForReading(); 041 assertFalse(connection.isClosed()); 042 source.closeReading(); 043 assertTrue(connection.isClosed()); 044 source.getConnectionForReading(); 045 } 046 047 @Test 048 @Parameters({"sql1"}) 049 public void testSimpleSQL(String sql) throws Exception { 050 PreparedStatement statement = source.prepareQuery(sql); 051 ResultSet results = source.executeQuery(statement); 052 for (int i = 0; i < 5; i++) { 053 assertTrue(results.next()); 054 } 055 source.close(results); 056 source.close(statement); 057 } 058 059 @Test 060 @Parameters({"sql2", "n"}) 061 public void testSimpleStarQuery(String sql, @Optional Integer n) throws Exception { 062 List<Object> params = new LinkedList<Object>(); 063 RiverMouth output = new MockRiverMouth() { 064 @Override 065 public void index(IndexableObject object, boolean create) throws IOException { 066 logger.debug("object={}", object); 067 } 068 }; 069 PreparedStatement statement = source.prepareQuery(sql); 070 source.bind(statement, params); 071 ResultSet results = source.executeQuery(statement); 072 KeyValueStreamListener listener = new StringKeyValueStreamListener() 073 .output(output); 074 long rows = 0L; 075 source.beforeRows(results, listener); 076 while (source.nextRow(results, listener)) { 077 rows++; 078 } 079 source.afterRows(results, listener); 080 assertEquals(rows, n == null ? 5 : n); 081 source.close(results); 082 source.close(statement); 083 } 084 085 @Test 086 @Parameters({"sql3"}) 087 public void testSimpleNullInteger(String sql) throws Exception { 088 List<Object> params = new LinkedList<Object>(); 089 RiverMouth mouth = new MockRiverMouth() { 090 @Override 091 public void index(IndexableObject object, boolean create) throws IOException { 092 if (object == null || object.source() == null) { 093 throw new IllegalArgumentException("object missing"); 094 } 095 Values o = (Values) object.source().get("amount"); 096 if (o == null) { 097 o = (Values) object.source().get("AMOUNT"); // hsqldb is uppercase 098 } 099 if (!o.isNull()) { 100 throw new IllegalArgumentException("amount not null??? " + o.getClass().getName()); 101 } 102 } 103 }; 104 PreparedStatement statement = source.prepareQuery(sql); 105 source.bind(statement, params); 106 ResultSet results = source.executeQuery(statement); 107 KeyValueStreamListener listener = new StringKeyValueStreamListener() 108 .output(mouth); 109 long rows = 0L; 110 source.beforeRows(results, listener); 111 if (source.nextRow(results, listener)) { 112 // only one row 113 rows++; 114 } 115 source.afterRows(results, listener); 116 assertEquals(rows, 1); 117 source.close(results); 118 source.close(statement); 119 } 120 121 /** 122 * Test JDBC Array to structured object array 123 * 124 * @param sql the array select statement 125 * @throws Exception if test fails 126 */ 127 @Test 128 @Parameters({"sql4", "res1", "res2"}) 129 public void testSimpleArray(@Optional String sql, @Optional String res1, @Optional String res2) throws Exception { 130 if (sql == null) { 131 return; 132 } 133 List<Object> params = new LinkedList<Object>(); 134 final List<IndexableObject> result = new LinkedList<IndexableObject>(); 135 RiverMouth mouth = new MockRiverMouth() { 136 @Override 137 public void index(IndexableObject object, boolean create) throws IOException { 138 if (object == null || object.source() == null) { 139 throw new IllegalArgumentException("object missing"); 140 } 141 result.add(object); 142 } 143 }; 144 PreparedStatement statement = source.prepareQuery(sql); 145 source.bind(statement, params); 146 ResultSet results = source.executeQuery(statement); 147 KeyValueStreamListener listener = new StringKeyValueStreamListener() 148 .output(mouth); 149 long rows = 0L; 150 source.beforeRows(results, listener); 151 while (source.nextRow(results, listener)) { 152 rows++; 153 } 154 source.afterRows(results, listener); 155 assertEquals(rows, 2); 156 source.close(results); 157 source.close(statement); 158 Iterator<IndexableObject> it = result.iterator(); 159 assertEquals(it.next().source().toString(), res1); 160 assertEquals(it.next().source().toString(), res2); 161 } 162 163}