001package org.xbib.elasticsearch.action.websocket.pubsub; 002 003import org.elasticsearch.action.index.IndexResponse; 004import org.elasticsearch.action.search.SearchResponse; 005import org.elasticsearch.action.search.SearchType; 006import org.elasticsearch.client.Client; 007import org.elasticsearch.common.inject.Inject; 008import org.elasticsearch.common.settings.Settings; 009import org.elasticsearch.common.unit.TimeValue; 010import org.elasticsearch.common.xcontent.XContentBuilder; 011import org.elasticsearch.index.query.QueryBuilder; 012import org.elasticsearch.search.SearchHit; 013import org.elasticsearch.search.SearchHitField; 014import org.xbib.elasticsearch.websocket.InteractiveChannel; 015import org.xbib.elasticsearch.websocket.InteractiveController; 016import org.xbib.elasticsearch.websocket.InteractiveRequest; 017import org.xbib.elasticsearch.http.HttpServerTransport; 018 019import java.io.IOException; 020import java.util.Map; 021 022import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; 023import static org.elasticsearch.index.query.QueryBuilders.termQuery; 024 025/** 026 * Publish action 027 */ 028public class PublishAction extends PublishSubscribe { 029 030 protected final static String TYPE = "publish"; 031 032 @Inject 033 public PublishAction(Settings settings, 034 Client client, 035 HttpServerTransport transport, 036 InteractiveController controller, 037 Checkpointer service) { 038 super(settings, client, transport, controller, service); 039 controller.registerHandler(TYPE, this); 040 } 041 042 @Override 043 public void handleRequest(final InteractiveRequest request, final InteractiveChannel channel) { 044 String topic = request.hasParam("topic") ? request.paramAsString("topic") : "*"; 045 try { 046 // advertise phase - save message in the index (for disconnected subscribers) 047 final XContentBuilder messageBuilder = createPublishMessage(request); 048 final XContentBuilder responseBuilder = jsonBuilder().startObject(); 049 IndexResponse indexResponse = client.prepareIndex() 050 .setIndex(pubSubIndexName) 051 .setType(TYPE) 052 .setSource(messageBuilder) 053 .setRefresh(request.paramAsBoolean("refresh", true)) 054 .execute().actionGet(); 055 responseBuilder.field("id", indexResponse.getId()); 056 // push phase - scroll over subscribers for this topic currently connected 057 QueryBuilder queryBuilder = termQuery("topic", topic); 058 SearchResponse searchResponse = client.prepareSearch() 059 .setIndices(pubSubIndexName) 060 .setTypes("subscribe") 061 .setSearchType(SearchType.SCAN) 062 .setScroll(new TimeValue(60000)) 063 .setQuery(queryBuilder) 064 .addField("subscriber.channel") 065 .setSize(100) 066 .execute().actionGet(); 067 boolean failed = searchResponse.getFailedShards() > 0 || searchResponse.isTimedOut(); 068 if (failed) { 069 logger.error("searching for subscribers for topic {} failed: failed shards={} timeout={}", 070 topic, searchResponse.getFailedShards(), searchResponse.isTimedOut()); 071 responseBuilder.field("subscribers", 0).field("failed", true); 072 channel.sendResponse(TYPE, responseBuilder.endObject()); 073 responseBuilder.close(); 074 return; 075 } 076 // look for subscribers 077 long totalHits = searchResponse.getHits().getTotalHits(); 078 boolean zero = totalHits == 0L; 079 if (zero) { 080 responseBuilder.field("subscribers", 0).field("failed", false); 081 channel.sendResponse(TYPE, responseBuilder.endObject()); 082 responseBuilder.close(); 083 return; 084 } 085 // report the total number of subscribers online to the publisher 086 responseBuilder.field("subscribers", totalHits); 087 channel.sendResponse(TYPE, responseBuilder.endObject()); 088 messageBuilder.close(); 089 responseBuilder.close(); 090 // checkpoint topic 091 service.checkpoint(topic); 092 // push phase - write the message to the subscribers. We have 60 seconds per 100 subscribers. 093 while (true) { 094 searchResponse = client.prepareSearchScroll(searchResponse.getScrollId()) 095 .setScroll(new TimeValue(60000)) 096 .execute().actionGet(); 097 for (SearchHit hit : searchResponse.getHits()) { 098 // for message sync - update all subscribers with the current timestamp 099 service.checkpoint(hit.getId()); 100 // find node address and channel ID 101 SearchHitField channelField = hit.field("subscriber.channel"); 102 Map<String, Object> channelfieldMap = channelField.getValue(); 103 String nodeAddress = (String) channelfieldMap.get("localAddress"); 104 Integer id = (Integer) channelfieldMap.get("id"); 105 // forward to node 106 transport.forward(nodeAddress, id, messageBuilder); 107 } 108 if (searchResponse.getHits().hits().length == 0) { 109 break; 110 } 111 } 112 service.flushCheckpoint(); 113 } catch (Exception e) { 114 logger.error("exception while processing publish request", e); 115 try { 116 channel.sendResponse(TYPE, e); 117 } catch (IOException e1) { 118 logger.error("exception while sending exception response", e1); 119 } 120 } 121 } 122 123 private XContentBuilder createPublishMessage(InteractiveRequest request) { 124 try { 125 return jsonBuilder().startObject() 126 .field("timestamp", request.paramAsLong("timestamp", System.currentTimeMillis())) 127 .field("data", request.asMap()) 128 .endObject(); 129 } catch (IOException e) { 130 return null; 131 } 132 } 133 134}