001package org.xbib.elasticsearch.rest.action.websocket;
002
003import org.elasticsearch.action.ActionListener;
004import org.elasticsearch.action.index.IndexResponse;
005import org.elasticsearch.action.search.SearchResponse;
006import org.elasticsearch.action.search.SearchType;
007import org.elasticsearch.client.Client;
008import org.elasticsearch.common.inject.Inject;
009import org.elasticsearch.common.settings.Settings;
010import org.elasticsearch.common.unit.TimeValue;
011import org.elasticsearch.common.xcontent.XContentBuilder;
012import org.elasticsearch.common.xcontent.XContentFactory;
013import org.elasticsearch.common.xcontent.XContentParser;
014import org.elasticsearch.index.query.QueryBuilder;
015import org.elasticsearch.rest.BaseRestHandler;
016import org.elasticsearch.rest.RestChannel;
017import org.elasticsearch.rest.RestController;
018import org.elasticsearch.rest.RestRequest;
019import org.elasticsearch.search.SearchHit;
020import org.elasticsearch.search.SearchHitField;
021import org.jboss.netty.channel.Channel;
022import org.xbib.elasticsearch.action.websocket.pubsub.Checkpointer;
023import org.xbib.elasticsearch.action.websocket.pubsub.PubSubIndexName;
024import org.xbib.elasticsearch.http.HttpServerTransport;
025import org.xbib.elasticsearch.http.netty.NettyInteractiveResponse;
026import org.xbib.elasticsearch.rest.XContentRestResponse;
027import org.xbib.elasticsearch.rest.XContentThrowableRestResponse;
028
029import java.io.IOException;
030import java.util.Map;
031
032import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
033import static org.elasticsearch.index.query.QueryBuilders.termQuery;
034import static org.elasticsearch.rest.RestStatus.BAD_REQUEST;
035import static org.elasticsearch.rest.RestStatus.OK;
036import static org.xbib.elasticsearch.rest.RestXContentBuilder.restContentBuilder;
037
038/**
039 * Publish action for REST
040 */
041public class RestPublishAction extends BaseRestHandler {
042
043    private final static String TYPE = "publish";
044
045    private final String pubSubIndexName;
046
047    private final HttpServerTransport transport;
048
049    private final Checkpointer service;
050
051    @Inject
052    public RestPublishAction(Settings settings, Client client,
053                             RestController restController,
054                             HttpServerTransport transport,
055                             Checkpointer service) {
056        super(settings, client);
057        this.pubSubIndexName = PubSubIndexName.Conf.indexName(settings);
058        this.transport = transport;
059        this.service = service;
060        restController.registerHandler(RestRequest.Method.GET, "/_publish", this);
061        restController.registerHandler(RestRequest.Method.POST, "/_publish", this);
062    }
063
064    @Override
065    public void handleRequest(final RestRequest request, final RestChannel channel, Client client) {
066        String topic = request.hasParam("topic") ? request.param("topic") : "*";
067        try {
068            final XContentBuilder messageBuilder = createPublishMessage(request);
069            client.prepareIndex()
070                    .setIndex(pubSubIndexName)
071                    .setType(TYPE)
072                    .setSource(messageBuilder)
073                    .setRefresh(request.paramAsBoolean("refresh", true))
074                    .execute(new ActionListener<IndexResponse>() {
075                        @Override
076                        public void onResponse(IndexResponse response) {
077                            try {
078                                XContentBuilder builder = restContentBuilder(request);
079                                builder.startObject().field("ok", true).field("id", response.getId()).endObject();
080                                channel.sendResponse(new XContentRestResponse(request, OK, builder));
081                            } catch (Exception e) {
082                                onFailure(e);
083                            }
084                        }
085
086                        @Override
087                        public void onFailure(Throwable e) {
088                            try {
089                                logger.error("Error processing publish request", e);
090                                channel.sendResponse(new XContentThrowableRestResponse(request, e));
091                            } catch (IOException e1) {
092                                logger.error("Failed to send failure response", e1);
093                            }
094                        }
095                    });
096            // push phase - scroll over subscribers for this topic currently connected
097            QueryBuilder queryBuilder = termQuery("topic", topic);
098            SearchResponse searchResponse = client.prepareSearch()
099                    .setIndices(pubSubIndexName)
100                    .setTypes("subscribe")
101                    .setSearchType(SearchType.SCAN)
102                    .setScroll(new TimeValue(60000))
103                    .setQuery(queryBuilder)
104                    .addField("subscriber.channel")
105                    .setSize(100)
106                    .execute().actionGet();
107            messageBuilder.close();
108            service.checkpoint(topic);
109            // push phase - write the message to the subscribers. We have 60 seconds per 100 subscribers.
110            while (true) {
111                searchResponse = client.prepareSearchScroll(searchResponse.getScrollId())
112                        .setScroll(new TimeValue(60000))
113                        .execute().actionGet();
114                for (SearchHit hit : searchResponse.getHits()) {
115                    service.checkpoint(hit.getId());
116                    SearchHitField channelField = hit.field("subscriber.channel");
117                    Map<String, Object> channelfieldMap = channelField.getValue();
118                    Integer id = (Integer) channelfieldMap.get("id");
119                    Channel ch = transport.channel(id);
120                    if (ch != null) {
121                        ch.write(new NettyInteractiveResponse("message", messageBuilder).response());
122                    }
123                }
124                if (searchResponse.getHits().hits().length == 0) {
125                    break;
126                }
127            }
128            service.flushCheckpoint();
129        } catch (Exception e) {
130            try {
131                XContentBuilder builder = restContentBuilder(request);
132                channel.sendResponse(new XContentRestResponse(request, BAD_REQUEST, builder.startObject().field("error", e.getMessage()).endObject()));
133            } catch (IOException e1) {
134                logger.error("Failed to send failure response", e1);
135            }
136        }
137    }
138
139    private XContentBuilder createPublishMessage(RestRequest request) {
140        try {
141            Map<String, Object> map = null;
142            String message = request.content().toUtf8();
143            XContentParser parser = null;
144            try {
145                parser = XContentFactory.xContent(message).createParser(message);
146                map = parser.map();
147            } catch (Exception e) {
148                logger.warn("unable to parse {}", message);
149            } finally {
150                parser = null;
151            }
152            return jsonBuilder().startObject()
153                    .field("timestamp", request.param("timestamp", Long.toString(System.currentTimeMillis())))
154                    .field("message", map)
155                    .endObject();
156        } catch (IOException e) {
157            return null;
158        }
159    }
160}