001/*
002 * Copyright (C) 2014 Jörg Prante
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.xbib.elasticsearch.river.jdbc.strategy.simple;
017
018import org.elasticsearch.common.logging.ESLogger;
019import org.elasticsearch.common.logging.ESLoggerFactory;
020import org.elasticsearch.common.metrics.MeterMetric;
021import org.elasticsearch.common.unit.TimeValue;
022import org.elasticsearch.common.xcontent.XContentBuilder;
023import org.elasticsearch.common.xcontent.XContentHelper;
024import org.xbib.elasticsearch.plugin.jdbc.state.RiverState;
025import org.xbib.elasticsearch.plugin.jdbc.util.SQLCommand;
026import org.xbib.elasticsearch.river.jdbc.RiverContext;
027import org.xbib.elasticsearch.river.jdbc.RiverMouth;
028import org.xbib.elasticsearch.river.jdbc.RiverSource;
029
030import java.io.IOException;
031import java.math.BigDecimal;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035
036import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
037
038/**
039 * The river context consists of the parameters that span source and mouth settings.
040 * It represents the river state, for supporting the river task execution, and river scripting.
041 */
042public class SimpleRiverContext implements RiverContext {
043
044    private final static ESLogger logger = ESLoggerFactory.getLogger("river.jdbc.SimpleRiverContext");
045
046    private Map<String, Object> definition;
047
048    /**
049     * The state of the river
050     */
051    private RiverState riverState;
052
053    /**
054     * The metrics
055     */
056    private MeterMetric metric;
057
058    /**
059     * The source of the river
060     */
061    private RiverSource source;
062
063    /**
064     * The target of the river
065     */
066    private RiverMouth mouth;
067
068    /**
069     * Autocomit enabled or not
070     */
071    private boolean autocommit;
072
073    /**
074     * The fetch size
075     */
076    private int fetchSize;
077
078    /**
079     * The maximum numbe rof rows per statement execution
080     */
081    private int maxRows;
082
083    /**
084     * The number of retries
085     */
086    private int retries = 1;
087
088    /**
089     * The time to wait between retries
090     */
091    private TimeValue maxretrywait = TimeValue.timeValueSeconds(30);
092
093    private int rounding;
094
095    private int scale = -1;
096
097    private String resultSetType = "TYPE_FORWARD_ONLY";
098
099    private String resultSetConcurrency = "CONCUR_UPDATABLE";
100
101    private boolean shouldIgnoreNull;
102
103    private boolean shouldPrepareResultSetMetadata;
104
105    private boolean shouldPrepareDatabaseMetadata;
106
107    private Map<String, Object> lastResultSetMetadata = new HashMap<String, Object>();
108
109    private Map<String, Object> lastDatabaseMetadata = new HashMap<String, Object>();
110
111    private long lastRowCount;
112
113    private long lastStartDate;
114
115    private long lastEndDate;
116
117    private long lastExecutionStartDate;
118
119    private long lastExecutionEndDate;
120
121    private Map<String, Object> columnNameMap;
122
123    private Map<String, Object> lastRow = new HashMap<String, Object>();
124
125    private List<SQLCommand> sql;
126
127    private boolean isTimestampDiffSupported;
128
129    private int queryTimeout;
130
131    private Map<String, Object> connectionProperties = new HashMap<String, Object>();
132
133    private boolean shouldTreatBinaryAsString;
134
135    @Override
136    public SimpleRiverContext setDefinition(Map<String, Object> definition) {
137        this.definition = definition;
138        return this;
139    }
140
141    @Override
142    public Map<String, Object> getDefinition() {
143        return definition;
144    }
145
146    @Override
147    public SimpleRiverContext setRiverState(RiverState riverState) {
148        this.riverState = riverState;
149        return this;
150    }
151
152    @Override
153    public RiverState getRiverState() {
154        return riverState;
155    }
156
157    @Override
158    public SimpleRiverContext setRiverSource(RiverSource source) {
159        this.source = source;
160        return this;
161    }
162
163    @Override
164    public RiverSource getRiverSource() {
165        return source;
166    }
167
168    public SimpleRiverContext setRiverMouth(RiverMouth mouth) {
169        this.mouth = mouth;
170        return this;
171    }
172
173    public RiverMouth getRiverMouth() {
174        return mouth;
175    }
176
177    @Override
178    public RiverContext setMetric(MeterMetric metric) {
179        this.metric = metric;
180        return this;
181    }
182
183    @Override
184    public MeterMetric getMetric() {
185        return metric;
186    }
187
188    public SimpleRiverContext setAutoCommit(boolean autocommit) {
189        this.autocommit = autocommit;
190        return this;
191    }
192
193    public boolean getAutoCommit() {
194        return autocommit;
195    }
196
197    public SimpleRiverContext setFetchSize(int fetchSize) {
198        this.fetchSize = fetchSize;
199        return this;
200    }
201
202    public int getFetchSize() {
203        return fetchSize;
204    }
205
206    public SimpleRiverContext setMaxRows(int maxRows) {
207        this.maxRows = maxRows;
208        return this;
209    }
210
211    public int getMaxRows() {
212        return maxRows;
213    }
214
215    public SimpleRiverContext setRetries(int retries) {
216        this.retries = retries;
217        return this;
218    }
219
220    public int getRetries() {
221        return retries;
222    }
223
224    public SimpleRiverContext setMaxRetryWait(TimeValue maxretrywait) {
225        this.maxretrywait = maxretrywait;
226        return this;
227    }
228
229    public TimeValue getMaxRetryWait() {
230        return maxretrywait;
231    }
232
233    public SimpleRiverContext setRounding(String rounding) {
234        if ("ceiling".equalsIgnoreCase(rounding)) {
235            this.rounding = BigDecimal.ROUND_CEILING;
236        } else if ("down".equalsIgnoreCase(rounding)) {
237            this.rounding = BigDecimal.ROUND_DOWN;
238        } else if ("floor".equalsIgnoreCase(rounding)) {
239            this.rounding = BigDecimal.ROUND_FLOOR;
240        } else if ("halfdown".equalsIgnoreCase(rounding)) {
241            this.rounding = BigDecimal.ROUND_HALF_DOWN;
242        } else if ("halfeven".equalsIgnoreCase(rounding)) {
243            this.rounding = BigDecimal.ROUND_HALF_EVEN;
244        } else if ("halfup".equalsIgnoreCase(rounding)) {
245            this.rounding = BigDecimal.ROUND_HALF_UP;
246        } else if ("unnecessary".equalsIgnoreCase(rounding)) {
247            this.rounding = BigDecimal.ROUND_UNNECESSARY;
248        } else if ("up".equalsIgnoreCase(rounding)) {
249            this.rounding = BigDecimal.ROUND_UP;
250        }
251        return this;
252    }
253
254    public int getRounding() {
255        return rounding;
256    }
257
258    public SimpleRiverContext setScale(int scale) {
259        this.scale = scale;
260        return this;
261    }
262
263    public int getScale() {
264        return scale;
265    }
266
267    public SimpleRiverContext setResultSetType(String resultSetType) {
268        this.resultSetType = resultSetType;
269        return this;
270    }
271
272    public String getResultSetType() {
273        return resultSetType;
274    }
275
276    public SimpleRiverContext setResultSetConcurrency(String resultSetConcurrency) {
277        this.resultSetConcurrency = resultSetConcurrency;
278        return this;
279    }
280
281    public String getResultSetConcurrency() {
282        return resultSetConcurrency;
283    }
284
285    public SimpleRiverContext shouldIgnoreNull(boolean shouldIgnoreNull) {
286        this.shouldIgnoreNull = shouldIgnoreNull;
287        return this;
288    }
289
290    public boolean shouldIgnoreNull() {
291        return shouldIgnoreNull;
292    }
293
294    public SimpleRiverContext shouldPrepareResultSetMetadata(boolean shouldPrepareResultSetMetadata) {
295        this.shouldPrepareResultSetMetadata = shouldPrepareResultSetMetadata;
296        return this;
297    }
298
299    public boolean shouldPrepareResultSetMetadata() {
300        return shouldPrepareResultSetMetadata;
301    }
302
303    public SimpleRiverContext shouldPrepareDatabaseMetadata(boolean shouldPrepareDatabaseMetadata) {
304        this.shouldPrepareDatabaseMetadata = shouldPrepareDatabaseMetadata;
305        return this;
306    }
307
308    public boolean shouldPrepareDatabaseMetadata() {
309        return shouldPrepareDatabaseMetadata;
310    }
311
312    public SimpleRiverContext setLastResultSetMetadata(Map<String, Object> lastResultSetMetadata) {
313        this.lastResultSetMetadata = lastResultSetMetadata;
314        return this;
315    }
316
317    public Map<String, Object> getLastResultSetMetadata() {
318        return lastResultSetMetadata;
319    }
320
321    public SimpleRiverContext setLastDatabaseMetadata(Map<String, Object> lastDatabaseMetadata) {
322        this.lastDatabaseMetadata = lastDatabaseMetadata;
323        return this;
324    }
325
326    public Map<String, Object> getLastDatabaseMetadata() {
327        return lastDatabaseMetadata;
328    }
329
330    public SimpleRiverContext setLastRowCount(long lastRowCount) {
331        this.lastRowCount = lastRowCount;
332        return this;
333    }
334
335    public long getLastRowCount() {
336        return lastRowCount;
337    }
338
339    public SimpleRiverContext setLastStartDate(long lastStartDate) {
340        this.lastStartDate = lastStartDate;
341        return this;
342    }
343
344    public long getLastStartDate() {
345        return lastStartDate;
346    }
347
348    public SimpleRiverContext setLastEndDate(long lastEndDate) {
349        this.lastEndDate = lastEndDate;
350        return this;
351    }
352
353    public long getLastEndDate() {
354        return lastEndDate;
355    }
356
357    public SimpleRiverContext setLastExecutionStartDate(long lastExecutionStartDate) {
358        this.lastExecutionStartDate = lastExecutionStartDate;
359        return this;
360    }
361
362    public long getLastExecutionStartDate() {
363        return lastExecutionStartDate;
364    }
365
366    public SimpleRiverContext setLastExecutionEndDate(long lastExecutionEndDate) {
367        this.lastExecutionEndDate = lastExecutionEndDate;
368        return this;
369    }
370
371    public long getLastExecutionEndDate() {
372        return lastExecutionEndDate;
373    }
374
375    public SimpleRiverContext setColumnNameMap(Map<String, Object> columnNameMap) {
376        this.columnNameMap = columnNameMap;
377        return this;
378    }
379
380    public Map<String, Object> getColumnNameMap() {
381        return columnNameMap;
382    }
383
384
385    public SimpleRiverContext setLastRow(Map<String, Object> lastRow) {
386        this.lastRow = lastRow;
387        return this;
388    }
389
390    public Map<String, Object> getLastRow() {
391        return lastRow;
392    }
393
394
395    public SimpleRiverContext setStatements(List<SQLCommand> sql) {
396        this.sql = sql;
397        return this;
398    }
399
400    public List<SQLCommand> getStatements() {
401        return sql;
402    }
403
404    public SimpleRiverContext setTimestampDiffSupported(boolean supported) {
405        this.isTimestampDiffSupported = supported;
406        return this;
407    }
408
409    public boolean isTimestampDiffSupported() {
410        return isTimestampDiffSupported;
411    }
412
413    public SimpleRiverContext setQueryTimeout(int queryTimeout) {
414        this.queryTimeout = queryTimeout;
415        return this;
416    }
417
418    public int getQueryTimeout() {
419        return queryTimeout;
420    }
421
422    public SimpleRiverContext setConnectionProperties(Map<String, Object> connectionProperties) {
423        this.connectionProperties = connectionProperties;
424        return this;
425    }
426
427    public Map<String, Object> getConnectionProperties() {
428        return connectionProperties;
429    }
430
431    public SimpleRiverContext shouldTreatBinaryAsString(boolean shouldTreatBinaryAsString) {
432        this.shouldTreatBinaryAsString = shouldTreatBinaryAsString;
433        return this;
434    }
435
436    public boolean shouldTreatBinaryAsString() {
437        return shouldTreatBinaryAsString;
438    }
439
440    public SimpleRiverContext release() {
441        try {
442            if (mouth != null) {
443                mouth.shutdown();
444                mouth = null;
445            }
446        } catch (IOException e) {
447            logger.error(e.getMessage(), e);
448        }
449        try {
450            if (source != null) {
451                source.shutdown();
452                source = null;
453            }
454        } catch (IOException e) {
455            logger.error(e.getMessage(), e);
456        }
457        return this;
458    }
459
460    public Map<String, Object> asMap() {
461        try {
462            XContentBuilder builder = jsonBuilder()
463                    .startObject()
464                    .field("autocommit", autocommit)
465                    .field("fetchsize", fetchSize)
466                    .field("maxrows", maxRows)
467                    .field("retries", retries)
468                    .field("maxretrywait", maxretrywait)
469                    .field("resultsetconcurrency", resultSetConcurrency)
470                    .field("resultsettype", resultSetType)
471                    .field("rounding", rounding)
472                    .field("scale", scale)
473                    .field("shouldignorenull", shouldIgnoreNull)
474                    .field("lastResultSetMetadata", lastResultSetMetadata)
475                    .field("lastDatabaseMetadata", lastDatabaseMetadata)
476                    .field("lastStartDate", lastStartDate)
477                    .field("lastEndDate", lastEndDate)
478                    .field("lastExecutionStartDate", lastExecutionStartDate)
479                    .field("lastExecutionEndDate", lastExecutionEndDate)
480                    .field("columnNameMap", columnNameMap)
481                    .field("lastRow", lastRow)
482                    .field("sql", sql)
483                    .field("isTimestampDiffSupported", isTimestampDiffSupported)
484                    .field("queryTimeout", queryTimeout)
485                    .field("connectionProperties")
486                    .map(connectionProperties)
487                    .endObject();
488            return XContentHelper.convertToMap(builder.bytes(), true).v2();
489        } catch (IOException e) {
490            // should really not happen
491            return new HashMap<String, Object>();
492        }
493    }
494
495    @Override
496    public String toString() {
497        return asMap().toString();
498    }
499}