001/* 002 * Copyright (C) 2014 Jörg Prante 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.xbib.elasticsearch.river.jdbc.strategy.simple; 017 018import org.elasticsearch.action.delete.DeleteRequest; 019import org.elasticsearch.action.index.IndexRequest; 020import org.elasticsearch.client.Requests; 021import org.elasticsearch.common.Strings; 022import org.elasticsearch.common.collect.ImmutableSet; 023import org.elasticsearch.common.joda.time.DateTime; 024import org.elasticsearch.common.joda.time.format.DateTimeFormat; 025import org.elasticsearch.common.logging.ESLogger; 026import org.elasticsearch.common.logging.ESLoggerFactory; 027import org.elasticsearch.common.settings.Settings; 028import org.elasticsearch.common.unit.TimeValue; 029import org.elasticsearch.index.VersionType; 030import org.xbib.elasticsearch.plugin.jdbc.client.Ingest; 031import org.xbib.elasticsearch.plugin.jdbc.client.IngestFactory; 032import org.xbib.elasticsearch.plugin.jdbc.client.Metric; 033import org.xbib.elasticsearch.plugin.jdbc.util.ControlKeys; 034import org.xbib.elasticsearch.plugin.jdbc.util.IndexableObject; 035import org.xbib.elasticsearch.river.jdbc.RiverMouth; 036 037import java.io.IOException; 038import java.util.Map; 039 040/** 041 * Simple river mouth implementation. This implementation uses bulk processing, 042 * index name housekeeping (with replica/refresh), and metrics. It understands 043 * _version, _routing, _timestamp, _parent, and _ttl metadata. 044 */ 045public class SimpleRiverMouth implements RiverMouth<SimpleRiverContext> { 046 047 private final static ESLogger logger = ESLoggerFactory.getLogger("river.jdbc.SimpleRiverMouth"); 048 049 protected SimpleRiverContext context; 050 051 protected IngestFactory ingestFactory; 052 053 protected Ingest ingest; 054 055 protected Metric metric; 056 057 protected Settings indexSettings; 058 059 protected Map<String, String> indexMappings; 060 061 protected String index; 062 063 protected String type; 064 065 protected String id; 066 067 protected volatile boolean suspended = false; 068 069 @Override 070 public String strategy() { 071 return "simple"; 072 } 073 074 @Override 075 public SimpleRiverMouth newInstance() { 076 return new SimpleRiverMouth(); 077 } 078 079 @Override 080 public SimpleRiverMouth setRiverContext(SimpleRiverContext context) { 081 this.context = context; 082 return this; 083 } 084 085 @Override 086 public SimpleRiverMouth setIngestFactory(IngestFactory ingestFactory) { 087 this.ingestFactory = ingestFactory; 088 this.ingest = ingestFactory.create(); 089 this.metric = ingest.getMetric(); 090 return this; 091 } 092 093 @Override 094 public Metric getMetric() { 095 return metric; 096 } 097 098 @Override 099 public synchronized void beforeFetch() throws IOException { 100 if (ingest == null || ingest.isShutdown()) { 101 ingest = ingestFactory.create(); 102 } 103 if (!ingest.client().admin().indices().prepareExists(index).execute().actionGet().isExists()) { 104 logger.info("creating index {} with settings {} and mappings {}", 105 index, indexSettings != null ? indexSettings.getAsMap() : "{}", indexMappings); 106 ingest.newIndex(index, indexSettings, indexMappings); 107 } 108 ingest.startBulk(index); 109 } 110 111 @Override 112 public synchronized void afterFetch() throws IOException { 113 if (ingest == null || ingest.isShutdown()) { 114 ingest = ingestFactory.create(); 115 } 116 flush(); 117 ingest.stopBulk(index); 118 ingest.refresh(index); 119 if (metric.indices() != null && !metric.indices().isEmpty()) { 120 for (String index : ImmutableSet.copyOf(metric.indices())) { 121 logger.info("stopping bulk mode for index {} and refreshing...", index); 122 ingest.stopBulk(index); 123 ingest.refresh(index); 124 } 125 } 126 if (!ingest.isShutdown()) { 127 ingest.shutdown(); 128 } 129 } 130 131 @Override 132 public synchronized void shutdown() { 133 try { 134 flush(); 135 } catch (IOException e) { 136 logger.error(e.getMessage(), e); 137 } 138 if (ingest != null && !ingest.isShutdown()) { 139 // shut down ingest and release ingest resources 140 ingest.shutdown(); 141 } 142 } 143 144 @Override 145 public SimpleRiverMouth setIndexSettings(Settings indexSettings) { 146 this.indexSettings = indexSettings; 147 return this; 148 } 149 150 @Override 151 public SimpleRiverMouth setTypeMapping(Map<String, String> typeMapping) { 152 this.indexMappings = typeMapping; 153 return this; 154 } 155 156 @Override 157 public SimpleRiverMouth setIndex(String index) { 158 this.index = index.contains("'") ? DateTimeFormat.forPattern(index).print(new DateTime()) : index; 159 return this; 160 } 161 162 @Override 163 public String getIndex() { 164 return index; 165 } 166 167 @Override 168 public SimpleRiverMouth setType(String type) { 169 this.type = type; 170 return this; 171 } 172 173 @Override 174 public String getType() { 175 return type; 176 } 177 178 @Override 179 public SimpleRiverMouth setId(String id) { 180 this.id = id; 181 return this; 182 } 183 184 @Override 185 public String getId() { 186 return id; 187 } 188 189 @Override 190 public void index(IndexableObject object, boolean create) throws IOException { 191 try { 192 while (suspended) { 193 Thread.sleep(1000L); 194 } 195 } catch (InterruptedException e) { 196 Thread.currentThread().interrupt(); 197 logger.warn("interrupted"); 198 } 199 if (Strings.hasLength(object.index())) { 200 setIndex(object.index()); 201 } 202 if (Strings.hasLength(object.type())) { 203 setType(object.type()); 204 } 205 if (Strings.hasLength(object.id())) { 206 setId(object.id()); 207 } 208 IndexRequest request = Requests.indexRequest(this.index) 209 .type(this.type) 210 .id(getId()) 211 .source(object.build()); 212 if (object.meta(ControlKeys._version.name()) != null) { 213 request.versionType(VersionType.EXTERNAL) 214 .version(Long.parseLong(object.meta(ControlKeys._version.name()))); 215 } 216 if (object.meta(ControlKeys._routing.name()) != null) { 217 request.routing(object.meta(ControlKeys._routing.name())); 218 } 219 if (object.meta(ControlKeys._parent.name()) != null) { 220 request.parent(object.meta(ControlKeys._parent.name())); 221 } 222 if (object.meta(ControlKeys._timestamp.name()) != null) { 223 request.timestamp(object.meta(ControlKeys._timestamp.name())); 224 } 225 if (object.meta(ControlKeys._ttl.name()) != null) { 226 request.ttl(Long.parseLong(object.meta(ControlKeys._ttl.name()))); 227 } 228 if (logger.isTraceEnabled()) { 229 logger.trace("adding bulk index action {}", request.source().toUtf8()); 230 } 231 if (ingest != null) { 232 ingest.bulkIndex(request); 233 } 234 } 235 236 @Override 237 public void delete(IndexableObject object) { 238 try { 239 while (suspended) { 240 Thread.sleep(1000L); 241 } 242 } catch (InterruptedException e) { 243 Thread.currentThread().interrupt(); 244 logger.warn("interrupted"); 245 } 246 if (Strings.hasLength(object.index())) { 247 this.index = object.index(); 248 } 249 if (Strings.hasLength(object.type())) { 250 this.type = object.type(); 251 } 252 if (Strings.hasLength(object.id())) { 253 setId(object.id()); 254 } 255 if (getId() == null) { 256 return; // skip if no doc is specified to delete 257 } 258 DeleteRequest request = Requests.deleteRequest(this.index).type(this.type).id(getId()); 259 if (object.meta(ControlKeys._version.name()) != null) { 260 request.versionType(VersionType.EXTERNAL) 261 .version(Long.parseLong(object.meta(ControlKeys._version.name()))); 262 } 263 if (object.meta(ControlKeys._routing.name()) != null) { 264 request.routing(object.meta(ControlKeys._routing.name())); 265 } 266 if (object.meta(ControlKeys._parent.name()) != null) { 267 request.parent(object.meta(ControlKeys._parent.name())); 268 } 269 if (logger.isTraceEnabled()) { 270 logger.trace("adding bulk delete action {}/{}/{}", request.index(), request.type(), request.id()); 271 } 272 if (ingest != null) { 273 ingest.bulkDelete(request); 274 } 275 } 276 277 @Override 278 public void flush() throws IOException { 279 if (ingest != null) { 280 ingest.flushIngest(); 281 // wait for all outstanding bulk requests before continue with river 282 try { 283 ingest.waitForResponses(TimeValue.timeValueSeconds(60)); 284 } catch (InterruptedException e) { 285 logger.warn("interrupted while waiting for responses"); 286 Thread.currentThread().interrupt(); 287 } 288 } 289 } 290 291 @Override 292 public void suspend() { 293 if (ingest != null) { 294 this.suspended = true; 295 ingest.suspend(); 296 } 297 } 298 299 @Override 300 public void resume() { 301 if (ingest != null) { 302 this.suspended = false; 303 ingest.resume(); 304 } 305 } 306 307}