001package org.xbib.elasticsearch.action.cluster.admin.websocket; 002 003import org.elasticsearch.ElasticsearchException; 004import org.elasticsearch.action.support.nodes.NodeOperationRequest; 005import org.elasticsearch.action.support.nodes.TransportNodesOperationAction; 006import org.elasticsearch.cluster.ClusterName; 007import org.elasticsearch.cluster.ClusterService; 008import org.elasticsearch.common.inject.Inject; 009import org.elasticsearch.common.io.stream.StreamInput; 010import org.elasticsearch.common.io.stream.StreamOutput; 011import org.elasticsearch.common.settings.Settings; 012import org.elasticsearch.common.transport.InetSocketTransportAddress; 013import org.elasticsearch.discovery.Discovery; 014import org.elasticsearch.threadpool.ThreadPool; 015import org.elasticsearch.transport.TransportService; 016import org.xbib.elasticsearch.http.HttpServer; 017 018import java.io.IOException; 019import java.util.List; 020import java.util.concurrent.atomic.AtomicReferenceArray; 021 022import static org.elasticsearch.common.collect.Lists.newLinkedList; 023 024public class TransportWebsocketInfoAction extends TransportNodesOperationAction<WebsocketInfoRequest, WebsocketInfoResponse, TransportWebsocketInfoAction.TransportWebsocketInfoRequest, WebsocketInfo> { 025 026 private final Discovery discovery; 027 028 private final HttpServer httpServer; 029 030 @Inject 031 public TransportWebsocketInfoAction(Settings settings, ClusterName clusterName, ThreadPool threadPool, 032 ClusterService clusterService, TransportService transportService, 033 Discovery discovery, HttpServer httpServer) { 034 super(settings, WebsocketInfoAction.NAME, clusterName, threadPool, clusterService, transportService); 035 this.discovery = discovery; 036 this.httpServer = httpServer; 037 } 038 039 @Override 040 protected String executor() { 041 return ThreadPool.Names.MANAGEMENT; 042 } 043 044 @Override 045 protected WebsocketInfoResponse newResponse(WebsocketInfoRequest nodesInfoRequest, AtomicReferenceArray responses) { 046 final List<WebsocketInfo> nodesInfos = newLinkedList(); 047 for (int i = 0; i < responses.length(); i++) { 048 Object resp = responses.get(i); 049 if (resp instanceof WebsocketInfo) { 050 nodesInfos.add((WebsocketInfo) resp); 051 } 052 } 053 return new WebsocketInfoResponse(clusterName, nodesInfos); 054 } 055 056 @Override 057 protected WebsocketInfoRequest newRequest() { 058 return new WebsocketInfoRequest(); 059 } 060 061 @Override 062 protected TransportWebsocketInfoRequest newNodeRequest() { 063 return new TransportWebsocketInfoRequest(); 064 } 065 066 @Override 067 protected TransportWebsocketInfoRequest newNodeRequest(String nodeId, WebsocketInfoRequest request) { 068 return new TransportWebsocketInfoRequest(nodeId, request); 069 } 070 071 @Override 072 protected WebsocketInfo newNodeResponse() { 073 return new WebsocketInfo(); 074 } 075 076 @Override 077 protected WebsocketInfo nodeOperation(TransportWebsocketInfoRequest nodeRequest) throws ElasticsearchException { 078 return new WebsocketInfo(discovery.localNode(), (InetSocketTransportAddress)httpServer.address()); 079 } 080 081 @Override 082 protected boolean accumulateExceptions() { 083 return false; 084 } 085 086 static class TransportWebsocketInfoRequest extends NodeOperationRequest { 087 088 WebsocketInfoRequest request; 089 090 TransportWebsocketInfoRequest() { 091 } 092 093 TransportWebsocketInfoRequest(String nodeId, WebsocketInfoRequest request) { 094 super(request, nodeId); 095 this.request = request; 096 } 097 098 @Override 099 public void readFrom(StreamInput in) throws IOException { 100 super.readFrom(in); 101 request = new WebsocketInfoRequest(); 102 request.readFrom(in); 103 } 104 105 @Override 106 public void writeTo(StreamOutput out) throws IOException { 107 super.writeTo(out); 108 request.writeTo(out); 109 } 110 } 111}