001package org.xbib.elasticsearch.river.jdbc.strategy.column;
002
003import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
004import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
005import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingRequest;
006import org.elasticsearch.action.index.IndexRequest;
007import org.elasticsearch.client.Client;
008import org.elasticsearch.client.Requests;
009import org.elasticsearch.common.io.Streams;
010import org.elasticsearch.common.settings.ImmutableSettings;
011import org.elasticsearch.common.xcontent.XContentBuilder;
012import org.elasticsearch.common.xcontent.XContentHelper;
013import org.elasticsearch.indices.IndexAlreadyExistsException;
014import org.elasticsearch.indices.IndexMissingException;
015import org.elasticsearch.river.RiverSettings;
016import org.testng.annotations.AfterMethod;
017import org.testng.annotations.BeforeMethod;
018import org.testng.annotations.Optional;
019import org.testng.annotations.Parameters;
020import org.xbib.elasticsearch.action.plugin.jdbc.state.get.GetRiverStateAction;
021import org.xbib.elasticsearch.action.plugin.jdbc.state.get.GetRiverStateRequest;
022import org.xbib.elasticsearch.action.plugin.jdbc.state.get.GetRiverStateResponse;
023import org.xbib.elasticsearch.plugin.jdbc.state.RiverState;
024import org.xbib.elasticsearch.support.helper.AbstractNodeTestHelper;
025
026import java.io.BufferedReader;
027import java.io.IOException;
028import java.io.InputStream;
029import java.io.InputStreamReader;
030import java.sql.Connection;
031import java.sql.SQLException;
032import java.sql.Statement;
033import java.util.Locale;
034import java.util.Map;
035import java.util.TimeZone;
036
037import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
038
039public abstract class AbstractColumnRiverTest extends AbstractNodeTestHelper {
040
041    private final static int SECONDS_TO_WAIT = 15;
042
043    protected static ColumnRiverSource source;
044
045    protected static ColumnRiverContext context;
046
047    public abstract ColumnRiverSource newRiverSource();
048
049    public abstract ColumnRiverContext newRiverContext();
050
051    @BeforeMethod
052    @Parameters({"starturl", "user", "password", "create"})
053    public void beforeMethod(String starturl, String user, String password, @Optional String resourceName)
054            throws Exception {
055        startNodes();
056
057        logger.info("nodes started");
058
059        waitForYellow("1");
060        try {
061            // create river index
062            client("1").admin().indices().create(new CreateIndexRequest("_river")).actionGet();
063            logger.info("river index created");
064        } catch (IndexAlreadyExistsException e) {
065            logger.warn(e.getMessage());
066        }
067        source = newRiverSource();
068        source.setUrl(starturl)
069                .setUser(user)
070                .setPassword(password)
071                .setLocale(Locale.getDefault())
072                .setTimeZone(TimeZone.getDefault());
073        context = newRiverContext();
074        context.setRiverSource(source);
075        source.setRiverContext(context);
076        logger.info("create table {}", resourceName);
077        if (resourceName == null || "".equals(resourceName)) {
078            return;
079        }
080        Connection connection = source.getConnectionForWriting();
081        if (connection == null) {
082            throw new IOException("no connection");
083        }
084        sqlScript(connection, resourceName);
085        source.closeWriting();
086    }
087
088    @AfterMethod
089    @Parameters({"stopurl", "user", "password", "delete"})
090    public void afterMethod(String stopurl, String user, String password, @Optional String resourceName)
091            throws Exception {
092
093        logger.info("remove table {}", resourceName);
094        if (resourceName == null || "".equals(resourceName)) {
095            return;
096        }
097        // before dropping tables, open read connection must be closed to avoid hangs in mysql/postgresql
098        logger.debug("closing reads...");
099        source.closeReading();
100
101        logger.debug("connecting for close...");
102        Connection connection = source.getConnectionForWriting();
103        if (connection == null) {
104            throw new IOException("no connection");
105        }
106        logger.debug("cleaning...");
107        // clean up tables
108        sqlScript(connection, resourceName);
109        logger.debug("closing writes...");
110        source.closeWriting();
111
112        // some driver can drop database by a magic 'stop' URL
113        source = newRiverSource();
114        source.setUrl(stopurl)
115                .setUser(user)
116                .setPassword(password)
117                .setLocale(Locale.getDefault())
118                .setTimeZone(TimeZone.getDefault());
119        try {
120            logger.info("connecting to stop URL...");
121            // activate stop URL
122            source.getConnectionForWriting();
123        } catch (Exception e) {
124            // exception is expected, ignore
125        }
126        // close open write connection
127        source.closeWriting();
128        logger.info("stopped");
129
130        // delete test index
131        try {
132            client("1").admin().indices().delete(new DeleteIndexRequest(index)).actionGet();
133            logger.info("index {} deleted", index);
134        } catch (IndexMissingException e) {
135            logger.warn(e.getMessage());
136        }
137        try {
138            client("1").admin().indices().deleteMapping(new DeleteMappingRequest()
139                    .indices(new String[]{"_river"}).types("my_jdbc_river")).actionGet();
140            logger.info("river my_jdbc_river deleted");
141        } catch (Exception e) {
142            logger.warn(e.getMessage());
143        }
144        stopNodes();
145    }
146
147    protected void performRiver(String resource) throws Exception {
148        createRiver(resource);
149        waitForRiver();
150        waitForActiveRiver();
151        waitForInactiveRiver();
152    }
153
154    protected void createRiver(String resource) throws Exception {
155        waitForYellow("1");
156        byte[] b = Streams.copyToByteArray(getClass().getResourceAsStream(resource));
157        Map<String, Object> map = XContentHelper.convertToMap(b, false).v2();
158        XContentBuilder builder = jsonBuilder().map(map);
159        logger.info("river = {}", builder.string());
160        IndexRequest indexRequest = Requests.indexRequest("_river").type("my_jdbc_river").id("_meta")
161                .source(builder.string());
162        client("1").index(indexRequest).actionGet();
163        client("1").admin().indices().prepareRefresh("_river").execute().actionGet();
164        logger.info("river is created");
165    }
166
167    public void waitForRiver() throws Exception {
168        waitForRiver(client("1"), "my_jdbc_river", "jdbc", SECONDS_TO_WAIT);
169        logger.info("river is up");
170    }
171    public void waitForActiveRiver() throws Exception {
172        waitForActiveRiver(client("1"), "my_jdbc_river", "jdbc", SECONDS_TO_WAIT);
173        logger.info("river is active");
174    }
175
176    public void waitForInactiveRiver() throws Exception {
177        waitForInactiveRiver(client("1"), "my_jdbc_river", "jdbc", SECONDS_TO_WAIT);
178        logger.info("river is inactive");
179    }
180
181    protected RiverSettings riverSettings(String resource)
182            throws IOException {
183        InputStream in = getClass().getResourceAsStream(resource);
184        return new RiverSettings(ImmutableSettings.settingsBuilder().build(),
185                XContentHelper.convertToMap(Streams.copyToByteArray(in), false).v2());
186    }
187
188    private void sqlScript(Connection connection, String resourceName) throws Exception {
189        InputStream in = getClass().getResourceAsStream(resourceName);
190        BufferedReader br = new BufferedReader(new InputStreamReader(in, "UTF-8"));
191        String sql;
192        while ((sql = br.readLine()) != null) {
193
194            try {
195                logger.trace("executing {}", sql);
196                Statement p = connection.createStatement();
197                p.execute(sql);
198                p.close();
199            } catch (SQLException e) {
200                // ignore
201                logger.error(sql + " failed. Reason: " + e.getMessage());
202            } finally {
203                connection.commit();
204            }
205        }
206        br.close();
207    }
208
209    public static RiverState waitForRiver(Client client, String riverName, String riverType, int seconds)
210            throws InterruptedException, IOException {
211        GetRiverStateRequest riverStateRequest = new GetRiverStateRequest()
212                .setRiverName(riverName)
213                .setRiverType(riverType);
214        GetRiverStateResponse riverStateResponse = client.admin().cluster()
215                .execute(GetRiverStateAction.INSTANCE, riverStateRequest).actionGet();
216        logger.info("waitForRiver {}/{}", riverName, riverType);
217        while (seconds-- > 0 && riverStateResponse.exists(riverName, riverType)) {
218            Thread.sleep(1000L);
219            try {
220                riverStateResponse = client.admin().cluster().execute(GetRiverStateAction.INSTANCE, riverStateRequest).actionGet();
221                logger.info("waitForRiver state={}", riverStateResponse.getRiverState());
222            } catch (IndexMissingException e) {
223                logger.warn("index missing");
224            }
225        }
226        if (seconds < 0) {
227            throw new IOException("timeout waiting for river");
228        }
229        return riverStateResponse.getRiverState();
230    }
231
232    public static RiverState waitForActiveRiver(Client client, String riverName, String riverType, int seconds) throws InterruptedException, IOException {
233        long now = System.currentTimeMillis();
234        GetRiverStateRequest riverStateRequest = new GetRiverStateRequest()
235                .setRiverName(riverName)
236                .setRiverType(riverType);
237        GetRiverStateResponse riverStateResponse = client.admin().cluster()
238                .execute(GetRiverStateAction.INSTANCE, riverStateRequest).actionGet();
239        RiverState riverState = riverStateResponse.getRiverState();
240        long t0 = riverState != null ? riverState.getLastActiveBegin().getMillis() : 0L;
241        logger.info("waitForActiveRiver: now={} t0={} t0<now={} state={}",
242                now, t0, t0 < now, riverState);
243        while (seconds-- > 0 && t0 == 0 && t0 < now) {
244            Thread.sleep(1000L);
245            try {
246                riverStateResponse = client.admin().cluster().execute(GetRiverStateAction.INSTANCE, riverStateRequest).actionGet();
247                riverState = riverStateResponse.getRiverState();
248                t0 = riverState != null ? riverState.getLastActiveBegin().getMillis() : 0L;
249            } catch (IndexMissingException e) {
250                //
251            }
252            logger.info("waitForActiveRiver: now={} t0={} t0<now={} state={}",
253                    now, t0, t0 < now, riverState);
254        }
255        if (seconds < 0) {
256            throw new IOException("timeout waiting for active river");
257        }
258        return riverState;
259    }
260
261    public static RiverState waitForInactiveRiver(Client client, String riverName, String riverType, int seconds) throws InterruptedException, IOException {
262        long now = System.currentTimeMillis();
263        GetRiverStateRequest riverStateRequest = new GetRiverStateRequest()
264                .setRiverName(riverName)
265                .setRiverType(riverType);
266        GetRiverStateResponse riverStateResponse = client.admin().cluster()
267                .execute(GetRiverStateAction.INSTANCE, riverStateRequest).actionGet();
268        RiverState riverState = riverStateResponse.getRiverState();
269        long t0 = riverState != null ? riverState.getLastActiveBegin().getMillis() : 0L;
270        long t1 = riverState != null ? riverState.getLastActiveEnd().getMillis() : 0L;
271        logger.info("waitForInactiveRiver: now={} t0<now={} t1-t0<=0={} state={}",
272                now, t0 < now, t1 - t0 <= 0L, riverState);
273        while (seconds-- > 0 && t0 < now && t1 - t0 <= 0L) {
274            Thread.sleep(1000L);
275            try {
276                riverStateResponse = client.admin().cluster().execute(GetRiverStateAction.INSTANCE, riverStateRequest).actionGet();
277                riverState = riverStateResponse.getRiverState();
278                t0 = riverState != null ? riverState.getLastActiveBegin().getMillis() : 0L;
279                t1 = riverState != null ? riverState.getLastActiveEnd().getMillis() : 0L;
280            } catch (IndexMissingException e) {
281                //
282            }
283            logger.info("waitForInactiveRiver: now={} t0<now={} t1-t0<=0={} state={}",
284                    now, t0 < now, t1 - t0 <= 0L, riverState);
285        }
286        if (seconds < 0) {
287            throw new IOException("timeout waiting for inactive river");
288        }
289        return riverState;
290    }
291}