001package org.xbib.elasticsearch.river.jdbc.strategy.column; 002 003import org.elasticsearch.client.Client; 004import org.elasticsearch.common.joda.time.DateTime; 005import org.elasticsearch.common.settings.Settings; 006import org.elasticsearch.common.unit.ByteSizeValue; 007import org.elasticsearch.common.unit.TimeValue; 008import org.elasticsearch.river.RiverName; 009import org.testng.annotations.Parameters; 010import org.testng.annotations.Test; 011import org.xbib.elasticsearch.plugin.jdbc.client.Ingest; 012import org.xbib.elasticsearch.plugin.jdbc.client.IngestFactory; 013import org.xbib.elasticsearch.plugin.jdbc.client.node.BulkNodeClient; 014import org.xbib.elasticsearch.plugin.jdbc.state.RiverState; 015import org.xbib.elasticsearch.river.jdbc.RiverSource; 016import org.xbib.elasticsearch.river.jdbc.strategy.mock.MockRiverMouth; 017import org.xbib.elasticsearch.river.jdbc.strategy.mock.MockRiverSource; 018 019import java.io.IOException; 020import java.util.Map; 021 022import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; 023 024public class ColumnRiverFlowTests extends AbstractColumnRiverTest { 025 026 @Override 027 public ColumnRiverSource newRiverSource() { 028 return new ColumnRiverSource(); 029 } 030 031 @Override 032 public ColumnRiverContext newRiverContext() { 033 return new ColumnRiverContext(); 034 } 035 036 @Test 037 @Parameters({"river-existedWhereClause"}) 038 public void testWriteLastRiverRunTime(String riverResource) throws Exception { 039 createRiverContext(new MockRiverSource() { 040 @Override 041 public void fetch() { 042 } 043 }, riverResource); 044 Map<String, Object> settingsMap = riverSettings(riverResource).settings(); 045 Settings settings = settingsBuilder().put(settingsMap).build(); 046 ColumnRiverContext riverContext = new ColumnRiverContext(); 047 Map<String, Object> def = (Map<String, Object>) settingsMap.get("jdbc"); 048 riverContext.setDefinition(def); 049 ColumnRiverFlow flow = new ColumnRiverFlow(); 050 flow.setRiverName(new RiverName("jdbc", "column")) 051 .setSettings(settings) 052 .setClient(client("1")) 053 .setIngestFactory(createIngestFactory(settings, client("1"))) 054 .execute(riverContext); 055 assertNotNull(riverContext.getRiverState().getMap().get(ColumnRiverFlow.LAST_RUN_TIME)); 056 } 057 058 @Test 059 @Parameters({"river-existedWhereClause"}) 060 public void testReadLastRiverRunTime(String riverResource) throws Exception { 061 final DateTime lastRunAt = new DateTime(new DateTime().getMillis() - 600); 062 createRiverContext(new MockRiverSource() { 063 @Override 064 public void fetch() { 065 DateTime readlastRunAt = (DateTime) context.getRiverState().getMap().get(ColumnRiverFlow.LAST_RUN_TIME); 066 assertNotNull(readlastRunAt); 067 assertEquals(readlastRunAt, lastRunAt); 068 069 } 070 }, riverResource); 071 RiverState riverState = new RiverState(); 072 context.setRiverState(riverState); 073 context.getRiverState().getMap().put(ColumnRiverFlow.LAST_RUN_TIME, lastRunAt); 074 Map<String, Object> settingsMap = riverSettings(riverResource).settings(); 075 Settings settings = settingsBuilder().put(settingsMap).build(); 076 ColumnRiverContext riverContext = new ColumnRiverContext(); 077 Map<String, Object> def = (Map<String, Object>) settingsMap.get("jdbc"); 078 riverContext.setDefinition(def); 079 ColumnRiverFlow flow = new ColumnRiverFlow(); 080 flow.setRiverName(new RiverName("jdbc", "column")) 081 .setSettings(settings) 082 .setClient(client("1")) 083 .setIngestFactory(createIngestFactory(settings, client("1"))) 084 .execute(riverContext); 085 } 086 087 private void createRiverContext(RiverSource riverSource, String riverResource) throws IOException { 088 context.columnEscape(true) 089 .setRiverMouth(new MockRiverMouth()) 090 .setRiverSource(riverSource); 091 } 092 093 private IngestFactory createIngestFactory(final Settings settings, final Client client) { 094 return new IngestFactory() { 095 @Override 096 public Ingest create() { 097 Integer maxbulkactions = settings.getAsInt("max_bulk_actions", 100); 098 Integer maxconcurrentbulkrequests = settings.getAsInt("max_concurrent_bulk_requests", 099 Runtime.getRuntime().availableProcessors() * 2); 100 ByteSizeValue maxvolume = settings.getAsBytesSize("max_bulk_volume", ByteSizeValue.parseBytesSizeValue("10m")); 101 TimeValue maxrequestwait = settings.getAsTime("max_request_wait", TimeValue.timeValueSeconds(60)); 102 TimeValue flushinterval = settings.getAsTime("flush_interval", TimeValue.timeValueSeconds(5)); 103 return new BulkNodeClient() 104 .maxActionsPerBulkRequest(maxbulkactions) 105 .maxConcurrentBulkRequests(maxconcurrentbulkrequests) 106 .maxRequestWait(maxrequestwait) 107 .maxVolumePerBulkRequest(maxvolume) 108 .flushIngestInterval(flushinterval) 109 .newClient(client); 110 } 111 }; 112 } 113 114}