001/*
002 * Copyright (C) 2014 Jörg Prante
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.xbib.elasticsearch.plugin.jdbc.cron;
017
018import java.util.Date;
019import java.util.concurrent.CancellationException;
020import java.util.concurrent.Future;
021import java.util.concurrent.RejectedExecutionException;
022import java.util.concurrent.RejectedExecutionHandler;
023import java.util.concurrent.ScheduledThreadPoolExecutor;
024import java.util.concurrent.ThreadFactory;
025import java.util.concurrent.TimeUnit;
026
027/**
028 * Scheduled thread-pool executor implementation that leverages a CronExpression
029 * to calculate future execution times for scheduled tasks.
030 */
031public class CronThreadPoolExecutor extends ScheduledThreadPoolExecutor implements CronExecutorService {
032    /**
033     * Constructs a new CronThreadPoolExecutor.
034     *
035     * @param corePoolSize the pool size
036     */
037    public CronThreadPoolExecutor(int corePoolSize) {
038        super(corePoolSize);
039    }
040
041    /**
042     * Constructs a new CronThreadPoolExecutor.
043     *
044     * @param corePoolSize  the pool size
045     * @param threadFactory the thread factory
046     */
047    public CronThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
048        super(corePoolSize, threadFactory);
049    }
050
051    /**
052     * Constructs a new CronThreadPoolExecutor.
053     *
054     * @param corePoolSize the pool size
055     * @param handler      the handler for rejected executions
056     */
057    public CronThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
058        super(corePoolSize, handler);
059    }
060
061    /**
062     * Constructs a new CronThreadPoolExecutor.
063     *
064     * @param corePoolSize  the pool size
065     * @param handler       the handler for rejecting executions
066     * @param threadFactory the thread factory
067     */
068    public CronThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
069        super(corePoolSize, threadFactory, handler);
070    }
071
072    @Override
073    public Future<?> schedule(final Runnable task, final CronExpression expression) {
074        if (task == null) {
075            throw new NullPointerException();
076        }
077        this.setCorePoolSize(this.getCorePoolSize() + 1);
078        Runnable scheduleTask = new Runnable() {
079            /**
080             * @see Runnable#run()
081             */
082            @Override
083            public void run() {
084                Date now = new Date();
085                Date time = expression.getNextValidTimeAfter(now);
086                try {
087                    while (time != null) {
088                        CronThreadPoolExecutor.this.schedule(task, time.getTime() - now.getTime(), TimeUnit.MILLISECONDS);
089                        while (now.before(time)) {
090                            Thread.sleep(time.getTime() - now.getTime());
091                            now = new Date();
092                        }
093                        time = expression.getNextValidTimeAfter(now);
094                    }
095                } catch (RejectedExecutionException e) {
096                    //
097                } catch (CancellationException e) {
098                    //
099                } catch (InterruptedException e) {
100                    //
101                    Thread.currentThread().interrupt();
102                }
103            }
104        };
105        return this.submit(scheduleTask);
106    }
107}