001/*
002 * Copyright (C) 2014 Jörg Prante
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.xbib.elasticsearch.river.jdbc.strategy.simple;
017
018import org.elasticsearch.action.delete.DeleteRequest;
019import org.elasticsearch.action.index.IndexRequest;
020import org.elasticsearch.client.Requests;
021import org.elasticsearch.common.Strings;
022import org.elasticsearch.common.collect.ImmutableSet;
023import org.elasticsearch.common.joda.time.DateTime;
024import org.elasticsearch.common.joda.time.format.DateTimeFormat;
025import org.elasticsearch.common.logging.ESLogger;
026import org.elasticsearch.common.logging.ESLoggerFactory;
027import org.elasticsearch.common.settings.Settings;
028import org.elasticsearch.common.unit.TimeValue;
029import org.elasticsearch.index.VersionType;
030import org.xbib.elasticsearch.plugin.jdbc.client.Ingest;
031import org.xbib.elasticsearch.plugin.jdbc.client.IngestFactory;
032import org.xbib.elasticsearch.plugin.jdbc.client.Metric;
033import org.xbib.elasticsearch.plugin.jdbc.util.ControlKeys;
034import org.xbib.elasticsearch.plugin.jdbc.util.IndexableObject;
035import org.xbib.elasticsearch.river.jdbc.RiverMouth;
036
037import java.io.IOException;
038import java.util.Map;
039
040/**
041 * Simple river mouth implementation. This implementation uses bulk processing,
042 * index name housekeeping (with replica/refresh), and metrics. It understands
043 * _version, _routing, _timestamp, _parent, and _ttl metadata.
044 */
045public class SimpleRiverMouth implements RiverMouth<SimpleRiverContext> {
046
047    private final static ESLogger logger = ESLoggerFactory.getLogger("river.jdbc.SimpleRiverMouth");
048
049    protected SimpleRiverContext context;
050
051    protected IngestFactory ingestFactory;
052
053    protected Ingest ingest;
054
055    protected Metric metric;
056
057    protected Settings indexSettings;
058
059    protected Map<String, String> indexMappings;
060
061    protected String index;
062
063    protected String type;
064
065    protected String id;
066
067    protected volatile boolean suspended = false;
068
069    @Override
070    public String strategy() {
071        return "simple";
072    }
073
074    @Override
075    public SimpleRiverMouth newInstance() {
076        return new SimpleRiverMouth();
077    }
078
079    @Override
080    public SimpleRiverMouth setRiverContext(SimpleRiverContext context) {
081        this.context = context;
082        return this;
083    }
084
085    @Override
086    public SimpleRiverMouth setIngestFactory(IngestFactory ingestFactory) {
087        this.ingestFactory = ingestFactory;
088        this.ingest = ingestFactory.create();
089        this.metric = ingest.getMetric();
090        return this;
091    }
092
093    @Override
094    public Metric getMetric() {
095        return metric;
096    }
097
098    @Override
099    public synchronized void beforeFetch() throws IOException {
100        if (ingest == null || ingest.isShutdown()) {
101            ingest = ingestFactory.create();
102        }
103        if (!ingest.client().admin().indices().prepareExists(index).execute().actionGet().isExists()) {
104            logger.info("creating index {} with settings {} and mappings {}",
105                    index, indexSettings != null ? indexSettings.getAsMap() : "{}", indexMappings);
106            ingest.newIndex(index, indexSettings, indexMappings);
107        }
108        ingest.startBulk(index);
109    }
110
111    @Override
112    public synchronized void afterFetch() throws IOException {
113        if (ingest == null || ingest.isShutdown()) {
114            ingest = ingestFactory.create();
115        }
116        flush();
117        ingest.stopBulk(index);
118        ingest.refresh(index);
119        if (metric.indices() != null && !metric.indices().isEmpty()) {
120            for (String index : ImmutableSet.copyOf(metric.indices())) {
121                logger.info("stopping bulk mode for index {} and refreshing...", index);
122                ingest.stopBulk(index);
123                ingest.refresh(index);
124            }
125        }
126        if (!ingest.isShutdown()) {
127            ingest.shutdown();
128        }
129    }
130
131    @Override
132    public synchronized void shutdown() {
133        try {
134            flush();
135        } catch (IOException e) {
136            logger.error(e.getMessage(), e);
137        }
138        if (ingest != null && !ingest.isShutdown()) {
139            // shut down ingest and release ingest resources
140            ingest.shutdown();
141        }
142    }
143
144    @Override
145    public SimpleRiverMouth setIndexSettings(Settings indexSettings) {
146        this.indexSettings = indexSettings;
147        return this;
148    }
149
150    @Override
151    public SimpleRiverMouth setTypeMapping(Map<String, String> typeMapping) {
152        this.indexMappings = typeMapping;
153        return this;
154    }
155
156    @Override
157    public SimpleRiverMouth setIndex(String index) {
158        this.index = index.contains("'") ? DateTimeFormat.forPattern(index).print(new DateTime()) : index;
159        return this;
160    }
161
162    @Override
163    public String getIndex() {
164        return index;
165    }
166
167    @Override
168    public SimpleRiverMouth setType(String type) {
169        this.type = type;
170        return this;
171    }
172
173    @Override
174    public String getType() {
175        return type;
176    }
177
178    @Override
179    public SimpleRiverMouth setId(String id) {
180        this.id = id;
181        return this;
182    }
183
184    @Override
185    public String getId() {
186        return id;
187    }
188
189    @Override
190    public void index(IndexableObject object, boolean create) throws IOException {
191        try {
192            while (suspended) {
193                Thread.sleep(1000L);
194            }
195        } catch (InterruptedException e) {
196            Thread.currentThread().interrupt();
197            logger.warn("interrupted");
198        }
199        if (Strings.hasLength(object.index())) {
200            setIndex(object.index());
201        }
202        if (Strings.hasLength(object.type())) {
203            setType(object.type());
204        }
205        if (Strings.hasLength(object.id())) {
206            setId(object.id());
207        }
208        IndexRequest request = Requests.indexRequest(this.index)
209                .type(this.type)
210                .id(getId())
211                .source(object.build());
212        if (object.meta(ControlKeys._version.name()) != null) {
213            request.versionType(VersionType.EXTERNAL)
214                    .version(Long.parseLong(object.meta(ControlKeys._version.name())));
215        }
216        if (object.meta(ControlKeys._routing.name()) != null) {
217            request.routing(object.meta(ControlKeys._routing.name()));
218        }
219        if (object.meta(ControlKeys._parent.name()) != null) {
220            request.parent(object.meta(ControlKeys._parent.name()));
221        }
222        if (object.meta(ControlKeys._timestamp.name()) != null) {
223            request.timestamp(object.meta(ControlKeys._timestamp.name()));
224        }
225        if (object.meta(ControlKeys._ttl.name()) != null) {
226            request.ttl(Long.parseLong(object.meta(ControlKeys._ttl.name())));
227        }
228        if (logger.isTraceEnabled()) {
229            logger.trace("adding bulk index action {}", request.source().toUtf8());
230        }
231        if (ingest != null) {
232            ingest.bulkIndex(request);
233        }
234    }
235
236    @Override
237    public void delete(IndexableObject object) {
238        try {
239            while (suspended) {
240                Thread.sleep(1000L);
241            }
242        } catch (InterruptedException e) {
243            Thread.currentThread().interrupt();
244            logger.warn("interrupted");
245        }
246        if (Strings.hasLength(object.index())) {
247            this.index = object.index();
248        }
249        if (Strings.hasLength(object.type())) {
250            this.type = object.type();
251        }
252        if (Strings.hasLength(object.id())) {
253            setId(object.id());
254        }
255        if (getId() == null) {
256            return; // skip if no doc is specified to delete
257        }
258        DeleteRequest request = Requests.deleteRequest(this.index).type(this.type).id(getId());
259        if (object.meta(ControlKeys._version.name()) != null) {
260            request.versionType(VersionType.EXTERNAL)
261                    .version(Long.parseLong(object.meta(ControlKeys._version.name())));
262        }
263        if (object.meta(ControlKeys._routing.name()) != null) {
264            request.routing(object.meta(ControlKeys._routing.name()));
265        }
266        if (object.meta(ControlKeys._parent.name()) != null) {
267            request.parent(object.meta(ControlKeys._parent.name()));
268        }
269        if (logger.isTraceEnabled()) {
270            logger.trace("adding bulk delete action {}/{}/{}", request.index(), request.type(), request.id());
271        }
272        if (ingest != null) {
273            ingest.bulkDelete(request);
274        }
275    }
276
277    @Override
278    public void flush() throws IOException {
279        if (ingest != null) {
280            ingest.flushIngest();
281            // wait for all outstanding bulk requests before continue with river
282            try {
283                ingest.waitForResponses(TimeValue.timeValueSeconds(60));
284            } catch (InterruptedException e) {
285                logger.warn("interrupted while waiting for responses");
286                Thread.currentThread().interrupt();
287            }
288        }
289    }
290
291    @Override
292    public void suspend() {
293        if (ingest != null) {
294            this.suspended = true;
295            ingest.suspend();
296        }
297    }
298
299    @Override
300    public void resume() {
301        if (ingest != null) {
302            this.suspended = false;
303            ingest.resume();
304        }
305    }
306
307}