001/* 002 * Copyright (C) 2014 Jörg Prante 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.xbib.elasticsearch.plugin.jdbc.client; 017 018import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; 019import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; 020import org.elasticsearch.client.Client; 021import org.elasticsearch.client.transport.TransportClient; 022import org.elasticsearch.cluster.node.DiscoveryNode; 023import org.elasticsearch.cluster.node.DiscoveryNodes; 024import org.elasticsearch.common.logging.ESLogger; 025import org.elasticsearch.common.logging.ESLoggerFactory; 026import org.elasticsearch.common.settings.ImmutableSettings; 027import org.elasticsearch.common.settings.Settings; 028import org.elasticsearch.common.transport.InetSocketTransportAddress; 029import org.elasticsearch.common.unit.TimeValue; 030 031import java.io.IOException; 032import java.io.InputStream; 033import java.net.Inet4Address; 034import java.net.Inet6Address; 035import java.net.InetAddress; 036import java.net.NetworkInterface; 037import java.net.SocketException; 038import java.net.UnknownHostException; 039import java.util.Collections; 040import java.util.Enumeration; 041import java.util.List; 042import java.util.Map; 043import java.util.Set; 044 045import static org.elasticsearch.common.collect.Sets.newHashSet; 046 047public abstract class BaseTransportClient { 048 049 private final static ESLogger logger = ESLoggerFactory.getLogger(BaseTransportClient.class.getSimpleName()); 050 051 private final Set<InetSocketTransportAddress> addresses = newHashSet(); 052 053 protected TransportClient client; 054 055 protected ConfigHelper configHelper = new ConfigHelper(); 056 057 private boolean isShutdown; 058 059 protected void createClient(Settings settings) { 060 if (client != null) { 061 logger.warn("client is open, closing..."); 062 client.close(); 063 client.threadPool().shutdown(); 064 logger.warn("client is closed"); 065 client = null; 066 } 067 if (settings != null) { 068 logger.info("creating transport client, java version {}, effective settings {}", 069 System.getProperty("java.version"), settings.getAsMap()); 070 // false = do not load config settings from environment 071 this.client = new TransportClient(settings, false); 072 logger.info("transport client settings = {}", client.settings().getAsMap()); 073 } else { 074 logger.info("creating transport client, java version {}, using default settings", 075 System.getProperty("java.version")); 076 this.client = new TransportClient(); 077 } 078 try { 079 connect(settings); 080 } catch (UnknownHostException e) { 081 logger.error(e.getMessage(), e); 082 } catch (SocketException e) { 083 logger.error(e.getMessage(), e); 084 } catch (IOException e) { 085 logger.error(e.getMessage(), e); 086 } 087 } 088 089 public Client client() { 090 return client; 091 } 092 093 public List<String> getConnectedNodes() { 094 return ClientHelper.getConnectedNodes(client); 095 } 096 097 public synchronized void shutdown() { 098 if (client != null) { 099 logger.debug("shutdown started"); 100 client.close(); 101 client.threadPool().shutdown(); 102 client = null; 103 logger.debug("shutdown complete"); 104 } 105 addresses.clear(); 106 isShutdown = true; 107 } 108 109 public boolean isShutdown() { 110 return isShutdown; 111 } 112 113 protected Settings findSettings() { 114 ImmutableSettings.Builder settingsBuilder = ImmutableSettings.settingsBuilder(); 115 settingsBuilder.put("host", "localhost"); 116 try { 117 String hostname = InetAddress.getLocalHost().getHostName(); 118 logger.debug("the hostname is {}", hostname); 119 settingsBuilder.put("host", hostname) 120 .put("port", 9300); 121 } catch (UnknownHostException e) { 122 logger.warn("can't resolve host name, probably something wrong with network config: " + e.getMessage(), e); 123 } catch (Exception e) { 124 logger.warn(e.getMessage(), e); 125 } 126 return settingsBuilder.build(); 127 } 128 129 protected void connect(Settings settings) throws IOException { 130 String hostname = settings.get("host"); 131 int port = settings.getAsInt("port", 9300); 132 switch (hostname) { 133 case "hostname": { 134 InetSocketTransportAddress address = new InetSocketTransportAddress(InetAddress.getLocalHost().getHostName(), port); 135 if (!addresses.contains(address)) { 136 logger.info("adding hostname address for transport client: {}", address); 137 client.addTransportAddress(address); 138 addresses.add(address); 139 } 140 break; 141 } 142 case "interfaces": { 143 Enumeration<NetworkInterface> nets = NetworkInterface.getNetworkInterfaces(); 144 for (NetworkInterface netint : Collections.list(nets)) { 145 logger.info("checking network interface = {}", netint.getName()); 146 Enumeration<InetAddress> inetAddresses = netint.getInetAddresses(); 147 for (InetAddress addr : Collections.list(inetAddresses)) { 148 logger.info("checking address = {}", addr.getHostAddress()); 149 InetSocketTransportAddress address = new InetSocketTransportAddress(addr, port); 150 if (!addresses.contains(address)) { 151 logger.info("adding address to transport client: {}", address); 152 client.addTransportAddress(address); 153 addresses.add(address); 154 } 155 } 156 } 157 break; 158 } 159 case "inet4": { 160 Enumeration<NetworkInterface> nets = NetworkInterface.getNetworkInterfaces(); 161 for (NetworkInterface netint : Collections.list(nets)) { 162 logger.info("checking network interface = {}", netint.getName()); 163 Enumeration<InetAddress> inetAddresses = netint.getInetAddresses(); 164 for (InetAddress addr : Collections.list(inetAddresses)) { 165 if (addr instanceof Inet4Address) { 166 logger.info("checking address = {}", addr.getHostAddress()); 167 InetSocketTransportAddress address = new InetSocketTransportAddress(addr, port); 168 if (!addresses.contains(address)) { 169 logger.info("adding address for transport client: {}", address); 170 client.addTransportAddress(address); 171 addresses.add(address); 172 } 173 } 174 } 175 } 176 break; 177 } 178 case "inet6": { 179 Enumeration<NetworkInterface> nets = NetworkInterface.getNetworkInterfaces(); 180 for (NetworkInterface netint : Collections.list(nets)) { 181 logger.info("checking network interface = {}", netint.getName()); 182 Enumeration<InetAddress> inetAddresses = netint.getInetAddresses(); 183 for (InetAddress addr : Collections.list(inetAddresses)) { 184 if (addr instanceof Inet6Address) { 185 logger.info("checking address = {}", addr.getHostAddress()); 186 InetSocketTransportAddress address = new InetSocketTransportAddress(addr, port); 187 if (!addresses.contains(address)) { 188 logger.info("adding address for transport client: {}", address); 189 client.addTransportAddress(address); 190 addresses.add(address); 191 } 192 } 193 } 194 } 195 break; 196 } 197 default: { 198 InetSocketTransportAddress address = new InetSocketTransportAddress(hostname, port); 199 if (!addresses.contains(address)) { 200 logger.info("adding custom address for transport client: {}", address); 201 client.addTransportAddress(address); 202 addresses.add(address); 203 } 204 break; 205 } 206 } 207 // 10 seconds is used because it is longer than 5 seconds 208 long timeout = settings.getAsTime("timeout", settings.getAsTime("client.transport.ping_timeout", 209 TimeValue.timeValueSeconds(10))).millis(); 210 logger.info("configured addresses to connect = {}, waiting for {} to connect ...", addresses, 211 TimeValue.timeValueMillis(timeout).format()); 212 if (client.connectedNodes() != null) { 213 List<DiscoveryNode> nodes = client.connectedNodes().asList(); 214 logger.info("connected nodes = {}", nodes); 215 for (DiscoveryNode node : nodes) { 216 logger.info("new connection to {}", node); 217 } 218 if (!nodes.isEmpty()) { 219 if (settings.get("sniff") != null || settings.get("es.sniff") != null || settings.get("client.transport.sniff") != null) { 220 try { 221 connectMore(); 222 } catch (Exception e) { 223 logger.error("error while connecting to more nodes", e); 224 } 225 } 226 } 227 } 228 } 229 230 public ImmutableSettings.Builder getSettingsBuilder() { 231 return configHelper.settingsBuilder(); 232 } 233 234 public void resetSettings() { 235 configHelper.reset(); 236 } 237 238 public void setting(InputStream in) throws IOException { 239 configHelper.setting(in); 240 } 241 242 public void addSetting(String key, String value) { 243 configHelper.setting(key, value); 244 } 245 246 public void addSetting(String key, Boolean value) { 247 configHelper.setting(key, value); 248 } 249 250 public void addSetting(String key, Integer value) { 251 configHelper.setting(key, value); 252 } 253 254 public void setSettings(Settings settings) { 255 configHelper.settings(settings); 256 } 257 258 public Settings getSettings() { 259 return configHelper.settings(); 260 } 261 262 public void mapping(String type, String mapping) throws IOException { 263 configHelper.mapping(type, mapping); 264 } 265 266 public void mapping(String type, InputStream in) throws IOException { 267 configHelper.mapping(type, in); 268 } 269 270 public Map<String, String> getMappings() { 271 return configHelper.mappings(); 272 } 273 274 private void connectMore() throws IOException { 275 logger.debug("trying to discover more nodes..."); 276 ClusterStateResponse clusterStateResponse = client.admin().cluster().state(new ClusterStateRequest()).actionGet(); 277 DiscoveryNodes nodes = clusterStateResponse.getState().getNodes(); 278 for (DiscoveryNode node : nodes) { 279 logger.debug("adding discovered node {}", node); 280 try { 281 client.addTransportAddress(node.address()); 282 } catch (Exception e) { 283 logger.warn("can't add node " + node, e); 284 } 285 } 286 logger.debug("... discovery done"); 287 } 288 289}