001package org.xbib.elasticsearch.action.websocket.pubsub; 002 003import org.elasticsearch.action.ActionListener; 004import org.elasticsearch.action.index.IndexResponse; 005import org.elasticsearch.action.search.SearchResponse; 006import org.elasticsearch.action.search.SearchType; 007import org.elasticsearch.client.Client; 008import org.elasticsearch.common.inject.Inject; 009import org.elasticsearch.common.settings.Settings; 010import org.elasticsearch.common.xcontent.XContentBuilder; 011import org.elasticsearch.index.query.QueryBuilder; 012import org.elasticsearch.index.query.RangeFilterBuilder; 013import org.elasticsearch.search.SearchHit; 014import org.jboss.netty.channel.Channel; 015import org.xbib.elasticsearch.websocket.InteractiveChannel; 016import org.xbib.elasticsearch.websocket.InteractiveController; 017import org.xbib.elasticsearch.websocket.InteractiveRequest; 018import org.xbib.elasticsearch.http.HttpServerTransport; 019import org.xbib.elasticsearch.http.netty.NettyInteractiveResponse; 020 021import java.io.IOException; 022import java.util.Map; 023 024import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; 025import static org.elasticsearch.index.query.FilterBuilders.rangeFilter; 026import static org.elasticsearch.index.query.QueryBuilders.termQuery; 027 028/** 029 * Subscribe action. It performs the subscription of a client to 030 * the pubsub index under a given topic. 031 */ 032public class SubscribeAction extends PublishSubscribe { 033 034 protected final static String TYPE = "subscribe"; 035 036 @Inject 037 public SubscribeAction(Settings settings, 038 Client client, 039 HttpServerTransport transport, 040 InteractiveController controller, 041 Checkpointer service) { 042 super(settings, client, transport, controller, service); 043 controller.registerHandler(TYPE, this); 044 } 045 046 @Override 047 public void handleRequest(final InteractiveRequest request, final InteractiveChannel channel) { 048 final String topic = request.hasParam("topic") ? request.paramAsString("topic") : "*"; 049 final String subscriberId = request.hasParam("subscriber") ? request.paramAsString("subscriber") : null; 050 if (subscriberId == null) { 051 try { 052 channel.sendResponse(TYPE, new IllegalArgumentException("no subscriber")); 053 } catch (IOException e) { 054 logger.error("error while sending failure response", e); 055 } 056 } 057 try { 058 client.prepareIndex() 059 .setIndex(pubSubIndexName) 060 .setType(TYPE) 061 .setId(subscriberId) 062 .setSource(createSubscriberMessage(topic, channel)) 063 .setRefresh(request.paramAsBoolean("refresh", true)) 064 .execute(new ActionListener<IndexResponse>() { 065 @Override 066 public void onResponse(IndexResponse response) { 067 try { 068 XContentBuilder builder = jsonBuilder(); 069 builder.startObject().field("ok", true).field("id", response.getId()).endObject(); 070 channel.sendResponse(TYPE, builder); 071 // receive outstanding messages 072 sync(subscriberId, topic, channel.getChannel()); 073 } catch (Exception e) { 074 onFailure(e); 075 } 076 } 077 078 @Override 079 public void onFailure(Throwable e) { 080 logger.error("error while processing subscribe request", e); 081 try { 082 channel.sendResponse(TYPE, e); 083 } catch (IOException ex) { 084 logger.error("error while sending error response", ex); 085 } 086 } 087 }); 088 } catch (Exception e) { 089 logger.error("exception while processing subscribe request", e); 090 try { 091 channel.sendResponse(TYPE, e); 092 } catch (IOException e1) { 093 logger.error("exception while sending exception response", e1); 094 } 095 } 096 } 097 098 private XContentBuilder createSubscriberMessage(String topic, InteractiveChannel channel) { 099 Integer channelId = channel.getChannel().getId(); 100 String localAddress = channel.getChannel().getLocalAddress().toString(); 101 String remoteAddress = channel.getChannel().getRemoteAddress().toString(); 102 try { 103 return jsonBuilder() 104 .startObject() 105 .field("topic", topic) 106 .startObject("subscriber") 107 .startObject("channel") 108 .field("id", channelId) 109 .field("localAddress", localAddress) 110 .field("remoteAddress", remoteAddress) 111 .endObject() 112 .endObject() 113 .endObject(); 114 } catch (IOException e) { 115 return null; 116 } 117 } 118 119 /** 120 * Synchronize the subscriber with the current messages. 121 * 122 * @param subscriberId 123 * @param topic 124 * @param channel 125 * @throws IOException 126 */ 127 private void sync(final String subscriberId, final String topic, final Channel channel) throws IOException { 128 Long lastSeen = service.checkpointedAt(subscriberId); 129 Long topicSeen = service.checkpointedAt(topic); 130 // if client appearance is later than topic, do not search for any messages 131 if (lastSeen == null || topicSeen == null || lastSeen >= topicSeen) { 132 return; 133 } 134 // message sync - update subscriber with the current timestamp 135 service.checkpoint(subscriberId); 136 service.flushCheckpoint(); 137 // there are unreceived messages, get all outstanding messages since last seen 138 QueryBuilder queryBuilder = termQuery("topic", topic); 139 RangeFilterBuilder filterBuilder = rangeFilter("timestamp").gte(lastSeen); 140 SearchResponse searchResponse = client.prepareSearch() 141 .setIndices(pubSubIndexName) 142 .setTypes("publish") 143 .setSearchType(SearchType.SCAN) 144 .setScroll(scrollTimeout) 145 .setQuery(queryBuilder) 146 .setPostFilter(filterBuilder) 147 .addField("data") 148 .addField("timestamp") 149 .setSize(scrollSize) 150 .execute().actionGet(); 151 boolean failed = searchResponse.getFailedShards() > 0 || searchResponse.isTimedOut(); 152 if (failed) { 153 logger.error("searching for messages for topic {} failed: failed shards={} timeout={}", 154 topic, searchResponse.getFailedShards(), searchResponse.isTimedOut()); 155 return; 156 } 157 // look for messages 158 long totalHits = searchResponse.getHits().getTotalHits(); 159 boolean zero = totalHits == 0L; 160 if (zero) { 161 return; 162 } 163 // slurp in all outstanding messages 164 while (true) { 165 searchResponse = client.prepareSearchScroll(searchResponse.getScrollId()) 166 .setScroll(scrollTimeout) 167 .execute().actionGet(); 168 for (SearchHit hit : searchResponse.getHits()) { 169 Long timestamp = (Long) hit.field("timestamp").getValues().get(0); 170 Map<String, Object> data = hit.field("data").getValue(); 171 channel.write(new NettyInteractiveResponse("message", createPublishMessage(timestamp, data)).response()); 172 } 173 if (searchResponse.getHits().hits().length == 0) { 174 break; 175 } 176 } 177 } 178 179 private XContentBuilder createPublishMessage(long timestamp, Map<String, Object> data) { 180 try { 181 return jsonBuilder().startObject() 182 .field("timestamp", timestamp) 183 .field("data", data) 184 .endObject(); 185 } catch (IOException e) { 186 return null; 187 } 188 } 189}