001/*
002 * Copyright (C) 2014 Jörg Prante
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.xbib.elasticsearch.plugin.jdbc.state;
017
018import org.elasticsearch.ElasticsearchException;
019import org.elasticsearch.action.ActionListener;
020import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
021import org.elasticsearch.cluster.ClusterChangedEvent;
022import org.elasticsearch.cluster.ClusterService;
023import org.elasticsearch.cluster.ClusterState;
024import org.elasticsearch.cluster.ClusterStateListener;
025import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest;
026import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
027import org.elasticsearch.cluster.metadata.MetaData;
028import org.elasticsearch.cluster.node.DiscoveryNode;
029import org.elasticsearch.common.collect.ImmutableMap;
030import org.elasticsearch.common.component.AbstractLifecycleComponent;
031import org.elasticsearch.common.inject.Inject;
032import org.elasticsearch.common.logging.ESLogger;
033import org.elasticsearch.common.logging.ESLoggerFactory;
034import org.elasticsearch.common.regex.Regex;
035import org.elasticsearch.common.settings.Settings;
036import org.elasticsearch.river.RiverName;
037
038import java.util.HashMap;
039import java.util.List;
040import java.util.Map;
041
042import static org.elasticsearch.common.collect.Lists.newLinkedList;
043import static org.elasticsearch.common.collect.Maps.newHashMap;
044
045/**
046 * The RiverStateService manages reading and writing of river states in the cluster state
047 */
048public class RiverStateService extends AbstractLifecycleComponent<RiverStateService> implements ClusterStateListener {
049
050    private final static ESLogger logger = ESLoggerFactory.getLogger("river.jdbc.RiverStateService");
051
052    private volatile ImmutableMap<RiverName, RiverState> riverStates = ImmutableMap.of();
053
054    private final ClusterService clusterService;
055
056    @Inject
057    public RiverStateService(Settings settings, ClusterService clusterService) {
058        super(settings);
059        this.clusterService = clusterService;
060        clusterService.add(this);
061    }
062
063    @Override
064    protected void doStart() throws ElasticsearchException {
065    }
066
067    @Override
068    protected void doStop() throws ElasticsearchException {
069    }
070
071    @Override
072    protected void doClose() throws ElasticsearchException {
073    }
074
075    @Override
076    public void clusterChanged(ClusterChangedEvent event) {
077        try {
078            RiverStatesMetaData prev = event.previousState().getMetaData().custom(RiverStatesMetaData.TYPE);
079            RiverStatesMetaData curr = event.state().getMetaData().custom(RiverStatesMetaData.TYPE);
080            if (prev == null) {
081                if (curr != null) {
082                    processRiverStatesMetadata(curr);
083                }
084            } else {
085                if (!prev.equals(curr)) {
086                    processRiverStatesMetadata(curr);
087                }
088            }
089        } catch (Throwable t) {
090            logger.warn("failed to update river state", t);
091        }
092    }
093
094    private void processRiverStatesMetadata(RiverStatesMetaData riverStatesMetaData) {
095        Map<RiverName, RiverState> survivors = newHashMap();
096        // first, remove river states that are no longer there
097        for (Map.Entry<RiverName, RiverState> entry : riverStates.entrySet()) {
098            if (riverStatesMetaData != null) {
099                if (!riverStatesMetaData.getRiverStates(entry.getKey().getName(), entry.getKey().getType()).isEmpty()) {
100                    survivors.put(entry.getKey(), entry.getValue());
101                }
102            }
103        }
104        ImmutableMap.Builder<RiverName, RiverState> builder = ImmutableMap.builder();
105        if (riverStatesMetaData != null) {
106            for (RiverState newRiverState : riverStatesMetaData.getRiverStates()) {
107                if (newRiverState.getName() == null || newRiverState.getType() == null) {
108                    continue;
109                }
110                RiverName riverName = new RiverName(newRiverState.getType(), newRiverState.getName());
111                RiverState oldRiverState = survivors.get(riverName);
112                if (oldRiverState != null && oldRiverState.getType() != null) {
113                    if (!oldRiverState.getType().equals(newRiverState.getType())) {
114                        oldRiverState = newRiverState;
115                    }
116                } else {
117                    oldRiverState = newRiverState;
118                }
119                builder.put(riverName, oldRiverState);
120            }
121
122        }
123        this.riverStates = builder.build();
124    }
125
126    /**
127     * Put a new river into river state management
128     *
129     * @param request  a river state register request
130     * @param listener listener for cluster state update response
131     */
132    public void putRiverState(final RiverStateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
133        clusterService.submitStateUpdateTask(request.cause, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
134            @Override
135            protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
136                return new ClusterStateUpdateResponse(acknowledged);
137            }
138
139            @Override
140            public ClusterState execute(ClusterState currentState) {
141                // sanity check
142                if (request.riverState.getName() == null || request.riverState.getType() == null) {
143                    logger.debug("put: no river name / type given");
144                    return currentState;
145                }
146                RiverName riverName = new RiverName(request.riverState.getType(), request.riverState.getName());
147                RiverState previous = riverStates.get(riverName);
148                if (previous != null) {
149                    logger.debug("put: previous state not null");
150                    return currentState;
151                }
152                Map<RiverName, RiverState> newRiverStates = newHashMap();
153                newRiverStates.put(riverName, request.riverState);
154                riverStates = ImmutableMap.copyOf(newRiverStates);
155                MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData());
156                RiverStatesMetaData riverStates = currentState.metaData().custom(RiverStatesMetaData.TYPE);
157                if (riverStates == null) {
158                    logger.debug("put: first river state [{}/{}]", request.riverState.getType(), request.riverState.getName());
159                    riverStates = new RiverStatesMetaData(request.riverState);
160                } else {
161                    boolean found = false;
162                    List<RiverState> riverStateList = newLinkedList();
163                    for (RiverState riverState : riverStates.getRiverStates()) {
164                        if (riverState != null
165                                && request.riverState != null && riverState.getName() != null
166                                && riverState.getName().equals(request.riverState.getName())
167                                && riverState.getType() != null
168                                && riverState.getType().equals(request.riverState.getType())) {
169                            found = true;
170                            riverStateList.add(request.riverState);
171                        } else {
172                            riverStateList.add(riverState);
173                        }
174                    }
175                    if (!found && request.riverState != null && request.riverState.getName() != null) {
176                        logger.debug("put: another river state [{}/{}]", request.riverState.getType(), request.riverState.getName());
177                        riverStateList.add(request.riverState);
178                    } else {
179                        logger.debug("put: update existing river state [{}/{}]", request.riverState.getType(), request.riverState.getName());
180                    }
181                    riverStates = new RiverStatesMetaData(riverStateList.toArray(new RiverState[riverStateList.size()]));
182                }
183                metadataBuilder.putCustom(RiverStatesMetaData.TYPE, riverStates);
184                return ClusterState.builder(currentState).metaData(metadataBuilder).build();
185            }
186
187            @Override
188            public void onFailure(String source, Throwable t) {
189                logger.warn("failed to create river state [{}/{}]", t, request.riverState.getType(), request.riverState.getName());
190                super.onFailure(source, t);
191            }
192
193            @Override
194            public boolean mustAck(DiscoveryNode discoveryNode) {
195                return discoveryNode.masterNode();
196            }
197        });
198    }
199
200    /**
201     * Post a new river for river state management
202     *
203     * @param request  a river state register request
204     * @param listener listener for cluster state update response
205     */
206    public void postRiverState(final RiverStateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
207        clusterService.submitStateUpdateTask(request.cause, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
208            @Override
209            protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
210                return new ClusterStateUpdateResponse(acknowledged);
211            }
212
213            @Override
214            public ClusterState execute(ClusterState currentState) {
215                // sanity check
216                if (request.riverState.getName() == null || request.riverState.getType() == null) {
217                    logger.debug("post: no river name / type given");
218                    return currentState;
219                }
220                RiverName riverName = new RiverName(request.riverState.getType(), request.riverState.getName());
221                Map<RiverName, RiverState> newRiverStates = newHashMap();
222                newRiverStates.put(riverName, request.riverState);
223                riverStates = ImmutableMap.copyOf(newRiverStates);
224                MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData());
225                RiverStatesMetaData riverStatesMetaData = currentState.metaData().custom(RiverStatesMetaData.TYPE);
226                if (riverStatesMetaData == null) {
227                    logger.debug("post: first river state [{}/{}]", request.riverState.getType(), request.riverState.getName());
228                    riverStatesMetaData = new RiverStatesMetaData(request.riverState);
229                } else {
230                    boolean found = false;
231                    List<RiverState> riverStateList = newLinkedList();
232                    for (RiverState riverState : riverStatesMetaData.getRiverStates()) {
233                        if (riverState.getName() != null && riverState.getName().equals(request.riverState.getName())
234                                && riverState.getType() != null && riverState.getType().equals(request.riverState.getType())) {
235                            found = true;
236                            if (riverState.getMap() == null) {
237                                riverState.setMap(new HashMap<String, Object>());
238                            }
239                            if (request.riverState != null) {
240                                riverState.setStarted(request.riverState.getStarted());
241                                riverState.setLastActive(request.riverState.getLastActiveBegin(), request.riverState.getLastActiveEnd());
242                                if (request.riverState.getMap() != null) {
243                                    riverState.getMap().putAll(request.riverState.getMap());
244                                }
245                            }
246                            logger.debug("post: update river state = {}", riverState);
247                            riverStateList.add(riverState);
248                        } else {
249                            riverStateList.add(riverState);
250                        }
251                    }
252                    if (!found && request.riverState != null && request.riverState.getName() != null) {
253                        logger.debug("post: another river state [{}/{}]", request.riverState.getType(), request.riverState.getName());
254                        riverStateList.add(request.riverState);
255                    }
256                    riverStatesMetaData = new RiverStatesMetaData(riverStateList.toArray(new RiverState[riverStateList.size()]));
257                }
258                metadataBuilder.putCustom(RiverStatesMetaData.TYPE, riverStatesMetaData);
259                return ClusterState.builder(currentState).metaData(metadataBuilder).build();
260            }
261
262            @Override
263            public void onFailure(String source, Throwable t) {
264                logger.warn("failed to post river state [{}/{}]", t, request.riverState.getType(), request.riverState.getName());
265                super.onFailure(source, t);
266            }
267
268            @Override
269            public boolean mustAck(DiscoveryNode discoveryNode) {
270                return discoveryNode.masterNode();
271            }
272        });
273    }
274
275
276    /**
277     * Delete river from river state management
278     *
279     * @param request  the unregister river state request
280     * @param listener listener for cluster state updates
281     */
282    public void deleteRiverState(final DeleteRiverStateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
283        clusterService.submitStateUpdateTask(request.cause, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
284            @Override
285            protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
286                return new ClusterStateUpdateResponse(acknowledged);
287            }
288
289            @Override
290            public ClusterState execute(ClusterState currentState) {
291                MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData());
292                RiverStatesMetaData riverStatesMetaData = currentState.metaData().custom(RiverStatesMetaData.TYPE);
293                if (riverStatesMetaData != null && riverStatesMetaData.getRiverStates().size() > 0) {
294                    List<RiverState> riverStates = newLinkedList();
295                    boolean changed = false;
296                    for (RiverState riverState : riverStatesMetaData.getRiverStates()) {
297                        if (riverState.getType().equals(request.type) && Regex.simpleMatch(request.name, riverState.getName())) {
298                            logger.debug("delete: river state [{}]", riverState.getName());
299                            changed = true;
300                        } else {
301                            riverStates.add(riverState);
302                        }
303                    }
304                    if (changed) {
305                        riverStatesMetaData = new RiverStatesMetaData(riverStates.toArray(new RiverState[riverStates.size()]));
306                        metadataBuilder.putCustom(RiverStatesMetaData.TYPE, riverStatesMetaData);
307                        return ClusterState.builder(currentState).metaData(metadataBuilder).build();
308                    }
309                }
310                throw new ElasticsearchException("unable to delete, river state missing: " + request.name);
311            }
312
313            @Override
314            public boolean mustAck(DiscoveryNode discoveryNode) {
315                return discoveryNode.masterNode();
316            }
317        });
318    }
319
320    public static class RiverStateRequest extends ClusterStateUpdateRequest<RiverStateRequest> {
321
322        final String cause;
323
324        RiverState riverState;
325
326        /**
327         * Constructs new register river request
328         *
329         * @param cause      river registration cause
330         * @param riverState river state
331         */
332        public RiverStateRequest(String cause, RiverState riverState) {
333            this.cause = cause;
334            this.riverState = riverState;
335        }
336
337    }
338
339    public static class DeleteRiverStateRequest extends ClusterStateUpdateRequest<DeleteRiverStateRequest> {
340
341        final String cause;
342
343        final String name;
344
345        final String type;
346
347        /**
348         * Creates a new delete river state request
349         *
350         * @param cause river delete cause
351         * @param name  river name
352         */
353        public DeleteRiverStateRequest(String cause, String name, String type) {
354            this.cause = cause;
355            this.name = name;
356            this.type = type;
357        }
358    }
359
360}