001/* 002 * Copyright (C) 2014 Jörg Prante 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.xbib.elasticsearch.plugin.jdbc.state; 017 018import org.elasticsearch.ElasticsearchException; 019import org.elasticsearch.action.ActionListener; 020import org.elasticsearch.cluster.AckedClusterStateUpdateTask; 021import org.elasticsearch.cluster.ClusterChangedEvent; 022import org.elasticsearch.cluster.ClusterService; 023import org.elasticsearch.cluster.ClusterState; 024import org.elasticsearch.cluster.ClusterStateListener; 025import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest; 026import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; 027import org.elasticsearch.cluster.metadata.MetaData; 028import org.elasticsearch.cluster.node.DiscoveryNode; 029import org.elasticsearch.common.collect.ImmutableMap; 030import org.elasticsearch.common.component.AbstractLifecycleComponent; 031import org.elasticsearch.common.inject.Inject; 032import org.elasticsearch.common.logging.ESLogger; 033import org.elasticsearch.common.logging.ESLoggerFactory; 034import org.elasticsearch.common.regex.Regex; 035import org.elasticsearch.common.settings.Settings; 036import org.elasticsearch.river.RiverName; 037 038import java.util.HashMap; 039import java.util.List; 040import java.util.Map; 041 042import static org.elasticsearch.common.collect.Lists.newLinkedList; 043import static org.elasticsearch.common.collect.Maps.newHashMap; 044 045/** 046 * The RiverStateService manages reading and writing of river states in the cluster state 047 */ 048public class RiverStateService extends AbstractLifecycleComponent<RiverStateService> implements ClusterStateListener { 049 050 private final static ESLogger logger = ESLoggerFactory.getLogger("river.jdbc.RiverStateService"); 051 052 private volatile ImmutableMap<RiverName, RiverState> riverStates = ImmutableMap.of(); 053 054 private final ClusterService clusterService; 055 056 @Inject 057 public RiverStateService(Settings settings, ClusterService clusterService) { 058 super(settings); 059 this.clusterService = clusterService; 060 clusterService.add(this); 061 } 062 063 @Override 064 protected void doStart() throws ElasticsearchException { 065 } 066 067 @Override 068 protected void doStop() throws ElasticsearchException { 069 } 070 071 @Override 072 protected void doClose() throws ElasticsearchException { 073 } 074 075 @Override 076 public void clusterChanged(ClusterChangedEvent event) { 077 try { 078 RiverStatesMetaData prev = event.previousState().getMetaData().custom(RiverStatesMetaData.TYPE); 079 RiverStatesMetaData curr = event.state().getMetaData().custom(RiverStatesMetaData.TYPE); 080 if (prev == null) { 081 if (curr != null) { 082 processRiverStatesMetadata(curr); 083 } 084 } else { 085 if (!prev.equals(curr)) { 086 processRiverStatesMetadata(curr); 087 } 088 } 089 } catch (Throwable t) { 090 logger.warn("failed to update river state", t); 091 } 092 } 093 094 private void processRiverStatesMetadata(RiverStatesMetaData riverStatesMetaData) { 095 Map<RiverName, RiverState> survivors = newHashMap(); 096 // first, remove river states that are no longer there 097 for (Map.Entry<RiverName, RiverState> entry : riverStates.entrySet()) { 098 if (riverStatesMetaData != null) { 099 if (!riverStatesMetaData.getRiverStates(entry.getKey().getName(), entry.getKey().getType()).isEmpty()) { 100 survivors.put(entry.getKey(), entry.getValue()); 101 } 102 } 103 } 104 ImmutableMap.Builder<RiverName, RiverState> builder = ImmutableMap.builder(); 105 if (riverStatesMetaData != null) { 106 for (RiverState newRiverState : riverStatesMetaData.getRiverStates()) { 107 if (newRiverState.getName() == null || newRiverState.getType() == null) { 108 continue; 109 } 110 RiverName riverName = new RiverName(newRiverState.getType(), newRiverState.getName()); 111 RiverState oldRiverState = survivors.get(riverName); 112 if (oldRiverState != null && oldRiverState.getType() != null) { 113 if (!oldRiverState.getType().equals(newRiverState.getType())) { 114 oldRiverState = newRiverState; 115 } 116 } else { 117 oldRiverState = newRiverState; 118 } 119 builder.put(riverName, oldRiverState); 120 } 121 122 } 123 this.riverStates = builder.build(); 124 } 125 126 /** 127 * Put a new river into river state management 128 * 129 * @param request a river state register request 130 * @param listener listener for cluster state update response 131 */ 132 public void putRiverState(final RiverStateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) { 133 clusterService.submitStateUpdateTask(request.cause, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) { 134 @Override 135 protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { 136 return new ClusterStateUpdateResponse(acknowledged); 137 } 138 139 @Override 140 public ClusterState execute(ClusterState currentState) { 141 // sanity check 142 if (request.riverState.getName() == null || request.riverState.getType() == null) { 143 logger.debug("put: no river name / type given"); 144 return currentState; 145 } 146 RiverName riverName = new RiverName(request.riverState.getType(), request.riverState.getName()); 147 RiverState previous = riverStates.get(riverName); 148 if (previous != null) { 149 logger.debug("put: previous state not null"); 150 return currentState; 151 } 152 Map<RiverName, RiverState> newRiverStates = newHashMap(); 153 newRiverStates.put(riverName, request.riverState); 154 riverStates = ImmutableMap.copyOf(newRiverStates); 155 MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData()); 156 RiverStatesMetaData riverStates = currentState.metaData().custom(RiverStatesMetaData.TYPE); 157 if (riverStates == null) { 158 logger.debug("put: first river state [{}/{}]", request.riverState.getType(), request.riverState.getName()); 159 riverStates = new RiverStatesMetaData(request.riverState); 160 } else { 161 boolean found = false; 162 List<RiverState> riverStateList = newLinkedList(); 163 for (RiverState riverState : riverStates.getRiverStates()) { 164 if (riverState != null 165 && request.riverState != null && riverState.getName() != null 166 && riverState.getName().equals(request.riverState.getName()) 167 && riverState.getType() != null 168 && riverState.getType().equals(request.riverState.getType())) { 169 found = true; 170 riverStateList.add(request.riverState); 171 } else { 172 riverStateList.add(riverState); 173 } 174 } 175 if (!found && request.riverState != null && request.riverState.getName() != null) { 176 logger.debug("put: another river state [{}/{}]", request.riverState.getType(), request.riverState.getName()); 177 riverStateList.add(request.riverState); 178 } else { 179 logger.debug("put: update existing river state [{}/{}]", request.riverState.getType(), request.riverState.getName()); 180 } 181 riverStates = new RiverStatesMetaData(riverStateList.toArray(new RiverState[riverStateList.size()])); 182 } 183 metadataBuilder.putCustom(RiverStatesMetaData.TYPE, riverStates); 184 return ClusterState.builder(currentState).metaData(metadataBuilder).build(); 185 } 186 187 @Override 188 public void onFailure(String source, Throwable t) { 189 logger.warn("failed to create river state [{}/{}]", t, request.riverState.getType(), request.riverState.getName()); 190 super.onFailure(source, t); 191 } 192 193 @Override 194 public boolean mustAck(DiscoveryNode discoveryNode) { 195 return discoveryNode.masterNode(); 196 } 197 }); 198 } 199 200 /** 201 * Post a new river for river state management 202 * 203 * @param request a river state register request 204 * @param listener listener for cluster state update response 205 */ 206 public void postRiverState(final RiverStateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) { 207 clusterService.submitStateUpdateTask(request.cause, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) { 208 @Override 209 protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { 210 return new ClusterStateUpdateResponse(acknowledged); 211 } 212 213 @Override 214 public ClusterState execute(ClusterState currentState) { 215 // sanity check 216 if (request.riverState.getName() == null || request.riverState.getType() == null) { 217 logger.debug("post: no river name / type given"); 218 return currentState; 219 } 220 RiverName riverName = new RiverName(request.riverState.getType(), request.riverState.getName()); 221 Map<RiverName, RiverState> newRiverStates = newHashMap(); 222 newRiverStates.put(riverName, request.riverState); 223 riverStates = ImmutableMap.copyOf(newRiverStates); 224 MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData()); 225 RiverStatesMetaData riverStatesMetaData = currentState.metaData().custom(RiverStatesMetaData.TYPE); 226 if (riverStatesMetaData == null) { 227 logger.debug("post: first river state [{}/{}]", request.riverState.getType(), request.riverState.getName()); 228 riverStatesMetaData = new RiverStatesMetaData(request.riverState); 229 } else { 230 boolean found = false; 231 List<RiverState> riverStateList = newLinkedList(); 232 for (RiverState riverState : riverStatesMetaData.getRiverStates()) { 233 if (riverState.getName() != null && riverState.getName().equals(request.riverState.getName()) 234 && riverState.getType() != null && riverState.getType().equals(request.riverState.getType())) { 235 found = true; 236 if (riverState.getMap() == null) { 237 riverState.setMap(new HashMap<String, Object>()); 238 } 239 if (request.riverState != null) { 240 riverState.setStarted(request.riverState.getStarted()); 241 riverState.setLastActive(request.riverState.getLastActiveBegin(), request.riverState.getLastActiveEnd()); 242 if (request.riverState.getMap() != null) { 243 riverState.getMap().putAll(request.riverState.getMap()); 244 } 245 } 246 logger.debug("post: update river state = {}", riverState); 247 riverStateList.add(riverState); 248 } else { 249 riverStateList.add(riverState); 250 } 251 } 252 if (!found && request.riverState != null && request.riverState.getName() != null) { 253 logger.debug("post: another river state [{}/{}]", request.riverState.getType(), request.riverState.getName()); 254 riverStateList.add(request.riverState); 255 } 256 riverStatesMetaData = new RiverStatesMetaData(riverStateList.toArray(new RiverState[riverStateList.size()])); 257 } 258 metadataBuilder.putCustom(RiverStatesMetaData.TYPE, riverStatesMetaData); 259 return ClusterState.builder(currentState).metaData(metadataBuilder).build(); 260 } 261 262 @Override 263 public void onFailure(String source, Throwable t) { 264 logger.warn("failed to post river state [{}/{}]", t, request.riverState.getType(), request.riverState.getName()); 265 super.onFailure(source, t); 266 } 267 268 @Override 269 public boolean mustAck(DiscoveryNode discoveryNode) { 270 return discoveryNode.masterNode(); 271 } 272 }); 273 } 274 275 276 /** 277 * Delete river from river state management 278 * 279 * @param request the unregister river state request 280 * @param listener listener for cluster state updates 281 */ 282 public void deleteRiverState(final DeleteRiverStateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) { 283 clusterService.submitStateUpdateTask(request.cause, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) { 284 @Override 285 protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { 286 return new ClusterStateUpdateResponse(acknowledged); 287 } 288 289 @Override 290 public ClusterState execute(ClusterState currentState) { 291 MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData()); 292 RiverStatesMetaData riverStatesMetaData = currentState.metaData().custom(RiverStatesMetaData.TYPE); 293 if (riverStatesMetaData != null && riverStatesMetaData.getRiverStates().size() > 0) { 294 List<RiverState> riverStates = newLinkedList(); 295 boolean changed = false; 296 for (RiverState riverState : riverStatesMetaData.getRiverStates()) { 297 if (riverState.getType().equals(request.type) && Regex.simpleMatch(request.name, riverState.getName())) { 298 logger.debug("delete: river state [{}]", riverState.getName()); 299 changed = true; 300 } else { 301 riverStates.add(riverState); 302 } 303 } 304 if (changed) { 305 riverStatesMetaData = new RiverStatesMetaData(riverStates.toArray(new RiverState[riverStates.size()])); 306 metadataBuilder.putCustom(RiverStatesMetaData.TYPE, riverStatesMetaData); 307 return ClusterState.builder(currentState).metaData(metadataBuilder).build(); 308 } 309 } 310 throw new ElasticsearchException("unable to delete, river state missing: " + request.name); 311 } 312 313 @Override 314 public boolean mustAck(DiscoveryNode discoveryNode) { 315 return discoveryNode.masterNode(); 316 } 317 }); 318 } 319 320 public static class RiverStateRequest extends ClusterStateUpdateRequest<RiverStateRequest> { 321 322 final String cause; 323 324 RiverState riverState; 325 326 /** 327 * Constructs new register river request 328 * 329 * @param cause river registration cause 330 * @param riverState river state 331 */ 332 public RiverStateRequest(String cause, RiverState riverState) { 333 this.cause = cause; 334 this.riverState = riverState; 335 } 336 337 } 338 339 public static class DeleteRiverStateRequest extends ClusterStateUpdateRequest<DeleteRiverStateRequest> { 340 341 final String cause; 342 343 final String name; 344 345 final String type; 346 347 /** 348 * Creates a new delete river state request 349 * 350 * @param cause river delete cause 351 * @param name river name 352 */ 353 public DeleteRiverStateRequest(String cause, String name, String type) { 354 this.cause = cause; 355 this.name = name; 356 this.type = type; 357 } 358 } 359 360}