001/* 002 * Copyright (C) 2014 Jörg Prante 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.xbib.elasticsearch.plugin.jdbc.client; 017 018import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; 019import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; 020import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder; 021import org.elasticsearch.common.logging.ESLogger; 022import org.elasticsearch.common.logging.ESLoggerFactory; 023import org.elasticsearch.common.settings.Settings; 024import org.elasticsearch.common.unit.TimeValue; 025 026import java.io.IOException; 027import java.io.InputStream; 028import java.util.Map; 029 030public abstract class BaseIngestTransportClient extends BaseTransportClient 031 implements Ingest { 032 033 private final static ESLogger logger = ESLoggerFactory.getLogger(""); 034 035 public Ingest newClient(Settings settings) { 036 super.createClient(settings); 037 return this; 038 } 039 040 @Override 041 public BaseIngestTransportClient shards(int shards) { 042 super.addSetting("index.number_of_shards", shards); 043 return this; 044 } 045 046 @Override 047 public BaseIngestTransportClient replica(int replica) { 048 super.addSetting("index.number_of_replicas", replica); 049 return this; 050 } 051 052 @Override 053 public BaseIngestTransportClient newIndex(String index) { 054 return newIndex(index, null, null); 055 } 056 057 @Override 058 public BaseIngestTransportClient newIndex(String index, String type, InputStream settings, InputStream mappings) throws IOException { 059 configHelper.reset(); 060 configHelper.setting(settings); 061 configHelper.mapping(type, mappings); 062 return newIndex(index, configHelper.settings(), configHelper.mappings()); 063 } 064 065 @Override 066 public BaseIngestTransportClient newIndex(String index, Settings settings, Map<String, String> mappings) { 067 if (client == null) { 068 logger.warn("no client for create index"); 069 return this; 070 } 071 if (index == null) { 072 logger.warn("no index name given to create index"); 073 return this; 074 } 075 CreateIndexRequestBuilder createIndexRequestBuilder = 076 new CreateIndexRequestBuilder(client.admin().indices()).setIndex(index); 077 Settings concreteSettings; 078 if (settings == null && getSettings() != null) { 079 concreteSettings = getSettings(); 080 } else if (settings != null) { 081 concreteSettings = settings; 082 } else { 083 concreteSettings = null; 084 } 085 if (concreteSettings != null) { 086 createIndexRequestBuilder.setSettings(getSettings()); 087 } 088 if (mappings == null && getMappings() != null) { 089 for (String type : getMappings().keySet()) { 090 createIndexRequestBuilder.addMapping(type, getMappings().get(type)); 091 } 092 } else if (mappings != null) { 093 for (String type : mappings.keySet()) { 094 createIndexRequestBuilder.addMapping(type, mappings.get(type)); 095 } 096 } 097 createIndexRequestBuilder.execute().actionGet(); 098 logger.info("index {} created with settings {} and {} mappings", index, 099 concreteSettings != null ? concreteSettings.getAsMap() : "", 100 mappings != null ? mappings.size() : 0); 101 return this; 102 } 103 104 @Override 105 public synchronized BaseIngestTransportClient deleteIndex(String index) { 106 if (client == null) { 107 logger.warn("no client for delete index"); 108 return this; 109 } 110 if (index == null) { 111 logger.warn("no index name given to delete index"); 112 return this; 113 } 114 DeleteIndexRequestBuilder deleteIndexRequestBuilder = 115 new DeleteIndexRequestBuilder(client.admin().indices(), index); 116 deleteIndexRequestBuilder.execute().actionGet(); 117 return this; 118 } 119 120 public BaseIngestTransportClient putMapping(String index) { 121 if (client == null) { 122 logger.warn("no client for put mapping"); 123 return this; 124 } 125 configHelper.putMapping(client, index); 126 return this; 127 } 128 129 public BaseIngestTransportClient deleteMapping(String index, String type) { 130 if (client == null) { 131 logger.warn("no client for delete mapping"); 132 return this; 133 } 134 configHelper.deleteMapping(client, index, type); 135 return this; 136 } 137 138 @Override 139 public BaseIngestTransportClient waitForCluster(ClusterHealthStatus status, TimeValue timeValue) throws IOException { 140 ClientHelper.waitForCluster(client, status, timeValue); 141 return this; 142 } 143 144}