001package org.xbib.elasticsearch.action.cluster.admin.websocket;
002
003import org.elasticsearch.ElasticsearchException;
004import org.elasticsearch.action.support.nodes.NodeOperationRequest;
005import org.elasticsearch.action.support.nodes.TransportNodesOperationAction;
006import org.elasticsearch.cluster.ClusterName;
007import org.elasticsearch.cluster.ClusterService;
008import org.elasticsearch.common.inject.Inject;
009import org.elasticsearch.common.io.stream.StreamInput;
010import org.elasticsearch.common.io.stream.StreamOutput;
011import org.elasticsearch.common.settings.Settings;
012import org.elasticsearch.common.transport.InetSocketTransportAddress;
013import org.elasticsearch.discovery.Discovery;
014import org.elasticsearch.threadpool.ThreadPool;
015import org.elasticsearch.transport.TransportService;
016import org.xbib.elasticsearch.http.HttpServer;
017
018import java.io.IOException;
019import java.util.List;
020import java.util.concurrent.atomic.AtomicReferenceArray;
021
022import static org.elasticsearch.common.collect.Lists.newLinkedList;
023
024public class TransportWebsocketInfoAction extends TransportNodesOperationAction<WebsocketInfoRequest, WebsocketInfoResponse, TransportWebsocketInfoAction.TransportWebsocketInfoRequest, WebsocketInfo> {
025
026    private final Discovery discovery;
027
028    private final HttpServer httpServer;
029
030    @Inject
031    public TransportWebsocketInfoAction(Settings settings, ClusterName clusterName, ThreadPool threadPool,
032                                        ClusterService clusterService, TransportService transportService,
033                                        Discovery discovery, HttpServer httpServer) {
034        super(settings, WebsocketInfoAction.NAME, clusterName, threadPool, clusterService, transportService);
035        this.discovery = discovery;
036        this.httpServer = httpServer;
037    }
038
039    @Override
040    protected String executor() {
041        return ThreadPool.Names.MANAGEMENT;
042    }
043
044    @Override
045    protected WebsocketInfoResponse newResponse(WebsocketInfoRequest nodesInfoRequest, AtomicReferenceArray responses) {
046        final List<WebsocketInfo> nodesInfos = newLinkedList();
047        for (int i = 0; i < responses.length(); i++) {
048            Object resp = responses.get(i);
049            if (resp instanceof WebsocketInfo) {
050                nodesInfos.add((WebsocketInfo) resp);
051            }
052        }
053        return new WebsocketInfoResponse(clusterName, nodesInfos);
054    }
055
056    @Override
057    protected WebsocketInfoRequest newRequest() {
058        return new WebsocketInfoRequest();
059    }
060
061    @Override
062    protected TransportWebsocketInfoRequest newNodeRequest() {
063        return new TransportWebsocketInfoRequest();
064    }
065
066    @Override
067    protected TransportWebsocketInfoRequest newNodeRequest(String nodeId, WebsocketInfoRequest request) {
068        return new TransportWebsocketInfoRequest(nodeId, request);
069    }
070
071    @Override
072    protected WebsocketInfo newNodeResponse() {
073        return new WebsocketInfo();
074    }
075
076    @Override
077    protected WebsocketInfo nodeOperation(TransportWebsocketInfoRequest nodeRequest) throws ElasticsearchException {
078        return new WebsocketInfo(discovery.localNode(), (InetSocketTransportAddress)httpServer.address());
079    }
080
081    @Override
082    protected boolean accumulateExceptions() {
083        return false;
084    }
085
086    static class TransportWebsocketInfoRequest extends NodeOperationRequest {
087
088        WebsocketInfoRequest request;
089
090        TransportWebsocketInfoRequest() {
091        }
092
093        TransportWebsocketInfoRequest(String nodeId, WebsocketInfoRequest request) {
094            super(request, nodeId);
095            this.request = request;
096        }
097
098        @Override
099        public void readFrom(StreamInput in) throws IOException {
100            super.readFrom(in);
101            request = new WebsocketInfoRequest();
102            request.readFrom(in);
103        }
104
105        @Override
106        public void writeTo(StreamOutput out) throws IOException {
107            super.writeTo(out);
108            request.writeTo(out);
109        }
110    }
111}