001/*
002 * Copyright (C) 2014 Jörg Prante
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.xbib.elasticsearch.plugin.jdbc.client;
017
018import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
019import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
020import org.elasticsearch.client.Client;
021import org.elasticsearch.client.transport.TransportClient;
022import org.elasticsearch.cluster.node.DiscoveryNode;
023import org.elasticsearch.cluster.node.DiscoveryNodes;
024import org.elasticsearch.common.logging.ESLogger;
025import org.elasticsearch.common.logging.ESLoggerFactory;
026import org.elasticsearch.common.settings.ImmutableSettings;
027import org.elasticsearch.common.settings.Settings;
028import org.elasticsearch.common.transport.InetSocketTransportAddress;
029import org.elasticsearch.common.unit.TimeValue;
030
031import java.io.IOException;
032import java.io.InputStream;
033import java.net.Inet4Address;
034import java.net.Inet6Address;
035import java.net.InetAddress;
036import java.net.NetworkInterface;
037import java.net.SocketException;
038import java.net.UnknownHostException;
039import java.util.Collections;
040import java.util.Enumeration;
041import java.util.List;
042import java.util.Map;
043import java.util.Set;
044
045import static org.elasticsearch.common.collect.Sets.newHashSet;
046
047public abstract class BaseTransportClient {
048
049    private final static ESLogger logger = ESLoggerFactory.getLogger(BaseTransportClient.class.getSimpleName());
050
051    private final Set<InetSocketTransportAddress> addresses = newHashSet();
052
053    protected TransportClient client;
054
055    protected ConfigHelper configHelper = new ConfigHelper();
056
057    private boolean isShutdown;
058
059    protected void createClient(Settings settings) {
060        if (client != null) {
061            logger.warn("client is open, closing...");
062            client.close();
063            client.threadPool().shutdown();
064            logger.warn("client is closed");
065            client = null;
066        }
067        if (settings != null) {
068            logger.info("creating transport client, java version {}, effective settings {}",
069                    System.getProperty("java.version"), settings.getAsMap());
070            // false = do not load config settings from environment
071            this.client = new TransportClient(settings, false);
072            logger.info("transport client settings = {}", client.settings().getAsMap());
073        } else {
074            logger.info("creating transport client, java version {}, using default settings",
075                    System.getProperty("java.version"));
076            this.client = new TransportClient();
077        }
078        try {
079            connect(settings);
080        } catch (UnknownHostException e) {
081            logger.error(e.getMessage(), e);
082        } catch (SocketException e) {
083            logger.error(e.getMessage(), e);
084        } catch (IOException e) {
085            logger.error(e.getMessage(), e);
086        }
087    }
088
089    public Client client() {
090        return client;
091    }
092
093    public List<String> getConnectedNodes() {
094        return ClientHelper.getConnectedNodes(client);
095    }
096
097    public synchronized void shutdown() {
098        if (client != null) {
099            logger.debug("shutdown started");
100            client.close();
101            client.threadPool().shutdown();
102            client = null;
103            logger.debug("shutdown complete");
104        }
105        addresses.clear();
106        isShutdown = true;
107    }
108
109    public boolean isShutdown() {
110        return isShutdown;
111    }
112
113    protected Settings findSettings() {
114        ImmutableSettings.Builder settingsBuilder = ImmutableSettings.settingsBuilder();
115        settingsBuilder.put("host", "localhost");
116        try {
117            String hostname = InetAddress.getLocalHost().getHostName();
118            logger.debug("the hostname is {}", hostname);
119            settingsBuilder.put("host", hostname)
120                    .put("port", 9300);
121        } catch (UnknownHostException e) {
122            logger.warn("can't resolve host name, probably something wrong with network config: " + e.getMessage(), e);
123        } catch (Exception e) {
124            logger.warn(e.getMessage(), e);
125        }
126        return settingsBuilder.build();
127    }
128
129    protected void connect(Settings settings) throws IOException {
130        String hostname = settings.get("host");
131        int port = settings.getAsInt("port", 9300);
132        switch (hostname) {
133            case "hostname": {
134                InetSocketTransportAddress address = new InetSocketTransportAddress(InetAddress.getLocalHost().getHostName(), port);
135                if (!addresses.contains(address)) {
136                    logger.info("adding hostname address for transport client: {}", address);
137                    client.addTransportAddress(address);
138                    addresses.add(address);
139                }
140                break;
141            }
142            case "interfaces": {
143                Enumeration<NetworkInterface> nets = NetworkInterface.getNetworkInterfaces();
144                for (NetworkInterface netint : Collections.list(nets)) {
145                    logger.info("checking network interface = {}", netint.getName());
146                    Enumeration<InetAddress> inetAddresses = netint.getInetAddresses();
147                    for (InetAddress addr : Collections.list(inetAddresses)) {
148                        logger.info("checking address = {}", addr.getHostAddress());
149                        InetSocketTransportAddress address = new InetSocketTransportAddress(addr, port);
150                        if (!addresses.contains(address)) {
151                            logger.info("adding address to transport client: {}", address);
152                            client.addTransportAddress(address);
153                            addresses.add(address);
154                        }
155                    }
156                }
157                break;
158            }
159            case "inet4": {
160                Enumeration<NetworkInterface> nets = NetworkInterface.getNetworkInterfaces();
161                for (NetworkInterface netint : Collections.list(nets)) {
162                    logger.info("checking network interface = {}", netint.getName());
163                    Enumeration<InetAddress> inetAddresses = netint.getInetAddresses();
164                    for (InetAddress addr : Collections.list(inetAddresses)) {
165                        if (addr instanceof Inet4Address) {
166                            logger.info("checking address = {}", addr.getHostAddress());
167                            InetSocketTransportAddress address = new InetSocketTransportAddress(addr, port);
168                            if (!addresses.contains(address)) {
169                                logger.info("adding address for transport client: {}", address);
170                                client.addTransportAddress(address);
171                                addresses.add(address);
172                            }
173                        }
174                    }
175                }
176                break;
177            }
178            case "inet6": {
179                Enumeration<NetworkInterface> nets = NetworkInterface.getNetworkInterfaces();
180                for (NetworkInterface netint : Collections.list(nets)) {
181                    logger.info("checking network interface = {}", netint.getName());
182                    Enumeration<InetAddress> inetAddresses = netint.getInetAddresses();
183                    for (InetAddress addr : Collections.list(inetAddresses)) {
184                        if (addr instanceof Inet6Address) {
185                            logger.info("checking address = {}", addr.getHostAddress());
186                            InetSocketTransportAddress address = new InetSocketTransportAddress(addr, port);
187                            if (!addresses.contains(address)) {
188                                logger.info("adding address for transport client: {}", address);
189                                client.addTransportAddress(address);
190                                addresses.add(address);
191                            }
192                        }
193                    }
194                }
195                break;
196            }
197            default: {
198                InetSocketTransportAddress address = new InetSocketTransportAddress(hostname, port);
199                if (!addresses.contains(address)) {
200                    logger.info("adding custom address for transport client: {}", address);
201                    client.addTransportAddress(address);
202                    addresses.add(address);
203                }
204                break;
205            }
206        }
207        // 10 seconds is used because it is longer than 5 seconds
208        long timeout = settings.getAsTime("timeout", settings.getAsTime("client.transport.ping_timeout",
209                TimeValue.timeValueSeconds(10))).millis();
210        logger.info("configured addresses to connect = {}, waiting for {} to connect ...", addresses,
211                TimeValue.timeValueMillis(timeout).format());
212        if (client.connectedNodes() != null) {
213            List<DiscoveryNode> nodes = client.connectedNodes().asList();
214            logger.info("connected nodes = {}", nodes);
215            for (DiscoveryNode node : nodes) {
216                logger.info("new connection to {}", node);
217            }
218            if (!nodes.isEmpty()) {
219                if (settings.get("sniff") != null || settings.get("es.sniff") != null || settings.get("client.transport.sniff") != null) {
220                    try {
221                        connectMore();
222                    } catch (Exception e) {
223                        logger.error("error while connecting to more nodes", e);
224                    }
225                }
226            }
227        }
228    }
229
230    public ImmutableSettings.Builder getSettingsBuilder() {
231        return configHelper.settingsBuilder();
232    }
233
234    public void resetSettings() {
235        configHelper.reset();
236    }
237
238    public void setting(InputStream in) throws IOException {
239        configHelper.setting(in);
240    }
241
242    public void addSetting(String key, String value) {
243        configHelper.setting(key, value);
244    }
245
246    public void addSetting(String key, Boolean value) {
247        configHelper.setting(key, value);
248    }
249
250    public void addSetting(String key, Integer value) {
251        configHelper.setting(key, value);
252    }
253
254    public void setSettings(Settings settings) {
255        configHelper.settings(settings);
256    }
257
258    public Settings getSettings() {
259        return configHelper.settings();
260    }
261
262    public void mapping(String type, String mapping) throws IOException {
263        configHelper.mapping(type, mapping);
264    }
265
266    public void mapping(String type, InputStream in) throws IOException {
267        configHelper.mapping(type, in);
268    }
269
270    public Map<String, String> getMappings() {
271        return configHelper.mappings();
272    }
273
274    private void connectMore() throws IOException {
275        logger.debug("trying to discover more nodes...");
276        ClusterStateResponse clusterStateResponse = client.admin().cluster().state(new ClusterStateRequest()).actionGet();
277        DiscoveryNodes nodes = clusterStateResponse.getState().getNodes();
278        for (DiscoveryNode node : nodes) {
279            logger.debug("adding discovered node {}", node);
280            try {
281                client.addTransportAddress(node.address());
282            } catch (Exception e) {
283                logger.warn("can't add node " + node, e);
284            }
285        }
286        logger.debug("... discovery done");
287    }
288
289}