001package org.xbib.elasticsearch.river.jdbc.strategy.column;
002
003import org.elasticsearch.client.Client;
004import org.elasticsearch.common.joda.time.DateTime;
005import org.elasticsearch.common.settings.Settings;
006import org.elasticsearch.common.unit.ByteSizeValue;
007import org.elasticsearch.common.unit.TimeValue;
008import org.elasticsearch.river.RiverName;
009import org.testng.annotations.Parameters;
010import org.testng.annotations.Test;
011import org.xbib.elasticsearch.plugin.jdbc.client.Ingest;
012import org.xbib.elasticsearch.plugin.jdbc.client.IngestFactory;
013import org.xbib.elasticsearch.plugin.jdbc.client.node.BulkNodeClient;
014import org.xbib.elasticsearch.plugin.jdbc.state.RiverState;
015import org.xbib.elasticsearch.river.jdbc.RiverSource;
016import org.xbib.elasticsearch.river.jdbc.strategy.mock.MockRiverMouth;
017import org.xbib.elasticsearch.river.jdbc.strategy.mock.MockRiverSource;
018
019import java.io.IOException;
020import java.util.Map;
021
022import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
023
024public class ColumnRiverFlowTests extends AbstractColumnRiverTest {
025
026    @Override
027    public ColumnRiverSource newRiverSource() {
028        return new ColumnRiverSource();
029    }
030
031    @Override
032    public ColumnRiverContext newRiverContext() {
033        return new ColumnRiverContext();
034    }
035
036    @Test
037    @Parameters({"river-existedWhereClause"})
038    public void testWriteLastRiverRunTime(String riverResource) throws Exception {
039        createRiverContext(new MockRiverSource() {
040            @Override
041            public void fetch() {
042            }
043        }, riverResource);
044        Map<String, Object> settingsMap = riverSettings(riverResource).settings();
045        Settings settings = settingsBuilder().put(settingsMap).build();
046        ColumnRiverContext riverContext = new ColumnRiverContext();
047        Map<String, Object> def = (Map<String, Object>) settingsMap.get("jdbc");
048        riverContext.setDefinition(def);
049        ColumnRiverFlow flow = new ColumnRiverFlow();
050        flow.setRiverName(new RiverName("jdbc", "column"))
051                .setSettings(settings)
052                .setClient(client("1"))
053                .setIngestFactory(createIngestFactory(settings, client("1")))
054                .execute(riverContext);
055        assertNotNull(riverContext.getRiverState().getMap().get(ColumnRiverFlow.LAST_RUN_TIME));
056    }
057
058    @Test
059    @Parameters({"river-existedWhereClause"})
060    public void testReadLastRiverRunTime(String riverResource) throws Exception {
061        final DateTime lastRunAt = new DateTime(new DateTime().getMillis() - 600);
062        createRiverContext(new MockRiverSource() {
063            @Override
064            public void fetch() {
065                DateTime readlastRunAt = (DateTime) context.getRiverState().getMap().get(ColumnRiverFlow.LAST_RUN_TIME);
066                assertNotNull(readlastRunAt);
067                assertEquals(readlastRunAt, lastRunAt);
068
069            }
070        }, riverResource);
071        RiverState riverState = new RiverState();
072        context.setRiverState(riverState);
073        context.getRiverState().getMap().put(ColumnRiverFlow.LAST_RUN_TIME, lastRunAt);
074        Map<String, Object> settingsMap = riverSettings(riverResource).settings();
075        Settings settings = settingsBuilder().put(settingsMap).build();
076        ColumnRiverContext riverContext = new ColumnRiverContext();
077        Map<String, Object> def = (Map<String, Object>) settingsMap.get("jdbc");
078        riverContext.setDefinition(def);
079        ColumnRiverFlow flow = new ColumnRiverFlow();
080        flow.setRiverName(new RiverName("jdbc", "column"))
081                .setSettings(settings)
082                .setClient(client("1"))
083                .setIngestFactory(createIngestFactory(settings, client("1")))
084                .execute(riverContext);
085    }
086
087    private void createRiverContext(RiverSource riverSource, String riverResource) throws IOException {
088        context.columnEscape(true)
089             .setRiverMouth(new MockRiverMouth())
090             .setRiverSource(riverSource);
091    }
092
093    private IngestFactory createIngestFactory(final Settings settings, final Client client) {
094        return new IngestFactory() {
095            @Override
096            public Ingest create() {
097                Integer maxbulkactions = settings.getAsInt("max_bulk_actions", 100);
098                Integer maxconcurrentbulkrequests = settings.getAsInt("max_concurrent_bulk_requests",
099                        Runtime.getRuntime().availableProcessors() * 2);
100                ByteSizeValue maxvolume = settings.getAsBytesSize("max_bulk_volume", ByteSizeValue.parseBytesSizeValue("10m"));
101                TimeValue maxrequestwait = settings.getAsTime("max_request_wait", TimeValue.timeValueSeconds(60));
102                TimeValue flushinterval = settings.getAsTime("flush_interval", TimeValue.timeValueSeconds(5));
103                return new BulkNodeClient()
104                        .maxActionsPerBulkRequest(maxbulkactions)
105                        .maxConcurrentBulkRequests(maxconcurrentbulkrequests)
106                        .maxRequestWait(maxrequestwait)
107                        .maxVolumePerBulkRequest(maxvolume)
108                        .flushIngestInterval(flushinterval)
109                        .newClient(client);
110            }
111        };
112    }
113
114}