001package org.xbib.elasticsearch.action.websocket.pubsub; 002 003import org.elasticsearch.client.Client; 004import org.elasticsearch.common.inject.Inject; 005import org.elasticsearch.common.settings.Settings; 006import org.jboss.netty.channel.Channel; 007import org.xbib.elasticsearch.websocket.BaseInteractiveHandler; 008import org.xbib.elasticsearch.websocket.InteractiveChannel; 009import org.xbib.elasticsearch.websocket.InteractiveController; 010import org.xbib.elasticsearch.websocket.InteractiveRequest; 011import org.xbib.elasticsearch.http.HttpServerTransport; 012import org.xbib.elasticsearch.http.netty.NettyInteractiveResponse; 013 014import java.io.IOException; 015import java.util.Map; 016 017/** 018 * Forwarding a message to a destination. 019 */ 020public class ForwardAction extends BaseInteractiveHandler { 021 022 private final String TYPE = "forward"; 023 private final HttpServerTransport transport; 024 025 @Inject 026 public ForwardAction(Settings settings, 027 Client client, 028 HttpServerTransport transport, 029 InteractiveController controller) { 030 super(settings, client); 031 this.transport = transport; 032 controller.registerHandler(TYPE, this); 033 } 034 035 @Override 036 public void handleRequest(final InteractiveRequest request, final InteractiveChannel channel) { 037 Map<String, Object> m = request.asMap(); 038 Integer id = (Integer) m.get("channel"); 039 Channel ch = transport.channel(id); 040 try { 041 if (ch != null) { 042 ch.write(new NettyInteractiveResponse("message", (Map<String, Object>) m.get("message")).response()); 043 // don't send a success message back to the channel 044 } else { 045 // delivery failed, channel not present 046 channel.sendResponse(TYPE, new IOException("channel " + id + " gone")); 047 } 048 } catch (IOException ex) { 049 logger.error("error while delivering forward message {}: {}", m, ex); 050 } 051 } 052}