001package org.xbib.elasticsearch.action.websocket.pubsub; 002 003import org.elasticsearch.action.ActionListener; 004import org.elasticsearch.action.delete.DeleteResponse; 005import org.elasticsearch.client.Client; 006import org.elasticsearch.common.inject.Inject; 007import org.elasticsearch.common.settings.Settings; 008import org.elasticsearch.common.xcontent.XContentBuilder; 009import org.xbib.elasticsearch.websocket.BaseInteractiveHandler; 010import org.xbib.elasticsearch.websocket.InteractiveChannel; 011import org.xbib.elasticsearch.websocket.InteractiveController; 012import org.xbib.elasticsearch.websocket.InteractiveRequest; 013 014import java.io.IOException; 015 016import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; 017 018/** 019 * Unsubscribe action. It removes a subscription. 020 */ 021public class UnsubscribeAction extends BaseInteractiveHandler { 022 023 private final String TYPE = "unsubscribe"; 024 private final String pubSubIndexName; 025 026 @Inject 027 public UnsubscribeAction(Settings settings, 028 Client client, 029 InteractiveController controller) { 030 super(settings, client); 031 this.pubSubIndexName = PubSubIndexName.Conf.indexName(settings); 032 controller.registerHandler(TYPE, this); 033 } 034 035 @Override 036 public void handleRequest(final InteractiveRequest request, final InteractiveChannel channel) { 037 String subscriberId = request.hasParam("subscriber") ? request.paramAsString("subscriber") : null; 038 if (subscriberId == null) { 039 try { 040 channel.sendResponse(TYPE, new IllegalArgumentException("no subscriber")); 041 } catch (IOException e) { 042 logger.error("error while sending failure response", e); 043 } 044 } 045 try { 046 client.prepareDelete(pubSubIndexName, SubscribeAction.TYPE, subscriberId) 047 .execute(new ActionListener<DeleteResponse>() { 048 @Override 049 public void onResponse(DeleteResponse response) { 050 try { 051 XContentBuilder builder = jsonBuilder(); 052 builder.startObject().field("ok", true).field("id", response.getId()).endObject(); 053 channel.sendResponse(TYPE, builder); 054 } catch (Exception e) { 055 onFailure(e); 056 } 057 } 058 059 @Override 060 public void onFailure(Throwable e) { 061 logger.error("error while processing unsubscribe request", e); 062 try { 063 channel.sendResponse(TYPE, e); 064 } catch (IOException ex) { 065 logger.error("error while sending error response", ex); 066 } 067 } 068 }); 069 } catch (Exception e) { 070 logger.error("exception while processing unsubscribe request", e); 071 try { 072 channel.sendResponse(TYPE, e); 073 } catch (IOException e1) { 074 logger.error("exception while sending exception response", e1); 075 } 076 } 077 } 078}