wake-up-neo.com

Warten auf eine Liste der Zukunft

Ich habe eine Methode, die eine List von Futures zurückgibt

List<Future<O>> futures = getFutures();

Jetzt möchte ich warten, bis entweder alle Futures erfolgreich verarbeitet wurden oder eine der Aufgaben, deren Ausgabe von einer Zukunft zurückgegeben wird, eine Ausnahme auslöst. Selbst wenn eine Aufgabe eine Ausnahme auslöst, hat es keinen Sinn, auf die anderen Futures zu warten.

Ein einfacher Ansatz wäre zu

wait() {

   For(Future f : futures) {
     try {
       f.get();
     } catch(Exception e) {
       //TODO catch specific exception
       // this future threw exception , means somone could not do its task
       return;
     }
   }
}

Aber das Problem hier ist, wenn zum Beispiel die 4. Zukunft eine Ausnahme auslöst, dann werde ich unnötig warten, bis die ersten 3 Futures verfügbar sind.

Wie löse ich das? Hilft der Countdown-Riegel in irgendeiner Weise? Ich kann Future isDone nicht verwenden, weil das Java-Dokument besagt

boolean isDone()
Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.
88
user93796

Sie können einen CompletionService verwenden, um die Futures zu erhalten, sobald sie bereit sind und wenn einer von ihnen eine Ausnahme auslöst, die Verarbeitung abbrechen. Etwas wie das:

Executor executor = Executors.newFixedThreadPool(4);
CompletionService<SomeResult> completionService = 
       new ExecutorCompletionService<SomeResult>(executor);

//4 tasks
for(int i = 0; i < 4; i++) {
   completionService.submit(new Callable<SomeResult>() {
       public SomeResult call() {
           ...
           return result;
       }
   });
}

int received = 0;
boolean erros = false;

while(received < 4 && !errors) {
      Future<SomeResult> resultFuture = completionService.take(); //blocks if none available
      try {
         SomeResult result = resultFuture.get();
         received ++;
         ... // do something with the result
      }
      catch(Exception e) {
             //log
         errors = true;
      }
}

Ich denke, Sie können noch mehr verbessern, um noch ausgeführte Aufgaben abzubrechen, wenn einer von ihnen einen Fehler auslöst. 

EDIT: Ich habe hier ein umfassenderes Beispiel gefunden: http://blog.teamlazerbeez.com/2009/04/29/Java-completionservice/

97
dcernahoschi

Wenn Sie Java 8 verwenden, können Sie dies mit CompletableFuture und CompletableFuture.allOf einfacher tun, da der Callback erst angewendet wird, nachdem alle mitgelieferten CompletableFutures ausgeführt wurden.

// Waits for *all* futures to complete and returns a list of results.
// If *any* future completes exceptionally then the resulting future will also complete exceptionally.

public static <T> CompletableFuture<List<T>> all(List<CompletableFuture<T>> futures) {
    CompletableFuture[] cfs = futures.toArray(new CompletableFuture[futures.size()]);

    return CompletableFuture.allOf(cfs)
            .thenApply(() -> futures.stream()
                                    .map(CompletableFuture::join)
                                    .collect(Collectors.toList())
            );
}
69
Andrejs

Sie können einen ExecutorCompletionService verwenden. Die Dokumentation enthält sogar ein Beispiel für Ihren genauen Anwendungsfall:

Angenommen, Sie möchten das erste Nicht-NULL-Ergebnis der Gruppe von Aufgaben verwenden, alle Ausnahmen ignorieren und alle anderen Aufgaben abbrechen, wenn die erste fertig ist:

void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException {
    CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
    int n = solvers.size();
    List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
    Result result = null;
    try {
        for (Callable<Result> s : solvers)
            futures.add(ecs.submit(s));
        for (int i = 0; i < n; ++i) {
            try {
                Result r = ecs.take().get();
                if (r != null) {
                    result = r;
                    break;
                }
            } catch (ExecutionException ignore) {
            }
        }
    } finally {
        for (Future<Result> f : futures)
            f.cancel(true);
    }

    if (result != null)
        use(result);
}

Hierbei ist zu beachten, dass ecs.take () die erste complete - Aufgabe erhält, nicht nur die zuerst übermittelte. Daher sollten Sie sie in der Reihenfolge bekommen, in der sie die Ausführung beenden (oder eine Ausnahme auslösen).

15
jmiserez

Verwenden Sie CompletableFuture in Java 8

    // Kick of multiple, asynchronous lookups
    CompletableFuture<User> page1 = gitHubLookupService.findUser("Test1");
    CompletableFuture<User> page2 = gitHubLookupService.findUser("Test2");
    CompletableFuture<User> page3 = gitHubLookupService.findUser("Test3");

    // Wait until they are all done
    CompletableFuture.allOf(page1,page2,page3).join();

    logger.info("--> " + page1.get());
4
sendon1982

