Persistent Channel
System.Threading.Channels allows to organize data exchange between producer and consumer using concept of channel. The standard implementation provides in-memory bounded and unbounded channels. Memory-based organization of storage for messages may be problematic in situation when producer has predictable and constant speed of generated messages but consumer speed may vary and depends on external factors. It leads to accumulation of messages in the channel and may cause OutOfMemoryException. Another disadvantage of in-memory channel is an inability to recover messages after crashes.
PersistentChannel is a reliable and persistent unbounded channel that can be recovered after app crash and not limited by RAM. However, it is more slower than in-memory channel.
The following example demonstrates concurrent work with channel:
using DotNext.Threading.Channels;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
using System.Threading.Channels;
using System.Threading.Tasks;
private static async Task Produce(ChannelWriter<decimal> writer)
{
for (decimal i = 0M; i < 500M; i++)
await writer.WriteAsync(i);
}
private static async Task Consume(ChannelReader<decimal> reader)
{
for (decimal i = 0M; i < 500M; i++)
Assert.Equal(i, await reader.ReadAsync());
}
public static async Task ProduceConsumeConcurrently()
{
using (var channel = new SerializationChannel<decimal>(new PersistentChannelOptions { SingleReader = true, SingleWriter = true }))
{
var consumer = Consume(channel.Reader);
var producer = Produce(channel.Writer);
await Task.WhenAll(consumer, producer);
}
}
sealed class SerializationChannel<T> : PersistentChannel<T, T>
{
private readonly IFormatter formatter;
internal SerializationChannel(PersistentChannelOptions options) : base(options) => formatter = new BinaryFormatter();
protected override ValueTask<T> DeserializeAsync(Stream input, CancellationToken token) => new ValueTask<T>((T)formatter.Deserialize(input));
protected override ValueTask SerializeAsync(T input, Stream output, CancellationToken token)
{
formatter.Serialize(output, input);
return new ValueTask();
}
}
PersistentChannel
class is not aware about serialization and deserialization of message type T
to/from stream. Therefore, it is abstract class and requires to override two methods:
SerializeAsync
which is used to serialize message of typeT
to the stream asynchronouslyDeserializeAsync
which is used to deserialize message of typeT
from the stream asynchronously The example shown previously demonstrates basic implementation of serialization mechanism using .NET binary formatter. In real world application it is better to use fast binary serialization format such as Protobuf or custom implementation.
Architecture
All messages generated by producer are written to logical partitions. Partition is a file on disk that contains sequentially written messages. The maximum number of messages per partition is limited by configuration parameter.
Producer and consumer maintaining their own file streams and cursors. Cursor is a data structure that describes position of the message and offset in partition file. When capacity of partition file is exceeded then cursor maintained on producer side causes creation of new partition file. When cursor reaches the end of the partition file on consumer side then it forces deletion of the file. As a result, fully consumed partition will be deleted automatically. This process called file rotation which is widely used by logging frameworks.
Configuration
PersistentChannelOptions can be passed into constructor of persistent channel to configure its behavior and performance characteristics. The following table describes all available configuration parameters:
Parameter | Default Value | Description |
---|---|---|
BufferSize | 4096 | The size of memory buffer used for internal I/O operations |
Location | Temp directory | The path to the directory used by channel to create partition files |
PartitionCapacity | 1000 | The maximum number of messages that can be placed to single partition file |
InitialPartitionSize | 0 | The initial size (in bytes) on disk of newly created partition file. If this parameter is greater than 0 then the created partition file will be filled with zeroes. Such approach helps to avoid fragmentation of partition file and significantly improves I/O performance |
SingleWriter | false | true if there is only one producer of the messages. false if WriteAsync method can be called concurrently from different asynchronous flows and causes synchronization of multiple producers so it has runtime overhead. |
SingleReader | false | true if the is only one consumer of the messages. false if ReadAsync method can be called concurrently from different asynchronous flows and causes synchronization of multiple producers so it has runtime overhead. |
The best performance can be achieved using the following configuration:
SingleWriter=true
SingleReader=true
PartitionCapacity
is not smallInitialPartitionSize
is greater than zero and enough to store the number of messages equal toPartitionCapacity
However, the configuration depends on use cases.
Diagnostics
Throughput property of PersistentChannel
class allows to measure ratio between consumed and produced messages. The value is always in range [0, 1). The value 1
means that consumer is reading messages with the same speed as producer writing new messages. If throughput is less than 1
then disk space utilized for messages may grow.
RemainintCount property provides the number of unread messages in the channel.