001package org.xbib.elasticsearch.action.websocket.pubsub;
002
003import org.elasticsearch.ElasticsearchException;
004import org.elasticsearch.action.delete.DeleteRequest;
005import org.elasticsearch.action.get.GetResponse;
006import org.elasticsearch.action.index.IndexRequest;
007import org.elasticsearch.client.Client;
008import org.elasticsearch.client.Requests;
009import org.elasticsearch.common.component.AbstractLifecycleComponent;
010import org.elasticsearch.common.inject.Inject;
011import org.elasticsearch.common.logging.ESLogger;
012import org.elasticsearch.common.logging.ESLoggerFactory;
013import org.elasticsearch.common.settings.Settings;
014import org.xbib.elasticsearch.websocket.InteractiveChannel;
015import org.xbib.elasticsearch.action.websocket.bulk.BulkHandler;
016
017import java.io.IOException;
018
019import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
020
021/*
022 * Checkpoint management for topics and subscribers.
023 */
024public class Checkpointer extends AbstractLifecycleComponent<Checkpointer> {
025
026    private final ESLogger logger = ESLoggerFactory.getLogger(Checkpointer.class.getSimpleName());
027
028    private final String pubSubIndexName;
029
030    private final Client client;
031
032    private final static String TYPE = "checkpoint";
033
034    private final BulkHandler bulkHandler;
035
036    @Inject
037    public Checkpointer(Settings settings, Client client) {
038        super(settings);
039        this.client = client;
040        this.bulkHandler = new BulkHandler(settings, client);
041        this.pubSubIndexName = PubSubIndexName.Conf.indexName(settings);
042    }
043
044    @Override
045    protected void doStart() throws ElasticsearchException {
046    }
047
048    @Override
049    protected void doStop() throws ElasticsearchException {
050    }
051
052    @Override
053    protected void doClose() throws ElasticsearchException {
054    }
055
056    /**
057     * Checkpointing a topic or a subscriber. The current timestamp is written
058     * to the checkpoint index type. Note that bulk index requests are used by
059     * checkpointing and flushCheckpoint() needs to be called after all is done.
060     *
061     * @param id topic or subscriber
062     * @throws IOException
063     */
064    public void checkpoint(String id) throws IOException {
065        indexBulk(Requests.indexRequest(pubSubIndexName).type(TYPE).id(id)
066                .source(jsonBuilder().startObject().field("timestamp", System.currentTimeMillis()).endObject()), null);
067    }
068
069    public void flushCheckpoint() throws IOException {
070        flushBulk(null);
071    }
072
073    public Long checkpointedAt(String id) throws IOException {
074        GetResponse response = client.prepareGet(pubSubIndexName, TYPE, id)
075                .setFields("timestamp")
076                .execute().actionGet();
077        boolean failed = !response.isExists();
078        if (failed) {
079            logger.warn("can't get checkpoint for {}", id);
080            return null;
081        } else {
082            return (Long) response.getFields().get("timestamp").getValue();
083        }
084    }
085
086    /**
087     * Perform bulk indexing
088     *
089     * @param request the index request
090     * @param channel the interactive channel
091     * @throws IOException
092     */
093    public void indexBulk(IndexRequest request, InteractiveChannel channel) throws IOException {
094        bulkHandler.add(request, channel);
095    }
096
097    /**
098     * Perform bulk delete
099     *
100     * @param request the delete request
101     * @param channel the interactive channel
102     * @throws IOException
103     */
104    public void deleteBulk(DeleteRequest request, InteractiveChannel channel) throws IOException {
105        bulkHandler.add(request, channel);
106    }
107
108    /**
109     * Flush bulk
110     *
111     * @param channel the interactive channel
112     * @throws IOException
113     */
114    public void flushBulk(InteractiveChannel channel) throws IOException {
115        bulkHandler.flush();
116    }
117}