wake-up-neo.com

erwartbare Task-basierte Warteschlange

Ich frage mich, ob es eine Implementierung/einen Wrapper für ConcurrentQueue gibt, ähnlich wie BlockingCollection , wobei das Entnehmen aus der Auflistung nicht blockiert, sondern asynchron ist und ein asynchrones Warten verursacht, bis ein Element eingefügt wird die Schlange.

Ich habe mir meine eigene Implementierung ausgedacht, aber es scheint nicht wie erwartet zu funktionieren. Ich frage mich, ob ich etwas neu erfinde, das bereits existiert.

Hier ist meine Implementierung:

public class MessageQueue<T>
{
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
        new ConcurrentQueue<TaskCompletionSource<T>>();

    object queueSyncLock = new object();

    public void Enqueue(T item)
    {
        queue.Enqueue(item);
        ProcessQueues();
    }

    public async Task<T> Dequeue()
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        waitingQueue.Enqueue(tcs);
        ProcessQueues();
        return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
    }

    private void ProcessQueues()
    {
        TaskCompletionSource<T> tcs=null;
        T firstItem=default(T);
        while (true)
        {
            bool ok;
            lock (queueSyncLock)
            {
                ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem);
                if (ok)
                {
                    waitingQueue.TryDequeue(out tcs);
                    queue.TryDequeue(out firstItem);
                }
            }
            if (!ok) break;
            tcs.SetResult(firstItem);
        }
    }
}
28
spender

Ich kenne keine schlüssellose Lösung, aber Sie können einen Blick auf die neue Dataflow-Bibliothek werfen, die Teil des Async-CTP ist. Ein einfacher BufferBlock<T> sollte ausreichen, z.

BufferBlock<int> buffer = new BufferBlock<int>();

Produktion und Verbrauch lassen sich am einfachsten über Erweiterungsmethoden für die Datenflussblocktypen durchführen.

Die Produktion ist so einfach wie:

buffer.Post(13);

und Verbrauch ist async-ready:

int item = await buffer.ReceiveAsync();

Ich empfehle Ihnen, wenn möglich Dataflow zu verwenden. einen solchen Puffer sowohl effizient als auch korrekt zu machen, ist schwieriger als er zunächst erscheint.

44
Stephen Cleary

Einfacher Ansatz mit C # 8.0 IAsyncEnumerable und Datenflussbibliothek

// Instatiate an async queue
var queue = new AsyncQueue<int>();

// Then, loop through the elements of queue.
// This loop won't stop until it is canceled or broken out of
// (for that, use queue.WithCancellation(..) or break;)
await foreach(int i in queue) {
    // Writes a line as soon as some other Task calls queue.Enqueue(..)
    Console.WriteLine(i);
}

Mit einer Implementierung von AsyncQueue wie folgt:

public class AsyncQueue<T> : IAsyncEnumerable<T>
{
    private readonly SemaphoreSlim _enumerationSemaphore = new SemaphoreSlim(1);
    private readonly BufferBlock<T> _bufferBlock = new BufferBlock<T>();

    public void Enqueue(T item) =>
        _bufferBlock.Post(item);

    public async IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken token = default)
    {
        // We lock this so we only ever enumerate once at a time.
        // That way we ensure all items are returned in a continuous
        // fashion with no 'holes' in the data when two foreach compete.
        await _enumerationSemaphore.WaitAsync();
        try {
            // Return new elements until cancellationToken is triggered.
            while (true) {
                // Make sure to throw on cancellation so the Task will transfer into a canceled state
                token.ThrowIfCancellationRequested();
                yield return await _bufferBlock.ReceiveAsync(token);
            }
        } finally {
            _enumerationSemaphore.Release();
        }

    }
}
8
Bruno Zell

Meine Versuchung (es wird ein Ereignis ausgelöst, wenn ein "Versprechen" erstellt wird, und es kann von einem externen Hersteller verwendet werden, um zu wissen, wann mehr Artikel hergestellt werden sollen):

public class AsyncQueue<T>
{
    private ConcurrentQueue<T> _bufferQueue;
    private ConcurrentQueue<TaskCompletionSource<T>> _promisesQueue;
    private object _syncRoot = new object();

    public AsyncQueue()
    {
        _bufferQueue = new ConcurrentQueue<T>();
        _promisesQueue = new ConcurrentQueue<TaskCompletionSource<T>>();
    }

