This blog post was written as part of the preparations while writing the book about .NET, which will be announced in a few weeks. If you want to be informed about its publication and receive auxiliary materials, feel free to subscribe to my newsletter. Many thanks to Stephen Toub that helped in reviewing this code.
Async programming becomes more and more popular. While being very convenient in use, from performance perspective there are scenarios where regular Task-returning async methods have one serious drawback: they need to allocate a new Task to represent the operation (and its result). Such heap-allocated Task is unavoidable in the truly asynchronous path of execution because async continuations are not guaranteed to be executed on the same thread – thus the operation must persist on the heap, not on the stack.
However, there are cases where async operations may complete synchronously (because of really fast meeting some conditions). It would be nice to avoid heap-allocating Task in such case, created just to pass the result of an operation. Exactly for such purpose ValueTask type was introduced in .NET Core 2.0 (and the corresponding AsyncValueTaskMethodBuilder handling the underlying state machine). Initially, it was a struct made as a discriminated union, which could take one of two possible values:
- ready to use the result (if the operation completed successfully synchronously)
- a normal Task which may be awaited on (if the operation become truly asynchronous)
In other words, ValueTask helps in handling synchronous path of async method execution. Thanks to being a struct (which will be allocated on the stack or enregistered into CPU register) synchronous result of the operation can be returned without heap allocations. And only in case of an asynchronous path, a new Task will be heap-allocated eventually by underlying machinery:
|
public ValueTask ReadFileAsync(string filename) { // Happy, no allocating synchronous path if (!File.Exists(filename)) return new ValueTask(string.Empty); // Wrapping a regular Task in case of asynchronous path return new ValueTask(File.ReadAllTextAsync(filename)); } |
But what if we were able to not heap-allocate Task even in case of an asynchronous path? This may be useful in very, very high-performance code, avoiding async-related allocations at all. As already said, something on the heap must represent our async operation because there is no thread affinity. But why not use heap-allocated, pooled objects for that, reused between successive async operations?
Indeed, since .NET Core 2.1, ValueTask can also wrap an object implementing IValueTaskSource interface. Such an object can be pooled and reused to minimize allocations. It represents our operation and underlying AsyncValueTaskMethodBuilder is aware of it, calling appropriate methods of IValueTaskSource interface. Here is how the previous example looks with the help of custom IValueTaskSource implementation described in this post:
|
public ValueTask<string> ReadFileAsync(string filename) { if (!File.Exists(filename)) return new ValueTask<string>("!"); var cachedOp = pool.Rent(); return cachedOp.RunAsync(filename, pool); } private ObjectPool<FileReadingPooledValueTaskSource> pool = new ObjectPool<FileReadingPooledValueTaskSource>( () => new FileReadingPooledValueTaskSource(), 10); |
Thanks to pooling, even on the asynchronous path there will be no allocations (at least most often, if our pool is used efficiently). We can await such method in a regular way and underlying machinery will take care of it.
Note. If you would like to hear more about Task, ValueTask and IValueTaskSource again, in similar but other words, please look at great Task, Async Await, ValueTask, IValueTaskSource and how to keep your sanity in modern .NET world post by Szymon Kulec.
Implementation details
Although the rationale behind IValueTaskSource seems to be clear, as well as its usage presented above, implementing it is not trivial. When implementing IValueTaskSource interface, we must implement three following methods:
* GetResult – called only once, when the async state machine needs to obtain the result of the operation
* GetStatus – called by the async state machine to check the status of the operation
* OnCompleted – called by the async state machine when wrapping ValueTask has been awaited. We should remember here the continuation to be called when the operation completes (but if it already has been completed, we should call the continuation immediately)
Seems easy, right? Read on to see if it really is! All source code described here is available in my PooledValueTaskSource repository on GitHub. There are quite many comments in the code but this post explains most relevant parts as well. Do not be also surprised with many diagnostic Console.Write in this code – it serves to illustrate the internal working of this class in prepared example program (also available in the repository).
In my custom implementation, I use object pooling based on ObjectPool class based on the internal class taken from Roslyn source code and a little refactored (with renaming mostly) – I’ve omitted it here for brevity as not so relevant. From our perspective here there are obvious Rent and Return methods, period.
In my implementation, I am also mostly based on code from AwaitableSocketAsyncEventArgs in System.Net.Sockets.Socket
and AsyncIOOperation in ASP.NET IIS Integration code. What I’ve tried to do is to provide Minimal Valuable Product that is correct and working (stripping as much as possible from the mentioned code).
My custom IValueTaskSource represents an operation that returns a string that is being read from the provided file. Obviously, one would probably like to introduce a more generic class with generic result type and action being provided as a lambda expression. However, to not clutter such example too much, I’ve decided to prepare it in such “hardcoded”, specific scenario. Feel free to contribute more generic versions!
Let’s start from fields that FileReadingPooledValueTaskSource contains:
|
public class FileReadingPooledValueTaskSource : IValueTaskSource<string> { private static readonly Action<object> CallbackCompleted = _ => { Debug.Assert(false, "Should not be invoked"); }; private Action<object> continuation; private string result; private Exception exception; private short token; private object state; private ObjectPool<FileReadingPooledValueTaskSource> pool; private ExecutionContext executionContext; private object scheduler; ... |
The most important fields of FileReadingPooledValueTaskSource include:
- Action< object > continuation – it represents a continuation to be executed when our operation ends
- string result – it keeps the result of our operation (in case of success)
- Exception exception – it keeps an Exception instance that happened during executing our operation (in case of failure)
- short token – current token value given to a ValueTask and then verified against the value it passes back to us. This is not meant to be a completely reliable mechanism, doesn’t require additional synchronization, etc. It’s purely a best effort attempt to catch misuse, including awaiting for a value task twice and after it’s already being reused by someone else
- object state – state internally used by asynchronous machinery
- static readonly Action<object> CallbackCompleted – sentinel object used to indicate that the operation has completed prior to OnCompleted being called
Let’s now look at each of IValueTaskSource method implementation. GetResult is quite easy – it will be called only once by underlying state machine when we inform that our operation has completed (by GetStatus method explained soon). Thus, we need to reset the object state (to be reusable), return it to the pool and return the result (or throw an exception in case of failure):
|
public string GetResult(short token) { Console.WriteLine("GetResult"); if (token != this.token) ThrowMultipleContinuations(); var exception = this.exception; var result = ResetAndReleaseOperation(); if (exception != null) { throw exception; } return result; } |
GetStatus is called by the state machine to check the current status of our operation. In my case, I assume it is completed if result is no more null. Depends on the exception field, it is then succeeded or faulted:
|
public ValueTaskSourceStatus GetStatus(short token) { if (token != this.token) ThrowMultipleContinuations(); Console.Write("GetStatus:"); if (result == null) { Console.WriteLine("pending"); return ValueTaskSourceStatus.Pending; } Console.WriteLine("completed: succeeded or faulted"); return exception != null ? ValueTaskSourceStatus.Succeeded : ValueTaskSourceStatus.Faulted; } |
The most complex is OnCompleted method implementation. It is being called by the underlying state machine if wrapped ValueTask is being awaited. Two scenarios may happen here that must be handled:
- if an operation has not yet completed – we store the provided continuation to be executed once the operation is completed
- if an operation has already completed – in such case our internal continuation should be already set to CallbackCompleted value. If it so, we simply invoke the continuation here
Please note how much code is dedicated to properly get the context of the continuation (with respect to provided ValueTaskSourceOnCompletedFlags):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
|
public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) { Console.WriteLine("." + token); if (token != this.token) ThrowMultipleContinuations(); if ((flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext) != 0) { this.executionContext = ExecutionContext.Capture(); } if ((flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext) != 0) { SynchronizationContext sc = SynchronizationContext.Current; if (sc != null && sc.GetType() != typeof(SynchronizationContext)) { this.scheduler = sc; } else { TaskScheduler ts = TaskScheduler.Current; if (ts != TaskScheduler.Default) { this.scheduler = ts; } } } // Remember current state this.state = state; // Remember continuation to be executed on completed (if not already completed, in case of which // continuation will be set to CallbackCompleted) var previousContinuation = Interlocked.CompareExchange(ref this.continuation, continuation, null); if (previousContinuation != null) { if (!ReferenceEquals(previousContinuation, CallbackCompleted)) ThrowMultipleContinuations(); // Lost the race condition and the operation has now already completed. // We need to invoke the continuation, but it must be asynchronously to // avoid a stack dive. However, since all of the queueing mechanisms flow // ExecutionContext, and since we're still in the same context where we // captured it, we can just ignore the one we captured. executionContext = null; this.state = null; // we have the state in "state"; no need for the one in UserToken InvokeContinuation(continuation, state, forceAsync: true); } } private void InvokeContinuation(Action<object> continuation, object state, bool forceAsync) { if (continuation == null) return; object scheduler = this.scheduler; this.scheduler = null; if (scheduler != null) { if (scheduler is SynchronizationContext sc) { sc.Post(s => { var t = (Tuple<Action<object>, object>)s; t.Item1(t.Item2); }, Tuple.Create(continuation, state)); } else { Debug.Assert(scheduler is TaskScheduler, $"Expected TaskScheduler, got {scheduler}"); Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, (TaskScheduler)scheduler); } } else if (forceAsync) { ThreadPool.QueueUserWorkItem(continuation, state, preferLocal: true); } else { continuation(state); } } |
This concludes implementing IValueTaskSource methods but we need to add two more crucial pieces into this puzzle: a method that starts our operation and method that is called when an operation completes asynchronously.
The first one, named by my as simple as RunAsync (called in our example at the beginning of the article) is responsible for executing the main work:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
|
public ValueTask<string> RunAsync(string filename, ObjectPool<FileReadingPooledValueTaskSource> pool) { Debug.Assert(Volatile.Read(ref continuation) == null, $"Expected null continuation to indicate reserved for use"); this.pool = pool; // Start async op var isCompleted = FireAsyncWorkWithSyncReturnPossible(filename); if (!isCompleted) { // Opearation not yet completed. Return ValueTask wrapping us. Console.WriteLine("Asynchronous path."); return new ValueTask<string>(this, token); } // OMG so happy, we catch up! Just return ValueTask wrapping the result (returning ourselves to the pool). Console.WriteLine("Synchronous path."); var result = ResetAndReleaseOperation(); return new ValueTask<string>(result); } private bool FireAsyncWorkWithSyncReturnPossible(string filename) { if (filename == @"c:\dummy.txt") { // Simulate sync path this.result = filename; return true; } // Simulate some low-level, unmanaged, asynchronous work. This normally: // - would call an OS-level API with callback registered // - after some time, registered callback would be triggered (with NotifyAsyncWorkCompletion call inside) ThreadPool.QueueUserWorkItem(_ => { Thread.Sleep(1000); var data = File.ReadAllText(filename); NotifyAsyncWorkCompletion(data); }); return false; } |
I’ve implemented here simulation of some operation that may both complete immediately (synchronously) and asynchronously. In case of asynchronous path, returned ValueTask pass the result so we avoid allocations again. In case of asynchronous case the key is to return ValueTask that wraps… ourselves. It may be then awaited on, while we also started asynchronous processing (simulated by thread pool work in our case).
When asynchronous operation finishes, NotifyAsyncWorkCompletion method will be called (remember – in the real-world scenario this would be some callback registered in asynchronous IO or other low-level API). The responsibility of this method is simple:
- it stores result and/or exception
- if the operation is not yet awaited (in such case this.continuation will be null) – it only sets continuation to CallbackCompleted. Continuation will be executed in OnCompleted method when ValueTask will be awaited
- if the operation is already awaited (in such case this.continuation contains awaited continuation) – it executes continuation in the appropriate context (which again is quite a complex process)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
private void NotifyAsyncWorkCompletion(string data, Exception exception = null) { this.result = data; this.exception = exception; // Mark operation as completed var previousContinuation = Interlocked.CompareExchange(ref this.continuation, CallbackCompleted, null); if (previousContinuation != null) { // Async work completed, continue with... continuation ExecutionContext ec = executionContext; if (ec == null) { InvokeContinuation(previousContinuation, this.state, forceAsync: false); } else { // This case should be relatively rare, as the async Task/ValueTask method builders // use the awaiter's UnsafeOnCompleted, so this will only happen with code that // explicitly uses the awaiter's OnCompleted instead. executionContext = null; ExecutionContext.Run(ec, runState => { var t = (Tuple<FileReadingPooledValueTaskSource, Action<object>, object>)runState; t.Item1.InvokeContinuation(t.Item2, t.Item3, forceAsync: false); }, Tuple.Create(this, previousContinuation, this.state)); } } } |
Above-mentioned clearing and returning to the pool is implemented in ResetAndReleaseOperation (yes I know, SRP is dying here, refactor!). The only field we cannot clear is token, which is solely dedicated to detecting incorrect re-usage of those objects:
|
protected virtual string ResetAndReleaseOperation() { string result = this.result; this.token++; this.result = null; this.exception = null; this.state = null; this.continuation = null; this.pool.Return(this); return result; } |
And… only such little code is necessary to avoid heap-allocating in case of async operations!
Remarks
- I do not claim that my code in current form is ideal. Quite opposite, I still expect it to be by far ideal! It serves as an illustration and base for further development. Please, feel invited to comment and to contribute to making it better!
- Current repository is oversimplified – due to the work on the book, I do not have time to reorganize it properly (especially include comprehensive unit tests). Again, feel free to contribute!