001package org.xbib.elasticsearch.action.plugin.jdbc.run; 002 003import org.elasticsearch.ElasticsearchException; 004import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; 005import org.elasticsearch.action.support.nodes.NodeOperationRequest; 006import org.elasticsearch.action.support.nodes.NodeOperationResponse; 007import org.elasticsearch.action.support.nodes.TransportNodesOperationAction; 008import org.elasticsearch.cluster.ClusterName; 009import org.elasticsearch.cluster.ClusterService; 010import org.elasticsearch.cluster.node.DiscoveryNode; 011import org.elasticsearch.common.collect.ImmutableMap; 012import org.elasticsearch.common.inject.Inject; 013import org.elasticsearch.common.inject.Injector; 014import org.elasticsearch.common.io.stream.StreamInput; 015import org.elasticsearch.common.io.stream.StreamOutput; 016import org.elasticsearch.common.settings.Settings; 017import org.elasticsearch.node.service.NodeService; 018import org.elasticsearch.river.River; 019import org.elasticsearch.river.RiverName; 020import org.elasticsearch.river.RiversService; 021import org.elasticsearch.threadpool.ThreadPool; 022import org.elasticsearch.transport.TransportService; 023import org.xbib.elasticsearch.plugin.jdbc.execute.RunnableRiver; 024 025import java.io.IOException; 026import java.lang.reflect.Field; 027import java.util.Map; 028import java.util.concurrent.atomic.AtomicReferenceArray; 029 030public class TransportRunRiverAction extends TransportNodesOperationAction<RunRiverRequest, RunRiverResponse, TransportRunRiverAction.NodeRiverExecuteRequest, TransportRunRiverAction.NodeRiverExecuteResponse> { 031 032 private final Injector injector; 033 034 @Inject 035 public TransportRunRiverAction(Settings settings, ClusterName clusterName, ThreadPool threadPool, 036 ClusterService clusterService, TransportService transportService, 037 Injector injector) { 038 super(settings, RunRiverAction.NAME, clusterName, threadPool, clusterService, transportService); 039 this.injector = injector; 040 } 041 042 @Override 043 protected String executor() { 044 return ThreadPool.Names.GENERIC; 045 } 046 047 @Override 048 protected NodeRiverExecuteResponse nodeOperation(NodeRiverExecuteRequest request) throws ElasticsearchException { 049 RiversService riversService = injector.getInstance(RiversService.class); 050 NodeService nodeService = injector.getInstance(NodeService.class); 051 NodeInfo nodeInfo = nodeService.info(false, true, false, true, false, false, true, false, true); 052 for (Map.Entry<RiverName, River> entry : rivers(riversService).entrySet()) { 053 RiverName name = entry.getKey(); 054 if ((request.getRiverName() == null || name.getName().equals(request.getRiverName())) 055 && (request.getRiverType() == null || name.getType().equals(request.getRiverType())) 056 && entry.getValue() instanceof RunnableRiver) { 057 RunnableRiver river = (RunnableRiver) entry.getValue(); 058 river.run(); 059 return new NodeRiverExecuteResponse(nodeInfo.getNode()).setExecuted(true); 060 } 061 } 062 return new NodeRiverExecuteResponse(nodeInfo.getNode()).setExecuted(false); 063 } 064 065 @Override 066 protected RunRiverRequest newRequest() { 067 return new RunRiverRequest(); 068 } 069 070 @Override 071 protected RunRiverResponse newResponse(RunRiverRequest request, AtomicReferenceArray nodesResponses) { 072 boolean[] b = new boolean[nodesResponses.length()]; 073 for (int i = 0; i < nodesResponses.length(); i++) { 074 Object nodesResponse = nodesResponses.get(i); 075 if (nodesResponse instanceof NodeRiverExecuteResponse) { 076 NodeRiverExecuteResponse nodeRiverExecuteResponse = (NodeRiverExecuteResponse) nodesResponse; 077 b[i] = nodeRiverExecuteResponse.isExecuted(); 078 } 079 } 080 return new RunRiverResponse().setExecuted(b); 081 } 082 083 @Override 084 protected NodeRiverExecuteRequest newNodeRequest() { 085 return new NodeRiverExecuteRequest(); 086 } 087 088 @Override 089 protected NodeRiverExecuteRequest newNodeRequest(String nodeId, RunRiverRequest request) { 090 return new NodeRiverExecuteRequest(nodeId, request); 091 } 092 093 @Override 094 protected NodeRiverExecuteResponse newNodeResponse() { 095 return new NodeRiverExecuteResponse(); 096 } 097 098 @Override 099 protected boolean accumulateExceptions() { 100 return false; 101 } 102 103 @SuppressWarnings({"unchecked"}) 104 public static ImmutableMap<RiverName, River> rivers(RiversService riversService) { 105 try { 106 Field field = RiversService.class.getDeclaredField("rivers"); 107 if (field != null) { 108 field.setAccessible(true); 109 return (ImmutableMap<RiverName, River>) field.get(riversService); 110 } 111 } catch (Throwable e) { 112 // ignore 113 } 114 // if error, do not return anything 115 return ImmutableMap.of(); 116 } 117 118 class NodeRiverExecuteRequest extends NodeOperationRequest { 119 120 private String riverType; 121 122 private String riverName; 123 124 NodeRiverExecuteRequest() { 125 } 126 127 public NodeRiverExecuteRequest(String nodeId, RunRiverRequest request) { 128 super(request, nodeId); 129 this.riverName = request.getRiverName(); 130 this.riverType = request.getRiverType(); 131 } 132 133 public NodeRiverExecuteRequest setRiverType(String riverType) { 134 this.riverType = riverType; 135 return this; 136 } 137 138 public String getRiverType() { 139 return riverType; 140 } 141 142 public NodeRiverExecuteRequest setRiverName(String riverName) { 143 this.riverName = riverName; 144 return this; 145 } 146 147 public String getRiverName() { 148 return riverName; 149 } 150 151 @Override 152 public void readFrom(StreamInput in) throws IOException { 153 super.readFrom(in); 154 this.riverName = in.readString(); 155 this.riverType = in.readString(); 156 } 157 158 @Override 159 public void writeTo(StreamOutput out) throws IOException { 160 super.writeTo(out); 161 out.writeString(riverName); 162 out.writeString(riverType); 163 } 164 } 165 166 class NodeRiverExecuteResponse extends NodeOperationResponse { 167 168 private boolean executed; 169 170 NodeRiverExecuteResponse() { 171 } 172 173 public NodeRiverExecuteResponse(DiscoveryNode node) { 174 super(node); 175 } 176 177 public NodeRiverExecuteResponse setExecuted(boolean b) { 178 this.executed = b; 179 return this; 180 } 181 182 public boolean isExecuted() { 183 return executed; 184 } 185 186 @Override 187 public void readFrom(StreamInput in) throws IOException { 188 super.readFrom(in); 189 executed = in.readBoolean(); 190 } 191 192 @Override 193 public void writeTo(StreamOutput out) throws IOException { 194 super.writeTo(out); 195 out.writeBoolean(executed); 196 } 197 198 } 199}