001package org.xbib.elasticsearch.action.plugin.jdbc.run;
002
003import org.elasticsearch.ElasticsearchException;
004import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
005import org.elasticsearch.action.support.nodes.NodeOperationRequest;
006import org.elasticsearch.action.support.nodes.NodeOperationResponse;
007import org.elasticsearch.action.support.nodes.TransportNodesOperationAction;
008import org.elasticsearch.cluster.ClusterName;
009import org.elasticsearch.cluster.ClusterService;
010import org.elasticsearch.cluster.node.DiscoveryNode;
011import org.elasticsearch.common.collect.ImmutableMap;
012import org.elasticsearch.common.inject.Inject;
013import org.elasticsearch.common.inject.Injector;
014import org.elasticsearch.common.io.stream.StreamInput;
015import org.elasticsearch.common.io.stream.StreamOutput;
016import org.elasticsearch.common.settings.Settings;
017import org.elasticsearch.node.service.NodeService;
018import org.elasticsearch.river.River;
019import org.elasticsearch.river.RiverName;
020import org.elasticsearch.river.RiversService;
021import org.elasticsearch.threadpool.ThreadPool;
022import org.elasticsearch.transport.TransportService;
023import org.xbib.elasticsearch.plugin.jdbc.execute.RunnableRiver;
024
025import java.io.IOException;
026import java.lang.reflect.Field;
027import java.util.Map;
028import java.util.concurrent.atomic.AtomicReferenceArray;
029
030public class TransportRunRiverAction extends TransportNodesOperationAction<RunRiverRequest, RunRiverResponse, TransportRunRiverAction.NodeRiverExecuteRequest, TransportRunRiverAction.NodeRiverExecuteResponse> {
031
032    private final Injector injector;
033
034    @Inject
035    public TransportRunRiverAction(Settings settings, ClusterName clusterName, ThreadPool threadPool,
036                                   ClusterService clusterService, TransportService transportService,
037                                   Injector injector) {
038        super(settings, RunRiverAction.NAME, clusterName, threadPool, clusterService, transportService);
039        this.injector = injector;
040    }
041
042    @Override
043    protected String executor() {
044        return ThreadPool.Names.GENERIC;
045    }
046
047    @Override
048    protected NodeRiverExecuteResponse nodeOperation(NodeRiverExecuteRequest request) throws ElasticsearchException {
049        RiversService riversService = injector.getInstance(RiversService.class);
050        NodeService nodeService = injector.getInstance(NodeService.class);
051        NodeInfo nodeInfo = nodeService.info(false, true, false, true, false, false, true, false, true);
052        for (Map.Entry<RiverName, River> entry : rivers(riversService).entrySet()) {
053            RiverName name = entry.getKey();
054            if ((request.getRiverName() == null || name.getName().equals(request.getRiverName()))
055                    && (request.getRiverType() == null || name.getType().equals(request.getRiverType()))
056                    && entry.getValue() instanceof RunnableRiver) {
057                RunnableRiver river = (RunnableRiver) entry.getValue();
058                river.run();
059                return new NodeRiverExecuteResponse(nodeInfo.getNode()).setExecuted(true);
060            }
061        }
062        return new NodeRiverExecuteResponse(nodeInfo.getNode()).setExecuted(false);
063    }
064
065    @Override
066    protected RunRiverRequest newRequest() {
067        return new RunRiverRequest();
068    }
069
070    @Override
071    protected RunRiverResponse newResponse(RunRiverRequest request, AtomicReferenceArray nodesResponses) {
072        boolean[] b = new boolean[nodesResponses.length()];
073        for (int i = 0; i < nodesResponses.length(); i++) {
074            Object nodesResponse = nodesResponses.get(i);
075            if (nodesResponse instanceof NodeRiverExecuteResponse) {
076                NodeRiverExecuteResponse nodeRiverExecuteResponse = (NodeRiverExecuteResponse) nodesResponse;
077                b[i] = nodeRiverExecuteResponse.isExecuted();
078            }
079        }
080        return new RunRiverResponse().setExecuted(b);
081    }
082
083    @Override
084    protected NodeRiverExecuteRequest newNodeRequest() {
085        return new NodeRiverExecuteRequest();
086    }
087
088    @Override
089    protected NodeRiverExecuteRequest newNodeRequest(String nodeId, RunRiverRequest request) {
090        return new NodeRiverExecuteRequest(nodeId, request);
091    }
092
093    @Override
094    protected NodeRiverExecuteResponse newNodeResponse() {
095        return new NodeRiverExecuteResponse();
096    }
097
098    @Override
099    protected boolean accumulateExceptions() {
100        return false;
101    }
102
103    @SuppressWarnings({"unchecked"})
104    public static ImmutableMap<RiverName, River> rivers(RiversService riversService) {
105        try {
106            Field field = RiversService.class.getDeclaredField("rivers");
107            if (field != null) {
108                field.setAccessible(true);
109                return (ImmutableMap<RiverName, River>) field.get(riversService);
110            }
111        } catch (Throwable e) {
112            // ignore
113        }
114        // if error, do not return anything
115        return ImmutableMap.of();
116    }
117
118    class NodeRiverExecuteRequest extends NodeOperationRequest {
119
120        private String riverType;
121
122        private String riverName;
123
124        NodeRiverExecuteRequest() {
125        }
126
127        public NodeRiverExecuteRequest(String nodeId, RunRiverRequest request) {
128            super(request, nodeId);
129            this.riverName = request.getRiverName();
130            this.riverType = request.getRiverType();
131        }
132
133        public NodeRiverExecuteRequest setRiverType(String riverType) {
134            this.riverType = riverType;
135            return this;
136        }
137
138        public String getRiverType() {
139            return riverType;
140        }
141
142        public NodeRiverExecuteRequest setRiverName(String riverName) {
143            this.riverName = riverName;
144            return this;
145        }
146
147        public String getRiverName() {
148            return riverName;
149        }
150
151        @Override
152        public void readFrom(StreamInput in) throws IOException {
153            super.readFrom(in);
154            this.riverName = in.readString();
155            this.riverType = in.readString();
156        }
157
158        @Override
159        public void writeTo(StreamOutput out) throws IOException {
160            super.writeTo(out);
161            out.writeString(riverName);
162            out.writeString(riverType);
163        }
164    }
165
166    class NodeRiverExecuteResponse extends NodeOperationResponse {
167
168        private boolean executed;
169
170        NodeRiverExecuteResponse() {
171        }
172
173        public NodeRiverExecuteResponse(DiscoveryNode node) {
174            super(node);
175        }
176
177        public NodeRiverExecuteResponse setExecuted(boolean b) {
178            this.executed = b;
179            return this;
180        }
181
182        public boolean isExecuted() {
183            return executed;
184        }
185
186        @Override
187        public void readFrom(StreamInput in) throws IOException {
188            super.readFrom(in);
189            executed = in.readBoolean();
190        }
191
192        @Override
193        public void writeTo(StreamOutput out) throws IOException {
194            super.writeTo(out);
195            out.writeBoolean(executed);
196        }
197
198    }
199}