wake-up-neo.com

So senden Sie mehrere asynchrone Anforderungen an verschiedene Webdienste

Ich muss mehrere Anfragen an viele verschiedene Webdienste senden und die Ergebnisse erhalten. Das Problem ist, dass es so lange dauert, bis alle Anforderungen einzeln gesendet und verarbeitet werden.

Ich frage mich, wie ich alle Anfragen auf einmal senden und die Ergebnisse erhalten kann.

Wie der folgende Code zeigt, habe ich drei Hauptmethoden und jede hat ihre eigenen Untermethoden. Jede Untermethode sendet eine Anfrage an den zugehörigen Webdienst und empfängt die Ergebnisse. Um beispielsweise die Ergebnisse des Webdienstes 9 zu empfangen, muss ich warten, bis alle Webdienste von 1 bis 8 abgeschlossen sind. Das Senden dauert sehr lange alle Anfragen nacheinander und erhalten ihre Ergebnisse.

Wie unten gezeigt, sind keine der Methoden oder Untermethoden miteinander verwandt, so dass ich sie alle aufrufen und ihre Ergebnisse in beliebiger Reihenfolge erhalten kann. Wichtig ist nur, dass die Ergebnisse der einzelnen Untermethoden abgerufen und ihre Daten ausgefüllt werden zugehörige Listen.

private List<StudentsResults> studentsResults = new ArrayList();
private List<DoctorsResults> doctorsResults = new ArrayList();
private List<PatientsResults> patientsResults = new ArrayList();

main (){
    retrieveAllLists();
}

retrieveAllLists(){

     retrieveStudents();
     retrieveDoctors();
     retrievePatients();
}

retrieveStudents(){

    this.studentsResults = retrieveStdWS1();   //send request to Web Service 1 to receive its  list of students
    this.studentsResults = retrieveStdWS2();  //send request to Web Service 2 to receive its  list of students
    this.studentsResults = retrieveStdWS3(); //send request to Web Service 3 to receive its  list of students

}

retrieveDoctors(){

   this.doctorsResults = retrieveDocWS4();   //send request to Web Service 4 to receive its list of doctors
   this.doctorsResults = retrieveDocWS5();  //send request to Web Service 5 to receive its  list of doctors
   this.doctorsResults = retrieveDocWS6(); //send request to Web Service 6 to receive its  list of doctors

}

retrievePatients(){

   this.patientsResults = retrievePtWS7();   //send request to Web Service 7 to receive its list of patients
   this.patientsResults = retrievePtWS8();  //send request to Web Service 8 to receive its list of patients
   this.patientsResults = retrievePtWS9(); //send request to Web Service 9 to receive its list of patients

}
14
J888

Dies ist ein einfacher Ansatz der Fork-Join-Methode. Der Übersichtlichkeit halber können Sie jedoch eine beliebige Anzahl von Threads starten und die Ergebnisse später abrufen, sobald sie verfügbar sind, z. B. dieser Ansatz.

    ExecutorService pool = Executors.newFixedThreadPool(10);
    List<Callable<String>> tasks = new ArrayList<>();
    tasks.add(new Callable<String>() {
        public String call() throws Exception {
            Thread.sleep((new Random().nextInt(5000)) + 500);
            return "Hello world";
        }

    });
    List<Future<String>> results = pool.invokeAll(tasks);

    for (Future<String> future : results) {
        System.out.println(future.get());
    }
    pool.shutdown();

AKTUALISIERUNG ABGESCHLOSSEN:

