001package org.xbib.elasticsearch.river.jdbc.strategy.simple; 002 003import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; 004import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; 005import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingRequest; 006import org.elasticsearch.action.index.IndexRequest; 007import org.elasticsearch.client.Client; 008import org.elasticsearch.client.Requests; 009import org.elasticsearch.common.io.Streams; 010import org.elasticsearch.common.xcontent.XContentHelper; 011import org.elasticsearch.indices.IndexAlreadyExistsException; 012import org.elasticsearch.indices.IndexMissingException; 013import org.testng.annotations.AfterMethod; 014import org.testng.annotations.BeforeMethod; 015import org.testng.annotations.Optional; 016import org.testng.annotations.Parameters; 017import org.elasticsearch.common.xcontent.XContentBuilder; 018import org.xbib.elasticsearch.action.plugin.jdbc.state.get.GetRiverStateAction; 019import org.xbib.elasticsearch.action.plugin.jdbc.state.get.GetRiverStateRequest; 020import org.xbib.elasticsearch.action.plugin.jdbc.state.get.GetRiverStateResponse; 021import org.xbib.elasticsearch.plugin.jdbc.util.LocaleUtil; 022import org.xbib.elasticsearch.river.jdbc.RiverContext; 023import org.xbib.elasticsearch.plugin.jdbc.state.RiverState; 024import org.xbib.elasticsearch.river.jdbc.RiverSource; 025import org.xbib.elasticsearch.support.helper.AbstractNodeTestHelper; 026 027import java.io.BufferedReader; 028import java.io.IOException; 029import java.io.InputStream; 030import java.io.InputStreamReader; 031import java.sql.Connection; 032import java.sql.PreparedStatement; 033import java.sql.SQLException; 034import java.sql.Statement; 035import java.sql.Timestamp; 036import java.util.ArrayList; 037import java.util.Calendar; 038import java.util.List; 039import java.util.Locale; 040import java.util.Map; 041import java.util.TimeZone; 042import java.util.UUID; 043 044import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; 045 046public abstract class AbstractSimpleRiverTest extends AbstractNodeTestHelper { 047 048 private final static int SECONDS_TO_WAIT = 15; 049 050 protected static RiverSource source; 051 052 protected static RiverContext context; 053 054 public abstract RiverSource newRiverSource(); 055 056 public abstract RiverContext newRiverContext(); 057 058 @BeforeMethod 059 @Parameters({"starturl", "user", "password", "create"}) 060 public void beforeMethod(String starturl, String user, String password, @Optional String resourceName) 061 throws Exception { 062 startNodes(); 063 logger.info("nodes started"); 064 waitForYellow("1"); 065 try { 066 // create river index 067 client("1").admin().indices().create(new CreateIndexRequest("_river")).actionGet(); 068 logger.info("river index created"); 069 } catch (IndexAlreadyExistsException e) { 070 logger.warn(e.getMessage()); 071 } 072 source = newRiverSource() 073 .setUrl(starturl) 074 .setUser(user) 075 .setPassword(password) 076 .setLocale(Locale.getDefault()) 077 .setTimeZone(TimeZone.getDefault()); 078 context = newRiverContext(); 079 context.setRiverSource(source); 080 source.setRiverContext(context); 081 logger.info("create table {}", resourceName); 082 if (resourceName == null || "".equals(resourceName)) { 083 return; 084 } 085 Connection connection = source.getConnectionForWriting(); 086 if (connection == null) { 087 throw new IOException("no connection"); 088 } 089 sqlScript(connection, resourceName); 090 source.closeWriting(); 091 } 092 093 @AfterMethod 094 @Parameters({"stopurl", "user", "password", "delete"}) 095 public void afterMethod(String stopurl, String user, String password, @Optional String resourceName) 096 throws Exception { 097 098 logger.info("remove table {}", resourceName); 099 if (resourceName == null || "".equals(resourceName)) { 100 return; 101 } 102 // before dropping tables, open read connection must be closed to avoid hangs in mysql/postgresql 103 logger.debug("closing reads..."); 104 source.closeReading(); 105 106 logger.debug("connecting for close..."); 107 Connection connection = source.getConnectionForWriting(); 108 if (connection == null) { 109 throw new IOException("no connection"); 110 } 111 logger.debug("cleaning..."); 112 // clean up tables 113 sqlScript(connection, resourceName); 114 logger.debug("closing writes..."); 115 source.closeWriting(); 116 117 // some driver can drop database by a magic 'stop' URL 118 source = newRiverSource() 119 .setUrl(stopurl) 120 .setUser(user) 121 .setPassword(password) 122 .setLocale(Locale.getDefault()) 123 .setTimeZone(TimeZone.getDefault()); 124 try { 125 logger.info("connecting to stop URL..."); 126 // activate stop URL 127 source.getConnectionForWriting(); 128 } catch (Exception e) { 129 // exception is expected, ignore 130 } 131 // close open write connection 132 source.closeWriting(); 133 logger.info("stopped"); 134 135 // delete test index 136 try { 137 client("1").admin().indices().delete(new DeleteIndexRequest(index)).actionGet(); 138 logger.info("index {} deleted", index); 139 } catch (IndexMissingException e) { 140 logger.warn(e.getMessage()); 141 } 142 try { 143 client("1").admin().indices().deleteMapping(new DeleteMappingRequest() 144 .indices(new String[]{"_river"}).types("my_jdbc_river")).actionGet(); 145 logger.info("river my_jdbc_river deleted"); 146 } catch (Exception e) { 147 logger.warn(e.getMessage()); 148 } 149 stopNodes(); 150 } 151 152 protected void performRiver(String resource) throws Exception { 153 createRiver(resource); 154 waitForRiver(); 155 waitForActiveRiver(); 156 waitForInactiveRiver(); 157 } 158 159 protected void createRiver(String resource) throws Exception { 160 waitForYellow("1"); 161 byte[] b = Streams.copyToByteArray(getClass().getResourceAsStream(resource)); 162 Map<String, Object> map = XContentHelper.convertToMap(b, false).v2(); 163 XContentBuilder builder = jsonBuilder().map(map); 164 logger.info("river = {}", builder.string()); 165 IndexRequest indexRequest = Requests.indexRequest("_river").type("my_jdbc_river").id("_meta") 166 .source(builder.string()); 167 client("1").index(indexRequest).actionGet(); 168 client("1").admin().indices().prepareRefresh("_river").execute().actionGet(); 169 logger.info("river is created"); 170 } 171 172 public void waitForRiver() throws Exception { 173 waitForRiver(client("1"), "my_jdbc_river", "jdbc", SECONDS_TO_WAIT); 174 logger.info("river is up"); 175 } 176 public void waitForActiveRiver() throws Exception { 177 waitForActiveRiver(client("1"), "my_jdbc_river", "jdbc", SECONDS_TO_WAIT); 178 logger.info("river is active"); 179 } 180 181 public void waitForInactiveRiver() throws Exception { 182 waitForInactiveRiver(client("1"), "my_jdbc_river", "jdbc", SECONDS_TO_WAIT); 183 logger.info("river is inactive"); 184 } 185 186 /*protected RiverSettings riverSettings(String resource) 187 throws IOException { 188 InputStream in = getClass().getResourceAsStream(resource); 189 return new RiverSettings(ImmutableSettings.settingsBuilder().build(), 190 XContentHelper.convertToMap(Streams.copyToByteArray(in), false).v2()); 191 }*/ 192 193 protected void createRandomProducts(String sql, int size) 194 throws SQLException { 195 Connection connection = source.getConnectionForWriting(); 196 for (int i = 0; i < size; i++) { 197 long amount = Math.round(Math.random() * 1000); 198 double price = (Math.random() * 10000) / 100.00; 199 add(connection, sql, UUID.randomUUID().toString().substring(0, 32), amount, price); 200 } 201 if (!connection.getAutoCommit()) { 202 connection.commit(); 203 } 204 source.closeWriting(); 205 } 206 207 protected void createTimestampedLogs(String sql, int size, String locale, String timezone) 208 throws SQLException { 209 Connection connection = source.getConnectionForWriting(); 210 Locale l = LocaleUtil.toLocale(locale); 211 TimeZone t = TimeZone.getTimeZone(timezone); 212 source.setTimeZone(t).setLocale(l); 213 Calendar cal = Calendar.getInstance(t, l); 214 // half of log in the past, half of it in the future 215 cal.add(Calendar.HOUR, -(size/2)); 216 for (int i = 0; i < size; i++) { 217 Timestamp modified = new Timestamp(cal.getTimeInMillis()); 218 String message = "Hello world"; 219 add(connection, sql, modified, message); 220 cal.add(Calendar.HOUR, 1); 221 } 222 if (!connection.getAutoCommit()) { 223 connection.commit(); 224 } 225 source.closeWriting(); 226 } 227 228 private void add(Connection connection, String sql, final String name, final long amount, final double price) 229 throws SQLException { 230 PreparedStatement stmt = connection.prepareStatement(sql); 231 List<Object> params = new ArrayList<Object>() {{ 232 add(name); 233 add(amount); 234 add(price); 235 }}; 236 source.bind(stmt, params); 237 stmt.execute(); 238 } 239 240 private void add(Connection connection, String sql, final Timestamp ts, final String message) 241 throws SQLException { 242 PreparedStatement stmt = connection.prepareStatement(sql); 243 List<Object> params = new ArrayList<Object>() {{ 244 add(ts); 245 add(message); 246 }}; 247 source.bind(stmt, params); 248 stmt.execute(); 249 } 250 251 protected void createRandomProductsJob(String sql, int size) 252 throws SQLException { 253 Connection connection = source.getConnectionForWriting(); 254 for (int i = 0; i < size; i++) { 255 long amount = Math.round(Math.random() * 1000); 256 double price = (Math.random() * 10000) / 100.00; 257 add(connection, sql, 1L, UUID.randomUUID().toString().substring(0, 32), amount, price); 258 } 259 if (!connection.getAutoCommit()) { 260 connection.commit(); 261 } 262 source.closeWriting(); 263 } 264 265 private void add(Connection connection, String sql, final long job, final String name, final long amount, final double price) 266 throws SQLException { 267 PreparedStatement stmt = connection.prepareStatement(sql); 268 List<Object> params = new ArrayList<Object>() { 269 { 270 add(job); 271 add(name); 272 add(amount); 273 add(price); 274 } 275 }; 276 source.bind(stmt, params); 277 stmt.execute(); 278 } 279 280 private void sqlScript(Connection connection, String resourceName) throws Exception { 281 InputStream in = getClass().getResourceAsStream(resourceName); 282 BufferedReader br = new BufferedReader(new InputStreamReader(in, "UTF-8")); 283 String sql; 284 while ((sql = br.readLine()) != null) { 285 286 try { 287 logger.trace("executing {}", sql); 288 Statement p = connection.createStatement(); 289 p.execute(sql); 290 p.close(); 291 } catch (SQLException e) { 292 // ignore 293 logger.error(sql + " failed. Reason: " + e.getMessage()); 294 } finally { 295 connection.commit(); 296 } 297 } 298 br.close(); 299 } 300 301 public static RiverState waitForRiver(Client client, String riverName, String riverType, int seconds) 302 throws InterruptedException, IOException { 303 GetRiverStateRequest riverStateRequest = new GetRiverStateRequest() 304 .setRiverName(riverName) 305 .setRiverType(riverType); 306 GetRiverStateResponse riverStateResponse = client.admin().cluster() 307 .execute(GetRiverStateAction.INSTANCE, riverStateRequest).actionGet(); 308 logger.debug("waitForRiver {}/{}", riverName, riverType); 309 seconds = 2 * seconds; 310 while (seconds-- > 0 && riverStateResponse.exists(riverName, riverType)) { 311 Thread.sleep(500L); 312 try { 313 riverStateResponse = client.admin().cluster().execute(GetRiverStateAction.INSTANCE, riverStateRequest).actionGet(); 314 logger.debug("waitForRiver state={}", riverStateResponse.getRiverState()); 315 } catch (IndexMissingException e) { 316 logger.warn("index missing"); 317 } 318 } 319 if (seconds < 0) { 320 throw new IOException("timeout waiting for river"); 321 } 322 return riverStateResponse.getRiverState(); 323 } 324 325 public static RiverState waitForActiveRiver(Client client, String riverName, String riverType, int seconds) throws InterruptedException, IOException { 326 long now = System.currentTimeMillis(); 327 GetRiverStateRequest riverStateRequest = new GetRiverStateRequest() 328 .setRiverName(riverName) 329 .setRiverType(riverType); 330 GetRiverStateResponse riverStateResponse = client.admin().cluster() 331 .execute(GetRiverStateAction.INSTANCE, riverStateRequest).actionGet(); 332 RiverState riverState = riverStateResponse.getRiverState(); 333 long t0 = riverState != null ? riverState.getLastActiveBegin().getMillis() : 0L; 334 logger.debug("waitForActiveRiver: now={} t0={} t0<now={} state={}", now, t0, t0 < now, riverState); 335 seconds = 2 * seconds; 336 while (seconds-- > 0 && t0 == 0 && t0 < now) { 337 Thread.sleep(500L); 338 try { 339 riverStateResponse = client.admin().cluster().execute(GetRiverStateAction.INSTANCE, riverStateRequest).actionGet(); 340 riverState = riverStateResponse.getRiverState(); 341 t0 = riverState != null ? riverState.getLastActiveBegin().getMillis() : 0L; 342 } catch (IndexMissingException e) { 343 logger.warn("index missing"); 344 } 345 logger.debug("waitForActiveRiver: now={} t0={} t0<now={} state={}", now, t0, t0 < now, riverState); 346 } 347 if (seconds < 0) { 348 throw new IOException("timeout waiting for active river"); 349 } 350 return riverState; 351 } 352 353 public static RiverState waitForInactiveRiver(Client client, String riverName, String riverType, int seconds) throws InterruptedException, IOException { 354 long now = System.currentTimeMillis(); 355 GetRiverStateRequest riverStateRequest = new GetRiverStateRequest() 356 .setRiverName(riverName) 357 .setRiverType(riverType); 358 GetRiverStateResponse riverStateResponse = client.admin().cluster() 359 .execute(GetRiverStateAction.INSTANCE, riverStateRequest).actionGet(); 360 RiverState riverState = riverStateResponse.getRiverState(); 361 long t0 = riverState != null ? riverState.getLastActiveBegin().getMillis() : 0L; 362 long t1 = riverState != null ? riverState.getLastActiveEnd().getMillis() : 0L; 363 logger.debug("waitForInactiveRiver: now={} t0<now={} t1-t0<=0={} state={}", now, t0 < now, t1 - t0 <= 0L, riverState); 364 seconds = 2 * seconds; 365 while (seconds-- > 0 && t0 < now && t1 - t0 <= 0L) { 366 Thread.sleep(500L); 367 try { 368 riverStateResponse = client.admin().cluster().execute(GetRiverStateAction.INSTANCE, riverStateRequest).actionGet(); 369 riverState = riverStateResponse.getRiverState(); 370 t0 = riverState != null ? riverState.getLastActiveBegin().getMillis() : 0L; 371 t1 = riverState != null ? riverState.getLastActiveEnd().getMillis() : 0L; 372 } catch (IndexMissingException e) { 373 logger.warn("index missing"); 374 } 375 logger.debug("waitForInactiveRiver: now={} t0<now={} t1-t0<=0={} state={}", now, t0 < now, t1 - t0 <= 0L, riverState); 376 } 377 if (seconds < 0) { 378 throw new IOException("timeout waiting for inactive river"); 379 } 380 return riverState; 381 } 382}