001/* 002 * Copyright (C) 2014 Jörg Prante 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.xbib.elasticsearch.river.jdbc.strategy.column; 017 018import org.elasticsearch.common.joda.time.DateTime; 019import org.elasticsearch.common.logging.ESLogger; 020import org.elasticsearch.common.logging.ESLoggerFactory; 021import org.elasticsearch.common.unit.TimeValue; 022import org.elasticsearch.common.xcontent.support.XContentMapValues; 023import org.xbib.elasticsearch.plugin.jdbc.state.RiverState; 024import org.xbib.elasticsearch.river.jdbc.RiverContext; 025import org.xbib.elasticsearch.river.jdbc.RiverMouth; 026import org.xbib.elasticsearch.river.jdbc.RiverSource; 027import org.xbib.elasticsearch.river.jdbc.strategy.simple.SimpleRiverFlow; 028 029import java.io.IOException; 030import java.util.Map; 031 032/** 033 * River flow implementation for the 'column' strategy 034 * 035 * @author <a href="piotr.sliwa@zineinc.com">Piotr Śliwa</a> 036 */ 037public class ColumnRiverFlow extends SimpleRiverFlow<ColumnRiverContext> { 038 039 private static final ESLogger logger = ESLoggerFactory.getLogger("river.jdbc.ColumnRiverFlow"); 040 041 public static final String LAST_RUN_TIME = "last_run_time"; 042 043 public static final String CURRENT_RUN_STARTED_TIME = "current_run_started_time"; 044 045 @Override 046 public String strategy() { 047 return "column"; 048 } 049 050 @Override 051 public ColumnRiverFlow newInstance() { 052 return new ColumnRiverFlow(); 053 } 054 055 @Override 056 public ColumnRiverContext newRiverContext() { 057 return new ColumnRiverContext(); 058 } 059 060 @Override 061 protected ColumnRiverContext fillRiverContext(ColumnRiverContext riverContext, RiverState state, 062 RiverSource riverSource, 063 RiverMouth riverMouth) throws IOException { 064 ColumnRiverContext context = super.fillRiverContext(riverContext, state, riverSource, riverMouth); 065 // defaults for column strategy 066 Map<String, Object> params = riverContext.getDefinition(); 067 String columnCreatedAt = XContentMapValues.nodeStringValue(params.get("created_at"), "created_at"); 068 String columnUpdatedAt = XContentMapValues.nodeStringValue(params.get("updated_at"), "updated_at"); 069 String columnDeletedAt = XContentMapValues.nodeStringValue(params.get("deleted_at"), null); 070 boolean columnEscape = XContentMapValues.nodeBooleanValue(params.get("column_escape"), true); 071 TimeValue lastRunTimeStampOverlap = XContentMapValues.nodeTimeValue(params.get("last_run_timestamp_overlap"), 072 TimeValue.timeValueSeconds(0)); 073 context.columnCreatedAt(columnCreatedAt) 074 .columnUpdatedAt(columnUpdatedAt) 075 .columnDeletedAt(columnDeletedAt) 076 .columnEscape(columnEscape) 077 .setLastRunTimeStampOverlap(lastRunTimeStampOverlap); 078 return context; 079 } 080 081 @Override 082 protected void fetch(RiverContext riverContext) throws Exception { 083 DateTime currentTime = new DateTime(); 084 riverContext.getRiverSource().fetch(); 085 riverContext.getRiverState().getMap().put(LAST_RUN_TIME, currentTime); 086 } 087 088}