wake-up-neo.com

Kann ich einen Stream in Java 8 duplizieren?

Manchmal möchte ich eine Reihe von Operationen für einen Stream ausführen und dann den resultierenden Stream auf zwei verschiedene Arten mit anderen Operationen verarbeiten.

Kann ich dies tun, ohne die gemeinsamen Anfangsoperationen zweimal angeben zu müssen?

Ich hoffe zum Beispiel, dass eine dup()-Methode wie die folgende existiert:

Stream [] desired_streams = IntStream.range(1, 100).filter(n -> n % 2 == 0).dup();
Stream stream14 = desired_streams[0].filter(n -> n % 7 == 0); // multiples of 14
Stream stream10 = desired_streams[1].filter(n -> n % 5 == 0); // multiples of 10
46
necromancer

Es ist generell nicht möglich.

Wenn Sie einen Eingabestrom oder einen Eingabe-Iterator duplizieren möchten, haben Sie zwei Möglichkeiten:

A. Bewahren Sie alles in einer Sammlung auf, sagen Sie einen List<>

Angenommen, Sie duplizieren einen Stream in zwei Streams s1 und s2. Wenn Sie n1-Elemente in s1- und n2-Elementen mit s2 erweitert haben, müssen Sie |n2 - n1|-Elemente im Speicher behalten, um Schritt zu halten. Wenn Ihr Stream unendlich ist, gibt es möglicherweise keine obere Grenze für den erforderlichen Speicher.

Schauen Sie sich Pythons tee() an, um zu sehen, was es braucht:

Dieses Werkzeug benötigt möglicherweise erheblichen zusätzlichen Speicher (abhängig davon, wie viel temporäre Daten gespeichert werden müssen). Wenn ein Iterator die meisten oder alle Daten vor dem Start eines anderen Iterators verwendet, ist es im Allgemeinen schneller, list() anstelle von tee() zu verwenden.

B. Wenn möglich: Kopieren Sie den Status des Generators, der die Elemente erstellt

Damit diese Option funktioniert, benötigen Sie wahrscheinlich Zugriff auf die inneren Abläufe des Streams. Mit anderen Worten, der Generator - der Teil, der die Elemente erzeugt - sollte das Kopieren überhaupt unterstützen. [OP: Siehe diese großartige Antwort , als Beispiel, wie dies für das Beispiel in der Frage gemacht werden kann]

Es funktioniert nicht bei Eingaben des Benutzers, da Sie den Zustand der gesamten "Außenwelt" kopieren müssen. Javas Stream unterstützt das Kopieren nicht, da es so allgemein wie möglich sein soll, insbesondere für die Arbeit mit Dateien, Netzwerk, Tastatur, Sensoren, Zufälligkeit usw. [OP: Ein anderes Beispiel ist ein Stream, der einen Temperatursensor bei Bedarf liest. Es kann nicht kopiert werden, ohne eine Kopie der Messwerte zu speichern.]

Dies ist nicht nur in Java der Fall. Dies ist eine allgemeine Regel. Sie sehen, dass std::istream in C++ aus diesem Grund (und anderen) nur Verschiebesemantik unterstützt, nicht Kopiersemantik ("Kopierkonstruktor (gelöscht)"). 

26
Elazar

Es ist nicht möglich, einen Stream auf diese Weise zu duplizieren. Sie können die Code-Duplizierung jedoch vermeiden, indem Sie den allgemeinen Teil in eine Methode oder einen Lambda-Ausdruck verschieben.

Supplier<IntStream> supplier = () ->
    IntStream.range(1, 100).filter(n -> n % 2 == 0);
supplier.get().filter(...);
supplier.get().filter(...);
39
nosid

Es ist möglich, wenn Sie Elemente puffern, die Sie in einem Duplikat verwendet haben, aber noch nicht in dem anderen.

Wir haben eine duplicate()-Methode für Streams in jOOλ implementiert, einer Open Source-Bibliothek, die wir zur Verbesserung der Integrationstests für jOOQ erstellt haben. Im Wesentlichen können Sie einfach schreiben:

Tuple2<Seq<Integer>, Seq<Integer>> desired_streams = Seq.seq(
    IntStream.range(1, 100).filter(n -> n % 2 == 0).boxed()
).duplicate();

(Hinweis: Derzeit müssen wir den Stream einrahmen, da noch keine IntSeq implementiert wurde.)

Intern gibt es einen LinkedList-Puffer, der alle Werte speichert, die von einem Stream, aber nicht vom anderen Stream verbraucht wurden. Das ist wahrscheinlich so effizient wie es nur geht, wenn Ihre zwei Streams etwa gleich viel verbraucht werden.

So funktioniert der Algorithmus:

static <T> Tuple2<Seq<T>, Seq<T>> duplicate(Stream<T> stream) {
    final LinkedList<T> gap = new LinkedList<>();
    final Iterator<T> it = stream.iterator();

    @SuppressWarnings("unchecked")
    final Iterator<T>[] ahead = new Iterator[] { null };

    class Duplicate implements Iterator<T> {
        @Override
        public boolean hasNext() {
            if (ahead[0] == null || ahead[0] == this)
                return it.hasNext();

            return !gap.isEmpty();
        }

        @Override
        public T next() {
            if (ahead[0] == null)
                ahead[0] = this;

            if (ahead[0] == this) {
                T value = it.next();
                gap.offer(value);
                return value;
            }

            return gap.poll();
        }
    }

    return Tuple(seq(new Duplicate()), seq(new Duplicate()));
}

