001/* 002 * Copyright (C) 2014 Jörg Prante 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.xbib.elasticsearch.rest.action.river.jdbc; 017 018import org.elasticsearch.client.Client; 019import org.elasticsearch.common.inject.Inject; 020import org.elasticsearch.common.settings.Settings; 021import org.elasticsearch.common.xcontent.XContentHelper; 022import org.elasticsearch.rest.BaseRestHandler; 023import org.elasticsearch.rest.BytesRestResponse; 024import org.elasticsearch.rest.RestChannel; 025import org.elasticsearch.rest.RestController; 026import org.elasticsearch.rest.RestHandler; 027import org.elasticsearch.rest.RestRequest; 028import org.elasticsearch.rest.RestStatus; 029import org.elasticsearch.rest.action.support.RestToXContentListener; 030import org.xbib.elasticsearch.action.plugin.jdbc.state.delete.DeleteRiverStateAction; 031import org.xbib.elasticsearch.action.plugin.jdbc.state.delete.DeleteRiverStateRequest; 032import org.xbib.elasticsearch.action.plugin.jdbc.state.delete.DeleteRiverStateResponse; 033import org.xbib.elasticsearch.action.plugin.jdbc.state.get.GetRiverStateAction; 034import org.xbib.elasticsearch.action.plugin.jdbc.state.get.GetRiverStateRequest; 035import org.xbib.elasticsearch.action.plugin.jdbc.state.get.GetRiverStateResponse; 036import org.xbib.elasticsearch.action.plugin.jdbc.state.post.PostRiverStateAction; 037import org.xbib.elasticsearch.action.plugin.jdbc.state.post.PostRiverStateRequest; 038import org.xbib.elasticsearch.action.plugin.jdbc.state.post.PostRiverStateResponse; 039import org.xbib.elasticsearch.plugin.jdbc.state.RiverState; 040 041import java.io.IOException; 042 043public class RestRiverStateAction extends BaseRestHandler { 044 045 private final Client client; 046 047 @Inject 048 public RestRiverStateAction(Settings settings, Client client, RestController controller) { 049 super(settings, client); 050 this.client = client; 051 052 controller.registerHandler(RestRequest.Method.GET, 053 "/_river/jdbc/{rivername}/_state", new Get()); 054 controller.registerHandler(RestRequest.Method.POST, 055 "/_river/jdbc/{rivername}/_state", new Post(false, false, false)); 056 controller.registerHandler(RestRequest.Method.DELETE, 057 "/_river/jdbc/{rivername}/_state", new Delete()); 058 059 controller.registerHandler(RestRequest.Method.POST, 060 "/_river/jdbc/{rivername}/_abort", new Post(true, false, false)); 061 controller.registerHandler(RestRequest.Method.POST, 062 "/_river/jdbc/{rivername}/_suspend", new Post(false, true, false)); 063 controller.registerHandler(RestRequest.Method.POST, 064 "/_river/jdbc/{rivername}/_resume", new Post(false, false, true)); 065 } 066 067 @Override 068 protected void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception { 069 channel.sendResponse(new BytesRestResponse(RestStatus.NOT_IMPLEMENTED)); 070 } 071 072 class Get implements RestHandler { 073 074 @Override 075 public void handleRequest(RestRequest request, RestChannel channel) throws Exception { 076 try { 077 String riverName = request.param("rivername"); 078 String riverType = "jdbc"; 079 GetRiverStateRequest riverStateRequest = new GetRiverStateRequest(); 080 riverStateRequest.setRiverName(riverName).setRiverType(riverType); 081 client.admin().cluster().execute(GetRiverStateAction.INSTANCE, riverStateRequest, 082 new RestToXContentListener<GetRiverStateResponse>(channel)); 083 } catch (Throwable t) { 084 logger.error(t.getMessage(), t); 085 try { 086 channel.sendResponse(new BytesRestResponse(channel, t)); 087 } catch (IOException e) { 088 channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR)); 089 } 090 } 091 } 092 } 093 094 class Post implements RestHandler { 095 096 boolean abort; 097 boolean suspend; 098 boolean resume; 099 100 Post(boolean abort, boolean suspend, boolean resume) { 101 this.abort = abort; 102 this.resume = resume; 103 this.suspend = suspend; 104 } 105 106 @Override 107 public void handleRequest(RestRequest request, RestChannel channel) throws Exception { 108 try { 109 String riverName = request.param("rivername"); 110 String riverType = "jdbc"; 111 PostRiverStateRequest postRiverStateRequest = new PostRiverStateRequest(); 112 postRiverStateRequest.setRiverName(riverName).setRiverType(riverType); 113 if (request.hasContent()) { 114 RiverState riverState = new RiverState().setName(riverName).setType(riverType); 115 riverState.setMap(XContentHelper.convertToMap(request.content(), true).v2()); 116 postRiverStateRequest.setRiverState(riverState); 117 } 118 if (abort) { 119 postRiverStateRequest.setAbort(); 120 } 121 if (suspend) { 122 postRiverStateRequest.setSuspend(); 123 } 124 if (resume) { 125 postRiverStateRequest.setResume(); 126 } 127 client.admin().cluster().execute(PostRiverStateAction.INSTANCE, postRiverStateRequest, 128 new RestToXContentListener<PostRiverStateResponse>(channel)); 129 } catch (Throwable t) { 130 logger.error(t.getMessage(), t); 131 try { 132 channel.sendResponse(new BytesRestResponse(channel, t)); 133 } catch (IOException e) { 134 channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR)); 135 } 136 } 137 } 138 } 139 140 class Delete implements RestHandler { 141 142 @Override 143 public void handleRequest(RestRequest request, RestChannel channel) throws Exception { 144 try { 145 String riverName = request.param("rivername"); 146 String riverType = "jdbc"; 147 DeleteRiverStateRequest riverStateRequest = new DeleteRiverStateRequest(); 148 riverStateRequest.setRiverName(riverName).setRiverType(riverType); 149 client.admin().cluster().execute(DeleteRiverStateAction.INSTANCE, riverStateRequest, 150 new RestToXContentListener<DeleteRiverStateResponse>(channel)); 151 } catch (Throwable t) { 152 logger.error(t.getMessage(), t); 153 try { 154 channel.sendResponse(new BytesRestResponse(channel, t)); 155 } catch (IOException e) { 156 channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR)); 157 } 158 } 159 } 160 } 161}