001/*
002 * Copyright (C) 2014 Jörg Prante
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.xbib.elasticsearch.river.jdbc.strategy.simple;
017
018import org.elasticsearch.client.Client;
019import org.elasticsearch.common.joda.time.DateTime;
020import org.elasticsearch.common.logging.ESLogger;
021import org.elasticsearch.common.logging.ESLoggerFactory;
022import org.elasticsearch.common.metrics.MeterMetric;
023import org.elasticsearch.common.settings.Settings;
024import org.elasticsearch.common.settings.loader.JsonSettingsLoader;
025import org.elasticsearch.common.unit.TimeValue;
026import org.elasticsearch.common.xcontent.XContentBuilder;
027import org.elasticsearch.common.xcontent.support.XContentMapValues;
028import org.elasticsearch.river.RiverName;
029import org.xbib.elasticsearch.action.plugin.jdbc.state.get.GetRiverStateRequestBuilder;
030import org.xbib.elasticsearch.action.plugin.jdbc.state.get.GetRiverStateResponse;
031import org.xbib.elasticsearch.action.plugin.jdbc.state.post.PostRiverStateRequestBuilder;
032import org.xbib.elasticsearch.action.plugin.jdbc.state.post.PostRiverStateResponse;
033import org.xbib.elasticsearch.action.plugin.jdbc.state.put.PutRiverStateRequestBuilder;
034import org.xbib.elasticsearch.action.plugin.jdbc.state.put.PutRiverStateResponse;
035import org.xbib.elasticsearch.plugin.jdbc.client.IngestFactory;
036import org.xbib.elasticsearch.plugin.jdbc.client.Metric;
037import org.xbib.elasticsearch.plugin.jdbc.state.RiverState;
038import org.xbib.elasticsearch.plugin.jdbc.util.DurationFormatUtil;
039import org.xbib.elasticsearch.plugin.jdbc.util.LocaleUtil;
040import org.xbib.elasticsearch.plugin.jdbc.util.RiverServiceLoader;
041import org.xbib.elasticsearch.plugin.jdbc.util.SQLCommand;
042import org.xbib.elasticsearch.plugin.jdbc.util.VolumeFormatUtil;
043import org.xbib.elasticsearch.river.jdbc.RiverContext;
044import org.xbib.elasticsearch.river.jdbc.RiverFlow;
045import org.xbib.elasticsearch.river.jdbc.RiverMouth;
046import org.xbib.elasticsearch.river.jdbc.RiverSource;
047
048import java.io.IOException;
049import java.text.NumberFormat;
050import java.util.Collections;
051import java.util.List;
052import java.util.Locale;
053import java.util.Map;
054import java.util.Queue;
055import java.util.TimeZone;
056
057import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
058import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
059
060/**
061 * Simple river flow implementation
062 */
063public class SimpleRiverFlow<RC extends RiverContext> implements RiverFlow<RC> {
064
065    private final static ESLogger logger = ESLoggerFactory.getLogger("river.jdbc.SimpleRiverFlow");
066
067    private final static ESLogger metricsLogger = ESLoggerFactory.getLogger("river.jdbc.RiverMetrics");
068
069    private RiverName riverName;
070
071    private Settings settings;
072
073    private IngestFactory ingestFactory;
074
075    private Client client;
076
077    private Queue<RiverContext> queue;
078
079    private MeterMetric meterMetric;
080
081    @Override
082    public String strategy() {
083        return "simple";
084    }
085
086    @Override
087    public SimpleRiverFlow<RC> newInstance() {
088        return new SimpleRiverFlow<RC>();
089    }
090
091    @Override
092    public RC newRiverContext() {
093        return (RC) new SimpleRiverContext();
094    }
095
096    @Override
097    public RiverFlow setRiverName(RiverName riverName) {
098        this.riverName = riverName;
099        return this;
100    }
101
102    @Override
103    public RiverName getRiverName() {
104        return riverName;
105    }
106
107    @Override
108    public RiverFlow setSettings(Settings settings) {
109        this.settings = settings;
110        return this;
111    }
112
113    @Override
114    public Settings getSettings() {
115        return settings;
116    }
117
118    @Override
119    public RiverFlow setIngestFactory(IngestFactory ingestFactory) {
120        this.ingestFactory = ingestFactory;
121        return this;
122    }
123
124    public RiverFlow setClient(Client client) {
125        this.client = client;
126        return this;
127    }
128
129    @Override
130    public Client getClient() {
131        return client;
132    }
133
134    @Override
135    public void execute(RC riverContext) throws Exception {
136        logger.debug("execute: {}/{} with context {}", riverName.getName(), riverName.getType(), riverContext);
137        try {
138            beforeFetch(riverContext);
139            fetch(riverContext);
140        } finally {
141            afterFetch(riverContext);
142        }
143    }
144
145    /**
146     * Before a river task (or river run) starts, this method is called.
147     */
148    protected void beforeFetch(RC riverContext) throws Exception {
149        logger.debug("before fetch: getting river state for {}/{}", riverName.getName(), riverName.getType());
150        GetRiverStateRequestBuilder riverStateRequestBuilder = new GetRiverStateRequestBuilder(client.admin().cluster())
151                .setRiverName(riverName.getName())
152                .setRiverType(riverName.getType());
153        GetRiverStateResponse riverStateResponse = riverStateRequestBuilder.execute().actionGet();
154        RiverState riverState = riverStateResponse.getRiverState();
155        // if river state was not defined yet, define it now
156        if (riverState == null) {
157            riverState = new RiverState()
158                    .setName(riverName.getName())
159                    .setType(riverName.getType())
160                    .setStarted(new DateTime());
161            PutRiverStateRequestBuilder putRiverStateRequestBuilder = new PutRiverStateRequestBuilder(client.admin().cluster())
162                    .setRiverName(riverName.getName())
163                    .setRiverType(riverName.getType())
164                    .setRiverState(riverState);
165            PutRiverStateResponse putRiverStateResponse = putRiverStateRequestBuilder.execute().actionGet();
166            if (putRiverStateResponse.isAcknowledged()) {
167                logger.debug("before fetch: put initial state {}", riverState);
168            } else {
169                logger.error("befor fetch: initial state not acknowledged", riverState);
170            }
171        }
172        RiverSource riverSource = createRiverSource(riverContext.getDefinition());
173        RiverMouth riverMouth = createRiverMouth(riverContext.getDefinition());
174        riverContext = fillRiverContext(riverContext, riverState, riverSource, riverMouth);
175        logger.debug("before fetch: created source = {}, mouth = {}, context = {}",
176                riverSource, riverMouth, riverContext);
177        Integer counter = riverState.getCounter() + 1;
178        riverContext.setRiverState(riverState.setCounter(counter).setLastActive(new DateTime(), null));
179        PostRiverStateRequestBuilder postRiverStateRequestBuilder = new PostRiverStateRequestBuilder(client.admin().cluster())
180                .setRiverName(riverName.getName())
181                .setRiverType(riverName.getType())
182                .setRiverState(riverContext.getRiverState());
183        PostRiverStateResponse postRiverStateResponse = postRiverStateRequestBuilder.execute().actionGet();
184        if (!postRiverStateResponse.isAcknowledged()) {
185            logger.warn("post river state not acknowledged: {}/{}", riverName.getName(), riverName.getType());
186        }
187        logger.debug("before fetch: state posted = {}", riverState);
188        // call river source "before fetch"
189        try {
190            riverContext.getRiverSource().beforeFetch();
191        } catch (Exception e) {
192            logger.error(e.getMessage(), e);
193        }
194        // call river mouth "before fetch"
195        try {
196            riverContext.getRiverMouth().beforeFetch();
197        } catch (Exception e) {
198            logger.error(e.getMessage(), e);
199        }
200    }
201
202    /**
203     * After river context and state setup, when data should be fetched from river source, this method is called.
204     * The default is to invoke the fetch() method of the river source.
205     *
206     * @throws Exception
207     */
208    protected void fetch(RiverContext riverContext) throws Exception {
209        riverContext.getRiverSource().fetch();
210    }
211
212    @Override
213    public RiverFlow setMetric(MeterMetric meterMetric) {
214        this.meterMetric = meterMetric;
215        return this;
216    }
217
218    @Override
219    public MeterMetric getMetric() {
220        return meterMetric;
221    }
222
223
224    @Override
225    public RiverFlow setQueue(Queue<RiverContext> queue) {
226        this.queue = queue;
227        return this;
228    }
229
230    @Override
231    public Queue<RiverContext> getQueue() {
232        return queue;
233    }
234
235    /**
236     * After the river task has completed a single run, this method is called.
237     */
238    protected void afterFetch(RiverContext riverContext) throws Exception {
239        if (riverContext == null) {
240            return;
241        }
242        try {
243            riverContext.getRiverMouth().afterFetch();
244        } catch (Exception e) {
245            logger.error(e.getMessage(), e);
246        }
247        try {
248            riverContext.getRiverSource().afterFetch();
249        } catch (Exception e) {
250            logger.error(e.getMessage(), e);
251        }
252        RiverState riverState = riverContext.getRiverState()
253                .setLastActive(riverContext.getRiverState().getLastActiveBegin(), new DateTime());
254        PostRiverStateRequestBuilder postRiverStateRequestBuilder = new PostRiverStateRequestBuilder(client.admin().cluster())
255                .setRiverName(riverName.getName())
256                .setRiverType(riverName.getType())
257                .setRiverState(riverState);
258        PostRiverStateResponse postRiverStateResponse = postRiverStateRequestBuilder.execute().actionGet();
259        if (!postRiverStateResponse.isAcknowledged()) {
260            logger.warn("post river state not acknowledged: {}/{}", riverName.getName(), riverName.getType());
261        }
262        logger.debug("after fetch: state posted = {}", riverState);
263    }
264
265    protected RiverSource createRiverSource(Map<String, Object> params) {
266        RiverSource riverSource = RiverServiceLoader.newRiverSource(strategy());
267        logger.debug("found river source class {}, params = {}", riverSource, strategy(), params);
268        String url = XContentMapValues.nodeStringValue(params.get("url"), null);
269        String user = XContentMapValues.nodeStringValue(params.get("user"), null);
270        String password = XContentMapValues.nodeStringValue(params.get("password"), null);
271        String locale = XContentMapValues.nodeStringValue(params.get("locale"), LocaleUtil.fromLocale(Locale.getDefault()));
272        String timezone = XContentMapValues.nodeStringValue(params.get("timezone"), TimeZone.getDefault().getID());
273        riverSource.setUrl(url)
274                .setUser(user)
275                .setPassword(password)
276                .setLocale(LocaleUtil.toLocale(locale))
277                .setTimeZone(TimeZone.getTimeZone(timezone));
278        return riverSource;
279    }
280
281    protected RiverMouth createRiverMouth(Map<String, Object> params) throws IOException {
282        RiverMouth riverMouth = RiverServiceLoader.newRiverMouth(strategy());
283        logger.debug("found river mouth class {}, params = {}", riverMouth, strategy(), params);
284        String index = XContentMapValues.nodeStringValue(params.get("index"), "jdbc");
285        String type = XContentMapValues.nodeStringValue(params.get("type"), "jdbc");
286        riverMouth.setIndex(index)
287                .setType(type)
288                .setIngestFactory(ingestFactory);
289        if (params.get("index_settings") != null) {
290            Map<String, String> loadedSettings = new JsonSettingsLoader()
291                    .load(jsonBuilder().map((Map<String, Object>) params.get("index_settings")).string());
292            riverMouth.setIndexSettings(settingsBuilder().put(loadedSettings).build());
293        }
294        if (params.get("type_mapping") != null) {
295            XContentBuilder builder = jsonBuilder().map((Map<String, Object>) params.get("type_mapping"));
296            riverMouth.setTypeMapping(Collections.singletonMap(type, builder.string()));
297        }
298        return riverMouth;
299    }
300
301    protected RC fillRiverContext(RC riverContext, RiverState state,
302                                  RiverSource riverSource,
303                                  RiverMouth riverMouth) throws IOException {
304        Map<String, Object> params = riverContext.getDefinition();
305        List<SQLCommand> sql = SQLCommand.parse(params);
306        String rounding = XContentMapValues.nodeStringValue(params.get("rounding"), null);
307        int scale = XContentMapValues.nodeIntegerValue(params.get("scale"), 2);
308        boolean autocommit = XContentMapValues.nodeBooleanValue(params.get("autocommit"), false);
309        int fetchsize = 10;
310        String fetchSizeStr = XContentMapValues.nodeStringValue(params.get("fetchsize"), null);
311        if ("min".equals(fetchSizeStr)) {
312            fetchsize = Integer.MIN_VALUE; // for MySQL streaming mode
313        } else if (fetchSizeStr != null) {
314            fetchsize = Integer.parseInt(fetchSizeStr);
315        }
316        int maxrows = XContentMapValues.nodeIntegerValue(params.get("max_rows"), 0);
317        int maxretries = XContentMapValues.nodeIntegerValue(params.get("max_retries"), 3);
318        TimeValue maxretrywait = XContentMapValues.nodeTimeValue(params.get("max_retries_wait"), TimeValue.timeValueSeconds(30));
319        String resultSetType = XContentMapValues.nodeStringValue(params.get("resultset_type"), "TYPE_FORWARD_ONLY");
320        String resultSetConcurrency = XContentMapValues.nodeStringValue(params.get("resultset_concurrency"), "CONCUR_UPDATABLE");
321        boolean shouldIgnoreNull = XContentMapValues.nodeBooleanValue(params.get("ignore_null_values"), false);
322        boolean shouldPrepareDatabaseMetadata = XContentMapValues.nodeBooleanValue(params.get("prepare_database_metadata"), false);
323        boolean shouldPrepareResultSetMetadata = XContentMapValues.nodeBooleanValue(params.get("prepare_resultset_metadata"), false);
324        Map<String, Object> columnNameMap = (Map<String, Object>) params.get("column_name_map");
325        int queryTimeout = XContentMapValues.nodeIntegerValue(params.get("query_timeout"), 1800);
326        Map<String, Object> connectionProperties = (Map<String, Object>) params.get("connection_properties");
327        boolean shouldTreatBinaryAsString = XContentMapValues.nodeBooleanValue(params.get("treat_binary_as_string"), false);
328
329        riverContext.setRiverState(state)
330                .setRiverSource(riverSource)
331                .setRiverMouth(riverMouth)
332                .setMetric(meterMetric)
333                .setRounding(rounding)
334                .setScale(scale)
335                .setStatements(sql)
336                .setAutoCommit(autocommit)
337                .setMaxRows(maxrows)
338                .setFetchSize(fetchsize)
339                .setRetries(maxretries)
340                .setMaxRetryWait(maxretrywait)
341                .setResultSetType(resultSetType)
342                .setResultSetConcurrency(resultSetConcurrency)
343                .shouldIgnoreNull(shouldIgnoreNull)
344                .shouldPrepareDatabaseMetadata(shouldPrepareDatabaseMetadata)
345                .shouldPrepareResultSetMetadata(shouldPrepareResultSetMetadata)
346                .setColumnNameMap(columnNameMap)
347                .setQueryTimeout(queryTimeout)
348                .setConnectionProperties(connectionProperties)
349                .shouldTreatBinaryAsString(shouldTreatBinaryAsString);
350        riverSource.setRiverContext(riverContext);
351        riverMouth.setRiverContext(riverContext);
352        return riverContext;
353    }
354
355    @Override
356    public void logMetrics(RiverContext riverContext, String cause) {
357        MeterMetric metric = getMetric();
358        if (metric == null) {
359            return;
360        }
361        if (riverContext == null || riverContext.getRiverMouth() == null) {
362            return;
363        }
364        Metric mouthMetric = riverContext.getRiverMouth().getMetric();
365        if (mouthMetric == null) {
366            return;
367        }
368        long ticks = metric.count();
369        double mean = metric.meanRate();
370        double oneminute = metric.oneMinuteRate();
371        double fiveminute = metric.fiveMinuteRate();
372        double fifteenminute = metric.fifteenMinuteRate();
373
374        long bytes = mouthMetric.getTotalIngestSizeInBytes().count();
375        long elapsed = mouthMetric.elapsed() / 1000000;
376        String elapsedhuman = DurationFormatUtil.formatDurationWords(elapsed, true, true);
377        //double dps = ticks * 1000 / elapsed;
378        double avg = bytes / (ticks + 1); // avoid div by zero
379        double mbps = (bytes * 1000.0 / elapsed) / (1024.0 * 1024.0);
380        NumberFormat formatter = NumberFormat.getNumberInstance();
381        metricsLogger.info("{}: river {}/{} metrics: {} rows, {} mean, ({} {} {}), ingest metrics: elapsed {}, {} bytes, {} avg, {} MB/s",
382                cause,
383                riverName.getType(),
384                riverName.getName(),
385                ticks,
386                mean,
387                oneminute,
388                fiveminute,
389                fifteenminute,
390                elapsedhuman,
391                VolumeFormatUtil.convertFileSize(bytes),
392                VolumeFormatUtil.convertFileSize(avg),
393                formatter.format(mbps)
394        );
395    }
396
397}