Hier ist eine ausführliche, aber praktikable Lösung. Ich habe es ad hoc geschrieben und nicht kompiliert. Da die drei Listen unterschiedliche Typen haben und die WS-Methoden individuell sind, ist Nicht wirklich modular. Versuchen Sie jedoch, Ihre besten Programmierfähigkeiten einzusetzen und zu prüfen, ob Sie es etwas besser modularisieren können.

    ExecutorService pool = Executors.newFixedThreadPool(10);

    List<Callable<List<StudentsResults>>> stasks = new ArrayList<>();
    List<Callable<List<DoctorsResults>>> dtasks = new ArrayList<>();
    List<Callable<List<PatientsResults>>> ptasks = new ArrayList<>();

    stasks.add(new Callable<List<StudentsResults>>() {
        public List<StudentsResults> call() throws Exception {
            return retrieveStdWS1();
        }

    });
    stasks.add(new Callable<List<StudentsResults>>() {
        public List<StudentsResults> call() throws Exception {
            return retrieveStdWS2();
        }

    });
    stasks.add(new Callable<List<StudentsResults>>() {
        public List<StudentsResults> call() throws Exception {
            return retrieveStdWS3();
        }

    });

    dtasks.add(new Callable<List<DoctorsResults>>() {
        public List<DoctorsResults> call() throws Exception {
            return retrieveDocWS4();
        }

    });
    dtasks.add(new Callable<List<DoctorsResults>>() {
        public List<DoctorsResults> call() throws Exception {
            return retrieveDocWS5();
        }

    });
    dtasks.add(new Callable<List<DoctorsResults>>() {
        public List<DoctorsResults> call() throws Exception {
            return retrieveDocWS6();
        }

    });

    ptasks.add(new Callable<List<PatientsResults>>() {
        public List<PatientsResults> call() throws Exception {
            return retrievePtWS7();
        }

    });
    ptasks.add(new Callable<List<PatientsResults>>() {
        public List<PatientsResults> call() throws Exception {
            return retrievePtWS8();
        }

    });
    ptasks.add(new Callable<List<PatientsResults>>() {
        public List<PatientsResults> call() throws Exception {
            return retrievePtWS9();
        }

    });

    List<Future<List<StudentsResults>>> sresults = pool.invokeAll(stasks);
    List<Future<List<DoctorsResults>>> dresults = pool.invokeAll(dtasks);
    List<Future<List<PatientsResults>>> presults = pool.invokeAll(ptasks);

    for (Future<List<StudentsResults>> future : sresults) {
       this.studentsResults.addAll(future.get());
    }
    for (Future<List<DoctorsResults>> future : dresults) {
       this.doctorsResults.addAll(future.get());
    }
    for (Future<List<PatientsResults>> future : presults) {
       this.patientsResults.addAll(future.get());
    }
    pool.shutdown();

Jede Callable gibt eine Ergebnisliste zurück und wird in einem eigenen Thread aufgerufen.
Wenn Sie die Future.get()-Methode aufrufen, erhalten Sie das Ergebnis wieder auf den Haupt-Thread.
Das Ergebnis istNICHTverfügbar, bis die Callable abgeschlossen ist. Daher gibt es keine Parallelitätsprobleme.

24

Ich führe nur zum Spaß zwei Arbeitsbeispiele an. Die erste zeigt die alte Schulmethode vor Java 1.5. Der zweite Abschnitt zeigt eine viel sauberere Art und Weise, mit den in Java 1.5 verfügbaren Tools:

import Java.util.ArrayList;

public class ThreadingExample
{
    private ArrayList <MyThread> myThreads;

    public static class MyRunnable implements Runnable
    {
        private String data;

        public String getData()
        {
            return data;
        }

        public void setData(String data)
        {
            this.data = data;
        }

        @Override
        public void run()
        {
        }
    }

    public static class MyThread extends Thread
    {
        private MyRunnable myRunnable;

        MyThread(MyRunnable runnable)
        {
            super(runnable);
            setMyRunnable(runnable);
        }

        /**
         * @return the myRunnable
         */
        public MyRunnable getMyRunnable()
        {
            return myRunnable;
        }

        /**
         * @param myRunnable the myRunnable to set
         */
        public void setMyRunnable(MyRunnable myRunnable)
        {
            this.myRunnable = myRunnable;
        }
    }

    public ThreadingExample()
    {
        myThreads = new ArrayList <MyThread> ();
    }

    public ArrayList <String> retrieveMyData ()
    {
        ArrayList <String> allmyData = new ArrayList <String> ();

        if (isComplete() == false)
        {
            // Sadly we aren't done
            return (null);
        }

        for (MyThread myThread : myThreads)
        {
            allmyData.add(myThread.getMyRunnable().getData());
        }

        return (allmyData);
    }

    private boolean isComplete()
    {
        boolean complete = true;

        // wait for all of them to finish
        for (MyThread x : myThreads)
        {
            if (x.isAlive())
            {
                complete = false;
                break;
            }
        }
        return (complete);
    }

