001package conexp.fx.gui.task;
002
003/*
004 * #%L
005 * Concept Explorer FX
006 * %%
007 * Copyright (C) 2010 - 2023 Francesco Kriegel
008 * %%
009 * This program is free software: you can redistribute it and/or modify
010 * it under the terms of the GNU General Public License as
011 * published by the Free Software Foundation, either version 3 of the
012 * License, or (at your option) any later version.
013 * 
014 * This program is distributed in the hope that it will be useful,
015 * but WITHOUT ANY WARRANTY; without even the implied warranty of
016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
017 * GNU General Public License for more details.
018 * 
019 * You should have received a copy of the GNU General Public
020 * License along with this program.  If not, see
021 * <http://www.gnu.org/licenses/gpl-3.0.html>.
022 * #L%
023 */
024
025import java.util.Arrays;
026import java.util.Collections;
027import java.util.LinkedList;
028import java.util.Map;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.ExecutorService;
031import java.util.concurrent.Executors;
032
033import conexp.fx.gui.ConExpFX;
034import conexp.fx.gui.dataset.Dataset;
035import conexp.fx.gui.dialog.ErrorDialog;
036import conexp.fx.gui.util.Platform2;
037import javafx.beans.binding.Bindings;
038import javafx.beans.binding.BooleanBinding;
039import javafx.beans.binding.DoubleBinding;
040import javafx.beans.property.Property;
041import javafx.beans.property.SimpleObjectProperty;
042import javafx.collections.FXCollections;
043import javafx.collections.ObservableList;
044import javafx.concurrent.WorkerStateEvent;
045import javafx.event.EventHandler;
046
047public class BlockingExecutor {
048
049  public final ExecutorService              tpe                     = Executors.newWorkStealingPool();
050  // new ForkJoinPool(Runtime.getRuntime().availableProcessors() - 1);
051//      new ThreadPoolExecutor(
052//          Runtime.getRuntime().availableProcessors() - 1,
053//          Runtime.getRuntime().availableProcessors() - 1,
054//          1,
055//          TimeUnit.SECONDS,
056//          new LinkedBlockingQueue<Runnable>());
057  public final BooleanBinding               isIdleBinding;
058  public final DoubleBinding                overallProgressBinding;
059  public final ObservableList<TimeTask<?>>  scheduledTasks          =
060      FXCollections.observableList(Collections.synchronizedList(new LinkedList<TimeTask<?>>()));
061  public final Property<TimeTask<?>>        currentTaskProperty     = new SimpleObjectProperty<TimeTask<?>>();
062  private final Map<Dataset, DoubleBinding> datasetProgressBindings = new ConcurrentHashMap<>();
063
064  public BlockingExecutor() {
065    overallProgressBinding = Bindings.createDoubleBinding(() -> {
066      synchronized (scheduledTasks) {
067        if (scheduledTasks.isEmpty())
068          return 1d;
069        double progress = 0d;
070        for (TimeTask<?> task : scheduledTasks) {
071          if (task.isDone())
072            progress += 1d;
073          else if (task.getProgress() > 0)
074            progress += Math.max(0, Math.min(1, task.getProgress()));
075        }
076        return progress / (double) scheduledTasks.size();
077      }
078    } , scheduledTasks, currentTaskProperty);
079    isIdleBinding = Bindings.createBooleanBinding(() -> overallProgressBinding.get() == 1d, overallProgressBinding);
080    currentTaskProperty.addListener((__, ___, task) -> {
081      synchronized (scheduledTasks) {
082        if (task.isDone())
083          next();
084        else {
085          task.exceptionProperty().addListener(
086              (____, _____, exception) -> new ErrorDialog(ConExpFX.instance.primaryStage, exception).showAndWait());
087          final EventHandler<WorkerStateEvent> x = ____ -> {
088//            System.out.println("finished task " + task.n + " " + task.getTitle());
089//            System.out.println("");
090            next();
091          };
092          task.setOnCancelled(x);
093          task.setOnFailed(x);
094          task.setOnSucceeded(x);
095//          System.out.println("starting task " + task.n + " " + task.getTitle());
096          if (task.onFXThread())
097            tpe.submit(TimeTask.encapsulateTaskOnFXThread(task));
098          else
099            tpe.submit(task);
100        }
101      }
102    });
103
104  }
105
106  public final void execute(final TimeTask<?> task) {
107    Platform2.runOnFXThread(() -> {
108      synchronized (scheduledTasks) {
109        final boolean wasIdle = isIdleBinding.get();
110        scheduledTasks.add(task);
111        Arrays.asList(task.progressProperty(), task.stateProperty()).forEach(p -> p.addListener((__, ___, ____) -> {
112          overallProgressBinding.invalidate();
113          datasetProgressBindings.values().forEach(DoubleBinding::invalidate);
114        }));
115        if (wasIdle)
116          currentTaskProperty.setValue(task);
117      }
118    });
119  }
120
121  private final void next() {
122    synchronized (scheduledTasks) {
123      if (!isIdleBinding.get()) {
124        try {
125          final TimeTask<?> nextTask = scheduledTasks.get(scheduledTasks.indexOf(currentTaskProperty.getValue()) + 1);
126          if (nextTask != null)
127            currentTaskProperty.setValue(nextTask);
128        } catch (IndexOutOfBoundsException e) {
129          System.err.println("index out of bounds, task not found.");
130          System.err.println(scheduledTasks.size() + " tasks");
131          System.err.println(scheduledTasks.indexOf(currentTaskProperty.getValue()));
132        }
133      }
134    }
135  }
136
137  public final DoubleBinding datasetProgressBinding(final Dataset dataset) {
138    if (datasetProgressBindings.containsKey(dataset))
139      return datasetProgressBindings.get(dataset);
140    final DoubleBinding datasetProgressBinding = Bindings.createDoubleBinding(() -> {
141      synchronized (scheduledTasks) {
142        if (scheduledTasks.isEmpty())
143          return 1d;
144        double progress = 0d;
145        double tasks = 0d;
146        for (TimeTask<?> task : scheduledTasks)
147          if (task.getDataset() != null && task.getDataset().equals(dataset)) {
148            if (task.isDone())
149              progress += 1d;
150            else if (task.getProgress() > 0)
151              progress += task.getProgress();
152            tasks++;
153          }
154        if (tasks > 0)
155          return progress / tasks;
156        return 1d;
157      }
158    } , scheduledTasks, currentTaskProperty);
159    datasetProgressBindings.put(dataset, datasetProgressBinding);
160    return datasetProgressBinding;
161  }
162
163  public final void cancel(final Dataset dataset) {
164    synchronized (scheduledTasks) {
165      scheduledTasks.filtered(task -> task.getDataset() != null && task.getDataset().equals(dataset)).forEach(
166          task -> task.cancel());
167      scheduledTasks.removeIf(task -> task.getDataset() != null && task.getDataset().equals(dataset));
168      datasetProgressBindings.remove(dataset);
169    }
170  }
171
172}