Mehr Quellcode hier

Mit jOOλ können Sie einen kompletten Einzeiler wie folgt schreiben:

Tuple2<Seq<Integer>, Seq<Integer>> desired_streams = Seq.seq(
    IntStream.range(1, 100).filter(n -> n % 2 == 0).boxed()
).duplicate()
 .map1(s -> s.filter(n -> n % 7 == 0))
 .map2(s -> s.filter(n -> n % 5 == 0));

// This will yield 14, 28, 42, 56...
desired_streams.v1.forEach(System.out::println)

// This will yield 10, 20, 30, 40...
desired_streams.v2.forEach(System.out::println);
6
Lukas Eder

Sie können die Streamgenerierung auch in separate Methoden/Funktionen verschieben, die diesen Stream zurückgeben, und ihn zweimal aufrufen.

4
Tomasz Górka

Entweder,

  • Verschieben Sie die Initialisierung in eine Methode und rufen Sie die Methode einfach erneut auf

Dies hat den Vorteil, dass Sie das, was Sie tun, explizit beschreiben kann, und auch für unendliche Streams.

  • Sammeln Sie den Stream und streamen Sie ihn erneut

In deinem Beispiel:

final int[] arr = IntStream.range(1, 100).filter(n -> n % 2 == 0).toArray();

Dann

final IntStream s = IntStream.of(arr);
3

Update: Dieses nicht funktioniert. Siehe nachstehende Erklärung nach dem Text der ursprünglichen Antwort.

Wie dumm von mir. Alles was ich tun muss ist:

Stream desired_stream = IntStream.range(1, 100).filter(n -> n % 2 == 0);
Stream stream14 = desired_stream.filter(n -> n % 7 == 0); // multiples of 14
Stream stream10 = desired_stream.filter(n -> n % 5 == 0); // multiples of 10

Erklärung, warum das nicht funktioniert:

Wenn Sie es verschlüsseln und versuchen, beide Streams zu sammeln, wird der erste Stream gut gesammelt, der zweite Stream wird jedoch die Ausnahme auslösen: Java.lang.IllegalStateException: stream has already been operated upon or closed.

Ströme sind zustandsbehaftete Objekte (die übrigens nicht zurückgesetzt oder zurückgespult werden können). Man kann sich sie als Iteratoren vorstellen, die wiederum wie Zeiger sind. stream14 und stream10 können also als Verweise auf denselben Zeiger betrachtet werden. Wenn Sie den ersten Stream vollständig verbrauchen, wird der Zeiger "über das Ende hinaus" geführt. Der Versuch, den zweiten Stream zu verbrauchen, ist wie der Versuch, auf einen Zeiger zuzugreifen, der bereits "am Ende vorbei ist", was natürlich eine illegale Operation ist.

Wie die akzeptierte Antwort zeigt, muss der Code zum Erstellen des Streams zweimal ausgeführt werden, er kann jedoch in ein Supplier-Lambda oder ein ähnliches Konstrukt unterteilt werden.

Vollständiger Testcode: Speichern in Foo.Java, dann javac Foo.Java, dann Java Foo

import Java.util.stream.IntStream;

public class Foo {
  public static void main (String [] args) {
    IntStream s = IntStream.range(0, 100).filter(n -> n % 2 == 0);
    IntStream s1 = s.filter(n -> n % 5 == 0);
    s1.forEach(n -> System.out.println(n));
    IntStream s2 = s.filter(n -> n % 7 == 0);
    s2.forEach(n -> System.out.println(n));
  }
}

Ausgabe:

$ javac Foo.Java
$ Java Foo
0
10
20
30
40
50
60
70
80
90
Exception in thread "main" Java.lang.IllegalStateException: stream has already been operated upon or closed
    at Java.util.stream.AbstractPipeline.<init>(AbstractPipeline.Java:203)
    at Java.util.stream.IntPipeline.<init>(IntPipeline.Java:91)
    at Java.util.stream.IntPipeline$StatelessOp.<init>(IntPipeline.Java:592)
    at Java.util.stream.IntPipeline$9.<init>(IntPipeline.Java:332)
    at Java.util.stream.IntPipeline.filter(IntPipeline.Java:331)
    at Foo.main(Foo.Java:8)
2
necromancer

Wenn Sie nicht unbegrenzte Streams verwenden, haben Sie direkten Zugriff auf die Quelle:

@Test
public void testName() throws Exception {
    List<Integer> integers = Arrays.asList(1, 2, 4, 5, 6, 7, 8, 9, 10);
    Stream<Integer> stream1 = integers.stream();
    Stream<Integer> stream2 = integers.stream();

    stream1.forEach(System.out::println);
    stream2.forEach(System.out::println);
}

druckt

1 2 4 5 6 7 8 9 10

1 2 4 5 6 7 8 9 10

Für Ihren Fall:

Stream originalStream = IntStream.range(1, 100).filter(n -> n % 2 == 0)

List<Integer> listOf = originalStream.collect(Collectors.toList())

Stream stream14 = listOf.stream().filter(n -> n % 7 == 0);
Stream stream10 = listOf.stream().filter(n -> n % 5 == 0);

Lies die Antwort einer anderen Person für die Leistung usw.;)

0
Blundell