001package org.xbib.elasticsearch.support.helper;
002
003import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
004import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
005import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
006import org.elasticsearch.client.Client;
007import org.elasticsearch.common.logging.ESLogger;
008import org.elasticsearch.common.logging.ESLoggerFactory;
009import org.elasticsearch.common.network.NetworkUtils;
010import org.elasticsearch.common.settings.ImmutableSettings;
011import org.elasticsearch.common.settings.Settings;
012import org.elasticsearch.common.transport.InetSocketTransportAddress;
013import org.elasticsearch.common.transport.LocalTransportAddress;
014import org.elasticsearch.index.query.QueryBuilder;
015import org.elasticsearch.index.query.QueryBuilders;
016import org.elasticsearch.node.Node;
017import org.elasticsearch.search.SearchHit;
018import org.elasticsearch.search.SearchHits;
019import org.elasticsearch.search.sort.SortBuilder;
020import org.elasticsearch.search.sort.SortBuilders;
021import org.elasticsearch.search.sort.SortOrder;
022import org.testng.Assert;
023
024import java.io.IOException;
025import java.util.Map;
026import java.util.concurrent.atomic.AtomicInteger;
027
028import static org.elasticsearch.common.collect.Maps.newHashMap;
029import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
030import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
031
032public abstract class AbstractNodeTestHelper extends Assert {
033
034    protected final static ESLogger logger = ESLoggerFactory.getLogger("test");
035
036    private final static AtomicInteger clusterCount = new AtomicInteger();
037
038    protected String cluster;
039
040    // note, this must be same name as in json river specs
041    protected final String index = "my_jdbc_river_index";
042
043    protected final String type = "my_jdbc_river_type";
044
045    private Map<String, Node> nodes = newHashMap();
046
047    private Map<String, Client> clients = newHashMap();
048
049    protected void setClusterName() {
050        this.cluster = "test-jdbc-cluster-" + NetworkUtils.getLocalAddress().getHostName() + "-" + clusterCount.incrementAndGet();
051    }
052
053    protected String getClusterName() {
054        return cluster;
055    }
056
057    protected Settings getNodeSettings() {
058        return ImmutableSettings
059                .settingsBuilder()
060                .put("cluster.name", getClusterName())
061                .put("index.number_of_shards", 1)
062                .put("index.number_of_replica", 0)
063                .put("cluster.routing.schedule", "50ms")
064                .put("gateway.type", "none")
065                .put("index.store.type", "ram")
066                .put("http.enabled", false)
067                .put("discovery.zen.multicast.enabled", false)
068                .build();
069    }
070
071    public void startNodes() throws Exception {
072        setClusterName();
073
074        // we need more than one node, for better resilience of the river state actions
075        startNode("1");
076        startNode("2");
077
078        // find node address
079        NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
080        NodesInfoResponse response = client("1").admin().cluster().nodesInfo(nodesInfoRequest).actionGet();
081        Object obj = response.iterator().next().getTransport().getAddress().publishAddress();
082        if (obj instanceof InetSocketTransportAddress) {
083            InetSocketTransportAddress address = (InetSocketTransportAddress) obj;
084            // ... do we need our transport address?
085        }
086        if (obj instanceof LocalTransportAddress) {
087            LocalTransportAddress address = (LocalTransportAddress) obj;
088            // .... do we need local transport?
089        }
090    }
091
092    public Node startNode(String id) {
093        return buildNode(id).start();
094    }
095
096    public Node buildNode(String id) {
097        String settingsSource = getClass().getName().replace('.', '/') + ".yml";
098        Settings finalSettings = settingsBuilder()
099                .loadFromClasspath(settingsSource)
100                .put(getNodeSettings())
101                .put("name", id)
102                .build();
103        Node node = nodeBuilder().local(true).settings(finalSettings).build();
104        Client client = node.client();
105        nodes.put(id, node);
106        clients.put(id, client);
107        return node;
108    }
109
110    public void waitForYellow(String id) throws IOException {
111        logger.info("wait for healthy cluster...");
112        ClusterHealthResponse clusterHealthResponse = client(id).admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet();
113        if (clusterHealthResponse.isTimedOut()) {
114            throw new IOException("error, cluster health is " + clusterHealthResponse.getStatus().name());
115        }
116        logger.info("cluster health is {}", clusterHealthResponse.getStatus().name());
117    }
118
119    public void assertHits(String id, int expectedHits) {
120        client(id).admin().indices().prepareRefresh(index).execute().actionGet();
121        long hitsFound = client(id).prepareSearch(index).setTypes(type).execute().actionGet().getHits().getTotalHits();
122        logger.info("{}/{} = {} hits", index, type, hitsFound);
123        assertEquals(hitsFound, expectedHits);
124    }
125
126    public void assertTimestampSort(String id, int expectedHits) {
127        client(id).admin().indices().prepareRefresh(index).execute().actionGet();
128        QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
129        SortBuilder sortBuilder = SortBuilders.fieldSort("_timestamp").order(SortOrder.DESC);
130        SearchHits hits = client(id).prepareSearch(index).setTypes(type)
131                .setQuery(queryBuilder)
132                .addSort(sortBuilder)
133                .addFields("_source", "_timestamp")
134                .setSize(expectedHits)
135                .execute().actionGet().getHits();
136        Long prev = Long.MAX_VALUE;
137        for (SearchHit hit : hits) {
138            if (hit.getFields().get("_timestamp") == null) {
139                logger.warn("type mapping was not correctly applied for _timestamp field");
140            }
141            Long curr = hit.getFields().get("_timestamp").getValue();
142            logger.info("timestamp = {}", curr);
143            assertTrue(curr <= prev);
144            prev = curr;
145        }
146        logger.info("{}/{} = {} hits", index, type, hits.getTotalHits());
147        assertEquals(hits.getTotalHits(), expectedHits);
148    }
149
150    public Client client(String id) {
151        Client client = clients.get(id);
152        if (client == null) {
153            client = nodes.get(id).client();
154            clients.put(id, client);
155        }
156        return client;
157    }
158
159    public void stopNodes() {
160        for (Client client : clients.values()) {
161            client.close();
162        }
163        clients.clear();
164        for (Node node : nodes.values()) {
165            node.stop();
166            node.close();
167        }
168        nodes.clear();
169    }
170
171}