    public void kickOffQueries()
    {
        myThreads.clear();

        MyThread a = new MyThread(new MyRunnable()
        {
            @Override
            public void run()
            {
                // This is where you make the call to external services
                // giving the results to setData("");
                setData("Data from list A");
            }
        });
        myThreads.add(a);

        MyThread b = new MyThread (new MyRunnable()
        {
            @Override
            public void run()
            {
                // This is where you make the call to external services
                // giving the results to setData("");
                setData("Data from list B");
            }
        });
        myThreads.add(b);

        for (MyThread x : myThreads)
        {
            x.start();
        }

        boolean done = false;

        while (done == false)
        {
            if (isComplete())
            {
                done = true;
            }
            else
            {
                // Sleep for 10 milliseconds
                try
                {
                    Thread.sleep(10);
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
            }
        }
    }


    public static void main(String [] args)
    {
        ThreadingExample example = new ThreadingExample();
        example.kickOffQueries();

        ArrayList <String> data = example.retrieveMyData();
        if (data != null)
        {
            for (String s : data)
            {
                System.out.println (s);
            }
        }
    }
}

Dies ist die wesentlich einfachere Arbeitsversion:

import Java.util.HashSet;
import Java.util.List;
import Java.util.Set;
import Java.util.concurrent.Callable;
import Java.util.concurrent.ExecutionException;
import Java.util.concurrent.ExecutorService;
import Java.util.concurrent.Executors;
import Java.util.concurrent.Future;

public class ThreadingExample
{

    public static void main(String [] args)
    {
        ExecutorService service = Executors.newCachedThreadPool();
        Set <Callable<String>> callables = new HashSet <Callable<String>> ();

        callables.add(new Callable<String>()
        {
            @Override
            public String call() throws Exception
            {
                return "This is where I make the call to web service A, and put its results here";
            }
        });

        callables.add(new Callable<String>()
        {
            @Override
            public String call() throws Exception
            {
                return "This is where I make the call to web service B, and put its results here";
            }
        });

        callables.add(new Callable<String>()
        {
            @Override
            public String call() throws Exception
            {
                return "This is where I make the call to web service C, and put its results here";
            }
        });

        try
        {
            List<Future<String>> futures = service.invokeAll(callables);
            for (Future<String> future : futures)
            {
                System.out.println (future.get());
            }
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        catch (ExecutionException e)
        {
            e.printStackTrace();
        }
    }
}
3
D-Klotz

Sie können Ihre jax-ws-Implementierung bitten, asynchrone Bindungen für den Webdienst zu generieren.

Das hat zwei Vorteile, die ich sehen kann:

