Ich muss eine Anzahl von Aufgaben 4 gleichzeitig ausführen, etwa so:
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}
//...wait for completion somehow
Wie kann ich benachrichtigt werden, wenn alle abgeschlossen sind? Im Moment kann ich mir nichts Besseres vorstellen, als einen globalen Taskzähler zu setzen und ihn am Ende jeder Task zu verringern. Dann überwach diesen Zähler in einer Endlosschleife, um 0 zu werden. oder eine Liste von Futures abrufen und in Endlosschleifen für alle isDone überwachen. Was sind bessere Lösungen ohne Endlosschleifen?
Vielen Dank.
Grundsätzlich rufen Sie auf einem ExecutorService
shutdown()
und dann awaitTermination()
auf:
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}
taskExecutor.shutdown();
try {
taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
...
}
Verwenden Sie ein CountDownLatch :
CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}
try {
latch.await();
} catch (InterruptedException E) {
// handle
}
und innerhalb Ihrer Aufgabe (in try/finally einschließen)
latch.countDown();
ExecutorService.invokeAll()
erledigt das für Sie.
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
List<Callable<?>> tasks; // your tasks
// invokeAll() returns when all tasks are complete
List<Future<?>> futures = taskExecutor.invokeAll(tasks);
Sie können auch Listen von Futures verwenden:
List<Future> futures = new ArrayList<Future>();
// now add to it:
futures.add(executorInstance.submit(new Callable<Void>() {
public Void call() throws IOException {
// do something
return null;
}
}));
wenn Sie dann an allen beitreten möchten, entspricht dies im Wesentlichen dem Beitreten an jedem (mit dem zusätzlichen Vorteil, dass Ausnahmen von untergeordneten Threads erneut an den Haupt-Thread ausgelöst werden):
for(Future f: this.futures) { f.get(); }
Grundsätzlich besteht der Trick darin, .get () für jeden Future einzeln aufzurufen, anstatt in einer Endlosschleife isDone () on (all oder each) aufzurufen. Sie werden also garantiert durch diesen Block "weitergehen", sobald der letzte Thread beendet ist. Die Einschränkung besteht darin, dass, da der Aufruf von .get () erneut Ausnahmen auslöst, wenn einer der Threads abstirbt, Sie dies möglicherweise vor Abschluss der anderen Threads auslösen würden [um dies zu vermeiden, könnten Sie einen catch ExecutionException
um den get call]. Die andere Einschränkung ist, dass sie einen Verweis auf alle Threads enthält. Wenn sie also Thread-lokale Variablen haben, werden sie erst gesammelt, nachdem Sie diesen Block passiert haben (obwohl Sie dies möglicherweise umgehen können, wenn es zu einem Problem wird, indem Sie es entfernen Zukunft ist aus der ArrayList). Wenn Sie wissen möchten, welche Zukunft "zuerst beendet" wird, können Sie etwas wie https://stackoverflow.com/a/31885029/3245 verwenden
In Java8 können Sie dies mit CompletableFuture tun:
ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
.map(task -> CompletableFuture.runAsync(task, es))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
es.shutdown();
Nur meine zwei Cent. Um das Erfordernis von CountDownLatch
zu überwinden, die Anzahl der Aufgaben im Voraus zu kennen, können Sie es auf die alte Art und Weise tun, indem Sie ein einfaches Semaphore
verwenden.
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
int numberOfTasks=0;
Semaphore s=new Semaphore(0);
while(...) {
taskExecutor.execute(new MyTask());
numberOfTasks++;
}
try {
s.aquire(numberOfTasks);
...
Rufe in deiner Aufgabe einfach s.release()
auf, als würdest du latch.countDown();
Die CyclicBarrier -Klasse in Java 5 und höher ist für diese Art von Dingen ausgelegt.
Ein bisschen zu spät zum Spiel, aber der Vollständigkeit halber ...
Anstatt auf das Ende aller Aufgaben zu warten, können Sie nach dem Hollywood-Prinzip denken: "Ruf mich nicht an, ich rufe dich an" - wenn ich fertig bin. Ich denke, der resultierende Code ist eleganter ...
Guave bietet einige interessante Werkzeuge, um dies zu erreichen.
Ein Beispiel ::
Wrappen Sie einen ExecutorService in einen ListeningExecutorService ::
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
Senden Sie eine Sammlung von Callables zur Ausführung:
for (Callable<Integer> callable : callables) {
ListenableFuture<Integer> lf = service.submit(callable);
// listenableFutures is a collection
listenableFutures.add(lf)
});
Nun das Wesentliche:
ListenableFuture<List<Integer>> lf = Futures.successfulAsList(listenableFutures);
Fügen Sie der ListenableFuture einen Rückruf hinzu, mit dem Sie benachrichtigt werden können, wenn alle Zukünfte abgeschlossen sind:
Futures.addCallback(lf, new FutureCallback<List<Integer>>() {
@Override
public void onSuccess(List<Integer> result) {
log.info("@@ finished processing {} elements", Iterables.size(result));
// do something with all the results
}
@Override
public void onFailure(Throwable t) {
log.info("@@ failed because of :: {}", t);
}
});
Dies bietet auch den Vorteil, dass Sie nach Abschluss der Verarbeitung alle Ergebnisse an einem Ort sammeln können ...
Weitere Informationen hier
Folgen Sie einem der folgenden Ansätze.
submit
am ExecutorService
zurückgegeben wurden, und überprüfen Sie den Status mit dem blockierenden Aufruf get()
am Future
Objekt wie von Kiran
vorgeschlageninvokeAll()
für ExecutorServiceshutdown, awaitTermination, shutdownNow
APIs von ThreadPoolExecutor in der richtigen ReihenfolgeVerwandte SE-Fragen:
hier sind zwei Optionen, nur ein bisschen verwirren, welche am besten zu gehen ist.
Option 1:
ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
.map(task -> CompletableFuture.runAsync(task, es))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
es.shutdown();
Option 2:
ExecutorService es = Executors.newFixedThreadPool(4);
List< Future<?>> futures = new ArrayList<>();
for(Runnable task : taskList) {
futures.add(es.submit(task));
}
for(Future<?> future : futures) {
try {
future.get();
}catch(Exception e){
// do logging and nothing else
}
}
es.shutdown();
Hier setzen future.get (); Beim Versuch zu fangen ist eine gute Idee, oder?
Sie können Ihre Aufgaben in ein anderes ausführbares Programm einbinden, das Benachrichtigungen sendet:
taskExecutor.execute(new Runnable() {
public void run() {
taskStartedNotification();
new MyTask().run();
taskFinishedNotification();
}
});
Nur um hier andere Alternativen vorzusehen, verwenden Sie Riegel/Barrieren. Sie können die Teilergebnisse auch mit CompletionService abrufen, bis alle beendet sind.
Von Java Nebenläufigkeit in der Praxis: "Wenn Sie eine Reihe von Berechnungen an einen Executor senden müssen und deren Ergebnisse abrufen möchten, sobald sie verfügbar sind, können Sie die mit jeder Aufgabe und verknüpfte Zukunft beibehalten Abruf zum Abschluss durch wiederholtes Aufrufen von get mit einer Zeitüberschreitung von 0. Dies ist möglich, aber langwierig . Zum Glück gibt es eine Besserer Weg : ein Abschlussdienst. "
Hier die Umsetzung
public class TaskSubmiter {
private final ExecutorService executor;
TaskSubmiter(ExecutorService executor) { this.executor = executor; }
void doSomethingLarge(AnySourceClass source) {
final List<InterestedResult> info = doPartialAsyncProcess(source);
CompletionService<PartialResult> completionService = new ExecutorCompletionService<PartialResult>(executor);
for (final InterestedResult interestedResultItem : info)
completionService.submit(new Callable<PartialResult>() {
public PartialResult call() {
return InterestedResult.doAnOperationToGetPartialResult();
}
});
try {
for (int t = 0, n = info.size(); t < n; t++) {
Future<PartialResult> f = completionService.take();
PartialResult PartialResult = f.get();
processThisSegment(PartialResult);
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
catch (ExecutionException e) {
throw somethinghrowable(e.getCause());
}
}
}
Ich habe gerade ein Beispielprogramm geschrieben, das Ihr Problem löst. Es wurde keine präzise Implementierung angegeben, daher werde ich eine hinzufügen. Sie können zwar executor.shutdown()
und executor.awaitTermination()
verwenden, dies ist jedoch keine bewährte Methode, da die von verschiedenen Threads benötigte Zeit nicht vorhersehbar wäre.
ExecutorService es = Executors.newCachedThreadPool();
List<Callable<Integer>> tasks = new ArrayList<>();
for (int j = 1; j <= 10; j++) {
tasks.add(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int sum = 0;
System.out.println("Starting Thread "
+ Thread.currentThread().getId());
for (int i = 0; i < 1000000; i++) {
sum += i;
}
System.out.println("Stopping Thread "
+ Thread.currentThread().getId());
return sum;
}
});
}
try {
List<Future<Integer>> futures = es.invokeAll(tasks);
int flag = 0;
for (Future<Integer> f : futures) {
Integer res = f.get();
System.out.println("Sum: " + res);
if (!f.isDone())
flag = 1;
}
if (flag == 0)
System.out.println("SUCCESS");
else
System.out.println("FAILED");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
Dies ist meine Lösung, die auf dem Tipp "AdamSkywalker" basiert und funktioniert
package frss.main;
import Java.util.ArrayList;
import Java.util.List;
import Java.util.concurrent.CompletableFuture;
import Java.util.concurrent.ExecutorService;
import Java.util.concurrent.Executors;
public class TestHilos {
void procesar() {
ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream().map(task -> CompletableFuture.runAsync(task, es)).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
es.shutdown();
System.out.println("FIN DEL PROCESO DE HILOS");
}
private List<Runnable> getTasks() {
List<Runnable> tasks = new ArrayList<Runnable>();
Hilo01 task1 = new Hilo01();
tasks.add(task1);
Hilo02 task2 = new Hilo02();
tasks.add(task2);
return tasks;
}
private class Hilo01 extends Thread {
@Override
public void run() {
System.out.println("HILO 1");
}
}
private class Hilo02 extends Thread {
@Override
public void run() {
try {
sleep(2000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("HILO 2");
}
}
public static void main(String[] args) {
TestHilos test = new TestHilos();
test.procesar();
}
}
Sie könnten diesen Code verwenden:
public class MyTask implements Runnable {
private CountDownLatch countDownLatch;
public MyTask(CountDownLatch countDownLatch {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
//Do somethings
//
this.countDownLatch.countDown();//important
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_TASKS);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
for (int i = 0; i < NUMBER_OF_TASKS; i++){
taskExecutor.execute(new MyTask(countDownLatch));
}
countDownLatch.await();
System.out.println("Finish tasks");
Ich habe das folgende Arbeitsbeispiel erstellt. Die Idee ist, eine Möglichkeit zu haben, einen Pool von Aufgaben (ich verwende als Beispiel eine Warteschlange) mit vielen Threads zu verarbeiten (programmatisch bestimmt durch die Anzahl der Aufgaben/den Schwellenwert) und zu warten, bis alle Threads abgeschlossen sind, um mit einer anderen Verarbeitung fortzufahren.
import Java.util.PriorityQueue;
import Java.util.Queue;
import Java.util.concurrent.CountDownLatch;
import Java.util.concurrent.ExecutorService;
import Java.util.concurrent.Executors;
/** Testing CountDownLatch and ExecutorService to manage scenario where
* multiple Threads work together to complete tasks from a single
* resource provider, so the processing can be faster. */
public class ThreadCountDown {
private CountDownLatch threadsCountdown = null;
private static Queue<Integer> tasks = new PriorityQueue<>();
public static void main(String[] args) {
// Create a queue with "Tasks"
int numberOfTasks = 2000;
while(numberOfTasks-- > 0) {
tasks.add(numberOfTasks);
}
// Initiate Processing of Tasks
ThreadCountDown main = new ThreadCountDown();
main.process(tasks);
}
/* Receiving the Tasks to process, and creating multiple Threads
* to process in parallel. */
private void process(Queue<Integer> tasks) {
int numberOfThreads = getNumberOfThreadsRequired(tasks.size());
threadsCountdown = new CountDownLatch(numberOfThreads);
ExecutorService threadExecutor = Executors.newFixedThreadPool(numberOfThreads);
//Initialize each Thread
while(numberOfThreads-- > 0) {
System.out.println("Initializing Thread: "+numberOfThreads);
threadExecutor.execute(new MyThread("Thread "+numberOfThreads));
}
try {
//Shutdown the Executor, so it cannot receive more Threads.
threadExecutor.shutdown();
threadsCountdown.await();
System.out.println("ALL THREADS COMPLETED!");
//continue With Some Other Process Here
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
/* Determine the number of Threads to create */
private int getNumberOfThreadsRequired(int size) {
int threshold = 100;
int threads = size / threshold;
if( size > (threads*threshold) ){
threads++;
}
return threads;
}
/* Task Provider. All Threads will get their task from here */
private synchronized static Integer getTask(){
return tasks.poll();
}
/* The Threads will get Tasks and process them, while still available.
* When no more tasks available, the thread will complete and reduce the threadsCountdown */
private class MyThread implements Runnable {
private String threadName;
protected MyThread(String threadName) {
super();
this.threadName = threadName;
}
@Override
public void run() {
Integer task;
try{
//Check in the Task pool if anything pending to process
while( (task = getTask()) != null ){
processTask(task);
}
}catch (Exception ex){
ex.printStackTrace();
}finally {
/*Reduce count when no more tasks to process. Eventually all
Threads will end-up here, reducing the count to 0, allowing
the flow to continue after threadsCountdown.await(); */
threadsCountdown.countDown();
}
}
private void processTask(Integer task){
try{
System.out.println(this.threadName+" is Working on Task: "+ task);
}catch (Exception ex){
ex.printStackTrace();
}
}
}
}
Ich hoffe es hilft!
Java 8 - Wir können Stream-API verwenden, um Streams zu verarbeiten. Bitte beachten Sie das folgende Snippet
final List<Runnable> tasks = ...; //or any other functional interface
tasks.stream().parallel().forEach(Runnable::run) // Uses default pool
//alternatively to specify parallelism
new ForkJoinPool(15).submit(
() -> tasks.stream().parallel().forEach(Runnable::run)
).get();
sie sollten die Methode executorService.shutdown()
und executorService.awaitTermination
verwenden.
Ein Beispiel wie folgt:
public class ScheduledThreadPoolExample {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
executorService.scheduleAtFixedRate(() -> System.out.println("process task."),
0, 1, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(10);
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.DAYS);
}
}
Also poste ich meine Antwort von der verknüpften Frage hier, falls jemand einen einfacheren Weg dies zu tun wünscht
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture[] futures = new CompletableFuture[10];
int i = 0;
while (...) {
futures[i++] = CompletableFuture.runAsync(runner, executor);
}
CompletableFuture.allOf(futures).join(); // THis will wait until all future ready.
Sie können Ihre eigene Unterklasse von ExecutorCompletionService verwenden, um taskExecutor
umzubrechen, und Ihre eigene Implementierung von BlockingQueue , um benachrichtigt zu werden, wenn jede Aufgabe abgeschlossen ist, und um einen beliebigen Rückruf oder eine andere durchzuführen Aktion, die Sie wünschen, wenn die Anzahl der erledigten Aufgaben Ihr gewünschtes Ziel erreicht.
ExecutorService WORKER_THREAD_POOL
= Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
WORKER_THREAD_POOL.submit(() -> {
try {
// doSomething();
latch.countDown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// wait for the latch to be decremented by the two remaining threads
latch.await();
Wenn doSomething()
andere Ausnahmen auslöst, wird latch.countDown()
anscheinend nicht ausgeführt. Was soll ich also tun?