001package org.xbib.elasticsearch.river.jdbc.strategy.mock; 002 003import org.elasticsearch.common.logging.ESLogger; 004import org.elasticsearch.common.logging.ESLoggerFactory; 005import org.testng.annotations.Parameters; 006import org.testng.annotations.Test; 007import org.xbib.elasticsearch.plugin.jdbc.util.IndexableObject; 008import org.xbib.elasticsearch.river.jdbc.strategy.simple.SimpleRiverContext; 009import org.xbib.elasticsearch.river.jdbc.support.StringKeyValueStreamListener; 010import org.xbib.elasticsearch.river.jdbc.RiverMouth; 011import org.xbib.elasticsearch.river.jdbc.RiverSource; 012import org.xbib.elasticsearch.river.jdbc.strategy.simple.SimpleRiverSource; 013import org.xbib.elasticsearch.river.jdbc.strategy.simple.AbstractSimpleRiverTest; 014import org.xbib.elasticsearch.plugin.jdbc.keyvalue.KeyValueStreamListener; 015 016import java.io.IOException; 017import java.sql.PreparedStatement; 018import java.sql.ResultSet; 019import java.util.LinkedList; 020import java.util.List; 021 022public class RiverMockTests extends AbstractSimpleRiverTest { 023 024 private static final ESLogger logger = ESLoggerFactory.getLogger(RiverMockTests.class.getName()); 025 026 @Override 027 public RiverSource newRiverSource() { 028 return new SimpleRiverSource(); 029 } 030 031 @Override 032 public SimpleRiverContext newRiverContext() { 033 return new SimpleRiverContext(); 034 } 035 036 @Test 037 @Parameters({"sql1"}) 038 public void testMockBill(String sql) throws Exception { 039 List<Object> params = new LinkedList(); 040 RiverMouth output = new MockRiverMouth() { 041 @Override 042 public void index(IndexableObject object, boolean create) throws IOException { 043 logger.debug("sql1 object={}", object); 044 } 045 }; 046 PreparedStatement statement = source.prepareQuery(sql); 047 source.bind(statement, params); 048 ResultSet results = source.executeQuery(statement); 049 StringKeyValueStreamListener listener = new StringKeyValueStreamListener() 050 .output(output); 051 long rows = 0L; 052 source.beforeRows(results, listener); 053 while (source.nextRow(results, listener)) { 054 rows++; 055 } 056 source.afterRows(results, listener); 057 assertEquals(rows, 5); 058 source.close(results); 059 source.close(statement); 060 } 061 062 @Test 063 @Parameters({"sql2"}) 064 public void testMockDepartments(String sql) throws Exception { 065 List<Object> params = new LinkedList(); 066 RiverMouth output = new MockRiverMouth() { 067 @Override 068 public void index(IndexableObject object, boolean create) throws IOException { 069 logger.debug("sql2 object={}", object); 070 } 071 }; 072 PreparedStatement statement = source.prepareQuery(sql); 073 source.bind(statement, params); 074 ResultSet results = source.executeQuery(statement); 075 StringKeyValueStreamListener listener = new StringKeyValueStreamListener() 076 .output(output); 077 source.beforeRows(results, listener); 078 long rows = 0L; 079 while (source.nextRow(results, listener)) { 080 rows++; 081 } 082 source.afterRows(results, listener); 083 assertEquals(rows, 11); 084 source.close(results); 085 source.close(statement); 086 } 087 088 @Test 089 @Parameters({"sql3"}) 090 public void testMockHighBills(String sql) throws Exception { 091 List<Object> params = new LinkedList(); 092 params.add(2.00); 093 RiverMouth output = new MockRiverMouth() { 094 @Override 095 public void index(IndexableObject object, boolean create) throws IOException { 096 logger.debug("sql3={}", object); 097 } 098 }; 099 PreparedStatement statement = source.prepareQuery(sql); 100 source.bind(statement, params); 101 ResultSet results = source.executeQuery(statement); 102 KeyValueStreamListener listener = new StringKeyValueStreamListener() 103 .output(output); 104 source.beforeRows(results, listener); 105 long rows = 0L; 106 while (source.nextRow(results, listener)) { 107 rows++; 108 } 109 source.afterRows(results, listener); 110 assertEquals(rows, 2); 111 source.close(results); 112 source.close(statement); 113 } 114 115 @Test 116 @Parameters({"sql4"}) 117 public void testMockTimePeriod(String sql) throws Exception { 118 List<Object> params = new LinkedList(); 119 params.add("2012-06-10 00:00:00"); 120 RiverMouth output = new MockRiverMouth() { 121 @Override 122 public void index(IndexableObject object, boolean create) throws IOException { 123 logger.debug("object={}", object); 124 } 125 }; 126 PreparedStatement statement = source.prepareQuery(sql); 127 source.bind(statement, params); 128 ResultSet results = source.executeQuery(statement); 129 StringKeyValueStreamListener listener = new StringKeyValueStreamListener() 130 .output(output); 131 source.beforeRows(results, listener); 132 long rows = 0L; 133 while (source.nextRow(results, listener)) { 134 rows++; 135 } 136 source.afterRows(results, listener); 137 assertEquals(rows, 3); 138 source.close(results); 139 source.close(statement); 140 } 141 142 @Test 143 @Parameters({"sql5"}) 144 public void testMockIndexId(String sql) throws Exception { 145 MockRiverMouth mock = new MockRiverMouth() { 146 @Override 147 public void index(IndexableObject object, boolean create) throws IOException { 148 super.index(object, create); 149 logger.debug("products={}", object); 150 } 151 }; 152 //mock.setIndex("products").setType("products"); 153 PreparedStatement statement = source.prepareQuery(sql); 154 ResultSet results = source.executeQuery(statement); 155 StringKeyValueStreamListener listener = new StringKeyValueStreamListener() 156 .output(mock); 157 source.beforeRows(results, listener); 158 while (source.nextRow(results, listener)) { 159 // ignore 160 } 161 source.afterRows(results, listener); 162 assertEquals(mock.getCounter(), 3); 163 source.close(results); 164 source.close(statement); 165 } 166 167 168}