001/*
002 * Copyright (C) 2014 Jörg Prante
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.xbib.elasticsearch.plugin.jdbc.client;
017
018import org.elasticsearch.ElasticsearchTimeoutException;
019import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
020import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
021import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
022import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
023import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
024import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
025import org.elasticsearch.client.Client;
026import org.elasticsearch.client.transport.NoNodeAvailableException;
027import org.elasticsearch.client.transport.TransportClient;
028import org.elasticsearch.cluster.node.DiscoveryNode;
029import org.elasticsearch.common.settings.ImmutableSettings;
030import org.elasticsearch.common.unit.TimeValue;
031
032import java.io.IOException;
033import java.util.List;
034
035import static org.elasticsearch.common.collect.Lists.newLinkedList;
036
037public class ClientHelper {
038
039    public static List<String> getConnectedNodes(TransportClient client) {
040        List<String> nodes = newLinkedList();
041        if (client.connectedNodes() != null) {
042            for (DiscoveryNode discoveryNode : client.connectedNodes()) {
043                nodes.add(discoveryNode.toString());
044            }
045        }
046        return nodes;
047    }
048
049    public static void updateIndexSetting(Client client, String index, String key, Object value) throws IOException {
050        if (client == null) {
051            throw new IOException("no client");
052        }
053        if (index == null) {
054            throw new IOException("no index name given");
055        }
056        if (key == null) {
057            throw new IOException("no key given");
058        }
059        if (value == null) {
060            throw new IOException("no value given");
061        }
062        ImmutableSettings.Builder settingsBuilder = ImmutableSettings.settingsBuilder();
063        settingsBuilder.put(key, value.toString());
064        UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(index)
065                .settings(settingsBuilder);
066        client.admin().indices().updateSettings(updateSettingsRequest).actionGet();
067    }
068
069    public static int waitForRecovery(Client client, String index) throws IOException {
070        if (index == null) {
071            throw new IOException("unable to waitfor recovery, index not set");
072        }
073        RecoveryResponse response = client.admin().indices().prepareRecoveries(index).execute().actionGet();
074        int shards = response.getTotalShards();
075        client.admin().cluster().prepareHealth(index).setWaitForActiveShards(shards).execute().actionGet();
076        return shards;
077    }
078
079    public static void waitForCluster(Client client, ClusterHealthStatus status, TimeValue timeout) throws IOException {
080        try {
081            ClusterHealthResponse healthResponse =
082                    client.admin().cluster().prepareHealth().setWaitForStatus(status).setTimeout(timeout).execute().actionGet();
083            if (healthResponse != null && healthResponse.isTimedOut()) {
084                throw new IOException("cluster state is " + healthResponse.getStatus().name()
085                        + " and not " + status.name()
086                        + ", cowardly refusing to continue with operations");
087            }
088        } catch (ElasticsearchTimeoutException e) {
089            throw new IOException("timeout, cluster does not respond to health request, cowardly refusing to continue with operations");
090        }
091    }
092
093    public static String clusterName(Client client) {
094        try {
095            ClusterStateRequestBuilder clusterStateRequestBuilder =
096                    new ClusterStateRequestBuilder(client.admin().cluster()).all();
097            ClusterStateResponse clusterStateResponse = clusterStateRequestBuilder.execute().actionGet();
098            String name = clusterStateResponse.getClusterName().value();
099            int nodeCount = clusterStateResponse.getState().getNodes().size();
100            return name + " (" + nodeCount + " nodes connected)";
101        } catch (ElasticsearchTimeoutException e) {
102            return "TIMEOUT";
103        } catch (NoNodeAvailableException e) {
104            return "DISCONNECTED";
105        } catch (Throwable t) {
106            return "[" + t.getMessage() + "]";
107        }
108    }
109
110    public static String healthColor(Client client) {
111        try {
112            ClusterHealthResponse healthResponse =
113                    client.admin().cluster().prepareHealth().setTimeout(TimeValue.timeValueSeconds(30)).execute().actionGet();
114            ClusterHealthStatus status = healthResponse.getStatus();
115            return status.name();
116        } catch (ElasticsearchTimeoutException e) {
117            return "TIMEOUT";
118        } catch (NoNodeAvailableException e) {
119            return "DISCONNECTED";
120        } catch (Throwable t) {
121            return "[" + t.getMessage() + "]";
122        }
123    }
124
125    public static int updateReplicaLevel(Client client, String index, int level) throws IOException {
126        waitForCluster(client, ClusterHealthStatus.YELLOW, TimeValue.timeValueSeconds(30));
127        updateIndexSetting(client, index, "number_of_replicas", level);
128        return waitForRecovery(client, index);
129    }
130
131    public static void disableRefresh(Client client, String index) throws IOException {
132        updateIndexSetting(client, index, "refresh_interval", -1);
133    }
134
135    public static void enableRefresh(Client client, String index) throws IOException {
136        updateIndexSetting(client, index, "refresh_interval", 1000);
137    }
138
139    public static void flush(Client client, String index) {
140        client.admin().indices().prepareFlush().setIndices(index).execute().actionGet();
141    }
142
143    public static void refresh(Client client, String index) {
144        client.admin().indices().prepareRefresh().setIndices(index).setForce(true).execute().actionGet();
145    }
146
147}