wake-up-neo.com

Wie erhalte ich den aktuellen Wert von RxJS Subject oder Observable?

Ich habe einen Angular 2-Dienst:

import {Storage} from './storage';
import {Injectable} from 'angular2/core';
import {Subject}    from 'rxjs/Subject';

@Injectable()
export class SessionStorage extends Storage {
  private _isLoggedInSource = new Subject<boolean>();
  isLoggedIn = this._isLoggedInSource.asObservable();
  constructor() {
    super('session');
  }
  setIsLoggedIn(value: boolean) {
    this.setItem('_isLoggedIn', value, () => {
      this._isLoggedInSource.next(value);
    });
  }
}

Alles funktioniert super Ich habe jedoch eine andere Komponente, die nicht abonniert werden muss. Sie muss lediglich den aktuellen Wert von isLoggedIn zu einem bestimmten Zeitpunkt abrufen. Wie kann ich das machen?

120
Baconbeastnz

Eine Subject oder Observable hat keinen aktuellen Wert. Wenn ein Wert ausgegeben wird, wird er an die Abonnenten übergeben und die Variable Observable wird damit ausgeführt.

Wenn Sie einen aktuellen Wert haben möchten, verwenden Sie BehaviorSubject, das genau zu diesem Zweck entwickelt wurde. BehaviorSubject behält den zuletzt ausgegebenen Wert und gibt ihn sofort an neue Abonnenten weiter.

Es hat auch eine Methode getValue(), um den aktuellen Wert abzurufen.

219

Die einzige Möglichkeit, sollte Werte "aus einem Observable/Subject" erhalten, ist mit subscribe!

Wenn Sie getValue() verwenden, machen Sie im deklarativen Paradigma etwas Wichtiges. Es ist als Fluchtluke vorhanden, aber 99,9% der Zeit, in der Sie NICHT getValue() verwenden sollten. Es gibt einige interessante Dinge, die getValue() tun wird: Es wird ein Fehler ausgegeben, wenn der Betreff nicht abonniert wurde Verhindern Sie, dass Sie einen Wert erhalten, wenn das Subjekt tot ist, weil es fehlerhaft ist usw. Aber es ist wieder eine Fluchtluke für seltene Umstände.

Es gibt verschiedene Möglichkeiten, den neuesten Wert aus einem Betreff oder Beobachtbaren "Rx-y" zu erhalten:

  1. Verwendung von BehaviorSubject: Aber abonniert es tatsächlich. Wenn Sie BehaviorSubject zum ersten Mal abonnieren, wird der vorherige Wert synchron gesendet, mit dem er empfangen wurde oder mit dem er initialisiert wurde.
  2. Verwenden einer ReplaySubject(N): Dadurch werden N-Werte zwischengespeichert und für neue Abonnenten wiedergegeben.
  3. A.withLatestFrom(B): Verwenden Sie diesen Operator, um den aktuellsten Wert aus beobachtbarer B zu erhalten, wenn beobachtbare A ausgegeben wird. Gibt Ihnen beide Werte in einem Array [a, b].
  4. A.combineLatest(B): Verwenden Sie diesen Operator, um bei jeder Ausgabe von A oder B die neuesten Werte von A und B abzurufen. Gibt Ihnen beide Werte in einem Array.
  5. shareReplay(): Erzeugt einen beobachtbaren Multicast über eine ReplaySubject, ermöglicht jedoch das Wiederholen des beobachtbaren Fehlers bei einem Fehler. (Im Grunde gibt es Ihnen dieses versprechende Cacheverhalten).
  6. publishReplay(), publishBehavior(initialValue), multicast(subject: BehaviorSubject | ReplaySubject) usw.: Andere Operatoren, die BehaviorSubject und ReplaySubject nutzen. Unterschiedliche Geschmacksrichtungen desselben; sie multicastieren im Wesentlichen die beobachtbare Quelle, indem alle Benachrichtigungen durch ein Subjekt geleitet werden. Sie müssen connect() aufrufen, um die Quelle mit dem Betreff zu abonnieren.
98
Ben Lesh

Ich hatte eine ähnliche Situation, in der verspätete Abonnenten das Subjekt abonnieren, nachdem ihr Wert eingetroffen ist. 

Ich fand ReplaySubject das ähnlich wie BehaviorSubject ist, funktioniert in diesem Fall wie ein Zauber ... Und hier ist ein Link zur besseren Erklärung: http://reactivex.io/rxjs/manual/overview.html# Wiederholungssubjekt

4
Kfir Erez

Ich habe das gleiche Problem in untergeordneten Komponenten festgestellt, bei denen es anfangs den aktuellen Wert des Betreffs hätte und dann den Betreff abonnieren musste, um die Änderungen zu überwachen. Ich pflege nur den aktuellen Wert im Dienst, so dass Komponenten für den Zugriff verfügbar sind, z. :

import {Storage} from './storage';
import {Injectable} from 'angular2/core';
import {Subject}    from 'rxjs/Subject';

@Injectable()
export class SessionStorage extends Storage {

  isLoggedIn: boolean;

