001/*
002 * Copyright (C) 2014 Jörg Prante
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.xbib.elasticsearch.river.jdbc.strategy.column;
017
018import org.elasticsearch.common.joda.time.DateTime;
019import org.elasticsearch.common.logging.ESLogger;
020import org.elasticsearch.common.logging.ESLoggerFactory;
021import org.elasticsearch.common.unit.TimeValue;
022import org.elasticsearch.common.xcontent.support.XContentMapValues;
023import org.xbib.elasticsearch.plugin.jdbc.state.RiverState;
024import org.xbib.elasticsearch.river.jdbc.RiverContext;
025import org.xbib.elasticsearch.river.jdbc.RiverMouth;
026import org.xbib.elasticsearch.river.jdbc.RiverSource;
027import org.xbib.elasticsearch.river.jdbc.strategy.simple.SimpleRiverFlow;
028
029import java.io.IOException;
030import java.util.Map;
031
032/**
033 * River flow implementation for the 'column' strategy
034 *
035 * @author <a href="piotr.sliwa@zineinc.com">Piotr Śliwa</a>
036 */
037public class ColumnRiverFlow extends SimpleRiverFlow<ColumnRiverContext> {
038
039    private static final ESLogger logger = ESLoggerFactory.getLogger("river.jdbc.ColumnRiverFlow");
040
041    public static final String LAST_RUN_TIME = "last_run_time";
042
043    public static final String CURRENT_RUN_STARTED_TIME = "current_run_started_time";
044
045    @Override
046    public String strategy() {
047        return "column";
048    }
049
050    @Override
051    public ColumnRiverFlow newInstance() {
052        return new ColumnRiverFlow();
053    }
054
055    @Override
056    public ColumnRiverContext newRiverContext() {
057        return new ColumnRiverContext();
058    }
059
060    @Override
061    protected ColumnRiverContext fillRiverContext(ColumnRiverContext riverContext, RiverState state,
062                                                  RiverSource riverSource,
063                                                  RiverMouth riverMouth) throws IOException {
064        ColumnRiverContext context = super.fillRiverContext(riverContext, state, riverSource, riverMouth);
065        // defaults for column strategy
066        Map<String, Object> params = riverContext.getDefinition();
067        String columnCreatedAt = XContentMapValues.nodeStringValue(params.get("created_at"), "created_at");
068        String columnUpdatedAt = XContentMapValues.nodeStringValue(params.get("updated_at"), "updated_at");
069        String columnDeletedAt = XContentMapValues.nodeStringValue(params.get("deleted_at"), null);
070        boolean columnEscape = XContentMapValues.nodeBooleanValue(params.get("column_escape"), true);
071        TimeValue lastRunTimeStampOverlap = XContentMapValues.nodeTimeValue(params.get("last_run_timestamp_overlap"),
072                TimeValue.timeValueSeconds(0));
073        context.columnCreatedAt(columnCreatedAt)
074                .columnUpdatedAt(columnUpdatedAt)
075                .columnDeletedAt(columnDeletedAt)
076                .columnEscape(columnEscape)
077                .setLastRunTimeStampOverlap(lastRunTimeStampOverlap);
078        return context;
079    }
080
081    @Override
082    protected void fetch(RiverContext riverContext) throws Exception {
083        DateTime currentTime = new DateTime();
084        riverContext.getRiverSource().fetch();
085        riverContext.getRiverState().getMap().put(LAST_RUN_TIME, currentTime);
086    }
087
088}