Falls Sie eine Liste von CompletableFutures kombinieren möchten, können Sie dies tun:

List<CompletableFuture<Void>> futures = new ArrayList<>();
// ... Add futures to this ArrayList of CompletableFutures

// CompletableFuture.allOf() method demand a variadic arguments
// You can use this syntax to pass a List instead
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
            futures.toArray(new CompletableFuture[futures.size()]));

// Wait for all individual CompletableFuture to complete
// All individual CompletableFutures are executed in parallel
allFutures.get();

Weitere Informationen zu Future & CompletableFuture finden Sie unter den folgenden nützlichen Links:
1. Future: https://www.baeldung.com/Java-future
2. CompletableFuture: https://www.baeldung.com/Java-completablefuture
3. CompletableFuture: https://www.callicoder.com/Java-8-completablefuture-tutorial/

1
Bohao LI

vielleicht würde dies helfen (nichts würde durch einen rohen Thread ersetzt werden, yeah!) Ich empfehle, jeden Future-Typ mit einem getrennten Thread auszuführen (sie laufen parallel), dann signalisiert der Manager (Handler) Klasse).

class Handler{
//...
private Thread thisThread;
private boolean failed=false;
private Thread[] trds;
public void waitFor(){
  thisThread=Thread.currentThread();
  List<Future<Object>> futures = getFutures();
  trds=new Thread[futures.size()];
  for (int i = 0; i < trds.length; i++) {
    RunTask rt=new RunTask(futures.get(i), this);
    trds[i]=new Thread(rt);
  }
  synchronized (this) {
    for(Thread tx:trds){
      tx.start();
    }  
  }
  for(Thread tx:trds){
    try {tx.join();
    } catch (InterruptedException e) {
      System.out.println("Job failed!");break;
    }
  }if(!failed){System.out.println("Job Done");}
}

private List<Future<Object>> getFutures() {
  return null;
}

public synchronized void cancelOther(){if(failed){return;}
  failed=true;
  for(Thread tx:trds){
    tx.stop();//Deprecated but works here like a boss
  }thisThread.interrupt();
}
//...
}
class RunTask implements Runnable{
private Future f;private Handler h;
public RunTask(Future f,Handler h){this.f=f;this.h=h;}
public void run(){
try{
f.get();//beware about state of working, the stop() method throws ThreadDeath Error at any thread state (unless it blocked by some operation)
}catch(Exception e){System.out.println("Error, stopping other guys...");h.cancelOther();}
catch(Throwable t){System.out.println("Oops, some other guy has stopped working...");}
}
}

Ich muss sagen, der obige Code würde fehlerhaft sein (nicht geprüft), aber ich hoffe, ich könnte die Lösung erklären. Bitte probieren Sie es aus.

0
user2511414

Wenn Sie Java 8 verwenden und CompletableFutures nicht bearbeiten möchten, habe ich ein Tool zum Abrufen von Ergebnissen für einen List<Future<T>> mit Streaming geschrieben. Der Schlüssel ist, dass es Ihnen verboten ist, map(Future::get) zu werfen, während es wirft.

public final class Futures
{

    private Futures()
    {}

    public static <E> Collector<Future<E>, Collection<E>, List<E>> present()
    {
        return new FutureCollector<>();
    }

    private static class FutureCollector<T> implements Collector<Future<T>, Collection<T>, List<T>>
    {
        private final List<Throwable> exceptions = new LinkedList<>();

        @Override
        public Supplier<Collection<T>> supplier()
        {
            return LinkedList::new;
        }

        @Override
        public BiConsumer<Collection<T>, Future<T>> accumulator()
        {
            return (r, f) -> {
                try
                {
                    r.add(f.get());
                }
                catch (InterruptedException e)
                {}
                catch (ExecutionException e)
                {
                    exceptions.add(e.getCause());
                }
            };
        }

        @Override
        public BinaryOperator<Collection<T>> combiner()
        {
            return (l1, l2) -> {
                l1.addAll(l2);
                return l1;
            };
        }

        @Override
        public Function<Collection<T>, List<T>> finisher()
        {
            return l -> {

                List<T> ret = new ArrayList<>(l);
                if (!exceptions.isEmpty())
                    throw new AggregateException(exceptions, ret);

                return ret;
            };

        }

        @Override
        public Set<Java.util.stream.Collector.Characteristics> characteristics()
        {
            return Java.util.Collections.emptySet();
        }
    }

Dies erfordert eine AggregateException, die wie C # funktioniert

public class AggregateException extends RuntimeException
{
    /**
     *
     */
    private static final long serialVersionUID = -4477649337710077094L;

    private final List<Throwable> causes;
    private List<?> successfulElements;

    public AggregateException(List<Throwable> causes, List<?> l)
    {
        this.causes = causes;
        successfulElements = l;
    }

    public AggregateException(List<Throwable> causes)
    {
        this.causes = causes;
    }

