001package org.xbib.elasticsearch.action.websocket.pubsub;
002
003import org.elasticsearch.action.index.IndexResponse;
004import org.elasticsearch.action.search.SearchResponse;
005import org.elasticsearch.action.search.SearchType;
006import org.elasticsearch.client.Client;
007import org.elasticsearch.common.inject.Inject;
008import org.elasticsearch.common.settings.Settings;
009import org.elasticsearch.common.unit.TimeValue;
010import org.elasticsearch.common.xcontent.XContentBuilder;
011import org.elasticsearch.index.query.QueryBuilder;
012import org.elasticsearch.search.SearchHit;
013import org.elasticsearch.search.SearchHitField;
014import org.xbib.elasticsearch.websocket.InteractiveChannel;
015import org.xbib.elasticsearch.websocket.InteractiveController;
016import org.xbib.elasticsearch.websocket.InteractiveRequest;
017import org.xbib.elasticsearch.http.HttpServerTransport;
018
019import java.io.IOException;
020import java.util.Map;
021
022import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
023import static org.elasticsearch.index.query.QueryBuilders.termQuery;
024
025/**
026 * Publish action
027 */
028public class PublishAction extends PublishSubscribe {
029
030    protected final static String TYPE = "publish";
031
032    @Inject
033    public PublishAction(Settings settings,
034                         Client client,
035                         HttpServerTransport transport,
036                         InteractiveController controller,
037                         Checkpointer service) {
038        super(settings, client, transport, controller, service);
039        controller.registerHandler(TYPE, this);
040    }
041
042    @Override
043    public void handleRequest(final InteractiveRequest request, final InteractiveChannel channel) {
044        String topic = request.hasParam("topic") ? request.paramAsString("topic") : "*";
045        try {
046            // advertise phase - save message in the index (for disconnected subscribers)
047            final XContentBuilder messageBuilder = createPublishMessage(request);
048            final XContentBuilder responseBuilder = jsonBuilder().startObject();
049            IndexResponse indexResponse = client.prepareIndex()
050                    .setIndex(pubSubIndexName)
051                    .setType(TYPE)
052                    .setSource(messageBuilder)
053                    .setRefresh(request.paramAsBoolean("refresh", true))
054                    .execute().actionGet();
055            responseBuilder.field("id", indexResponse.getId());
056            // push phase - scroll over subscribers for this topic currently connected
057            QueryBuilder queryBuilder = termQuery("topic", topic);
058            SearchResponse searchResponse = client.prepareSearch()
059                    .setIndices(pubSubIndexName)
060                    .setTypes("subscribe")
061                    .setSearchType(SearchType.SCAN)
062                    .setScroll(new TimeValue(60000))
063                    .setQuery(queryBuilder)
064                    .addField("subscriber.channel")
065                    .setSize(100)
066                    .execute().actionGet();
067            boolean failed = searchResponse.getFailedShards() > 0 || searchResponse.isTimedOut();
068            if (failed) {
069                logger.error("searching for subscribers for topic {} failed: failed shards={} timeout={}",
070                        topic, searchResponse.getFailedShards(), searchResponse.isTimedOut());
071                responseBuilder.field("subscribers", 0).field("failed", true);
072                channel.sendResponse(TYPE, responseBuilder.endObject());
073                responseBuilder.close();
074                return;
075            }
076            // look for subscribers
077            long totalHits = searchResponse.getHits().getTotalHits();
078            boolean zero = totalHits == 0L;
079            if (zero) {
080                responseBuilder.field("subscribers", 0).field("failed", false);
081                channel.sendResponse(TYPE, responseBuilder.endObject());
082                responseBuilder.close();
083                return;
084            }
085            // report the total number of subscribers online to the publisher
086            responseBuilder.field("subscribers", totalHits);
087            channel.sendResponse(TYPE, responseBuilder.endObject());
088            messageBuilder.close();
089            responseBuilder.close();
090            // checkpoint topic
091            service.checkpoint(topic);
092            // push phase - write the message to the subscribers. We have 60 seconds per 100 subscribers.
093            while (true) {
094                searchResponse = client.prepareSearchScroll(searchResponse.getScrollId())
095                        .setScroll(new TimeValue(60000))
096                        .execute().actionGet();
097                for (SearchHit hit : searchResponse.getHits()) {
098                    // for message sync - update all subscribers with the current timestamp
099                    service.checkpoint(hit.getId());
100                    // find node address and channel ID
101                    SearchHitField channelField = hit.field("subscriber.channel");
102                    Map<String, Object> channelfieldMap = channelField.getValue();
103                    String nodeAddress = (String) channelfieldMap.get("localAddress");
104                    Integer id = (Integer) channelfieldMap.get("id");
105                    // forward to node
106                    transport.forward(nodeAddress, id, messageBuilder);
107                }
108                if (searchResponse.getHits().hits().length == 0) {
109                    break;
110                }
111            }
112            service.flushCheckpoint();
113        } catch (Exception e) {
114            logger.error("exception while processing publish request", e);
115            try {
116                channel.sendResponse(TYPE, e);
117            } catch (IOException e1) {
118                logger.error("exception while sending exception response", e1);
119            }
120        }
121    }
122
123    private XContentBuilder createPublishMessage(InteractiveRequest request) {
124        try {
125            return jsonBuilder().startObject()
126                    .field("timestamp", request.paramAsLong("timestamp", System.currentTimeMillis()))
127                    .field("data", request.asMap())
128                    .endObject();
129        } catch (IOException e) {
130            return null;
131        }
132    }
133
134}