001/*
002 * Copyright (C) 2014 Jörg Prante
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.xbib.elasticsearch.plugin.jdbc.client;
017
018import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
019import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
020import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
021import org.elasticsearch.common.logging.ESLogger;
022import org.elasticsearch.common.logging.ESLoggerFactory;
023import org.elasticsearch.common.settings.Settings;
024import org.elasticsearch.common.unit.TimeValue;
025
026import java.io.IOException;
027import java.io.InputStream;
028import java.util.Map;
029
030public abstract class BaseIngestTransportClient extends BaseTransportClient
031        implements Ingest {
032
033    private final static ESLogger logger = ESLoggerFactory.getLogger("");
034
035    public Ingest newClient(Settings settings) {
036        super.createClient(settings);
037        return this;
038    }
039
040    @Override
041    public BaseIngestTransportClient shards(int shards) {
042        super.addSetting("index.number_of_shards", shards);
043        return this;
044    }
045
046    @Override
047    public BaseIngestTransportClient replica(int replica) {
048        super.addSetting("index.number_of_replicas", replica);
049        return this;
050    }
051
052    @Override
053    public BaseIngestTransportClient newIndex(String index) {
054        return newIndex(index, null, null);
055    }
056
057    @Override
058    public BaseIngestTransportClient newIndex(String index, String type, InputStream settings, InputStream mappings) throws IOException {
059        configHelper.reset();
060        configHelper.setting(settings);
061        configHelper.mapping(type, mappings);
062        return newIndex(index, configHelper.settings(), configHelper.mappings());
063    }
064
065    @Override
066    public BaseIngestTransportClient newIndex(String index, Settings settings, Map<String, String> mappings) {
067        if (client == null) {
068            logger.warn("no client for create index");
069            return this;
070        }
071        if (index == null) {
072            logger.warn("no index name given to create index");
073            return this;
074        }
075        CreateIndexRequestBuilder createIndexRequestBuilder =
076                new CreateIndexRequestBuilder(client.admin().indices()).setIndex(index);
077        Settings concreteSettings;
078        if (settings == null && getSettings() != null) {
079            concreteSettings = getSettings();
080        } else if (settings != null) {
081            concreteSettings = settings;
082        } else {
083            concreteSettings = null;
084        }
085        if (concreteSettings != null) {
086            createIndexRequestBuilder.setSettings(getSettings());
087        }
088        if (mappings == null && getMappings() != null) {
089            for (String type : getMappings().keySet()) {
090                createIndexRequestBuilder.addMapping(type, getMappings().get(type));
091            }
092        } else if (mappings != null) {
093            for (String type : mappings.keySet()) {
094                createIndexRequestBuilder.addMapping(type, mappings.get(type));
095            }
096        }
097        createIndexRequestBuilder.execute().actionGet();
098        logger.info("index {} created with settings {} and {} mappings", index,
099                concreteSettings != null ? concreteSettings.getAsMap() : "",
100                mappings != null ? mappings.size() : 0);
101        return this;
102    }
103
104    @Override
105    public synchronized BaseIngestTransportClient deleteIndex(String index) {
106        if (client == null) {
107            logger.warn("no client for delete index");
108            return this;
109        }
110        if (index == null) {
111            logger.warn("no index name given to delete index");
112            return this;
113        }
114        DeleteIndexRequestBuilder deleteIndexRequestBuilder =
115                new DeleteIndexRequestBuilder(client.admin().indices(), index);
116        deleteIndexRequestBuilder.execute().actionGet();
117        return this;
118    }
119
120    public BaseIngestTransportClient putMapping(String index) {
121        if (client == null) {
122            logger.warn("no client for put mapping");
123            return this;
124        }
125        configHelper.putMapping(client, index);
126        return this;
127    }
128
129    public BaseIngestTransportClient deleteMapping(String index, String type) {
130        if (client == null) {
131            logger.warn("no client for delete mapping");
132            return this;
133        }
134        configHelper.deleteMapping(client, index, type);
135        return this;
136    }
137
138    @Override
139    public BaseIngestTransportClient waitForCluster(ClusterHealthStatus status, TimeValue timeValue) throws IOException {
140        ClientHelper.waitForCluster(client, status, timeValue);
141        return this;
142    }
143
144}