001package org.xbib.elasticsearch.rest.action.websocket; 002 003import org.elasticsearch.action.ActionListener; 004import org.elasticsearch.action.delete.DeleteResponse; 005import org.elasticsearch.client.Client; 006import org.elasticsearch.common.inject.Inject; 007import org.elasticsearch.common.settings.Settings; 008import org.elasticsearch.common.xcontent.XContentBuilder; 009import org.elasticsearch.rest.BaseRestHandler; 010import org.elasticsearch.rest.RestChannel; 011import org.elasticsearch.rest.RestController; 012import org.elasticsearch.rest.RestRequest; 013import org.xbib.elasticsearch.rest.XContentRestResponse; 014import org.xbib.elasticsearch.rest.XContentThrowableRestResponse; 015import org.xbib.elasticsearch.action.websocket.pubsub.PubSubIndexName; 016 017import java.io.IOException; 018 019import static org.elasticsearch.rest.RestStatus.BAD_REQUEST; 020import static org.elasticsearch.rest.RestStatus.OK; 021import static org.xbib.elasticsearch.rest.RestXContentBuilder.restContentBuilder; 022 023/** 024 * Unsubscribe action for REST 025 */ 026public class RestUnsubscribeAction extends BaseRestHandler { 027 028 private final String pubSubIndexName; 029 030 @Inject 031 public RestUnsubscribeAction(Settings settings, Client client, 032 RestController restController) { 033 super(settings, client); 034 this.pubSubIndexName = PubSubIndexName.Conf.indexName(settings); 035 restController.registerHandler(RestRequest.Method.GET, "/_unsubscribe", this); 036 restController.registerHandler(RestRequest.Method.POST, "/_unsubscribe", this); 037 } 038 039 @Override 040 public void handleRequest(final RestRequest request, final RestChannel channel, Client client) { 041 String subscriberId = request.hasParam("subscriber") ? request.param("subscriber") : null; 042 if (subscriberId == null) { 043 try { 044 channel.sendResponse(new XContentThrowableRestResponse(request, new IllegalArgumentException("no subscriber"))); 045 } catch (IOException e) { 046 logger.error("error while sending failure response", e); 047 } 048 return; 049 } 050 try { 051 client.prepareDelete(pubSubIndexName, "subscribe", subscriberId) 052 .execute(new ActionListener<DeleteResponse>() { 053 @Override 054 public void onResponse(DeleteResponse response) { 055 try { 056 XContentBuilder builder = restContentBuilder(request); 057 builder.startObject().field("ok", true).field("id", response.getId()).endObject(); 058 channel.sendResponse(new XContentRestResponse(request, OK, builder)); 059 } catch (Exception e) { 060 onFailure(e); 061 } 062 } 063 064 @Override 065 public void onFailure(Throwable e) { 066 try { 067 logger.error("error while processing unsubscribe request", e); 068 channel.sendResponse(new XContentThrowableRestResponse(request, e)); 069 } catch (IOException e1) { 070 logger.error("error while sending error response", e1); 071 } 072 } 073 }); 074 } catch (Exception e) { 075 try { 076 XContentBuilder builder = restContentBuilder(request); 077 channel.sendResponse(new XContentRestResponse(request, BAD_REQUEST, builder.startObject().field("error", e.getMessage()).endObject())); 078 } catch (IOException e1) { 079 logger.error("exception while sending exception response", e1); 080 } 081 } 082 } 083 084}