001/*
002 * Copyright (C) 2014 Jörg Prante
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.xbib.elasticsearch.plugin.jdbc.feeder;
017
018import org.elasticsearch.cluster.metadata.MetaData;
019import org.elasticsearch.common.logging.ESLogger;
020import org.elasticsearch.common.logging.ESLoggerFactory;
021import org.elasticsearch.common.metrics.MeterMetric;
022import org.elasticsearch.common.settings.ImmutableSettings;
023import org.elasticsearch.common.settings.Settings;
024import org.elasticsearch.common.settings.loader.JsonSettingsLoader;
025import org.elasticsearch.common.unit.ByteSizeValue;
026import org.elasticsearch.common.unit.TimeValue;
027import org.elasticsearch.common.xcontent.XContentFactory;
028import org.elasticsearch.common.xcontent.XContentType;
029import org.elasticsearch.common.xcontent.support.XContentMapValues;
030import org.elasticsearch.river.RiverName;
031import org.xbib.elasticsearch.plugin.jdbc.RiverThread;
032import org.xbib.elasticsearch.plugin.jdbc.classloader.uri.URIClassLoader;
033import org.xbib.elasticsearch.plugin.jdbc.client.Ingest;
034import org.xbib.elasticsearch.plugin.jdbc.client.IngestFactory;
035import org.xbib.elasticsearch.plugin.jdbc.client.transport.BulkTransportClient;
036import org.xbib.elasticsearch.plugin.jdbc.cron.CronExpression;
037import org.xbib.elasticsearch.plugin.jdbc.cron.CronThreadPoolExecutor;
038import org.xbib.elasticsearch.plugin.jdbc.state.RiverStatesMetaData;
039import org.xbib.elasticsearch.plugin.jdbc.util.RiverServiceLoader;
040import org.xbib.elasticsearch.river.jdbc.RiverFlow;
041
042import java.io.File;
043import java.io.IOException;
044import java.io.PrintStream;
045import java.io.Reader;
046import java.io.Writer;
047import java.util.Arrays;
048import java.util.List;
049import java.util.Map;
050import java.util.concurrent.ConcurrentLinkedDeque;
051import java.util.concurrent.Executors;
052import java.util.concurrent.Future;
053import java.util.concurrent.LinkedBlockingQueue;
054import java.util.concurrent.ScheduledThreadPoolExecutor;
055import java.util.concurrent.ThreadPoolExecutor;
056import java.util.concurrent.TimeUnit;
057
058import static org.elasticsearch.common.collect.Lists.newLinkedList;
059import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
060import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
061
062/**
063 * Standalone feeder for JDBC
064 */
065public class JDBCFeeder implements CommandLineInterpreter {
066
067    private final static ESLogger logger = ESLoggerFactory.getLogger(JDBCFeeder.class.getSimpleName());
068
069    /**
070     * Register metadata factory in Elasticsearch for being able to decode
071     * ClusterStateResponse with RiverStatesMetadata
072     */
073    static {
074        MetaData.registerFactory(RiverStatesMetaData.TYPE, RiverStatesMetaData.FACTORY);
075    }
076
077    protected Reader reader;
078
079    protected Writer writer;
080
081    protected PrintStream printStream;
082
083    protected IngestFactory ingestFactory;
084
085    /**
086     * This ingest is the client for the river flow state operations
087     */
088    private Ingest ingest;
089
090    private RiverFlow riverFlow;
091
092    private List<Map<String, Object>> definitions;
093
094    private ThreadPoolExecutor threadPoolExecutor;
095
096    private volatile Thread feederThread;
097
098    private volatile boolean closed;
099
100    /**
101     * Constructor for running this from command line
102     */
103    public JDBCFeeder() {
104    }
105
106    @SuppressWarnings("unchecked")
107    @Override
108    public JDBCFeeder readFrom(Reader reader) {
109        this.reader = reader;
110        try {
111            Map<String, Object> map = XContentFactory.xContent(XContentType.JSON).createParser(reader).mapOrderedAndClose();
112            Settings settings = settingsBuilder()
113                    .put(new JsonSettingsLoader().load(jsonBuilder().map(map).string()))
114                    .build();
115            this.definitions = newLinkedList();
116            Object pipeline = map.get("jdbc");
117            if (pipeline instanceof Map) {
118                definitions.add((Map<String, Object>) pipeline);
119            }
120            if (pipeline instanceof List) {
121                definitions.addAll((List<Map<String, Object>>) pipeline);
122            }
123            // before running, create the river flow
124            createRiverFlow(map, settings);
125        } catch (IOException e) {
126            logger.error(e.getMessage(), e);
127        }
128        return this;
129    }
130
131    protected RiverFlow createRiverFlow(Map<String, Object> spec, Settings settings) throws IOException {
132        String strategy = XContentMapValues.nodeStringValue(spec.get("strategy"), "simple");
133        this.riverFlow = RiverServiceLoader.newRiverFlow(strategy);
134        logger.debug("strategy {}: river flow class {}, spec = {} settings = {}",
135                strategy, riverFlow.getClass().getName(), spec, settings.getAsMap());
136        this.ingestFactory = createIngestFactory(settings);
137        // out private ingest, needed for having a client in the river flow
138        this.ingest = ingestFactory.create();
139        riverFlow.setRiverName(new RiverName("jdbc", "feeder"))
140                .setSettings(settings)
141                .setClient(ingest.client())
142                .setIngestFactory(ingestFactory)
143                .setMetric(new MeterMetric(Executors.newScheduledThreadPool(1), TimeUnit.SECONDS))
144                .setQueue(new ConcurrentLinkedDeque<Map<String, Object>>());
145        return riverFlow;
146    }
147
148    @Override
149    public JDBCFeeder writeTo(Writer writer) {
150        this.writer = writer;
151        return this;
152    }
153
154    @Override
155    public JDBCFeeder errorsTo(PrintStream printStream) {
156        this.printStream = printStream;
157        return this;
158    }
159
160    @Override
161    public JDBCFeeder start() throws Exception {
162        this.closed = false;
163        if (ingest.getConnectedNodes().isEmpty()) {
164            throw new IOException("no nodes connected, can't continue");
165        }
166        this.feederThread = new Thread(new RiverThread(riverFlow, definitions));
167        List<Future<?>> futures = schedule(feederThread);
168        // wait for all threads to finish
169        for (Future<?> future : futures) {
170            future.get();
171        }
172        ingest.shutdown();
173        return this;
174    }
175
176    private List<Future<?>> schedule(Thread thread) {
177        Settings settings = riverFlow.getSettings();
178        String[] schedule = settings.getAsArray("schedule");
179        List<Future<?>> futures = newLinkedList();
180        Long seconds = settings.getAsTime("interval", TimeValue.timeValueSeconds(0)).seconds();
181        if (schedule != null && schedule.length > 0) {
182            CronThreadPoolExecutor cronThreadPoolExecutor =
183                    new CronThreadPoolExecutor(settings.getAsInt("threadpoolsize", 4));
184            for (String cron : schedule) {
185                futures.add(cronThreadPoolExecutor.schedule(thread, new CronExpression(cron)));
186            }
187            this.threadPoolExecutor = cronThreadPoolExecutor;
188            logger.debug("scheduled feeder instance with cron expressions {}", Arrays.asList(schedule));
189        } else if (seconds > 0L) {
190            ScheduledThreadPoolExecutor scheduledThreadPoolExecutor =
191                    new ScheduledThreadPoolExecutor(settings.getAsInt("threadpoolsize", 4));
192            futures.add(scheduledThreadPoolExecutor.scheduleAtFixedRate(thread, 0L, seconds, TimeUnit.SECONDS));
193            logger.debug("scheduled feeder instance at fixed rate of {} seconds", seconds);
194            this.threadPoolExecutor = scheduledThreadPoolExecutor;
195        } else {
196            this.threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
197                    new LinkedBlockingQueue<Runnable>());
198            futures.add(threadPoolExecutor.submit(thread));
199            logger.debug("started feeder instance");
200        }
201        return futures;
202    }
203
204    /**
205     * Shut down feeder instance by Ctrl-C
206     *
207     * @return shutdown thread
208     */
209    @Override
210    public Thread shutdownHook() {
211        return new Thread() {
212            public void run() {
213                try {
214                    shutdown();
215                } catch (Exception e) {
216                    e.printStackTrace(printStream);
217                }
218            }
219        };
220    }
221
222    @Override
223    public synchronized void shutdown() throws Exception {
224        if (closed) {
225            return;
226        }
227        closed = true;
228        if (threadPoolExecutor != null) {
229            threadPoolExecutor.shutdownNow();
230            threadPoolExecutor = null;
231        }
232        if (feederThread != null) {
233            feederThread.interrupt();
234        }
235        if (!ingest.isShutdown()) {
236            ingest.shutdown();
237        }
238        reader.close();
239        writer.close();
240        printStream.close();
241    }
242
243    private IngestFactory createIngestFactory(final Settings settings) {
244        return new IngestFactory() {
245            @Override
246            public Ingest create() {
247                Integer maxbulkactions = settings.getAsInt("max_bulk_actions", 100);
248                Integer maxconcurrentbulkrequests = settings.getAsInt("max_concurrent_bulk_requests",
249                        Runtime.getRuntime().availableProcessors() * 2);
250                ByteSizeValue maxvolume = settings.getAsBytesSize("max_bulk_volume", ByteSizeValue.parseBytesSizeValue("10m"));
251                TimeValue maxrequestwait = settings.getAsTime("max_request_wait", TimeValue.timeValueSeconds(60));
252                TimeValue flushinterval = settings.getAsTime("flush_interval", TimeValue.timeValueSeconds(5));
253                File home = new File(settings.get("home", "."));
254                BulkTransportClient ingest = new BulkTransportClient();
255                Settings clientSettings = ImmutableSettings.settingsBuilder()
256                        .put("cluster.name", settings.get("elasticsearch.cluster", "elasticsearch"))
257                        .put("host", settings.get("elasticsearch.host", "localhost"))
258                        .put("port", settings.getAsInt("elasticsearch.port", 9300))
259                        .put("sniff", settings.getAsBoolean("elasticsearch.sniff", false))
260                        .put("name", "feeder") // prevents lookup of names.txt, we don't have it, and marks this node as "feeder". See also module load skipping in JDBCRiverPlugin
261                        .put("client.transport.ignore_cluster_name", true) // ignore cluster name setting
262                        .put("client.transport.ping_timeout", settings.getAsTime("elasticsearch.timeout", TimeValue.timeValueSeconds(10))) //  ping timeout
263                        .put("client.transport.nodes_sampler_interval", settings.getAsTime("elasticsearch.timeout", TimeValue.timeValueSeconds(5))) // for sniff sampling
264                        .put("path.plugins", ".dontexist") // pointing to a non-exiting folder means, this disables loading site plugins
265                                // adding our custom class loader is tricky, actions may not be registered to ActionService
266                        .classLoader(getClassLoader(getClass().getClassLoader(), home))
267                        .build();
268                ingest.maxActionsPerBulkRequest(maxbulkactions)
269                        .maxConcurrentBulkRequests(maxconcurrentbulkrequests)
270                        .maxVolumePerBulkRequest(maxvolume)
271                        .maxRequestWait(maxrequestwait)
272                        .flushIngestInterval(flushinterval)
273                        .newClient(clientSettings);
274                return ingest;
275            }
276        };
277    }
278
279    /**
280     * We have to add Elasticsearch to our classpath, but exclude all jvm plugins
281     * for starting our TransportClient.
282     *
283     * @param home ES_HOME
284     * @return a custom class loader with our dependencies
285     */
286    private ClassLoader getClassLoader(ClassLoader parent, File home) {
287        URIClassLoader classLoader = new URIClassLoader(parent);
288        File[] libs = new File(home + "/lib").listFiles();
289        if (libs != null) {
290            for (File file : libs) {
291                if (file.getName().toLowerCase().endsWith(".jar")) {
292                    classLoader.addURI(file.toURI());
293                }
294            }
295        }
296        return classLoader;
297    }
298
299}