001/* 002 * Copyright (C) 2014 Jörg Prante 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.xbib.elasticsearch.action.plugin.jdbc.state.post; 017 018import org.elasticsearch.ElasticsearchException; 019import org.elasticsearch.action.ActionListener; 020import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; 021import org.elasticsearch.cluster.ClusterService; 022import org.elasticsearch.cluster.ClusterState; 023import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; 024import org.elasticsearch.cluster.block.ClusterBlockException; 025import org.elasticsearch.cluster.block.ClusterBlockLevel; 026import org.elasticsearch.common.collect.ImmutableList; 027import org.elasticsearch.common.inject.Inject; 028import org.elasticsearch.common.inject.Injector; 029import org.elasticsearch.common.settings.Settings; 030import org.elasticsearch.threadpool.ThreadPool; 031import org.elasticsearch.transport.TransportService; 032import org.xbib.elasticsearch.plugin.jdbc.state.RiverState; 033import org.xbib.elasticsearch.plugin.jdbc.state.RiverStateService; 034import org.xbib.elasticsearch.plugin.jdbc.state.RiverStatesMetaData; 035 036public class TransportPostRiverStateAction extends TransportMasterNodeOperationAction<PostRiverStateRequest, PostRiverStateResponse> { 037 038 private final Injector injector; 039 040 @Inject 041 public TransportPostRiverStateAction(Settings settings, ThreadPool threadPool, 042 ClusterService clusterService, TransportService transportService, 043 Injector injector) { 044 super(settings, PostRiverStateAction.NAME, transportService, clusterService, threadPool); 045 this.injector = injector; 046 } 047 048 @Override 049 protected String executor() { 050 return ThreadPool.Names.MANAGEMENT; 051 } 052 053 @Override 054 protected PostRiverStateRequest newRequest() { 055 return new PostRiverStateRequest(); 056 } 057 058 @Override 059 protected PostRiverStateResponse newResponse() { 060 return new PostRiverStateResponse(); 061 } 062 063 @Override 064 protected ClusterBlockException checkBlock(PostRiverStateRequest request, ClusterState state) { 065 return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA, ""); 066 } 067 068 @Override 069 protected void masterOperation(PostRiverStateRequest request, ClusterState clusterState, 070 final ActionListener<PostRiverStateResponse> listener) 071 throws ElasticsearchException { 072 RiverStatesMetaData riverStatesMetaData = clusterState.metaData().custom(RiverStatesMetaData.TYPE); 073 074 RiverStateService riverStateService = injector.getInstance(RiverStateService.class); 075 ImmutableList<RiverState> riverStates = riverStatesMetaData.getRiverStates(request.getRiverName(), request.getRiverType()); 076 RiverState riverState = request.getRiverState(); 077 if (riverStates.isEmpty()) { 078 if (riverState == null) { 079 riverState = new RiverState(); 080 } 081 riverState.setName(request.getRiverName()); 082 riverState.setType(request.getRiverType()); 083 riverState.getMap().put("aborted", request.isAbort()); 084 riverState.getMap().put("suspended", request.isSuspend()); 085 } else { 086 // merge old and new, overwrite previous values only if set in request 087 riverState = riverStates.get(0); 088 if (request.getRiverState().getStarted() != null) { 089 riverState.setStarted(request.getRiverState().getStarted()); 090 } 091 if (request.getRiverState().getLastActiveBegin() != null) { 092 riverState.setLastActive(request.getRiverState().getLastActiveBegin(), request.getRiverState().getLastActiveEnd()); 093 } 094 if (request.getRiverState().getCounter() != null) { 095 riverState.setCounter(request.getRiverState().getCounter()); 096 } 097 if (request.getRiverState().getMap() != null && !request.getRiverState().getMap().isEmpty()) { 098 riverState.getMap().putAll(request.getRiverState().getMap()); 099 } 100 riverState.getMap().put("aborted", request.isAbort()); 101 riverState.getMap().put("suspended", request.isSuspend()); 102 } 103 riverStateService.postRiverState(new RiverStateService.RiverStateRequest("post_river_state[" + request.getRiverName() + "]", riverState) 104 .masterNodeTimeout(request.masterNodeTimeout()) 105 .ackTimeout(request.ackTimeout()), new ActionListener<ClusterStateUpdateResponse>() { 106 @Override 107 public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) { 108 listener.onResponse(new PostRiverStateResponse(clusterStateUpdateResponse.isAcknowledged())); 109 } 110 111 @Override 112 public void onFailure(Throwable e) { 113 listener.onFailure(e); 114 } 115 }); 116 } 117 118}