    @Override
    public synchronized Throwable getCause()
    {
        return this;
    }

    public List<Throwable> getCauses()
    {
        return causes;
    }

    public List<?> getSuccessfulElements()
    {
        return successfulElements;
    }

    public void setSuccessfulElements(List<?> successfulElements)
    {
        this.successfulElements = successfulElements;
    }

}

Diese Komponente verhält sich genau wie C # Task.WaitAll . Ich arbeite an einer Variante, die dasselbe tut wie CompletableFuture.allOf (entspricht Task.WhenAll)

Der Grund, warum ich das gemacht habe, ist, dass ich Spring's ListenableFuture verwende und nicht auf CompletableFuture portieren möchte, obwohl es eine Standardmethode ist

CompletionService nimmt Ihre Callables mit der .submit () -Methode und Sie können die berechneten Futures mit der .take () -Methode abrufen.

Sie dürfen nicht vergessen, dass Sie den ExecutorService beenden, indem Sie die .shutdown () -Methode aufrufen. Sie können diese Methode nur aufrufen, wenn Sie einen Verweis auf den Executor-Service gespeichert haben. Bewahren Sie also einen auf.

Beispielcode - Für eine feste Anzahl von Workitems, die parallel bearbeitet werden sollen:

ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

CompletionService<YourCallableImplementor> completionService = 
new ExecutorCompletionService<YourCallableImplementor>(service);

ArrayList<Future<YourCallableImplementor>> futures = new ArrayList<Future<YourCallableImplementor>>();

for (String computeMe : elementsToCompute) {
    futures.add(completionService.submit(new YourCallableImplementor(computeMe)));
}
//now retrieve the futures after computation (auto wait for it)
int received = 0;

while(received < elementsToCompute.size()) {
 Future<YourCallableImplementor> resultFuture = completionService.take(); 
 YourCallableImplementor result = resultFuture.get();
 received ++;
}
//important: shutdown your ExecutorService
service.shutdown();

Beispielcode - Für eine dynamische Anzahl von Workitems, die parallel bearbeitet werden sollen:

public void runIt(){
    ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    CompletionService<CallableImplementor> completionService = new ExecutorCompletionService<CallableImplementor>(service);
    ArrayList<Future<CallableImplementor>> futures = new ArrayList<Future<CallableImplementor>>();

    //Initial workload is 8 threads
    for (int i = 0; i < 9; i++) {
        futures.add(completionService.submit(write.new CallableImplementor()));             
    }
    boolean finished = false;
    while (!finished) {
        try {
            Future<CallableImplementor> resultFuture;
            resultFuture = completionService.take();
            CallableImplementor result = resultFuture.get();
            finished = doSomethingWith(result.getResult());
            result.setResult(null);
            result = null;
            resultFuture = null;
            //After work package has been finished create new work package and add it to futures
            futures.add(completionService.submit(write.new CallableImplementor()));
        } catch (InterruptedException | ExecutionException e) {
            //handle interrupted and assert correct thread / work packet count              
        } 
    }

    //important: shutdown your ExecutorService
    service.shutdown();
}

public class CallableImplementor implements Callable{
    boolean result;

    @Override
    public CallableImplementor call() throws Exception {
        //business logic goes here
        return this;
    }

    public boolean getResult() {
        return result;
    }

    public void setResult(boolean result) {
        this.result = result;
    }
}
0
fl0w
 /**
     * execute suppliers as future tasks then wait / join for getting results
     * @param functors a supplier(s) to execute
     * @return a list of results
     */
    private List getResultsInFuture(Supplier<?>... functors) {
        CompletableFuture[] futures = stream(functors)
                .map(CompletableFuture::supplyAsync)
                .collect(Collectors.toList())
                .toArray(new CompletableFuture[functors.length]);
        CompletableFuture.allOf(futures).join();
        return stream(futures).map(a-> {
            try {
                return a.get();
            } catch (InterruptedException | ExecutionException e) {
                //logger.error("an error occurred during runtime execution a function",e);
                return null;
            }
        }).collect(Collectors.toList());
    };
0
Mohamed.Abdo

Ich habe eine Utility-Klasse, die diese enthält:

@FunctionalInterface
public interface CheckedSupplier<X> {
  X get() throws Throwable;
}

public static <X> Supplier<X> uncheckedSupplier(final CheckedSupplier<X> supplier) {
    return () -> {
        try {
            return supplier.get();
        } catch (final Throwable checkedException) {
            throw new IllegalStateException(checkedException);
        }
    };
}

Sobald Sie das haben, können Sie mithilfe eines statischen Imports einfach auf alle Futures wie folgt warten:

futures.stream().forEach(future -> uncheckedSupplier(future::get).get());

sie können auch alle Ergebnisse wie folgt sammeln:

List<MyResultType> results = futures.stream()
    .map(future -> uncheckedSupplier(future::get).get())
    .collect(Collectors.toList());
0
Brixomatic