001/* 002 * Copyright (C) 2014 Jörg Prante 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.xbib.elasticsearch.plugin.jdbc.state; 017 018import org.elasticsearch.common.io.stream.StreamInput; 019import org.elasticsearch.common.io.stream.StreamOutput; 020import org.elasticsearch.common.io.stream.Streamable; 021import org.elasticsearch.common.joda.time.DateTime; 022import org.elasticsearch.common.joda.time.DateTimeZone; 023import org.elasticsearch.common.joda.time.format.DateTimeFormatter; 024import org.elasticsearch.common.joda.time.format.ISODateTimeFormat; 025import org.elasticsearch.common.xcontent.ToXContent; 026import org.elasticsearch.common.xcontent.XContentBuilder; 027import org.elasticsearch.common.xcontent.XContentParser; 028 029import java.io.IOException; 030import java.util.Map; 031 032import static org.elasticsearch.common.collect.Maps.newHashMap; 033import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; 034import static org.elasticsearch.common.xcontent.XContentParser.Token.END_OBJECT; 035import static org.elasticsearch.common.xcontent.XContentParser.Token.FIELD_NAME; 036import static org.elasticsearch.common.xcontent.XContentParser.Token.START_OBJECT; 037import static org.elasticsearch.common.xcontent.XContentParser.Token.VALUE_NULL; 038 039/** 040 * A river state represents a point in time when a river has a defined behavior with a set of parameters 041 */ 042public class RiverState implements Streamable, ToXContent, Comparable<RiverState> { 043 044 private final static DateTime EMPTY_DATETIME = new DateTime(0L); 045 046 /** 047 * The name of the river instance 048 */ 049 private String name; 050 051 /** 052 * The type of the river instance 053 */ 054 private String type; 055 056 /** 057 * The time the river instance was started 058 */ 059 private DateTime started; 060 061 /* 062 * The time of the last river activity 063 */ 064 private DateTime begin; 065 066 /* 067 * The time when the last river activity ended 068 */ 069 private DateTime end; 070 071 /** 072 * A custom map for more information about the river 073 */ 074 private Map<String, Object> map = newHashMap(); 075 076 public RiverState() { 077 } 078 079 public RiverState setName(String name) { 080 this.name = name; 081 return this; 082 } 083 084 public String getName() { 085 return name; 086 } 087 088 public RiverState setType(String type) { 089 this.type = type; 090 return this; 091 } 092 093 public String getType() { 094 return type; 095 } 096 097 public RiverState setMap(Map<String, Object> map) { 098 this.map = map; 099 return this; 100 } 101 102 public Map<String, Object> getMap() { 103 return map; 104 } 105 106 public RiverState setStarted(DateTime started) { 107 this.started = started; 108 return this; 109 } 110 111 public DateTime getStarted() { 112 return started; 113 } 114 115 /** 116 * Set the last river activity. Null means, time is unknown 117 * 118 * @param begin when the last river activity began 119 * @param end when the last river activity ended 120 * @return this state 121 */ 122 public RiverState setLastActive(DateTime begin, DateTime end) { 123 if (begin != null) { 124 this.begin = begin; 125 } 126 if (end != null) { 127 this.end = end; 128 } 129 return this; 130 } 131 132 /** 133 * @return the begin of the last river activity 134 */ 135 public DateTime getLastActiveBegin() { 136 return begin != null ? begin : EMPTY_DATETIME; 137 } 138 139 /** 140 * @return the end of the last river activity 141 */ 142 public DateTime getLastActiveEnd() { 143 return end != null ? end : EMPTY_DATETIME; 144 } 145 146 /** 147 * Was the river active at a certain time? Only the last activity can be checked. 148 * 149 * @param instant the time to check 150 * @return true if river was active, false if not 151 */ 152 public boolean wasActiveAt(DateTime instant) { 153 return instant != null 154 && begin != null && begin.getMillis() != 0L && begin.isBefore(instant) 155 && (end == null || end.getMillis() == 0L || end.isAfter(instant)); 156 } 157 158 public boolean wasInactiveAt(DateTime instant) { 159 return !wasActiveAt(instant); 160 } 161 162 public RiverState setCounter(Integer counter) { 163 map.put("counter", counter); 164 return this; 165 } 166 167 public Integer getCounter() { 168 return map.containsKey("counter") ? (Integer) map.get("counter") : 0; 169 } 170 171 public RiverState setCustom(Map<String, Object> custom) { 172 this.map.put("custom", custom); 173 return this; 174 } 175 176 @SuppressWarnings("unchecked") 177 public Map<String, Object> getCustom() { 178 return (Map<String, Object>) this.map.get("custom"); 179 } 180 181 public boolean isAborted() { 182 return map.containsKey("aborted") ? (Boolean) map.get("aborted") : false; 183 } 184 185 public boolean isSuspended() { 186 return map.containsKey("suspended") ? (Boolean) map.get("suspended") : false; 187 } 188 189 public RiverState fromXContent(XContentParser parser) throws IOException { 190 DateTimeFormatter dateTimeFormatter = ISODateTimeFormat.dateOptionalTimeParser().withZone(DateTimeZone.UTC); 191 Long startTimestamp = 0L; 192 Long begin = 0L; 193 Long end = 0L; 194 String name = null; 195 String type = null; 196 String currentFieldName = null; 197 Map<String, Object> map = null; 198 XContentParser.Token token; 199 while ((token = parser.nextToken()) != END_OBJECT) { 200 if (token == FIELD_NAME) { 201 currentFieldName = parser.currentName(); 202 } else if (token.isValue() || token == VALUE_NULL) { 203 switch (currentFieldName) { 204 case "name": 205 name = parser.text(); 206 break; 207 case "type": 208 type = parser.text(); 209 break; 210 case "started": 211 startTimestamp = parser.text() != null && !"null".equals(parser.text()) ? 212 dateTimeFormatter.parseMillis(parser.text()) : 0L; 213 break; 214 case "last_active_begin": 215 begin = parser.text() != null && !"null".equals(parser.text()) ? 216 dateTimeFormatter.parseMillis(parser.text()) : 0L; 217 break; 218 case "last_active_end": 219 end = parser.text() != null && !"null".equals(parser.text()) ? 220 dateTimeFormatter.parseMillis(parser.text()) : 0L; 221 break; 222 } 223 } else if (token == START_OBJECT) { 224 map = parser.map(); 225 } 226 } 227 return new RiverState() 228 .setName(name) 229 .setType(type) 230 .setStarted(new DateTime(startTimestamp)) 231 .setLastActive(new DateTime(begin), new DateTime(end)) 232 .setMap(map); 233 } 234 235 @Override 236 public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { 237 builder.startObject() 238 .field("name", name) 239 .field("type", type) 240 .field("started", getStarted()) 241 .field("last_active_begin", getLastActiveBegin()) 242 .field("last_active_end", getLastActiveEnd()) 243 .field("map", map); 244 builder.endObject(); 245 return builder; 246 } 247 248 249 @Override 250 public void readFrom(StreamInput in) throws IOException { 251 this.name = in.readOptionalString(); 252 this.type = in.readOptionalString(); 253 this.started = new DateTime(in.readLong()); 254 this.begin = new DateTime(in.readLong()); 255 this.end = new DateTime(in.readLong()); 256 map = in.readMap(); 257 } 258 259 @Override 260 public void writeTo(StreamOutput out) throws IOException { 261 out.writeOptionalString(name); 262 out.writeOptionalString(type); 263 out.writeLong(started != null ? started.getMillis() : 0L); 264 out.writeLong(begin != null ? begin.getMillis() : 0L); 265 out.writeLong(end != null ? end.getMillis() : 0L); 266 out.writeMap(map); 267 } 268 269 @Override 270 public int compareTo(RiverState o) { 271 return (getName() + "/" + getType()).compareTo(o.getName() + "/" + o.getType()); 272 } 273 274 @Override 275 public String toString() { 276 try { 277 return toXContent(jsonBuilder(), EMPTY_PARAMS).string(); 278 } catch (IOException e) { 279 // ignore 280 } 281 return ""; 282 } 283}