wake-up-neo.com

Erstellen eines Flusses von einem Schauspieler in Akka Streams

Es ist möglich, Quellen und Senken von Schauspielern mithilfe der Methoden Source.actorPublisher() und Sink.actorSubscriber() zu erstellen. Aber ist es möglich, aus dem Schauspieler eine Flow zu erstellen?

Konzeptionell scheint es keinen triftigen Grund zu geben, da es ActorPublisher und ActorSubscriber implementiert, aber das Flow-Objekt hat leider keine Methode, um dies zu tun. In this excellent blog post wurde dies in einer früheren Version von Akka Streams gemacht. Die Frage ist, ob dies auch in der neuesten (2.4.9) Version möglich ist.

21
Ori Popowski

Ich bin Teil des Akka-Teams und möchte diese Frage nutzen, um ein paar Dinge über die unformatierten Reactive Streams-Schnittstellen zu klären. Ich hoffe, Sie finden das nützlich.

Vor allem werden wir bald mehrere Posts im Akka-Team-Blog veröffentlichen, in denen es darum geht, benutzerdefinierte Phasen, einschließlich Flows, zu erstellen. Behalten Sie diese also im Auge.

ActorPublisher/ActorSubscriber nicht verwenden

Bitte benutzen Sie nicht ActorPublisher und ActorSubscriber. Sie sind zu niedrig und werden möglicherweise so implementiert, dass sie gegen die Reactive Streams-Spezifikation verstoßen. Sie sind ein Relikt der Vergangenheit und waren selbst dann nur "Power-User-Modus". Es gibt heutzutage wirklich keinen Grund, diese Klassen zu verwenden. Wir haben nie eine Möglichkeit bereitgestellt, einen Flow zu erstellen, da die Komplexität einfach explosiv ist, wenn sie als "unformatierte" Actor-API verfügbar gemacht wurde, die Sie implementieren und abrufen können alle Regeln korrekt implementiert .

Wenn Sie echte ReactiveStreams-Schnittstellen implementieren möchten, verwenden Sie bitte das TCK der Spezifikation , um sicherzustellen, dass Ihre Implementierung korrekt ist. Sie werden wahrscheinlich von einigen der komplexeren Eckfälle überrascht sein, die ein Flow (oder in der RS-Terminologie ein Processor behandeln muss).

Die meisten Operationen können erstellt werden, ohne dass ein niedriger Level erreicht wird

Viele Flows sollten Sie einfach erstellen können, indem Sie aus einem Flow[T] Erstellen und die erforderlichen Operationen hinzufügen, um nur ein Beispiel zu nennen:

val newFlow: Flow[String, Int, NotUsed] = Flow[String].map(_.toInt)

Welches ist eine wiederverwendbare Beschreibung des Flusses.

Da Sie nach dem Power-User-Modus fragen, ist dies der leistungsstärkste Operator für das DSL selbst: statefulFlatMapConcat. Die überwiegende Mehrheit der Operationen, die mit einfachen Stream-Elementen ausgeführt werden, kann mit diesem Befehl ausgedrückt werden: Flow.statefulMapConcat[T](f: () ⇒ (Out) ⇒ Iterable[T]): Repr[T].

Wenn Sie Timer benötigen, können Sie Zip mit einem Source.timer Usw.

GraphStage ist the am einfachsten und am sichersten API zum Erstellen benutzerdefinierter Stufen

Stattdessen hat das Erstellen von Sources/Flows/Sinks eine eigene leistungsfähige und sichere API: die GraphStage. Bitte lesen Sie die Dokumentation zum Erstellen von benutzerdefinierten GraphStages (diese können Sink/Source/Flow oder sogar any beliebige Form sein). Es behandelt alle komplexen Reactive Streams-Regeln für Sie und gibt Ihnen gleichzeitig die volle Freiheit und Typensicherheit bei der Implementierung Ihrer Phasen (die ein Flow sein können).

Aus den Dokumenten stammt beispielsweise eine GraphStage-Implementierung des Operators filter(T => Boolean):

class Filter[A](p: A => Boolean) extends GraphStage[FlowShape[A, A]] {

  val in = Inlet[A]("Filter.in")
  val out = Outlet[A]("Filter.out")

  val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          val elem = grab(in)
          if (p(elem)) Push(out, elem)
          else pull(in)
        }
      })
      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          pull(in)
        }
      })
    }
}

Es verarbeitet auch asynchrone Kanäle und ist standardmäßig schmelzbar.

Zusätzlich zu den Dokumenten erklären diese Blog-Beiträge ausführlich, warum diese API der heilige Gral ist, um benutzerdefinierte Phasen jeder Form zu erstellen:

Konrads Lösung demonstriert, wie eine benutzerdefinierte Bühne mit Actors erstellt wird, aber in den meisten Fällen halte ich das für etwas übertrieben. 

