001/*
002 * Copyright (C) 2014 Jörg Prante
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.xbib.elasticsearch.action.plugin.jdbc.state.post;
017
018import org.elasticsearch.ElasticsearchException;
019import org.elasticsearch.action.ActionListener;
020import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
021import org.elasticsearch.cluster.ClusterService;
022import org.elasticsearch.cluster.ClusterState;
023import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
024import org.elasticsearch.cluster.block.ClusterBlockException;
025import org.elasticsearch.cluster.block.ClusterBlockLevel;
026import org.elasticsearch.common.collect.ImmutableList;
027import org.elasticsearch.common.inject.Inject;
028import org.elasticsearch.common.inject.Injector;
029import org.elasticsearch.common.settings.Settings;
030import org.elasticsearch.threadpool.ThreadPool;
031import org.elasticsearch.transport.TransportService;
032import org.xbib.elasticsearch.plugin.jdbc.state.RiverState;
033import org.xbib.elasticsearch.plugin.jdbc.state.RiverStateService;
034import org.xbib.elasticsearch.plugin.jdbc.state.RiverStatesMetaData;
035
036public class TransportPostRiverStateAction extends TransportMasterNodeOperationAction<PostRiverStateRequest, PostRiverStateResponse> {
037
038    private final Injector injector;
039
040    @Inject
041    public TransportPostRiverStateAction(Settings settings, ThreadPool threadPool,
042                                         ClusterService clusterService, TransportService transportService,
043                                         Injector injector) {
044        super(settings, PostRiverStateAction.NAME, transportService, clusterService, threadPool);
045        this.injector = injector;
046    }
047
048    @Override
049    protected String executor() {
050        return ThreadPool.Names.MANAGEMENT;
051    }
052
053    @Override
054    protected PostRiverStateRequest newRequest() {
055        return new PostRiverStateRequest();
056    }
057
058    @Override
059    protected PostRiverStateResponse newResponse() {
060        return new PostRiverStateResponse();
061    }
062
063    @Override
064    protected ClusterBlockException checkBlock(PostRiverStateRequest request, ClusterState state) {
065        return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA, "");
066    }
067
068    @Override
069    protected void masterOperation(PostRiverStateRequest request, ClusterState clusterState,
070                                   final ActionListener<PostRiverStateResponse> listener)
071            throws ElasticsearchException {
072        RiverStatesMetaData riverStatesMetaData = clusterState.metaData().custom(RiverStatesMetaData.TYPE);
073
074        RiverStateService riverStateService = injector.getInstance(RiverStateService.class);
075        ImmutableList<RiverState> riverStates = riverStatesMetaData.getRiverStates(request.getRiverName(), request.getRiverType());
076        RiverState riverState = request.getRiverState();
077        if (riverStates.isEmpty()) {
078            if (riverState == null) {
079                riverState = new RiverState();
080            }
081            riverState.setName(request.getRiverName());
082            riverState.setType(request.getRiverType());
083            riverState.getMap().put("aborted", request.isAbort());
084            riverState.getMap().put("suspended", request.isSuspend());
085        } else {
086            // merge old and new, overwrite previous values only if set in request
087            riverState = riverStates.get(0);
088            if (request.getRiverState().getStarted() != null) {
089                riverState.setStarted(request.getRiverState().getStarted());
090            }
091            if (request.getRiverState().getLastActiveBegin() != null) {
092                riverState.setLastActive(request.getRiverState().getLastActiveBegin(), request.getRiverState().getLastActiveEnd());
093            }
094            if (request.getRiverState().getCounter() != null) {
095                riverState.setCounter(request.getRiverState().getCounter());
096            }
097            if (request.getRiverState().getMap() != null && !request.getRiverState().getMap().isEmpty()) {
098                riverState.getMap().putAll(request.getRiverState().getMap());
099            }
100            riverState.getMap().put("aborted", request.isAbort());
101            riverState.getMap().put("suspended", request.isSuspend());
102        }
103        riverStateService.postRiverState(new RiverStateService.RiverStateRequest("post_river_state[" + request.getRiverName() + "]", riverState)
104                .masterNodeTimeout(request.masterNodeTimeout())
105                .ackTimeout(request.ackTimeout()), new ActionListener<ClusterStateUpdateResponse>() {
106            @Override
107            public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) {
108                listener.onResponse(new PostRiverStateResponse(clusterStateUpdateResponse.isAcknowledged()));
109            }
110
111            @Override
112            public void onFailure(Throwable e) {
113                listener.onFailure(e);
114            }
115        });
116    }
117
118}