001package org.xbib.elasticsearch.action.websocket.pubsub;
002
003import org.elasticsearch.client.Client;
004import org.elasticsearch.common.inject.Inject;
005import org.elasticsearch.common.settings.Settings;
006import org.jboss.netty.channel.Channel;
007import org.xbib.elasticsearch.websocket.BaseInteractiveHandler;
008import org.xbib.elasticsearch.websocket.InteractiveChannel;
009import org.xbib.elasticsearch.websocket.InteractiveController;
010import org.xbib.elasticsearch.websocket.InteractiveRequest;
011import org.xbib.elasticsearch.http.HttpServerTransport;
012import org.xbib.elasticsearch.http.netty.NettyInteractiveResponse;
013
014import java.io.IOException;
015import java.util.Map;
016
017/**
018 * Forwarding a message to a destination.
019 */
020public class ForwardAction extends BaseInteractiveHandler {
021
022    private final String TYPE = "forward";
023    private final HttpServerTransport transport;
024
025    @Inject
026    public ForwardAction(Settings settings,
027                         Client client,
028                         HttpServerTransport transport,
029                         InteractiveController controller) {
030        super(settings, client);
031        this.transport = transport;
032        controller.registerHandler(TYPE, this);
033    }
034
035    @Override
036    public void handleRequest(final InteractiveRequest request, final InteractiveChannel channel) {
037        Map<String, Object> m = request.asMap();
038        Integer id = (Integer) m.get("channel");
039        Channel ch = transport.channel(id);
040        try {
041            if (ch != null) {
042                ch.write(new NettyInteractiveResponse("message", (Map<String, Object>) m.get("message")).response());
043                // don't send a success message back to the channel
044            } else {
045                // delivery failed, channel not present
046                channel.sendResponse(TYPE, new IOException("channel " + id + " gone"));
047            }
048        } catch (IOException ex) {
049            logger.error("error while delivering forward message {}: {}", m, ex);
050        }
051    }
052}