001package conexp.fx.gui.task; 002 003/* 004 * #%L 005 * Concept Explorer FX 006 * %% 007 * Copyright (C) 2010 - 2023 Francesco Kriegel 008 * %% 009 * This program is free software: you can redistribute it and/or modify 010 * it under the terms of the GNU General Public License as 011 * published by the Free Software Foundation, either version 3 of the 012 * License, or (at your option) any later version. 013 * 014 * This program is distributed in the hope that it will be useful, 015 * but WITHOUT ANY WARRANTY; without even the implied warranty of 016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 017 * GNU General Public License for more details. 018 * 019 * You should have received a copy of the GNU General Public 020 * License along with this program. If not, see 021 * <http://www.gnu.org/licenses/gpl-3.0.html>. 022 * #L% 023 */ 024 025import java.util.Arrays; 026import java.util.Collections; 027import java.util.LinkedList; 028import java.util.Map; 029import java.util.concurrent.ConcurrentHashMap; 030import java.util.concurrent.ExecutorService; 031import java.util.concurrent.Executors; 032 033import conexp.fx.gui.ConExpFX; 034import conexp.fx.gui.dataset.Dataset; 035import conexp.fx.gui.dialog.ErrorDialog; 036import conexp.fx.gui.util.Platform2; 037import javafx.beans.binding.Bindings; 038import javafx.beans.binding.BooleanBinding; 039import javafx.beans.binding.DoubleBinding; 040import javafx.beans.property.Property; 041import javafx.beans.property.SimpleObjectProperty; 042import javafx.collections.FXCollections; 043import javafx.collections.ObservableList; 044import javafx.concurrent.WorkerStateEvent; 045import javafx.event.EventHandler; 046 047public class BlockingExecutor { 048 049 public final ExecutorService tpe = Executors.newWorkStealingPool(); 050 // new ForkJoinPool(Runtime.getRuntime().availableProcessors() - 1); 051// new ThreadPoolExecutor( 052// Runtime.getRuntime().availableProcessors() - 1, 053// Runtime.getRuntime().availableProcessors() - 1, 054// 1, 055// TimeUnit.SECONDS, 056// new LinkedBlockingQueue<Runnable>()); 057 public final BooleanBinding isIdleBinding; 058 public final DoubleBinding overallProgressBinding; 059 public final ObservableList<TimeTask<?>> scheduledTasks = 060 FXCollections.observableList(Collections.synchronizedList(new LinkedList<TimeTask<?>>())); 061 public final Property<TimeTask<?>> currentTaskProperty = new SimpleObjectProperty<TimeTask<?>>(); 062 private final Map<Dataset, DoubleBinding> datasetProgressBindings = new ConcurrentHashMap<>(); 063 064 public BlockingExecutor() { 065 overallProgressBinding = Bindings.createDoubleBinding(() -> { 066 synchronized (scheduledTasks) { 067 if (scheduledTasks.isEmpty()) 068 return 1d; 069 double progress = 0d; 070 for (TimeTask<?> task : scheduledTasks) { 071 if (task.isDone()) 072 progress += 1d; 073 else if (task.getProgress() > 0) 074 progress += Math.max(0, Math.min(1, task.getProgress())); 075 } 076 return progress / (double) scheduledTasks.size(); 077 } 078 } , scheduledTasks, currentTaskProperty); 079 isIdleBinding = Bindings.createBooleanBinding(() -> overallProgressBinding.get() == 1d, overallProgressBinding); 080 currentTaskProperty.addListener((__, ___, task) -> { 081 synchronized (scheduledTasks) { 082 if (task.isDone()) 083 next(); 084 else { 085 task.exceptionProperty().addListener( 086 (____, _____, exception) -> new ErrorDialog(ConExpFX.instance.primaryStage, exception).showAndWait()); 087 final EventHandler<WorkerStateEvent> x = ____ -> { 088// System.out.println("finished task " + task.n + " " + task.getTitle()); 089// System.out.println(""); 090 next(); 091 }; 092 task.setOnCancelled(x); 093 task.setOnFailed(x); 094 task.setOnSucceeded(x); 095// System.out.println("starting task " + task.n + " " + task.getTitle()); 096 if (task.onFXThread()) 097 tpe.submit(TimeTask.encapsulateTaskOnFXThread(task)); 098 else 099 tpe.submit(task); 100 } 101 } 102 }); 103 104 } 105 106 public final void execute(final TimeTask<?> task) { 107 Platform2.runOnFXThread(() -> { 108 synchronized (scheduledTasks) { 109 final boolean wasIdle = isIdleBinding.get(); 110 scheduledTasks.add(task); 111 Arrays.asList(task.progressProperty(), task.stateProperty()).forEach(p -> p.addListener((__, ___, ____) -> { 112 overallProgressBinding.invalidate(); 113 datasetProgressBindings.values().forEach(DoubleBinding::invalidate); 114 })); 115 if (wasIdle) 116 currentTaskProperty.setValue(task); 117 } 118 }); 119 } 120 121 private final void next() { 122 synchronized (scheduledTasks) { 123 if (!isIdleBinding.get()) { 124 try { 125 final TimeTask<?> nextTask = scheduledTasks.get(scheduledTasks.indexOf(currentTaskProperty.getValue()) + 1); 126 if (nextTask != null) 127 currentTaskProperty.setValue(nextTask); 128 } catch (IndexOutOfBoundsException e) { 129 System.err.println("index out of bounds, task not found."); 130 System.err.println(scheduledTasks.size() + " tasks"); 131 System.err.println(scheduledTasks.indexOf(currentTaskProperty.getValue())); 132 } 133 } 134 } 135 } 136 137 public final DoubleBinding datasetProgressBinding(final Dataset dataset) { 138 if (datasetProgressBindings.containsKey(dataset)) 139 return datasetProgressBindings.get(dataset); 140 final DoubleBinding datasetProgressBinding = Bindings.createDoubleBinding(() -> { 141 synchronized (scheduledTasks) { 142 if (scheduledTasks.isEmpty()) 143 return 1d; 144 double progress = 0d; 145 double tasks = 0d; 146 for (TimeTask<?> task : scheduledTasks) 147 if (task.getDataset() != null && task.getDataset().equals(dataset)) { 148 if (task.isDone()) 149 progress += 1d; 150 else if (task.getProgress() > 0) 151 progress += task.getProgress(); 152 tasks++; 153 } 154 if (tasks > 0) 155 return progress / tasks; 156 return 1d; 157 } 158 } , scheduledTasks, currentTaskProperty); 159 datasetProgressBindings.put(dataset, datasetProgressBinding); 160 return datasetProgressBinding; 161 } 162 163 public final void cancel(final Dataset dataset) { 164 synchronized (scheduledTasks) { 165 scheduledTasks.filtered(task -> task.getDataset() != null && task.getDataset().equals(dataset)).forEach( 166 task -> task.cancel()); 167 scheduledTasks.removeIf(task -> task.getDataset() != null && task.getDataset().equals(dataset)); 168 datasetProgressBindings.remove(dataset); 169 } 170 } 171 172}