Normalerweise haben Sie einen Schauspieler, der auf Fragen antworten kann:

val actorRef : ActorRef = ???

type Input = ???
type Output = ???

val queryActor : Input => Future[Output] = 
  (actorRef ? _) andThen (_.mapTo[Output])

Dies kann leicht mit der grundlegenden Flow - Funktionalität verwendet werden, die die maximale Anzahl gleichzeitiger Anforderungen berücksichtigt:

val actorQueryFlow : Int => Flow[Input, Output, _] =
  (parallelism) => Flow[Input].mapAsync[Output](parallelism)(queryActor)

Nun kann actorQueryFlow in jeden Stream integriert werden ...

Hier ist eine Lösung, die mithilfe einer Grafikstufe erstellt wird. Der Schauspieler muss alle Nachrichten bestätigen, um einen Gegendruck zu erzeugen. Der Akteur wird benachrichtigt, wenn der Stream fehlschlägt/abgeschlossen wird und der Stream fehlschlägt, wenn der Akteur beendet wird .. Dies kann nützlich sein, wenn Sie ask nicht verwenden möchten, z. wenn nicht jede Eingangsnachricht eine entsprechende Ausgangsnachricht hat. 

import akka.actor.{ActorRef, Status, Terminated}
import akka.stream._
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}

object ActorRefBackpressureFlowStage {
  case object StreamInit
  case object StreamAck
  case object StreamCompleted
  case class StreamFailed(ex: Throwable)
  case class StreamElementIn[A](element: A)
  case class StreamElementOut[A](element: A)
}

/**
  * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
  * First element is always `StreamInit`, then stream is waiting for acknowledgement message
  * `ackMessage` from the given actor which means that it is ready to process
  * elements. It also requires `ackMessage` message after each stream element
  * to make backpressure work. Stream elements are wrapped inside `StreamElementIn(elem)` messages.
  *
  * The target actor can emit elements at any time by sending a `StreamElementOut(elem)` message, which will
  * be emitted downstream when there is demand.
  *
  * If the target actor terminates the stage will fail with a WatchedActorTerminatedException.
  * When the stream is completed successfully a `StreamCompleted` message
  * will be sent to the destination actor.
  * When the stream is completed with failure a `StreamFailed(ex)` message will be send to the destination actor.
  */
class ActorRefBackpressureFlowStage[In, Out](private val flowActor: ActorRef) extends GraphStage[FlowShape[In, Out]] {

  import ActorRefBackpressureFlowStage._

  val in: Inlet[In] = Inlet("ActorFlowIn")
  val out: Outlet[Out] = Outlet("ActorFlowOut")

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {

    private lazy val self = getStageActor {
      case (_, StreamAck) =>
        if(firstPullReceived) {
          if (!isClosed(in) && !hasBeenPulled(in)) {
            pull(in)
          }
        } else {
          pullOnFirstPullReceived = true
        }

      case (_, StreamElementOut(elemOut)) =>
        val elem = elemOut.asInstanceOf[Out]
        emit(out, elem)

      case (_, Terminated(targetRef)) =>
        failStage(new WatchedActorTerminatedException("ActorRefBackpressureFlowStage", targetRef))

      case (actorRef, unexpected) =>
        failStage(new IllegalStateException(s"Unexpected message: `$unexpected` received from actor `$actorRef`."))
    }
    var firstPullReceived: Boolean = false
    var pullOnFirstPullReceived: Boolean = false

    override def preStart(): Unit = {
      //initialize stage actor and watch flow actor.
      self.watch(flowActor)
      tellFlowActor(StreamInit)
    }

    setHandler(in, new InHandler {

      override def onPush(): Unit = {
        val elementIn = grab(in)
        tellFlowActor(StreamElementIn(elementIn))
      }

      override def onUpstreamFailure(ex: Throwable): Unit = {
        tellFlowActor(StreamFailed(ex))
        super.onUpstreamFailure(ex)
      }

      override def onUpstreamFinish(): Unit = {
        tellFlowActor(StreamCompleted)
        super.onUpstreamFinish()
      }
    })

    setHandler(out, new OutHandler {
      override def onPull(): Unit = {
        if(!firstPullReceived) {
          firstPullReceived = true
          if(pullOnFirstPullReceived) {
            if (!isClosed(in) && !hasBeenPulled(in)) {
              pull(in)
            }
          }
        }

      }

      override def onDownstreamFinish(): Unit = {
        tellFlowActor(StreamCompleted)
        super.onDownstreamFinish()
      }
    })

    private def tellFlowActor(message: Any): Unit = {
      flowActor.tell(message, self.ref)
    }

  }

  override def shape: FlowShape[In, Out] = FlowShape(in, out)

}
1
Meeuw