001package org.xbib.elasticsearch.action.websocket.pubsub;
002
003import org.elasticsearch.action.ActionListener;
004import org.elasticsearch.action.index.IndexResponse;
005import org.elasticsearch.action.search.SearchResponse;
006import org.elasticsearch.action.search.SearchType;
007import org.elasticsearch.client.Client;
008import org.elasticsearch.common.inject.Inject;
009import org.elasticsearch.common.settings.Settings;
010import org.elasticsearch.common.xcontent.XContentBuilder;
011import org.elasticsearch.index.query.QueryBuilder;
012import org.elasticsearch.index.query.RangeFilterBuilder;
013import org.elasticsearch.search.SearchHit;
014import org.jboss.netty.channel.Channel;
015import org.xbib.elasticsearch.websocket.InteractiveChannel;
016import org.xbib.elasticsearch.websocket.InteractiveController;
017import org.xbib.elasticsearch.websocket.InteractiveRequest;
018import org.xbib.elasticsearch.http.HttpServerTransport;
019import org.xbib.elasticsearch.http.netty.NettyInteractiveResponse;
020
021import java.io.IOException;
022import java.util.Map;
023
024import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
025import static org.elasticsearch.index.query.FilterBuilders.rangeFilter;
026import static org.elasticsearch.index.query.QueryBuilders.termQuery;
027
028/**
029 * Subscribe action. It performs the subscription of a client to
030 * the pubsub index under a given topic.
031 */
032public class SubscribeAction extends PublishSubscribe {
033
034    protected final static String TYPE = "subscribe";
035
036    @Inject
037    public SubscribeAction(Settings settings,
038                           Client client,
039                           HttpServerTransport transport,
040                           InteractiveController controller,
041                           Checkpointer service) {
042        super(settings, client, transport, controller, service);
043        controller.registerHandler(TYPE, this);
044    }
045
046    @Override
047    public void handleRequest(final InteractiveRequest request, final InteractiveChannel channel) {
048        final String topic = request.hasParam("topic") ? request.paramAsString("topic") : "*";
049        final String subscriberId = request.hasParam("subscriber") ? request.paramAsString("subscriber") : null;
050        if (subscriberId == null) {
051            try {
052                channel.sendResponse(TYPE, new IllegalArgumentException("no subscriber"));
053            } catch (IOException e) {
054                logger.error("error while sending failure response", e);
055            }
056        }
057        try {
058            client.prepareIndex()
059                    .setIndex(pubSubIndexName)
060                    .setType(TYPE)
061                    .setId(subscriberId)
062                    .setSource(createSubscriberMessage(topic, channel))
063                    .setRefresh(request.paramAsBoolean("refresh", true))
064                    .execute(new ActionListener<IndexResponse>() {
065                        @Override
066                        public void onResponse(IndexResponse response) {
067                            try {
068                                XContentBuilder builder = jsonBuilder();
069                                builder.startObject().field("ok", true).field("id", response.getId()).endObject();
070                                channel.sendResponse(TYPE, builder);
071                                // receive outstanding messages
072                                sync(subscriberId, topic, channel.getChannel());
073                            } catch (Exception e) {
074                                onFailure(e);
075                            }
076                        }
077
078                        @Override
079                        public void onFailure(Throwable e) {
080                            logger.error("error while processing subscribe request", e);
081                            try {
082                                channel.sendResponse(TYPE, e);
083                            } catch (IOException ex) {
084                                logger.error("error while sending error response", ex);
085                            }
086                        }
087                    });
088        } catch (Exception e) {
089            logger.error("exception while processing subscribe request", e);
090            try {
091                channel.sendResponse(TYPE, e);
092            } catch (IOException e1) {
093                logger.error("exception while sending exception response", e1);
094            }
095        }
096    }
097
098    private XContentBuilder createSubscriberMessage(String topic, InteractiveChannel channel) {
099        Integer channelId = channel.getChannel().getId();
100        String localAddress = channel.getChannel().getLocalAddress().toString();
101        String remoteAddress = channel.getChannel().getRemoteAddress().toString();
102        try {
103            return jsonBuilder()
104                    .startObject()
105                    .field("topic", topic)
106                    .startObject("subscriber")
107                    .startObject("channel")
108                    .field("id", channelId)
109                    .field("localAddress", localAddress)
110                    .field("remoteAddress", remoteAddress)
111                    .endObject()
112                    .endObject()
113                    .endObject();
114        } catch (IOException e) {
115            return null;
116        }
117    }
118
119    /**
120     * Synchronize the subscriber with the current messages.
121     *
122     * @param subscriberId
123     * @param topic
124     * @param channel
125     * @throws IOException
126     */
127    private void sync(final String subscriberId, final String topic, final Channel channel) throws IOException {
128        Long lastSeen = service.checkpointedAt(subscriberId);
129        Long topicSeen = service.checkpointedAt(topic);
130        // if client appearance is later than topic, do not search for any messages
131        if (lastSeen == null || topicSeen == null || lastSeen >= topicSeen) {
132            return;
133        }
134        // message sync - update subscriber with the current timestamp
135        service.checkpoint(subscriberId);
136        service.flushCheckpoint();
137        // there are unreceived messages, get all outstanding messages since last seen
138        QueryBuilder queryBuilder = termQuery("topic", topic);
139        RangeFilterBuilder filterBuilder = rangeFilter("timestamp").gte(lastSeen);
140        SearchResponse searchResponse = client.prepareSearch()
141                .setIndices(pubSubIndexName)
142                .setTypes("publish")
143                .setSearchType(SearchType.SCAN)
144                .setScroll(scrollTimeout)
145                .setQuery(queryBuilder)
146                .setPostFilter(filterBuilder)
147                .addField("data")
148                .addField("timestamp")
149                .setSize(scrollSize)
150                .execute().actionGet();
151        boolean failed = searchResponse.getFailedShards() > 0 || searchResponse.isTimedOut();
152        if (failed) {
153            logger.error("searching for messages for topic {} failed: failed shards={} timeout={}",
154                    topic, searchResponse.getFailedShards(), searchResponse.isTimedOut());
155            return;
156        }
157        // look for messages
158        long totalHits = searchResponse.getHits().getTotalHits();
159        boolean zero = totalHits == 0L;
160        if (zero) {
161            return;
162        }
163        // slurp in all outstanding messages
164        while (true) {
165            searchResponse = client.prepareSearchScroll(searchResponse.getScrollId())
166                    .setScroll(scrollTimeout)
167                    .execute().actionGet();
168            for (SearchHit hit : searchResponse.getHits()) {
169                Long timestamp = (Long) hit.field("timestamp").getValues().get(0);
170                Map<String, Object> data = hit.field("data").getValue();
171                channel.write(new NettyInteractiveResponse("message", createPublishMessage(timestamp, data)).response());
172            }
173            if (searchResponse.getHits().hits().length == 0) {
174                break;
175            }
176        }
177    }
178
179    private XContentBuilder createPublishMessage(long timestamp, Map<String, Object> data) {
180        try {
181            return jsonBuilder().startObject()
182                    .field("timestamp", timestamp)
183                    .field("data", data)
184                    .endObject();
185        } catch (IOException e) {
186            return null;
187        }
188    }
189}