  private _isLoggedInSource = new Subject<boolean>();
  isLoggedIn = this._isLoggedInSource.asObservable();
  constructor() {
    super('session');
    this.currIsLoggedIn = false;
  }
  setIsLoggedIn(value: boolean) {
    this.setItem('_isLoggedIn', value, () => {
      this._isLoggedInSource.next(value);
    });
    this.isLoggedIn = value;
  }
}

Eine Komponente, die den aktuellen Wert benötigt, könnte daraufhin vom Dienst aus zugreifen, d. H.

sessionStorage.isLoggedIn

Nicht sicher, ob dies die richtige Übung ist :)

3
Molp Burnbright
const observable = of('response')

function hasValue(value: any) {
  return value !== null && value !== undefined;
}

function getValue<T>(observable: Observable<T>): Promise<T> {
  return observable
    .pipe(
      filter(hasValue),
      first()
    )
    .toPromise();
}

const result = await getValue(observable)
// Do the logic with the result
// .................
// .................
// .................

Den vollständigen Artikel zur Implementierung finden Sie hier. https://www.imkrish.com/how-to-get-current-value-of-observable-in-a-clean-way/

2

Eine ähnliche schauende Antwort wurde abgelehnt. Aber ich denke, ich kann es rechtfertigen, was ich hier für begrenzte Fälle vorschlage.


Es ist zwar wahr, dass ein Observable keinen aktuellen - Wert hat, aber häufig hat es einen sofort verfügbaren - Wert. Mit den Speichern von redux/flux/akita können Sie beispielsweise Daten von einem zentralen Speicher abrufen, basierend auf einer Anzahl von Observablen, und dieser Wert ist im Allgemeinen sofort verfügbar. 

Wenn dies der Fall ist, wird der Wert bei subscribe sofort wieder angezeigt.

Nehmen wir an, Sie haben einen Dienst angerufen und möchten nach Abschluss den neuesten Wert von etwas aus Ihrem Geschäft abrufen: das möglicherweise nicht ausgegeben wird:

Sie könnten versuchen, dies zu tun (und Sie sollten so viel wie möglich Dinge in Rohren halten): 

 serviceCallResponse$.pipe(withLatestFrom(store$.select(x => x.customer)))
                     .subscribe(([ serviceCallResponse, customer] => {

                        // we have serviceCallResponse and customer 
                     });

Das Problem dabei ist, dass es blockiert, bis das sekundäre Observable einen Wert abgibt, der möglicherweise niemals sein könnte.

Ich musste vor kurzem ein beobachtbares auswerten, nur wenn ein Wert sofort verfügbar war, und was noch wichtiger war, ich musste feststellen können, ob dies nicht der Fall war. Ich endete damit:

 serviceCallResponse$.pipe()
                     .subscribe(serviceCallResponse => {

                        // immediately try to subscribe to get the 'available' value
                        // note: immediately unsubscribe afterward to 'cancel' if needed
                        let customer = undefined;

                        // whatever the secondary observable is
                        const secondary$ = store$.select(x => x.customer);

                        // subscribe to it, and assign to closure scope
                        sub = secondary$.pipe(take(1)).subscribe(_customer => customer = _customer);
                        sub.unsubscribe();

                        // if there's a delay or customer isn't available the value won't have been set before we get here
                        if (customer === undefined) 
                        {
                           // handle, or ignore as needed
                           return throwError('Customer was not immediately available');
                        }
                     });

Beachten Sie, dass ich für alle oben genannten Dinge subscribe verwende, um den Wert zu erhalten (wie @Ben erläutert). Keine .value-Eigenschaft verwenden, auch wenn ich eine BehaviorSubject hatte.

1
Simon_Weaver

Obwohl es übertrieben klingen mag, ist dies nur eine weitere "mögliche" Lösung, um Observable type zu halten und Boilerplate zu reduzieren ...

Sie können immer einen extension getter erstellen, um den aktuellen Wert eines Observable zu erhalten.

Dazu müssen Sie die Observable<T>-Schnittstelle in einer global.d.ts-Typisierungsdeklarationsdatei erweitern. Implementieren Sie dann den extension getter in einer observable.extension.ts-Datei und fügen Sie Ihrer Anwendung schließlich Typisierungen und Erweiterungsdateien hinzu.

Auf diese StackOverflow-Antwort können Sie sich beziehen, um die Erweiterungen in Ihre Angular-Anwendung aufzunehmen.

// global.d.ts
declare module 'rxjs' {
  interface Observable<T> {
    /**
     * _Extension Method_ - Returns current value of an Observable.
     * Value is retrieved using _first()_ operator to avoid the need to unsubscribe.
     */
    value: Observable<T>;
  }
}

// observable.extension.ts
Object.defineProperty(Observable.prototype, 'value', {
  get <T>(this: Observable<T>): Observable<T> {
    return this.pipe(
      filter(value => value !== null && value !== undefined),
      first());
  },
});

// using the extension getter example
this.myObservable$.value
  .subscribe(value => {
    // whatever code you need...
  });
0
j3ff