001package org.xbib.elasticsearch.action.websocket.pubsub; 002 003import org.elasticsearch.ElasticsearchException; 004import org.elasticsearch.action.delete.DeleteRequest; 005import org.elasticsearch.action.get.GetResponse; 006import org.elasticsearch.action.index.IndexRequest; 007import org.elasticsearch.client.Client; 008import org.elasticsearch.client.Requests; 009import org.elasticsearch.common.component.AbstractLifecycleComponent; 010import org.elasticsearch.common.inject.Inject; 011import org.elasticsearch.common.logging.ESLogger; 012import org.elasticsearch.common.logging.ESLoggerFactory; 013import org.elasticsearch.common.settings.Settings; 014import org.xbib.elasticsearch.websocket.InteractiveChannel; 015import org.xbib.elasticsearch.action.websocket.bulk.BulkHandler; 016 017import java.io.IOException; 018 019import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; 020 021/* 022 * Checkpoint management for topics and subscribers. 023 */ 024public class Checkpointer extends AbstractLifecycleComponent<Checkpointer> { 025 026 private final ESLogger logger = ESLoggerFactory.getLogger(Checkpointer.class.getSimpleName()); 027 028 private final String pubSubIndexName; 029 030 private final Client client; 031 032 private final static String TYPE = "checkpoint"; 033 034 private final BulkHandler bulkHandler; 035 036 @Inject 037 public Checkpointer(Settings settings, Client client) { 038 super(settings); 039 this.client = client; 040 this.bulkHandler = new BulkHandler(settings, client); 041 this.pubSubIndexName = PubSubIndexName.Conf.indexName(settings); 042 } 043 044 @Override 045 protected void doStart() throws ElasticsearchException { 046 } 047 048 @Override 049 protected void doStop() throws ElasticsearchException { 050 } 051 052 @Override 053 protected void doClose() throws ElasticsearchException { 054 } 055 056 /** 057 * Checkpointing a topic or a subscriber. The current timestamp is written 058 * to the checkpoint index type. Note that bulk index requests are used by 059 * checkpointing and flushCheckpoint() needs to be called after all is done. 060 * 061 * @param id topic or subscriber 062 * @throws IOException 063 */ 064 public void checkpoint(String id) throws IOException { 065 indexBulk(Requests.indexRequest(pubSubIndexName).type(TYPE).id(id) 066 .source(jsonBuilder().startObject().field("timestamp", System.currentTimeMillis()).endObject()), null); 067 } 068 069 public void flushCheckpoint() throws IOException { 070 flushBulk(null); 071 } 072 073 public Long checkpointedAt(String id) throws IOException { 074 GetResponse response = client.prepareGet(pubSubIndexName, TYPE, id) 075 .setFields("timestamp") 076 .execute().actionGet(); 077 boolean failed = !response.isExists(); 078 if (failed) { 079 logger.warn("can't get checkpoint for {}", id); 080 return null; 081 } else { 082 return (Long) response.getFields().get("timestamp").getValue(); 083 } 084 } 085 086 /** 087 * Perform bulk indexing 088 * 089 * @param request the index request 090 * @param channel the interactive channel 091 * @throws IOException 092 */ 093 public void indexBulk(IndexRequest request, InteractiveChannel channel) throws IOException { 094 bulkHandler.add(request, channel); 095 } 096 097 /** 098 * Perform bulk delete 099 * 100 * @param request the delete request 101 * @param channel the interactive channel 102 * @throws IOException 103 */ 104 public void deleteBulk(DeleteRequest request, InteractiveChannel channel) throws IOException { 105 bulkHandler.add(request, channel); 106 } 107 108 /** 109 * Flush bulk 110 * 111 * @param channel the interactive channel 112 * @throws IOException 113 */ 114 public void flushBulk(InteractiveChannel channel) throws IOException { 115 bulkHandler.flush(); 116 } 117}