001package org.xbib.elasticsearch.support.helper; 002 003import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; 004import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; 005import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; 006import org.elasticsearch.client.Client; 007import org.elasticsearch.common.logging.ESLogger; 008import org.elasticsearch.common.logging.ESLoggerFactory; 009import org.elasticsearch.common.network.NetworkUtils; 010import org.elasticsearch.common.settings.ImmutableSettings; 011import org.elasticsearch.common.settings.Settings; 012import org.elasticsearch.common.transport.InetSocketTransportAddress; 013import org.elasticsearch.common.transport.LocalTransportAddress; 014import org.elasticsearch.index.query.QueryBuilder; 015import org.elasticsearch.index.query.QueryBuilders; 016import org.elasticsearch.node.Node; 017import org.elasticsearch.search.SearchHit; 018import org.elasticsearch.search.SearchHits; 019import org.elasticsearch.search.sort.SortBuilder; 020import org.elasticsearch.search.sort.SortBuilders; 021import org.elasticsearch.search.sort.SortOrder; 022import org.testng.Assert; 023 024import java.io.IOException; 025import java.util.Map; 026import java.util.concurrent.atomic.AtomicInteger; 027 028import static org.elasticsearch.common.collect.Maps.newHashMap; 029import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; 030import static org.elasticsearch.node.NodeBuilder.nodeBuilder; 031 032public abstract class AbstractNodeTestHelper extends Assert { 033 034 protected final static ESLogger logger = ESLoggerFactory.getLogger("test"); 035 036 private final static AtomicInteger clusterCount = new AtomicInteger(); 037 038 protected String cluster; 039 040 // note, this must be same name as in json river specs 041 protected final String index = "my_jdbc_river_index"; 042 043 protected final String type = "my_jdbc_river_type"; 044 045 private Map<String, Node> nodes = newHashMap(); 046 047 private Map<String, Client> clients = newHashMap(); 048 049 protected void setClusterName() { 050 this.cluster = "test-jdbc-cluster-" + NetworkUtils.getLocalAddress().getHostName() + "-" + clusterCount.incrementAndGet(); 051 } 052 053 protected String getClusterName() { 054 return cluster; 055 } 056 057 protected Settings getNodeSettings() { 058 return ImmutableSettings 059 .settingsBuilder() 060 .put("cluster.name", getClusterName()) 061 .put("index.number_of_shards", 1) 062 .put("index.number_of_replica", 0) 063 .put("cluster.routing.schedule", "50ms") 064 .put("gateway.type", "none") 065 .put("index.store.type", "ram") 066 .put("http.enabled", false) 067 .put("discovery.zen.multicast.enabled", false) 068 .build(); 069 } 070 071 public void startNodes() throws Exception { 072 setClusterName(); 073 074 // we need more than one node, for better resilience of the river state actions 075 startNode("1"); 076 startNode("2"); 077 078 // find node address 079 NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true); 080 NodesInfoResponse response = client("1").admin().cluster().nodesInfo(nodesInfoRequest).actionGet(); 081 Object obj = response.iterator().next().getTransport().getAddress().publishAddress(); 082 if (obj instanceof InetSocketTransportAddress) { 083 InetSocketTransportAddress address = (InetSocketTransportAddress) obj; 084 // ... do we need our transport address? 085 } 086 if (obj instanceof LocalTransportAddress) { 087 LocalTransportAddress address = (LocalTransportAddress) obj; 088 // .... do we need local transport? 089 } 090 } 091 092 public Node startNode(String id) { 093 return buildNode(id).start(); 094 } 095 096 public Node buildNode(String id) { 097 String settingsSource = getClass().getName().replace('.', '/') + ".yml"; 098 Settings finalSettings = settingsBuilder() 099 .loadFromClasspath(settingsSource) 100 .put(getNodeSettings()) 101 .put("name", id) 102 .build(); 103 Node node = nodeBuilder().local(true).settings(finalSettings).build(); 104 Client client = node.client(); 105 nodes.put(id, node); 106 clients.put(id, client); 107 return node; 108 } 109 110 public void waitForYellow(String id) throws IOException { 111 logger.info("wait for healthy cluster..."); 112 ClusterHealthResponse clusterHealthResponse = client(id).admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet(); 113 if (clusterHealthResponse.isTimedOut()) { 114 throw new IOException("error, cluster health is " + clusterHealthResponse.getStatus().name()); 115 } 116 logger.info("cluster health is {}", clusterHealthResponse.getStatus().name()); 117 } 118 119 public void assertHits(String id, int expectedHits) { 120 client(id).admin().indices().prepareRefresh(index).execute().actionGet(); 121 long hitsFound = client(id).prepareSearch(index).setTypes(type).execute().actionGet().getHits().getTotalHits(); 122 logger.info("{}/{} = {} hits", index, type, hitsFound); 123 assertEquals(hitsFound, expectedHits); 124 } 125 126 public void assertTimestampSort(String id, int expectedHits) { 127 client(id).admin().indices().prepareRefresh(index).execute().actionGet(); 128 QueryBuilder queryBuilder = QueryBuilders.matchAllQuery(); 129 SortBuilder sortBuilder = SortBuilders.fieldSort("_timestamp").order(SortOrder.DESC); 130 SearchHits hits = client(id).prepareSearch(index).setTypes(type) 131 .setQuery(queryBuilder) 132 .addSort(sortBuilder) 133 .addFields("_source", "_timestamp") 134 .setSize(expectedHits) 135 .execute().actionGet().getHits(); 136 Long prev = Long.MAX_VALUE; 137 for (SearchHit hit : hits) { 138 if (hit.getFields().get("_timestamp") == null) { 139 logger.warn("type mapping was not correctly applied for _timestamp field"); 140 } 141 Long curr = hit.getFields().get("_timestamp").getValue(); 142 logger.info("timestamp = {}", curr); 143 assertTrue(curr <= prev); 144 prev = curr; 145 } 146 logger.info("{}/{} = {} hits", index, type, hits.getTotalHits()); 147 assertEquals(hits.getTotalHits(), expectedHits); 148 } 149 150 public Client client(String id) { 151 Client client = clients.get(id); 152 if (client == null) { 153 client = nodes.get(id).client(); 154 clients.put(id, client); 155 } 156 return client; 157 } 158 159 public void stopNodes() { 160 for (Client client : clients.values()) { 161 client.close(); 162 } 163 clients.clear(); 164 for (Node node : nodes.values()) { 165 node.stop(); 166 node.close(); 167 } 168 nodes.clear(); 169 } 170 171}