    /// <summary>
    /// Enqueues the specified item.
    /// </summary>
    /// <param name="item">The item.</param>
    public void Enqueue(T item)
    {
        TaskCompletionSource<T> promise;
        do
        {
            if (_promisesQueue.TryDequeue(out promise) &&
                !promise.Task.IsCanceled &&
                promise.TrySetResult(item))
            {
                return;                                       
            }
        }
        while (promise != null);

        lock (_syncRoot)
        {
            if (_promisesQueue.TryDequeue(out promise) &&
                !promise.Task.IsCanceled &&
                promise.TrySetResult(item))
            {
                return;
            }

            _bufferQueue.Enqueue(item);
        }            
    }

    /// <summary>
    /// Dequeues the asynchronous.
    /// </summary>
    /// <param name="cancellationToken">The cancellation token.</param>
    /// <returns></returns>
    public Task<T> DequeueAsync(CancellationToken cancellationToken)
    {
        T item;

        if (!_bufferQueue.TryDequeue(out item))
        {
            lock (_syncRoot)
            {
                if (!_bufferQueue.TryDequeue(out item))
                {
                    var promise = new TaskCompletionSource<T>();
                    cancellationToken.Register(() => promise.TrySetCanceled());

                    _promisesQueue.Enqueue(promise);
                    this.PromiseAdded.RaiseEvent(this, EventArgs.Empty);

                    return promise.Task;
                }
            }
        }

        return Task.FromResult(item);
    }

    /// <summary>
    /// Gets a value indicating whether this instance has promises.
    /// </summary>
    /// <value>
    /// <c>true</c> if this instance has promises; otherwise, <c>false</c>.
    /// </value>
    public bool HasPromises
    {
        get { return _promisesQueue.Where(p => !p.Task.IsCanceled).Count() > 0; }
    }

    /// <summary>
    /// Occurs when a new promise
    /// is generated by the queue
    /// </summary>
    public event EventHandler PromiseAdded;
}
2
André Bires

Es kann für Ihren Anwendungsfall ein Overkill sein (angesichts der Lernkurve), aber Reactive Extentions bietet Ihnen all den Klebstoff, den Sie für eine asynchrone Komposition benötigen könnten.

Sie abonnieren im Wesentlichen Änderungen, und diese werden Ihnen zur Verfügung gestellt, sobald sie verfügbar sind, und Sie können das System die Änderungen in einem separaten Thread pushen lassen.

1
Morten Mertner

Hier ist die Implementierung, die ich gerade verwende.

public class MessageQueue<T>
{
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
        new ConcurrentQueue<TaskCompletionSource<T>>();
    object queueSyncLock = new object();
    public void Enqueue(T item)
    {
        queue.Enqueue(item);
        ProcessQueues();
    }

    public async Task<T> DequeueAsync(CancellationToken ct)
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        ct.Register(() =>
        {
            lock (queueSyncLock)
            {
                tcs.TrySetCanceled();
            }
        });
        waitingQueue.Enqueue(tcs);
        ProcessQueues();
        return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
    }

    private void ProcessQueues()
    {
        TaskCompletionSource<T> tcs = null;
        T firstItem = default(T);
        lock (queueSyncLock)
        {
            while (true)
            {
                if (waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem))
                {
                    waitingQueue.TryDequeue(out tcs);
                    if (tcs.Task.IsCanceled)
                    {
                        continue;
                    }
                    queue.TryDequeue(out firstItem);
                }
                else
                {
                    break;
                }
                tcs.SetResult(firstItem);
            }
        }
    }
}

Es funktioniert gut genug, aber queueSyncLock ist ziemlich umstritten, da ich die CancellationToken sehr oft benutze, um einige der wartenden Aufgaben abzubrechen. Dies führt natürlich zu einer wesentlich geringeren Blockierung, die ich mit einer BlockingCollection sehen würde, aber ... 

Ich frage mich, ob es ein glatteres und gesperrungsfreies Mittel gibt, um dasselbe Ziel zu erreichen

0
spender

Check out https://github.com/somdoron/AsyncCollection , Sie können beide asynchron aus der Warteschlange entfernen und C # 8.0 IAsyncEnumerable verwenden.

Die API ist der BlockingCollection sehr ähnlich.

AsyncCollection<int> collection = new AsyncCollection<int>();

var t = Task.Run(async () =>
{
    while (!collection.IsCompleted)
    {
        var item = await collection.TakeAsync();

        // process
    }
});

for (int i = 0; i < 1000; i++)
{
    collection.Add(i);
}

collection.CompleteAdding();

t.Wait();

Mit IAsyncEnumeable:

AsyncCollection<int> collection = new AsyncCollection<int>();

var t = Task.Run(async () =>
{
    await foreach (var item in collection)
    {
        // process
    }
});

for (int i = 0; i < 1000; i++)
{
    collection.Add(i);
}

collection.CompleteAdding();

t.Wait();
0
somdoron