001/*
002 * Copyright (C) 2014 Jörg Prante
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.xbib.elasticsearch.rest.action.river.jdbc;
017
018import org.elasticsearch.client.Client;
019import org.elasticsearch.common.inject.Inject;
020import org.elasticsearch.common.settings.Settings;
021import org.elasticsearch.common.xcontent.XContentHelper;
022import org.elasticsearch.rest.BaseRestHandler;
023import org.elasticsearch.rest.BytesRestResponse;
024import org.elasticsearch.rest.RestChannel;
025import org.elasticsearch.rest.RestController;
026import org.elasticsearch.rest.RestHandler;
027import org.elasticsearch.rest.RestRequest;
028import org.elasticsearch.rest.RestStatus;
029import org.elasticsearch.rest.action.support.RestToXContentListener;
030import org.xbib.elasticsearch.action.plugin.jdbc.state.delete.DeleteRiverStateAction;
031import org.xbib.elasticsearch.action.plugin.jdbc.state.delete.DeleteRiverStateRequest;
032import org.xbib.elasticsearch.action.plugin.jdbc.state.delete.DeleteRiverStateResponse;
033import org.xbib.elasticsearch.action.plugin.jdbc.state.get.GetRiverStateAction;
034import org.xbib.elasticsearch.action.plugin.jdbc.state.get.GetRiverStateRequest;
035import org.xbib.elasticsearch.action.plugin.jdbc.state.get.GetRiverStateResponse;
036import org.xbib.elasticsearch.action.plugin.jdbc.state.post.PostRiverStateAction;
037import org.xbib.elasticsearch.action.plugin.jdbc.state.post.PostRiverStateRequest;
038import org.xbib.elasticsearch.action.plugin.jdbc.state.post.PostRiverStateResponse;
039import org.xbib.elasticsearch.plugin.jdbc.state.RiverState;
040
041import java.io.IOException;
042
043public class RestRiverStateAction extends BaseRestHandler {
044
045    private final Client client;
046
047    @Inject
048    public RestRiverStateAction(Settings settings, Client client, RestController controller) {
049        super(settings, client);
050        this.client = client;
051
052        controller.registerHandler(RestRequest.Method.GET,
053                "/_river/jdbc/{rivername}/_state", new Get());
054        controller.registerHandler(RestRequest.Method.POST,
055                "/_river/jdbc/{rivername}/_state", new Post(false, false, false));
056        controller.registerHandler(RestRequest.Method.DELETE,
057                "/_river/jdbc/{rivername}/_state", new Delete());
058
059        controller.registerHandler(RestRequest.Method.POST,
060                "/_river/jdbc/{rivername}/_abort", new Post(true, false, false));
061        controller.registerHandler(RestRequest.Method.POST,
062                "/_river/jdbc/{rivername}/_suspend", new Post(false, true, false));
063        controller.registerHandler(RestRequest.Method.POST,
064                "/_river/jdbc/{rivername}/_resume", new Post(false, false, true));
065    }
066
067    @Override
068    protected void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception {
069        channel.sendResponse(new BytesRestResponse(RestStatus.NOT_IMPLEMENTED));
070    }
071
072    class Get implements RestHandler {
073
074        @Override
075        public void handleRequest(RestRequest request, RestChannel channel) throws Exception {
076            try {
077                String riverName = request.param("rivername");
078                String riverType = "jdbc";
079                GetRiverStateRequest riverStateRequest = new GetRiverStateRequest();
080                riverStateRequest.setRiverName(riverName).setRiverType(riverType);
081                client.admin().cluster().execute(GetRiverStateAction.INSTANCE, riverStateRequest,
082                        new RestToXContentListener<GetRiverStateResponse>(channel));
083            } catch (Throwable t) {
084                logger.error(t.getMessage(), t);
085                try {
086                    channel.sendResponse(new BytesRestResponse(channel, t));
087                } catch (IOException e) {
088                    channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR));
089                }
090            }
091        }
092    }
093
094    class Post implements RestHandler {
095
096        boolean abort;
097        boolean suspend;
098        boolean resume;
099
100        Post(boolean abort, boolean suspend, boolean resume) {
101            this.abort = abort;
102            this.resume = resume;
103            this.suspend = suspend;
104        }
105
106        @Override
107        public void handleRequest(RestRequest request, RestChannel channel) throws Exception {
108            try {
109                String riverName = request.param("rivername");
110                String riverType = "jdbc";
111                PostRiverStateRequest postRiverStateRequest = new PostRiverStateRequest();
112                postRiverStateRequest.setRiverName(riverName).setRiverType(riverType);
113                if (request.hasContent()) {
114                    RiverState riverState = new RiverState().setName(riverName).setType(riverType);
115                    riverState.setMap(XContentHelper.convertToMap(request.content(), true).v2());
116                    postRiverStateRequest.setRiverState(riverState);
117                }
118                if (abort) {
119                    postRiverStateRequest.setAbort();
120                }
121                if (suspend) {
122                    postRiverStateRequest.setSuspend();
123                }
124                if (resume) {
125                    postRiverStateRequest.setResume();
126                }
127                client.admin().cluster().execute(PostRiverStateAction.INSTANCE, postRiverStateRequest,
128                        new RestToXContentListener<PostRiverStateResponse>(channel));
129            } catch (Throwable t) {
130                logger.error(t.getMessage(), t);
131                try {
132                    channel.sendResponse(new BytesRestResponse(channel, t));
133                } catch (IOException e) {
134                    channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR));
135                }
136            }
137        }
138    }
139
140    class Delete implements RestHandler {
141
142        @Override
143        public void handleRequest(RestRequest request, RestChannel channel) throws Exception {
144            try {
145                String riverName = request.param("rivername");
146                String riverType = "jdbc";
147                DeleteRiverStateRequest riverStateRequest = new DeleteRiverStateRequest();
148                riverStateRequest.setRiverName(riverName).setRiverType(riverType);
149                client.admin().cluster().execute(DeleteRiverStateAction.INSTANCE, riverStateRequest,
150                        new RestToXContentListener<DeleteRiverStateResponse>(channel));
151            } catch (Throwable t) {
152                logger.error(t.getMessage(), t);
153                try {
154                    channel.sendResponse(new BytesRestResponse(channel, t));
155                } catch (IOException e) {
156                    channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR));
157                }
158            }
159        }
160    }
161}