001/* 002 * Copyright (C) 2014 Jörg Prante 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.xbib.elasticsearch.action.plugin.jdbc.state.put; 017 018import org.elasticsearch.ElasticsearchException; 019import org.elasticsearch.action.ActionListener; 020import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; 021import org.elasticsearch.cluster.ClusterService; 022import org.elasticsearch.cluster.ClusterState; 023import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; 024import org.elasticsearch.cluster.block.ClusterBlockException; 025import org.elasticsearch.cluster.block.ClusterBlockLevel; 026import org.elasticsearch.common.inject.Inject; 027import org.elasticsearch.common.inject.Injector; 028import org.elasticsearch.common.settings.Settings; 029import org.elasticsearch.threadpool.ThreadPool; 030import org.elasticsearch.transport.TransportService; 031import org.xbib.elasticsearch.plugin.jdbc.state.RiverState; 032import org.xbib.elasticsearch.plugin.jdbc.state.RiverStateService; 033 034public class TransportPutRiverStateAction extends TransportMasterNodeOperationAction<PutRiverStateRequest, PutRiverStateResponse> { 035 036 private final Injector injector; 037 038 @Inject 039 public TransportPutRiverStateAction(Settings settings, ThreadPool threadPool, 040 ClusterService clusterService, TransportService transportService, 041 Injector injector) { 042 super(settings, PutRiverStateAction.NAME, transportService, clusterService, threadPool); 043 this.injector = injector; 044 } 045 046 @Override 047 protected String executor() { 048 return ThreadPool.Names.MANAGEMENT; 049 } 050 051 @Override 052 protected PutRiverStateRequest newRequest() { 053 return new PutRiverStateRequest(); 054 } 055 056 @Override 057 protected PutRiverStateResponse newResponse() { 058 return new PutRiverStateResponse(); 059 } 060 061 @Override 062 protected ClusterBlockException checkBlock(PutRiverStateRequest request, ClusterState state) { 063 return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA, ""); 064 } 065 066 @Override 067 protected void masterOperation(PutRiverStateRequest request, ClusterState state, final ActionListener<PutRiverStateResponse> listener) throws ElasticsearchException { 068 RiverStateService riverStateService = injector.getInstance(RiverStateService.class); 069 RiverState riverState = request.getRiverState(); 070 if (riverState == null) { 071 riverState = new RiverState(); 072 } 073 riverState.setName(request.getRiverName()); 074 riverState.setType(request.getRiverType()); 075 riverStateService.putRiverState(new RiverStateService.RiverStateRequest("put_river_state[" + request.getRiverName() + "]", riverState) 076 .masterNodeTimeout(request.masterNodeTimeout()) 077 .ackTimeout(request.ackTimeout()), new ActionListener<ClusterStateUpdateResponse>() { 078 @Override 079 public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) { 080 listener.onResponse(new PutRiverStateResponse(clusterStateUpdateResponse.isAcknowledged())); 081 } 082 083 @Override 084 public void onFailure(Throwable e) { 085 listener.onFailure(e); 086 } 087 }); 088 } 089 090}