001package org.xbib.elasticsearch.river.jdbc.strategy.simple;
002
003import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
004import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
005import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingRequest;
006import org.elasticsearch.action.index.IndexRequest;
007import org.elasticsearch.client.Client;
008import org.elasticsearch.client.Requests;
009import org.elasticsearch.common.io.Streams;
010import org.elasticsearch.common.xcontent.XContentHelper;
011import org.elasticsearch.indices.IndexAlreadyExistsException;
012import org.elasticsearch.indices.IndexMissingException;
013import org.testng.annotations.AfterMethod;
014import org.testng.annotations.BeforeMethod;
015import org.testng.annotations.Optional;
016import org.testng.annotations.Parameters;
017import org.elasticsearch.common.xcontent.XContentBuilder;
018import org.xbib.elasticsearch.action.plugin.jdbc.state.get.GetRiverStateAction;
019import org.xbib.elasticsearch.action.plugin.jdbc.state.get.GetRiverStateRequest;
020import org.xbib.elasticsearch.action.plugin.jdbc.state.get.GetRiverStateResponse;
021import org.xbib.elasticsearch.plugin.jdbc.util.LocaleUtil;
022import org.xbib.elasticsearch.river.jdbc.RiverContext;
023import org.xbib.elasticsearch.plugin.jdbc.state.RiverState;
024import org.xbib.elasticsearch.river.jdbc.RiverSource;
025import org.xbib.elasticsearch.support.helper.AbstractNodeTestHelper;
026
027import java.io.BufferedReader;
028import java.io.IOException;
029import java.io.InputStream;
030import java.io.InputStreamReader;
031import java.sql.Connection;
032import java.sql.PreparedStatement;
033import java.sql.SQLException;
034import java.sql.Statement;
035import java.sql.Timestamp;
036import java.util.ArrayList;
037import java.util.Calendar;
038import java.util.List;
039import java.util.Locale;
040import java.util.Map;
041import java.util.TimeZone;
042import java.util.UUID;
043
044import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
045
046public abstract class AbstractSimpleRiverTest extends AbstractNodeTestHelper {
047
048    private final static int SECONDS_TO_WAIT = 15;
049
050    protected static RiverSource source;
051
052    protected static RiverContext context;
053
054    public abstract RiverSource newRiverSource();
055
056    public abstract RiverContext newRiverContext();
057
058    @BeforeMethod
059    @Parameters({"starturl", "user", "password", "create"})
060    public void beforeMethod(String starturl, String user, String password, @Optional String resourceName)
061            throws Exception {
062        startNodes();
063        logger.info("nodes started");
064        waitForYellow("1");
065        try {
066            // create river index
067            client("1").admin().indices().create(new CreateIndexRequest("_river")).actionGet();
068            logger.info("river index created");
069        } catch (IndexAlreadyExistsException e) {
070            logger.warn(e.getMessage());
071        }
072        source = newRiverSource()
073                .setUrl(starturl)
074                .setUser(user)
075                .setPassword(password)
076                .setLocale(Locale.getDefault())
077                .setTimeZone(TimeZone.getDefault());
078        context = newRiverContext();
079        context.setRiverSource(source);
080        source.setRiverContext(context);
081        logger.info("create table {}", resourceName);
082        if (resourceName == null || "".equals(resourceName)) {
083            return;
084        }
085        Connection connection = source.getConnectionForWriting();
086        if (connection == null) {
087            throw new IOException("no connection");
088        }
089        sqlScript(connection, resourceName);
090        source.closeWriting();
091    }
092
093    @AfterMethod
094    @Parameters({"stopurl", "user", "password", "delete"})
095    public void afterMethod(String stopurl, String user, String password, @Optional String resourceName)
096            throws Exception {
097
098        logger.info("remove table {}", resourceName);
099        if (resourceName == null || "".equals(resourceName)) {
100            return;
101        }
102        // before dropping tables, open read connection must be closed to avoid hangs in mysql/postgresql
103        logger.debug("closing reads...");
104        source.closeReading();
105
106        logger.debug("connecting for close...");
107        Connection connection = source.getConnectionForWriting();
108        if (connection == null) {
109            throw new IOException("no connection");
110        }
111        logger.debug("cleaning...");
112        // clean up tables
113        sqlScript(connection, resourceName);
114        logger.debug("closing writes...");
115        source.closeWriting();
116
117        // some driver can drop database by a magic 'stop' URL
118        source = newRiverSource()
119                .setUrl(stopurl)
120                .setUser(user)
121                .setPassword(password)
122                .setLocale(Locale.getDefault())
123                .setTimeZone(TimeZone.getDefault());
124        try {
125            logger.info("connecting to stop URL...");
126            // activate stop URL
127            source.getConnectionForWriting();
128        } catch (Exception e) {
129            // exception is expected, ignore
130        }
131        // close open write connection
132        source.closeWriting();
133        logger.info("stopped");
134
135        // delete test index
136        try {
137            client("1").admin().indices().delete(new DeleteIndexRequest(index)).actionGet();
138            logger.info("index {} deleted", index);
139        } catch (IndexMissingException e) {
140            logger.warn(e.getMessage());
141        }
142        try {
143            client("1").admin().indices().deleteMapping(new DeleteMappingRequest()
144                    .indices(new String[]{"_river"}).types("my_jdbc_river")).actionGet();
145            logger.info("river my_jdbc_river deleted");
146        } catch (Exception e) {
147            logger.warn(e.getMessage());
148        }
149        stopNodes();
150    }
151
152    protected void performRiver(String resource) throws Exception {
153        createRiver(resource);
154        waitForRiver();
155        waitForActiveRiver();
156        waitForInactiveRiver();
157    }
158
159    protected void createRiver(String resource) throws Exception {
160        waitForYellow("1");
161        byte[] b = Streams.copyToByteArray(getClass().getResourceAsStream(resource));
162        Map<String, Object> map = XContentHelper.convertToMap(b, false).v2();
163        XContentBuilder builder = jsonBuilder().map(map);
164        logger.info("river = {}", builder.string());
165        IndexRequest indexRequest = Requests.indexRequest("_river").type("my_jdbc_river").id("_meta")
166                .source(builder.string());
167        client("1").index(indexRequest).actionGet();
168        client("1").admin().indices().prepareRefresh("_river").execute().actionGet();
169        logger.info("river is created");
170    }
171
172    public void waitForRiver() throws Exception {
173        waitForRiver(client("1"), "my_jdbc_river", "jdbc", SECONDS_TO_WAIT);
174        logger.info("river is up");
175    }
176    public void waitForActiveRiver() throws Exception {
177        waitForActiveRiver(client("1"), "my_jdbc_river", "jdbc", SECONDS_TO_WAIT);
178        logger.info("river is active");
179    }
180
181    public void waitForInactiveRiver() throws Exception {
182        waitForInactiveRiver(client("1"), "my_jdbc_river", "jdbc", SECONDS_TO_WAIT);
183        logger.info("river is inactive");
184    }
185
186    /*protected RiverSettings riverSettings(String resource)
187            throws IOException {
188        InputStream in = getClass().getResourceAsStream(resource);
189        return new RiverSettings(ImmutableSettings.settingsBuilder().build(),
190                XContentHelper.convertToMap(Streams.copyToByteArray(in), false).v2());
191    }*/
192
193    protected void createRandomProducts(String sql, int size)
194            throws SQLException {
195        Connection connection = source.getConnectionForWriting();
196        for (int i = 0; i < size; i++) {
197            long amount = Math.round(Math.random() * 1000);
198            double price = (Math.random() * 10000) / 100.00;
199            add(connection, sql, UUID.randomUUID().toString().substring(0, 32), amount, price);
200        }
201        if (!connection.getAutoCommit()) {
202            connection.commit();
203        }
204        source.closeWriting();
205    }
206
207    protected void createTimestampedLogs(String sql, int size, String locale, String timezone)
208            throws SQLException {
209        Connection connection = source.getConnectionForWriting();
210        Locale l = LocaleUtil.toLocale(locale);
211        TimeZone t = TimeZone.getTimeZone(timezone);
212        source.setTimeZone(t).setLocale(l);
213        Calendar cal = Calendar.getInstance(t, l);
214        // half of log in the past, half of it in the future
215        cal.add(Calendar.HOUR, -(size/2));
216        for (int i = 0; i < size; i++) {
217            Timestamp modified = new Timestamp(cal.getTimeInMillis());
218            String message = "Hello world";
219            add(connection, sql, modified, message);
220            cal.add(Calendar.HOUR, 1);
221        }
222        if (!connection.getAutoCommit()) {
223            connection.commit();
224        }
225        source.closeWriting();
226    }
227
228    private void add(Connection connection, String sql, final String name, final long amount, final double price)
229            throws SQLException {
230        PreparedStatement stmt = connection.prepareStatement(sql);
231        List<Object> params = new ArrayList<Object>() {{
232            add(name);
233            add(amount);
234            add(price);
235        }};
236        source.bind(stmt, params);
237        stmt.execute();
238    }
239
240    private void add(Connection connection, String sql, final Timestamp ts, final String message)
241            throws SQLException {
242        PreparedStatement stmt = connection.prepareStatement(sql);
243        List<Object> params = new ArrayList<Object>() {{
244            add(ts);
245            add(message);
246        }};
247        source.bind(stmt, params);
248        stmt.execute();
249    }
250
251    protected void createRandomProductsJob(String sql, int size)
252            throws SQLException {
253        Connection connection = source.getConnectionForWriting();
254        for (int i = 0; i < size; i++) {
255            long amount = Math.round(Math.random() * 1000);
256            double price = (Math.random() * 10000) / 100.00;
257            add(connection, sql, 1L, UUID.randomUUID().toString().substring(0, 32), amount, price);
258        }
259        if (!connection.getAutoCommit()) {
260            connection.commit();
261        }
262        source.closeWriting();
263    }
264
265    private void add(Connection connection, String sql, final long job, final String name, final long amount, final double price)
266            throws SQLException {
267        PreparedStatement stmt = connection.prepareStatement(sql);
268        List<Object> params = new ArrayList<Object>() {
269            {
270                add(job);
271                add(name);
272                add(amount);
273                add(price);
274            }
275        };
276        source.bind(stmt, params);
277        stmt.execute();
278    }
279
280    private void sqlScript(Connection connection, String resourceName) throws Exception {
281        InputStream in = getClass().getResourceAsStream(resourceName);
282        BufferedReader br = new BufferedReader(new InputStreamReader(in, "UTF-8"));
283        String sql;
284        while ((sql = br.readLine()) != null) {
285
286            try {
287                logger.trace("executing {}", sql);
288                Statement p = connection.createStatement();
289                p.execute(sql);
290                p.close();
291            } catch (SQLException e) {
292                // ignore
293                logger.error(sql + " failed. Reason: " + e.getMessage());
294            } finally {
295                connection.commit();
296            }
297        }
298        br.close();
299    }
300
301    public static RiverState waitForRiver(Client client, String riverName, String riverType, int seconds)
302            throws InterruptedException, IOException {
303        GetRiverStateRequest riverStateRequest = new GetRiverStateRequest()
304                .setRiverName(riverName)
305                .setRiverType(riverType);
306        GetRiverStateResponse riverStateResponse = client.admin().cluster()
307                .execute(GetRiverStateAction.INSTANCE, riverStateRequest).actionGet();
308        logger.debug("waitForRiver {}/{}", riverName, riverType);
309        seconds = 2 * seconds;
310        while (seconds-- > 0 && riverStateResponse.exists(riverName, riverType)) {
311            Thread.sleep(500L);
312            try {
313                riverStateResponse = client.admin().cluster().execute(GetRiverStateAction.INSTANCE, riverStateRequest).actionGet();
314                logger.debug("waitForRiver state={}", riverStateResponse.getRiverState());
315            } catch (IndexMissingException e) {
316                logger.warn("index missing");
317            }
318        }
319        if (seconds < 0) {
320            throw new IOException("timeout waiting for river");
321        }
322        return riverStateResponse.getRiverState();
323    }
324
325    public static RiverState waitForActiveRiver(Client client, String riverName, String riverType, int seconds) throws InterruptedException, IOException {
326        long now = System.currentTimeMillis();
327        GetRiverStateRequest riverStateRequest = new GetRiverStateRequest()
328                .setRiverName(riverName)
329                .setRiverType(riverType);
330        GetRiverStateResponse riverStateResponse = client.admin().cluster()
331                .execute(GetRiverStateAction.INSTANCE, riverStateRequest).actionGet();
332        RiverState riverState = riverStateResponse.getRiverState();
333        long t0 = riverState != null ? riverState.getLastActiveBegin().getMillis() : 0L;
334        logger.debug("waitForActiveRiver: now={} t0={} t0<now={} state={}", now, t0, t0 < now, riverState);
335        seconds = 2 * seconds;
336        while (seconds-- > 0 && t0 == 0 && t0 < now) {
337            Thread.sleep(500L);
338            try {
339                riverStateResponse = client.admin().cluster().execute(GetRiverStateAction.INSTANCE, riverStateRequest).actionGet();
340                riverState = riverStateResponse.getRiverState();
341                t0 = riverState != null ? riverState.getLastActiveBegin().getMillis() : 0L;
342            } catch (IndexMissingException e) {
343                logger.warn("index missing");
344            }
345            logger.debug("waitForActiveRiver: now={} t0={} t0<now={} state={}", now, t0, t0 < now, riverState);
346        }
347        if (seconds < 0) {
348            throw new IOException("timeout waiting for active river");
349        }
350        return riverState;
351    }
352
353    public static RiverState waitForInactiveRiver(Client client, String riverName, String riverType, int seconds) throws InterruptedException, IOException {
354        long now = System.currentTimeMillis();
355        GetRiverStateRequest riverStateRequest = new GetRiverStateRequest()
356                .setRiverName(riverName)
357                .setRiverType(riverType);
358        GetRiverStateResponse riverStateResponse = client.admin().cluster()
359                .execute(GetRiverStateAction.INSTANCE, riverStateRequest).actionGet();
360        RiverState riverState = riverStateResponse.getRiverState();
361        long t0 = riverState != null ? riverState.getLastActiveBegin().getMillis() : 0L;
362        long t1 = riverState != null ? riverState.getLastActiveEnd().getMillis() : 0L;
363        logger.debug("waitForInactiveRiver: now={} t0<now={} t1-t0<=0={} state={}", now, t0 < now, t1 - t0 <= 0L, riverState);
364        seconds = 2 * seconds;
365        while (seconds-- > 0 && t0 < now && t1 - t0 <= 0L) {
366            Thread.sleep(500L);
367            try {
368                riverStateResponse = client.admin().cluster().execute(GetRiverStateAction.INSTANCE, riverStateRequest).actionGet();
369                riverState = riverStateResponse.getRiverState();
370                t0 = riverState != null ? riverState.getLastActiveBegin().getMillis() : 0L;
371                t1 = riverState != null ? riverState.getLastActiveEnd().getMillis() : 0L;
372            } catch (IndexMissingException e) {
373                logger.warn("index missing");
374            }
375            logger.debug("waitForInactiveRiver: now={} t0<now={} t1-t0<=0={} state={}", now, t0 < now, t1 - t0 <= 0L, riverState);
376        }
377        if (seconds < 0) {
378            throw new IOException("timeout waiting for inactive river");
379        }
380        return riverState;
381    }
382}