Show / Hide Table of Contents

Persistent Channel

System.Threading.Channels allows to organize data exchange between producer and consumer using concept of channel. The standard implementation provides in-memory bounded and unbounded channels. Memory-based organization of storage for messages may be problematic in situation when producer has predictable and constant speed of generated messages but consumer speed may vary and depends on external factors. It leads to accumulation of messages in the channel and may cause OutOfMemoryException. Another disadvantage of in-memory channel is an inability to recover messages after crashes.

PersistentChannel is a reliable and persistent unbounded channel that can be recovered after app crash and not limited by RAM. However, it is more slower than in-memory channel.

The following example demonstrates concurrent work with channel:

using DotNext.Threading.Channels;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
using System.Threading.Channels;
using System.Threading.Tasks;

private static async Task Produce(ChannelWriter<decimal> writer)
{
  for (decimal i = 0M; i < 500M; i++)
    await writer.WriteAsync(i);
}

private static async Task Consume(ChannelReader<decimal> reader)
{
  for (decimal i = 0M; i < 500M; i++)
    Assert.Equal(i, await reader.ReadAsync());
}

public static async Task ProduceConsumeConcurrently()
{
  using (var channel = new SerializationChannel<decimal>(new PersistentChannelOptions { SingleReader = true, SingleWriter = true }))
  {
    var consumer = Consume(channel.Reader);
    var producer = Produce(channel.Writer);
    await Task.WhenAll(consumer, producer);
  }
}

sealed class SerializationChannel<T> : PersistentChannel<T, T>
{
  private readonly IFormatter formatter;

  internal SerializationChannel(PersistentChannelOptions options) : base(options) => formatter = new BinaryFormatter();

  protected override ValueTask<T> DeserializeAsync(Stream input, CancellationToken token) => new ValueTask<T>((T)formatter.Deserialize(input));

  protected override ValueTask SerializeAsync(T input, Stream output, CancellationToken token)
  {
    formatter.Serialize(output, input);
    return new ValueTask();
  }
}

PersistentChannel class is not aware about serialization and deserialization of message type T to/from stream. Therefore, it is abstract class and requires to override two methods:

  • SerializeAsync which is used to serialize message of type T to the stream asynchronously
  • DeserializeAsync which is used to deserialize message of type T from the stream asynchronously The example shown previously demonstrates basic implementation of serialization mechanism using .NET binary formatter. In real world application it is better to use fast binary serialization format such as Protobuf or custom implementation.

Architecture

All messages generated by producer are written to logical partitions. Partition is a file on disk that contains sequentially written messages. The maximum number of messages per partition is limited by configuration parameter.

Producer and consumer maintaining their own file streams and cursors. Cursor is a data structure that describes position of the message and offset in partition file. When capacity of partition file is exceeded then cursor maintained on producer side causes creation of new partition file. When cursor reaches the end of the partition file on consumer side then it forces deletion of the file. As a result, fully consumed partition will be deleted automatically. This process called file rotation which is widely used by logging frameworks.

Configuration

PersistentChannelOptions can be passed into constructor of persistent channel to configure its behavior and performance characteristics. The following table describes all available configuration parameters:

Parameter Default Value Description
BufferSize 4096 The size of memory buffer used for internal I/O operations
Location Temp directory The path to the directory used by channel to create partition files
PartitionCapacity 1000 The maximum number of messages that can be placed to single partition file
InitialPartitionSize 0 The initial size (in bytes) on disk of newly created partition file. If this parameter is greater than 0 then the created partition file will be filled with zeroes. Such approach helps to avoid fragmentation of partition file and significantly improves I/O performance
SingleWriter false true if there is only one producer of the messages. false if WriteAsync method can be called concurrently from different asynchronous flows and causes synchronization of multiple producers so it has runtime overhead.
SingleReader false true if the is only one consumer of the messages. false if ReadAsync method can be called concurrently from different asynchronous flows and causes synchronization of multiple producers so it has runtime overhead.

The best performance can be achieved using the following configuration:

  • SingleWriter=true
  • SingleReader=true
  • PartitionCapacity is not small
  • InitialPartitionSize is greater than zero and enough to store the number of messages equal to PartitionCapacity

However, the configuration depends on use cases.

Diagnostics

Throughput property of PersistentChannel class allows to measure ratio between consumed and produced messages. The value is always in range [0, 1). The value 1 means that consumer is reading messages with the same speed as producer writing new messages. If throughput is less than 1 then disk space utilized for messages may grow.

RemainintCount property provides the number of unread messages in the channel.

  • Improve this Doc
☀
☾
Back to top Generated by DocFX