  1. Wie in Asynchrone Webservice-Aufrufe mit JAX-WS besprochen: Wsimport-Unterstützung für Asynchronität verwenden oder eigene Rollen? , jax-ws generiert für Sie einen getesteten (und möglicherweise schickeren) Code. Sie müssen den ExecutorService nicht selbst instanziieren. Also weniger Arbeit für Sie! (aber auch weniger Kontrolle über die Threading-Implementierungsdetails)
  2. Die generierten Bindungen enthalten eine Methode, bei der Sie einen Callback-Handler angeben, der möglicherweise besser zu Ihren Anforderungen passt als get(), indem Sie alle Antwortlisten im Thread retrieveAllLists() aufrufen. Es ermöglicht die Fehlerbehandlung pro Dienstanruf und verarbeitet die Ergebnisse parallel. Dies ist Nizza, wenn die Verarbeitung nicht trivial ist.

Ein Beispiel für die Metro finden Sie auf der Metro-Site . Beachten Sie den Inhalt der benutzerdefinierten Bindungsdatei custom-client.xml :

<bindings ...>    
    <bindings node="wsdl:definitions">
        <enableAsyncMapping>true</enableAsyncMapping>
    </bindings>    
</bindings>

Wenn Sie diese Bindungsdatei für wsimport angeben, wird ein Client generiert, der ein Objekt zurückgibt, das javax.xml.ws.Response<T> implementiert. Response erweitert die Future-Schnittstelle, die andere Benutzer auch beim Rolling Ihrer eigenen Implementierung vorschlagen.

Wenn Sie also auf die Rückrufe verzichten, wird der Code ähnlich aussehen wie die anderen Antworten:

public void retrieveAllLists() throws ExecutionException{
    // first fire all requests
    Response<List<StudentsResults>> students1 = ws1.getStudents();
    Response<List<StudentsResults>> students2 = ws2.getStudents();
    Response<List<StudentsResults>> students3 = ws3.getStudents();

    Response<List<DoctorsResults>> doctors1 = ws4.getDoctors();
    Response<List<DoctorsResults>> doctors2 = ws5.getDoctors();
    Response<List<DoctorsResults>> doctors3 = ws6.getDoctors();

    Response<List<PatientsResults>> patients1 = ws7.getPatients();
    Response<List<PatientsResults>> patients2 = ws8.getPatients();
    Response<List<PatientsResults>> patients3 = ws9.getPatients();

    // then await and collect all the responses
    studentsResults.addAll(students1.get());
    studentsResults.addAll(students2.get());
    studentsResults.addAll(students3.get());

    doctorsResults.addAll(doctors1.get());
    doctorsResults.addAll(doctors2.get());
    doctorsResults.addAll(doctors3.get());

    patientsResults.addAll(patients1.get());
    patientsResults.addAll(patients2.get());
    patientsResults.addAll(patients3.get());
}

Wenn Sie Callback-Handler erstellen, z

private class StudentsCallbackHandler 
            implements AsyncHandler<Response<List<StudentsResults>>> {
    public void handleResponse(List<StudentsResults> response) {
        try {
            studentsResults.addAll(response.get());
        } catch (ExecutionException e) {
            errors.add(new CustomError("Failed to retrieve Students.", e.getCause()));
        } catch (InterruptedException e) {
            log.error("Interrupted", e);
        }
    }
}

sie können sie wie folgt verwenden:

public void retrieveAllLists() {
    List<Future<?>> responses = new ArrayList<Future<?>>();
    // fire all requests, specifying callback handlers
    responses.add(ws1.getStudents(new StudentsCallbackHandler()));
    responses.add(ws2.getStudents(new StudentsCallbackHandler()));
    responses.add(ws3.getStudents(new StudentsCallbackHandler()));

    ...

    // await completion 
    for( Future<?> response: responses ) {
        response.get();
    }

    // or do some other work, and poll response.isDone()
}

Beachten Sie, dass die studentResults-Sammlung jetzt threadsicher sein muss, da die Ergebnisse gleichzeitig hinzugefügt werden!

1
flup

Wenn Sie das Problem betrachten, müssen Sie Ihre Anwendung in über 10 verschiedene Webservices integrieren. Machen Sie alle Aufrufe asynchron. Dies ist mit Apache Camel problemlos möglich. Es ist ein herausragendes Framework für die Unternehmensintegration und unterstützt auch die asynchrone Verarbeitung. Sie können die CXF-Komponente zum Aufrufen von Webservices und die Routing-Engine zum Aufruf und zur Verarbeitung von Ergebnissen verwenden. Beachten Sie die folgende page hinsichtlich der asynchronen Routing-Fähigkeit von Camel. Sie haben auch ein komplettes Beispiel bereitgestellt, das Webservices async mit CXF aufruft. Es ist unter maven repo verfügbar. Siehe auch die folgende page für weitere Details.

1
Hussain Pirosha

Sie können das folgende Paradigma in Betracht ziehen, in dem Sie Arbeit (seriell) erstellen, die eigentliche Arbeit wird jedoch parallel ausgeführt. Eine Möglichkeit, dies zu tun, besteht darin, 1) eine Warteschlange von Arbeitselementen erstellen zu lassen; 2) Erstellen Sie ein "doWork" -Objekt, das die Warteschlange für die Arbeit abfragt. 3) "main" einige Anzahl von "DoWork" -Threads starten (kann dieselbe Anzahl wie die Anzahl verschiedener Dienste oder eine kleinere Anzahl sein); lassen Sie die "doWork" -Objekte ihre Ergebnisse zu einer Objektliste hinzufügen (was auch immer das Konstrukt Vector, list ...).

Jedes "DoWork" -Objekt markiert das Warteschlangenelement als abgeschlossen, fügt alle Ergebnisse in den übergebenen Container ein und prüft, ob neue Arbeit vorhanden ist. 

Natürlich möchten Sie sehen, wie gut Sie Ihr Klassenmodell erstellen können. Wenn sich die Webservices für die Analyse sehr unterscheiden, können Sie ein Interface erstellen, das jede Ihrer "retrieveinfo" -Klassen zu implementieren verspricht.

0
ErstwhileIII