001/*
002 * Copyright (C) 2014 Jörg Prante
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.xbib.elasticsearch.action.plugin.jdbc.state.put;
017
018import org.elasticsearch.ElasticsearchException;
019import org.elasticsearch.action.ActionListener;
020import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
021import org.elasticsearch.cluster.ClusterService;
022import org.elasticsearch.cluster.ClusterState;
023import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
024import org.elasticsearch.cluster.block.ClusterBlockException;
025import org.elasticsearch.cluster.block.ClusterBlockLevel;
026import org.elasticsearch.common.inject.Inject;
027import org.elasticsearch.common.inject.Injector;
028import org.elasticsearch.common.settings.Settings;
029import org.elasticsearch.threadpool.ThreadPool;
030import org.elasticsearch.transport.TransportService;
031import org.xbib.elasticsearch.plugin.jdbc.state.RiverState;
032import org.xbib.elasticsearch.plugin.jdbc.state.RiverStateService;
033
034public class TransportPutRiverStateAction extends TransportMasterNodeOperationAction<PutRiverStateRequest, PutRiverStateResponse> {
035
036    private final Injector injector;
037
038    @Inject
039    public TransportPutRiverStateAction(Settings settings, ThreadPool threadPool,
040                                        ClusterService clusterService, TransportService transportService,
041                                        Injector injector) {
042        super(settings, PutRiverStateAction.NAME, transportService, clusterService, threadPool);
043        this.injector = injector;
044    }
045
046    @Override
047    protected String executor() {
048        return ThreadPool.Names.MANAGEMENT;
049    }
050
051    @Override
052    protected PutRiverStateRequest newRequest() {
053        return new PutRiverStateRequest();
054    }
055
056    @Override
057    protected PutRiverStateResponse newResponse() {
058        return new PutRiverStateResponse();
059    }
060
061    @Override
062    protected ClusterBlockException checkBlock(PutRiverStateRequest request, ClusterState state) {
063        return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA, "");
064    }
065
066    @Override
067    protected void masterOperation(PutRiverStateRequest request, ClusterState state, final ActionListener<PutRiverStateResponse> listener) throws ElasticsearchException {
068        RiverStateService riverStateService = injector.getInstance(RiverStateService.class);
069        RiverState riverState = request.getRiverState();
070        if (riverState == null) {
071            riverState = new RiverState();
072        }
073        riverState.setName(request.getRiverName());
074        riverState.setType(request.getRiverType());
075        riverStateService.putRiverState(new RiverStateService.RiverStateRequest("put_river_state[" + request.getRiverName() + "]", riverState)
076                .masterNodeTimeout(request.masterNodeTimeout())
077                .ackTimeout(request.ackTimeout()), new ActionListener<ClusterStateUpdateResponse>() {
078            @Override
079            public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) {
080                listener.onResponse(new PutRiverStateResponse(clusterStateUpdateResponse.isAcknowledged()));
081            }
082
083            @Override
084            public void onFailure(Throwable e) {
085                listener.onFailure(e);
086            }
087        });
088    }
089
090}