001package org.xbib.elasticsearch.rest.action.websocket; 002 003import org.elasticsearch.action.ActionListener; 004import org.elasticsearch.action.index.IndexResponse; 005import org.elasticsearch.action.search.SearchResponse; 006import org.elasticsearch.action.search.SearchType; 007import org.elasticsearch.client.Client; 008import org.elasticsearch.common.inject.Inject; 009import org.elasticsearch.common.settings.Settings; 010import org.elasticsearch.common.unit.TimeValue; 011import org.elasticsearch.common.xcontent.XContentBuilder; 012import org.elasticsearch.common.xcontent.XContentFactory; 013import org.elasticsearch.common.xcontent.XContentParser; 014import org.elasticsearch.index.query.QueryBuilder; 015import org.elasticsearch.rest.BaseRestHandler; 016import org.elasticsearch.rest.RestChannel; 017import org.elasticsearch.rest.RestController; 018import org.elasticsearch.rest.RestRequest; 019import org.elasticsearch.search.SearchHit; 020import org.elasticsearch.search.SearchHitField; 021import org.jboss.netty.channel.Channel; 022import org.xbib.elasticsearch.action.websocket.pubsub.Checkpointer; 023import org.xbib.elasticsearch.action.websocket.pubsub.PubSubIndexName; 024import org.xbib.elasticsearch.http.HttpServerTransport; 025import org.xbib.elasticsearch.http.netty.NettyInteractiveResponse; 026import org.xbib.elasticsearch.rest.XContentRestResponse; 027import org.xbib.elasticsearch.rest.XContentThrowableRestResponse; 028 029import java.io.IOException; 030import java.util.Map; 031 032import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; 033import static org.elasticsearch.index.query.QueryBuilders.termQuery; 034import static org.elasticsearch.rest.RestStatus.BAD_REQUEST; 035import static org.elasticsearch.rest.RestStatus.OK; 036import static org.xbib.elasticsearch.rest.RestXContentBuilder.restContentBuilder; 037 038/** 039 * Publish action for REST 040 */ 041public class RestPublishAction extends BaseRestHandler { 042 043 private final static String TYPE = "publish"; 044 045 private final String pubSubIndexName; 046 047 private final HttpServerTransport transport; 048 049 private final Checkpointer service; 050 051 @Inject 052 public RestPublishAction(Settings settings, Client client, 053 RestController restController, 054 HttpServerTransport transport, 055 Checkpointer service) { 056 super(settings, client); 057 this.pubSubIndexName = PubSubIndexName.Conf.indexName(settings); 058 this.transport = transport; 059 this.service = service; 060 restController.registerHandler(RestRequest.Method.GET, "/_publish", this); 061 restController.registerHandler(RestRequest.Method.POST, "/_publish", this); 062 } 063 064 @Override 065 public void handleRequest(final RestRequest request, final RestChannel channel, Client client) { 066 String topic = request.hasParam("topic") ? request.param("topic") : "*"; 067 try { 068 final XContentBuilder messageBuilder = createPublishMessage(request); 069 client.prepareIndex() 070 .setIndex(pubSubIndexName) 071 .setType(TYPE) 072 .setSource(messageBuilder) 073 .setRefresh(request.paramAsBoolean("refresh", true)) 074 .execute(new ActionListener<IndexResponse>() { 075 @Override 076 public void onResponse(IndexResponse response) { 077 try { 078 XContentBuilder builder = restContentBuilder(request); 079 builder.startObject().field("ok", true).field("id", response.getId()).endObject(); 080 channel.sendResponse(new XContentRestResponse(request, OK, builder)); 081 } catch (Exception e) { 082 onFailure(e); 083 } 084 } 085 086 @Override 087 public void onFailure(Throwable e) { 088 try { 089 logger.error("Error processing publish request", e); 090 channel.sendResponse(new XContentThrowableRestResponse(request, e)); 091 } catch (IOException e1) { 092 logger.error("Failed to send failure response", e1); 093 } 094 } 095 }); 096 // push phase - scroll over subscribers for this topic currently connected 097 QueryBuilder queryBuilder = termQuery("topic", topic); 098 SearchResponse searchResponse = client.prepareSearch() 099 .setIndices(pubSubIndexName) 100 .setTypes("subscribe") 101 .setSearchType(SearchType.SCAN) 102 .setScroll(new TimeValue(60000)) 103 .setQuery(queryBuilder) 104 .addField("subscriber.channel") 105 .setSize(100) 106 .execute().actionGet(); 107 messageBuilder.close(); 108 service.checkpoint(topic); 109 // push phase - write the message to the subscribers. We have 60 seconds per 100 subscribers. 110 while (true) { 111 searchResponse = client.prepareSearchScroll(searchResponse.getScrollId()) 112 .setScroll(new TimeValue(60000)) 113 .execute().actionGet(); 114 for (SearchHit hit : searchResponse.getHits()) { 115 service.checkpoint(hit.getId()); 116 SearchHitField channelField = hit.field("subscriber.channel"); 117 Map<String, Object> channelfieldMap = channelField.getValue(); 118 Integer id = (Integer) channelfieldMap.get("id"); 119 Channel ch = transport.channel(id); 120 if (ch != null) { 121 ch.write(new NettyInteractiveResponse("message", messageBuilder).response()); 122 } 123 } 124 if (searchResponse.getHits().hits().length == 0) { 125 break; 126 } 127 } 128 service.flushCheckpoint(); 129 } catch (Exception e) { 130 try { 131 XContentBuilder builder = restContentBuilder(request); 132 channel.sendResponse(new XContentRestResponse(request, BAD_REQUEST, builder.startObject().field("error", e.getMessage()).endObject())); 133 } catch (IOException e1) { 134 logger.error("Failed to send failure response", e1); 135 } 136 } 137 } 138 139 private XContentBuilder createPublishMessage(RestRequest request) { 140 try { 141 Map<String, Object> map = null; 142 String message = request.content().toUtf8(); 143 XContentParser parser = null; 144 try { 145 parser = XContentFactory.xContent(message).createParser(message); 146 map = parser.map(); 147 } catch (Exception e) { 148 logger.warn("unable to parse {}", message); 149 } finally { 150 parser = null; 151 } 152 return jsonBuilder().startObject() 153 .field("timestamp", request.param("timestamp", Long.toString(System.currentTimeMillis()))) 154 .field("message", map) 155 .endObject(); 156 } catch (IOException e) { 157 return null; 158 } 159 } 160}