001/*
002 * Copyright (C) 2014 Jörg Prante
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.xbib.elasticsearch.plugin.jdbc.state;
017
018import org.elasticsearch.common.io.stream.StreamInput;
019import org.elasticsearch.common.io.stream.StreamOutput;
020import org.elasticsearch.common.io.stream.Streamable;
021import org.elasticsearch.common.joda.time.DateTime;
022import org.elasticsearch.common.joda.time.DateTimeZone;
023import org.elasticsearch.common.joda.time.format.DateTimeFormatter;
024import org.elasticsearch.common.joda.time.format.ISODateTimeFormat;
025import org.elasticsearch.common.xcontent.ToXContent;
026import org.elasticsearch.common.xcontent.XContentBuilder;
027import org.elasticsearch.common.xcontent.XContentParser;
028
029import java.io.IOException;
030import java.util.Map;
031
032import static org.elasticsearch.common.collect.Maps.newHashMap;
033import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
034import static org.elasticsearch.common.xcontent.XContentParser.Token.END_OBJECT;
035import static org.elasticsearch.common.xcontent.XContentParser.Token.FIELD_NAME;
036import static org.elasticsearch.common.xcontent.XContentParser.Token.START_OBJECT;
037import static org.elasticsearch.common.xcontent.XContentParser.Token.VALUE_NULL;
038
039/**
040 * A river state represents a point in time when a river has a defined behavior with a set of parameters
041 */
042public class RiverState implements Streamable, ToXContent, Comparable<RiverState> {
043
044    private final static DateTime EMPTY_DATETIME = new DateTime(0L);
045
046    /**
047     * The name of the river instance
048     */
049    private String name;
050
051    /**
052     * The type of the river instance
053     */
054    private String type;
055
056    /**
057     * The time the river instance was started
058     */
059    private DateTime started;
060
061    /*
062     * The time of the last river activity
063     */
064    private DateTime begin;
065
066    /*
067     * The time when the last river activity ended
068     */
069    private DateTime end;
070
071    /**
072     * A custom map for more information about the river
073     */
074    private Map<String, Object> map = newHashMap();
075
076    public RiverState() {
077    }
078
079    public RiverState setName(String name) {
080        this.name = name;
081        return this;
082    }
083
084    public String getName() {
085        return name;
086    }
087
088    public RiverState setType(String type) {
089        this.type = type;
090        return this;
091    }
092
093    public String getType() {
094        return type;
095    }
096
097    public RiverState setMap(Map<String, Object> map) {
098        this.map = map;
099        return this;
100    }
101
102    public Map<String, Object> getMap() {
103        return map;
104    }
105
106    public RiverState setStarted(DateTime started) {
107        this.started = started;
108        return this;
109    }
110
111    public DateTime getStarted() {
112        return started;
113    }
114
115    /**
116     * Set the last river activity. Null means, time is unknown
117     *
118     * @param begin when the last river activity began
119     * @param end   when the last river activity ended
120     * @return this state
121     */
122    public RiverState setLastActive(DateTime begin, DateTime end) {
123        if (begin != null) {
124            this.begin = begin;
125        }
126        if (end != null) {
127            this.end = end;
128        }
129        return this;
130    }
131
132    /**
133     * @return the begin of the last river activity
134     */
135    public DateTime getLastActiveBegin() {
136        return begin != null ? begin : EMPTY_DATETIME;
137    }
138
139    /**
140     * @return the end of the last river activity
141     */
142    public DateTime getLastActiveEnd() {
143        return end != null ? end : EMPTY_DATETIME;
144    }
145
146    /**
147     * Was the river active at a certain time? Only the last activity can be checked.
148     *
149     * @param instant the time to check
150     * @return true if river was active, false if not
151     */
152    public boolean wasActiveAt(DateTime instant) {
153        return instant != null
154                && begin != null && begin.getMillis() != 0L && begin.isBefore(instant)
155                && (end == null || end.getMillis() == 0L || end.isAfter(instant));
156    }
157
158    public boolean wasInactiveAt(DateTime instant) {
159        return !wasActiveAt(instant);
160    }
161
162    public RiverState setCounter(Integer counter) {
163        map.put("counter", counter);
164        return this;
165    }
166
167    public Integer getCounter() {
168        return map.containsKey("counter") ? (Integer) map.get("counter") : 0;
169    }
170
171    public RiverState setCustom(Map<String, Object> custom) {
172        this.map.put("custom", custom);
173        return this;
174    }
175
176    @SuppressWarnings("unchecked")
177    public Map<String, Object> getCustom() {
178        return (Map<String, Object>) this.map.get("custom");
179    }
180
181    public boolean isAborted() {
182        return map.containsKey("aborted") ? (Boolean) map.get("aborted") : false;
183    }
184
185    public boolean isSuspended() {
186        return map.containsKey("suspended") ? (Boolean) map.get("suspended") : false;
187    }
188
189    public RiverState fromXContent(XContentParser parser) throws IOException {
190        DateTimeFormatter dateTimeFormatter = ISODateTimeFormat.dateOptionalTimeParser().withZone(DateTimeZone.UTC);
191        Long startTimestamp = 0L;
192        Long begin = 0L;
193        Long end = 0L;
194        String name = null;
195        String type = null;
196        String currentFieldName = null;
197        Map<String, Object> map = null;
198        XContentParser.Token token;
199        while ((token = parser.nextToken()) != END_OBJECT) {
200            if (token == FIELD_NAME) {
201                currentFieldName = parser.currentName();
202            } else if (token.isValue() || token == VALUE_NULL) {
203                switch (currentFieldName) {
204                    case "name":
205                        name = parser.text();
206                        break;
207                    case "type":
208                        type = parser.text();
209                        break;
210                    case "started":
211                        startTimestamp = parser.text() != null && !"null".equals(parser.text()) ?
212                                dateTimeFormatter.parseMillis(parser.text()) : 0L;
213                        break;
214                    case "last_active_begin":
215                        begin = parser.text() != null && !"null".equals(parser.text()) ?
216                                dateTimeFormatter.parseMillis(parser.text()) : 0L;
217                        break;
218                    case "last_active_end":
219                        end = parser.text() != null && !"null".equals(parser.text()) ?
220                                dateTimeFormatter.parseMillis(parser.text()) : 0L;
221                        break;
222                }
223            } else if (token == START_OBJECT) {
224                map = parser.map();
225            }
226        }
227        return new RiverState()
228                .setName(name)
229                .setType(type)
230                .setStarted(new DateTime(startTimestamp))
231                .setLastActive(new DateTime(begin), new DateTime(end))
232                .setMap(map);
233    }
234
235    @Override
236    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
237        builder.startObject()
238                .field("name", name)
239                .field("type", type)
240                .field("started", getStarted())
241                .field("last_active_begin", getLastActiveBegin())
242                .field("last_active_end", getLastActiveEnd())
243                .field("map", map);
244        builder.endObject();
245        return builder;
246    }
247
248
249    @Override
250    public void readFrom(StreamInput in) throws IOException {
251        this.name = in.readOptionalString();
252        this.type = in.readOptionalString();
253        this.started = new DateTime(in.readLong());
254        this.begin = new DateTime(in.readLong());
255        this.end = new DateTime(in.readLong());
256        map = in.readMap();
257    }
258
259    @Override
260    public void writeTo(StreamOutput out) throws IOException {
261        out.writeOptionalString(name);
262        out.writeOptionalString(type);
263        out.writeLong(started != null ? started.getMillis() : 0L);
264        out.writeLong(begin != null ? begin.getMillis() : 0L);
265        out.writeLong(end != null ? end.getMillis() : 0L);
266        out.writeMap(map);
267    }
268
269    @Override
270    public int compareTo(RiverState o) {
271        return (getName() + "/" + getType()).compareTo(o.getName() + "/" + o.getType());
272    }
273
274    @Override
275    public String toString() {
276        try {
277            return toXContent(jsonBuilder(), EMPTY_PARAMS).string();
278        } catch (IOException e) {
279            // ignore
280        }
281        return "";
282    }
283}