001/*
002 * Copyright (C) 2014 Jörg Prante
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.xbib.elasticsearch.plugin.jdbc.river;
017
018import org.elasticsearch.client.Client;
019import org.elasticsearch.common.inject.Inject;
020import org.elasticsearch.common.logging.ESLogger;
021import org.elasticsearch.common.logging.ESLoggerFactory;
022import org.elasticsearch.common.metrics.MeterMetric;
023import org.elasticsearch.common.settings.Settings;
024import org.elasticsearch.common.settings.loader.JsonSettingsLoader;
025import org.elasticsearch.common.unit.ByteSizeValue;
026import org.elasticsearch.common.unit.TimeValue;
027import org.elasticsearch.common.util.concurrent.EsExecutors;
028import org.elasticsearch.common.xcontent.support.XContentMapValues;
029import org.elasticsearch.river.AbstractRiverComponent;
030import org.elasticsearch.river.RiverName;
031import org.elasticsearch.river.RiverSettings;
032import org.xbib.elasticsearch.action.plugin.jdbc.state.get.GetRiverStateRequestBuilder;
033import org.xbib.elasticsearch.action.plugin.jdbc.state.get.GetRiverStateResponse;
034import org.xbib.elasticsearch.plugin.jdbc.RiverThread;
035import org.xbib.elasticsearch.plugin.jdbc.client.Ingest;
036import org.xbib.elasticsearch.plugin.jdbc.client.IngestFactory;
037import org.xbib.elasticsearch.plugin.jdbc.client.node.BulkNodeClient;
038import org.xbib.elasticsearch.plugin.jdbc.cron.CronExpression;
039import org.xbib.elasticsearch.plugin.jdbc.cron.CronThreadPoolExecutor;
040import org.xbib.elasticsearch.plugin.jdbc.execute.RunnableRiver;
041import org.xbib.elasticsearch.plugin.jdbc.state.RiverState;
042import org.xbib.elasticsearch.plugin.jdbc.state.StatefulRiver;
043import org.xbib.elasticsearch.plugin.jdbc.util.RiverServiceLoader;
044import org.xbib.elasticsearch.river.jdbc.RiverFlow;
045
046import java.io.IOException;
047import java.util.Arrays;
048import java.util.List;
049import java.util.Map;
050import java.util.concurrent.ConcurrentLinkedQueue;
051import java.util.concurrent.Executors;
052import java.util.concurrent.Future;
053import java.util.concurrent.LinkedBlockingQueue;
054import java.util.concurrent.ScheduledThreadPoolExecutor;
055import java.util.concurrent.ThreadPoolExecutor;
056import java.util.concurrent.TimeUnit;
057
058import static org.elasticsearch.common.collect.Lists.newLinkedList;
059import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
060import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
061
062/**
063 * JDBC river
064 */
065public class JDBCRiver extends AbstractRiverComponent implements StatefulRiver, RunnableRiver {
066
067    private final static ESLogger logger = ESLoggerFactory.getLogger("river.jdbc.JDBCRiver");
068
069    private Client client;
070
071    private RiverFlow riverFlow;
072
073    private List<Map<String, Object>> definitions;
074
075    private ThreadPoolExecutor threadPoolExecutor;
076
077    private volatile Thread riverThread;
078
079    private volatile boolean closed;
080
081    private String[] schedule;
082
083    private List<Future<?>> futures;
084
085    private Long interval;
086
087    @Inject
088    @SuppressWarnings({"unchecked"})
089    public JDBCRiver(RiverName riverName, RiverSettings riverSettings, Client client) {
090        super(riverName, riverSettings);
091        this.client = client;
092        if (!riverSettings.settings().containsKey("jdbc")) {
093            throw new IllegalArgumentException("no 'jdbc' settings in river settings?");
094        }
095        try {
096            Map<String, String> loadedSettings = new JsonSettingsLoader()
097                    .load(jsonBuilder().map(riverSettings.settings()).string());
098            Settings settings = settingsBuilder().put(loadedSettings).build();
099            String strategy = XContentMapValues.nodeStringValue(riverSettings.settings().get("strategy"), "simple");
100            this.schedule = settings.getAsArray("schedule");
101            this.interval = settings.getAsTime("interval", TimeValue.timeValueSeconds(0)).seconds();
102            this.definitions = newLinkedList();
103            Object definition = riverSettings.settings().get("jdbc");
104            if (definition instanceof Map) {
105                Map<String, Object> map = (Map<String, Object>) definition;
106                definitions.add(map);
107                // legacy mode: check for "strategy", "schedule", "interval" in the "jdbc" definition part
108                if (map.containsKey("strategy")) {
109                    strategy = map.get("strategy").toString();
110                }
111                if (map.containsKey("schedule")) {
112                    this.schedule = settingsBuilder().put(new JsonSettingsLoader()
113                            .load(jsonBuilder().map(map).string())).build().getAsArray("schedule");
114                }
115                if (map.containsKey("interval")) {
116                    this.interval = XContentMapValues.nodeLongValue(map.get("interval"), 0L);
117                }
118            }
119            if (definition instanceof List) {
120                definitions.addAll((List<Map<String, Object>>) definition);
121            }
122            this.riverFlow = RiverServiceLoader.newRiverFlow(strategy);
123            logger.debug("strategy {}: river flow class {} found, settings = {}",
124                    strategy, riverFlow.getClass().getName(), settings.getAsMap());
125            riverFlow.setRiverName(riverName)
126                    .setSettings(settings)
127                    .setClient(client)
128                    .setIngestFactory(createIngestFactory(settings))
129                    .setMetric(new MeterMetric(Executors.newScheduledThreadPool(1), TimeUnit.SECONDS))
130                    .setQueue(new ConcurrentLinkedQueue<Map<String, Object>>());
131        } catch (IOException e) {
132            logger.error(e.getMessage(), e);
133        }
134    }
135
136    /**
137     * Called from Elasticsearch after recovery of indices when a node has initialized.
138     */
139    @Override
140    public void start() {
141        closed = false;
142        this.riverThread = EsExecutors.daemonThreadFactory(settings.globalSettings(),
143                "river(" + riverName().getType() + "/" + riverName().getName() + ")")
144                .newThread(new RiverThread(riverFlow, definitions));
145        this.futures = schedule(riverThread);
146        // we do not wait for futures here, instead, we return to Elasticsea
147    }
148
149    /**
150     * Called form Elasticsearch when river is deleted.
151     */
152    @Override
153    public void close() {
154        if (closed) {
155            return;
156        }
157        closed = true;
158        if (threadPoolExecutor != null) {
159            logger.debug("shutting down river thread scheduler");
160            threadPoolExecutor.shutdownNow();
161            threadPoolExecutor = null;
162        }
163        if (riverThread != null) {
164            logger.debug("interrupting river thread");
165            riverThread.interrupt();
166        }
167        logger.info("river closed [{}/{}]", riverName.getType(), riverName.getName());
168    }
169
170    /**
171     * Execute river once from execute API, no matter if schedule/interval is defined.
172     * If river is running, prevoius instance are interrupted and closed.
173     */
174    public void run() {
175        if (!closed) {
176            close();
177        }
178        this.riverThread = EsExecutors.daemonThreadFactory(settings.globalSettings(),
179                "river(" + riverName().getType() + "/" + riverName().getName() + ")")
180                .newThread(new RiverThread(riverFlow, definitions));
181        this.threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
182                new LinkedBlockingQueue<Runnable>());
183        futures.add(threadPoolExecutor.submit(riverThread));
184        logger.info("started river instance for single run");
185    }
186
187    private List<Future<?>> schedule(Thread thread) {
188        Settings settings = riverFlow.getSettings();
189        List<Future<?>> futures = newLinkedList();
190        if (schedule != null && schedule.length > 0) {
191            CronThreadPoolExecutor cronThreadPoolExecutor =
192                    new CronThreadPoolExecutor(settings.getAsInt("threadpoolsize", 4));
193            for (String cron : schedule) {
194                futures.add(cronThreadPoolExecutor.schedule(thread, new CronExpression(cron)));
195            }
196            this.threadPoolExecutor = cronThreadPoolExecutor;
197            logger.info("scheduled river instance with cron expressions {}", Arrays.asList(schedule));
198        } else if (interval > 0L) {
199            ScheduledThreadPoolExecutor scheduledthreadPoolExecutor = new ScheduledThreadPoolExecutor(settings.getAsInt("threadpoolsize", 4));
200            futures.add(scheduledthreadPoolExecutor.scheduleAtFixedRate(thread, 0L, interval, TimeUnit.SECONDS));
201            this.threadPoolExecutor = scheduledthreadPoolExecutor;
202            logger.info("scheduled river instance at fixed rate of {} seconds", interval);
203        } else {
204            this.threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
205                    new LinkedBlockingQueue<Runnable>());
206            futures.add(threadPoolExecutor.submit(thread));
207            logger.info("started river instance for single run");
208        }
209        return futures;
210    }
211
212    @Override
213    public RiverState getRiverState() {
214        GetRiverStateRequestBuilder riverStateRequestBuilder = new GetRiverStateRequestBuilder(client.admin().cluster())
215                .setRiverName(riverName.getName())
216                .setRiverType(riverName.getType());
217        GetRiverStateResponse riverStateResponse = riverStateRequestBuilder.execute().actionGet();
218        return riverStateResponse.getRiverState();
219    }
220
221    private IngestFactory createIngestFactory(final Settings settings) {
222        return new IngestFactory() {
223            @Override
224            public Ingest create() {
225                Integer maxbulkactions = settings.getAsInt("max_bulk_actions", 100);
226                Integer maxconcurrentbulkrequests = settings.getAsInt("max_concurrent_bulk_requests",
227                        Runtime.getRuntime().availableProcessors() * 2);
228                ByteSizeValue maxvolume = settings.getAsBytesSize("max_bulk_volume", ByteSizeValue.parseBytesSizeValue("10m"));
229                TimeValue maxrequestwait = settings.getAsTime("max_request_wait", TimeValue.timeValueSeconds(60));
230                TimeValue flushinterval = settings.getAsTime("flush_interval", TimeValue.timeValueSeconds(5));
231                return new BulkNodeClient()
232                        .maxActionsPerBulkRequest(maxbulkactions)
233                        .maxConcurrentBulkRequests(maxconcurrentbulkrequests)
234                        .maxRequestWait(maxrequestwait)
235                        .maxVolumePerBulkRequest(maxvolume)
236                        .flushIngestInterval(flushinterval)
237                        .newClient(client);
238            }
239        };
240    }
241}