001package org.xbib.elasticsearch.action.websocket.pubsub;
002
003import org.elasticsearch.action.ActionListener;
004import org.elasticsearch.action.delete.DeleteResponse;
005import org.elasticsearch.client.Client;
006import org.elasticsearch.common.inject.Inject;
007import org.elasticsearch.common.settings.Settings;
008import org.elasticsearch.common.xcontent.XContentBuilder;
009import org.xbib.elasticsearch.websocket.BaseInteractiveHandler;
010import org.xbib.elasticsearch.websocket.InteractiveChannel;
011import org.xbib.elasticsearch.websocket.InteractiveController;
012import org.xbib.elasticsearch.websocket.InteractiveRequest;
013
014import java.io.IOException;
015
016import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
017
018/**
019 * Unsubscribe action. It removes a subscription.
020 */
021public class UnsubscribeAction extends BaseInteractiveHandler {
022
023    private final String TYPE = "unsubscribe";
024    private final String pubSubIndexName;
025
026    @Inject
027    public UnsubscribeAction(Settings settings,
028                             Client client,
029                             InteractiveController controller) {
030        super(settings, client);
031        this.pubSubIndexName = PubSubIndexName.Conf.indexName(settings);
032        controller.registerHandler(TYPE, this);
033    }
034
035    @Override
036    public void handleRequest(final InteractiveRequest request, final InteractiveChannel channel) {
037        String subscriberId = request.hasParam("subscriber") ? request.paramAsString("subscriber") : null;
038        if (subscriberId == null) {
039            try {
040                channel.sendResponse(TYPE, new IllegalArgumentException("no subscriber"));
041            } catch (IOException e) {
042                logger.error("error while sending failure response", e);
043            }
044        }
045        try {
046            client.prepareDelete(pubSubIndexName, SubscribeAction.TYPE, subscriberId)
047                    .execute(new ActionListener<DeleteResponse>() {
048                        @Override
049                        public void onResponse(DeleteResponse response) {
050                            try {
051                                XContentBuilder builder = jsonBuilder();
052                                builder.startObject().field("ok", true).field("id", response.getId()).endObject();
053                                channel.sendResponse(TYPE, builder);
054                            } catch (Exception e) {
055                                onFailure(e);
056                            }
057                        }
058
059                        @Override
060                        public void onFailure(Throwable e) {
061                            logger.error("error while processing unsubscribe request", e);
062                            try {
063                                channel.sendResponse(TYPE, e);
064                            } catch (IOException ex) {
065                                logger.error("error while sending error response", ex);
066                            }
067                        }
068                    });
069        } catch (Exception e) {
070            logger.error("exception while processing unsubscribe request", e);
071            try {
072                channel.sendResponse(TYPE, e);
073            } catch (IOException e1) {
074                logger.error("exception while sending exception response", e1);
075            }
076        }
077    }
078}