001package org.xbib.elasticsearch.river.jdbc.strategy.column; 002 003import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; 004import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; 005import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingRequest; 006import org.elasticsearch.action.index.IndexRequest; 007import org.elasticsearch.client.Client; 008import org.elasticsearch.client.Requests; 009import org.elasticsearch.common.io.Streams; 010import org.elasticsearch.common.settings.ImmutableSettings; 011import org.elasticsearch.common.xcontent.XContentBuilder; 012import org.elasticsearch.common.xcontent.XContentHelper; 013import org.elasticsearch.indices.IndexAlreadyExistsException; 014import org.elasticsearch.indices.IndexMissingException; 015import org.elasticsearch.river.RiverSettings; 016import org.testng.annotations.AfterMethod; 017import org.testng.annotations.BeforeMethod; 018import org.testng.annotations.Optional; 019import org.testng.annotations.Parameters; 020import org.xbib.elasticsearch.action.plugin.jdbc.state.get.GetRiverStateAction; 021import org.xbib.elasticsearch.action.plugin.jdbc.state.get.GetRiverStateRequest; 022import org.xbib.elasticsearch.action.plugin.jdbc.state.get.GetRiverStateResponse; 023import org.xbib.elasticsearch.plugin.jdbc.state.RiverState; 024import org.xbib.elasticsearch.support.helper.AbstractNodeTestHelper; 025 026import java.io.BufferedReader; 027import java.io.IOException; 028import java.io.InputStream; 029import java.io.InputStreamReader; 030import java.sql.Connection; 031import java.sql.SQLException; 032import java.sql.Statement; 033import java.util.Locale; 034import java.util.Map; 035import java.util.TimeZone; 036 037import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; 038 039public abstract class AbstractColumnRiverTest extends AbstractNodeTestHelper { 040 041 private final static int SECONDS_TO_WAIT = 15; 042 043 protected static ColumnRiverSource source; 044 045 protected static ColumnRiverContext context; 046 047 public abstract ColumnRiverSource newRiverSource(); 048 049 public abstract ColumnRiverContext newRiverContext(); 050 051 @BeforeMethod 052 @Parameters({"starturl", "user", "password", "create"}) 053 public void beforeMethod(String starturl, String user, String password, @Optional String resourceName) 054 throws Exception { 055 startNodes(); 056 057 logger.info("nodes started"); 058 059 waitForYellow("1"); 060 try { 061 // create river index 062 client("1").admin().indices().create(new CreateIndexRequest("_river")).actionGet(); 063 logger.info("river index created"); 064 } catch (IndexAlreadyExistsException e) { 065 logger.warn(e.getMessage()); 066 } 067 source = newRiverSource(); 068 source.setUrl(starturl) 069 .setUser(user) 070 .setPassword(password) 071 .setLocale(Locale.getDefault()) 072 .setTimeZone(TimeZone.getDefault()); 073 context = newRiverContext(); 074 context.setRiverSource(source); 075 source.setRiverContext(context); 076 logger.info("create table {}", resourceName); 077 if (resourceName == null || "".equals(resourceName)) { 078 return; 079 } 080 Connection connection = source.getConnectionForWriting(); 081 if (connection == null) { 082 throw new IOException("no connection"); 083 } 084 sqlScript(connection, resourceName); 085 source.closeWriting(); 086 } 087 088 @AfterMethod 089 @Parameters({"stopurl", "user", "password", "delete"}) 090 public void afterMethod(String stopurl, String user, String password, @Optional String resourceName) 091 throws Exception { 092 093 logger.info("remove table {}", resourceName); 094 if (resourceName == null || "".equals(resourceName)) { 095 return; 096 } 097 // before dropping tables, open read connection must be closed to avoid hangs in mysql/postgresql 098 logger.debug("closing reads..."); 099 source.closeReading(); 100 101 logger.debug("connecting for close..."); 102 Connection connection = source.getConnectionForWriting(); 103 if (connection == null) { 104 throw new IOException("no connection"); 105 } 106 logger.debug("cleaning..."); 107 // clean up tables 108 sqlScript(connection, resourceName); 109 logger.debug("closing writes..."); 110 source.closeWriting(); 111 112 // some driver can drop database by a magic 'stop' URL 113 source = newRiverSource(); 114 source.setUrl(stopurl) 115 .setUser(user) 116 .setPassword(password) 117 .setLocale(Locale.getDefault()) 118 .setTimeZone(TimeZone.getDefault()); 119 try { 120 logger.info("connecting to stop URL..."); 121 // activate stop URL 122 source.getConnectionForWriting(); 123 } catch (Exception e) { 124 // exception is expected, ignore 125 } 126 // close open write connection 127 source.closeWriting(); 128 logger.info("stopped"); 129 130 // delete test index 131 try { 132 client("1").admin().indices().delete(new DeleteIndexRequest(index)).actionGet(); 133 logger.info("index {} deleted", index); 134 } catch (IndexMissingException e) { 135 logger.warn(e.getMessage()); 136 } 137 try { 138 client("1").admin().indices().deleteMapping(new DeleteMappingRequest() 139 .indices(new String[]{"_river"}).types("my_jdbc_river")).actionGet(); 140 logger.info("river my_jdbc_river deleted"); 141 } catch (Exception e) { 142 logger.warn(e.getMessage()); 143 } 144 stopNodes(); 145 } 146 147 protected void performRiver(String resource) throws Exception { 148 createRiver(resource); 149 waitForRiver(); 150 waitForActiveRiver(); 151 waitForInactiveRiver(); 152 } 153 154 protected void createRiver(String resource) throws Exception { 155 waitForYellow("1"); 156 byte[] b = Streams.copyToByteArray(getClass().getResourceAsStream(resource)); 157 Map<String, Object> map = XContentHelper.convertToMap(b, false).v2(); 158 XContentBuilder builder = jsonBuilder().map(map); 159 logger.info("river = {}", builder.string()); 160 IndexRequest indexRequest = Requests.indexRequest("_river").type("my_jdbc_river").id("_meta") 161 .source(builder.string()); 162 client("1").index(indexRequest).actionGet(); 163 client("1").admin().indices().prepareRefresh("_river").execute().actionGet(); 164 logger.info("river is created"); 165 } 166 167 public void waitForRiver() throws Exception { 168 waitForRiver(client("1"), "my_jdbc_river", "jdbc", SECONDS_TO_WAIT); 169 logger.info("river is up"); 170 } 171 public void waitForActiveRiver() throws Exception { 172 waitForActiveRiver(client("1"), "my_jdbc_river", "jdbc", SECONDS_TO_WAIT); 173 logger.info("river is active"); 174 } 175 176 public void waitForInactiveRiver() throws Exception { 177 waitForInactiveRiver(client("1"), "my_jdbc_river", "jdbc", SECONDS_TO_WAIT); 178 logger.info("river is inactive"); 179 } 180 181 protected RiverSettings riverSettings(String resource) 182 throws IOException { 183 InputStream in = getClass().getResourceAsStream(resource); 184 return new RiverSettings(ImmutableSettings.settingsBuilder().build(), 185 XContentHelper.convertToMap(Streams.copyToByteArray(in), false).v2()); 186 } 187 188 private void sqlScript(Connection connection, String resourceName) throws Exception { 189 InputStream in = getClass().getResourceAsStream(resourceName); 190 BufferedReader br = new BufferedReader(new InputStreamReader(in, "UTF-8")); 191 String sql; 192 while ((sql = br.readLine()) != null) { 193 194 try { 195 logger.trace("executing {}", sql); 196 Statement p = connection.createStatement(); 197 p.execute(sql); 198 p.close(); 199 } catch (SQLException e) { 200 // ignore 201 logger.error(sql + " failed. Reason: " + e.getMessage()); 202 } finally { 203 connection.commit(); 204 } 205 } 206 br.close(); 207 } 208 209 public static RiverState waitForRiver(Client client, String riverName, String riverType, int seconds) 210 throws InterruptedException, IOException { 211 GetRiverStateRequest riverStateRequest = new GetRiverStateRequest() 212 .setRiverName(riverName) 213 .setRiverType(riverType); 214 GetRiverStateResponse riverStateResponse = client.admin().cluster() 215 .execute(GetRiverStateAction.INSTANCE, riverStateRequest).actionGet(); 216 logger.info("waitForRiver {}/{}", riverName, riverType); 217 while (seconds-- > 0 && riverStateResponse.exists(riverName, riverType)) { 218 Thread.sleep(1000L); 219 try { 220 riverStateResponse = client.admin().cluster().execute(GetRiverStateAction.INSTANCE, riverStateRequest).actionGet(); 221 logger.info("waitForRiver state={}", riverStateResponse.getRiverState()); 222 } catch (IndexMissingException e) { 223 logger.warn("index missing"); 224 } 225 } 226 if (seconds < 0) { 227 throw new IOException("timeout waiting for river"); 228 } 229 return riverStateResponse.getRiverState(); 230 } 231 232 public static RiverState waitForActiveRiver(Client client, String riverName, String riverType, int seconds) throws InterruptedException, IOException { 233 long now = System.currentTimeMillis(); 234 GetRiverStateRequest riverStateRequest = new GetRiverStateRequest() 235 .setRiverName(riverName) 236 .setRiverType(riverType); 237 GetRiverStateResponse riverStateResponse = client.admin().cluster() 238 .execute(GetRiverStateAction.INSTANCE, riverStateRequest).actionGet(); 239 RiverState riverState = riverStateResponse.getRiverState(); 240 long t0 = riverState != null ? riverState.getLastActiveBegin().getMillis() : 0L; 241 logger.info("waitForActiveRiver: now={} t0={} t0<now={} state={}", 242 now, t0, t0 < now, riverState); 243 while (seconds-- > 0 && t0 == 0 && t0 < now) { 244 Thread.sleep(1000L); 245 try { 246 riverStateResponse = client.admin().cluster().execute(GetRiverStateAction.INSTANCE, riverStateRequest).actionGet(); 247 riverState = riverStateResponse.getRiverState(); 248 t0 = riverState != null ? riverState.getLastActiveBegin().getMillis() : 0L; 249 } catch (IndexMissingException e) { 250 // 251 } 252 logger.info("waitForActiveRiver: now={} t0={} t0<now={} state={}", 253 now, t0, t0 < now, riverState); 254 } 255 if (seconds < 0) { 256 throw new IOException("timeout waiting for active river"); 257 } 258 return riverState; 259 } 260 261 public static RiverState waitForInactiveRiver(Client client, String riverName, String riverType, int seconds) throws InterruptedException, IOException { 262 long now = System.currentTimeMillis(); 263 GetRiverStateRequest riverStateRequest = new GetRiverStateRequest() 264 .setRiverName(riverName) 265 .setRiverType(riverType); 266 GetRiverStateResponse riverStateResponse = client.admin().cluster() 267 .execute(GetRiverStateAction.INSTANCE, riverStateRequest).actionGet(); 268 RiverState riverState = riverStateResponse.getRiverState(); 269 long t0 = riverState != null ? riverState.getLastActiveBegin().getMillis() : 0L; 270 long t1 = riverState != null ? riverState.getLastActiveEnd().getMillis() : 0L; 271 logger.info("waitForInactiveRiver: now={} t0<now={} t1-t0<=0={} state={}", 272 now, t0 < now, t1 - t0 <= 0L, riverState); 273 while (seconds-- > 0 && t0 < now && t1 - t0 <= 0L) { 274 Thread.sleep(1000L); 275 try { 276 riverStateResponse = client.admin().cluster().execute(GetRiverStateAction.INSTANCE, riverStateRequest).actionGet(); 277 riverState = riverStateResponse.getRiverState(); 278 t0 = riverState != null ? riverState.getLastActiveBegin().getMillis() : 0L; 279 t1 = riverState != null ? riverState.getLastActiveEnd().getMillis() : 0L; 280 } catch (IndexMissingException e) { 281 // 282 } 283 logger.info("waitForInactiveRiver: now={} t0<now={} t1-t0<=0={} state={}", 284 now, t0 < now, t1 - t0 <= 0L, riverState); 285 } 286 if (seconds < 0) { 287 throw new IOException("timeout waiting for inactive river"); 288 } 289 return riverState; 290 } 291}