28 February 2013

Copying files or streams using asynchronous non-blocking I/O

The Context

I've been intensively working on BizTalk messaging solutions for almost two years now, and one of the operational feature of the platform we put in place is the capture of a copy of all incoming and outgoing messages.
For messages of a reasonable size (say 512 kB once compressed and base64-encoded), we're using the integrated Business Activity Monitoring (BAM) API of BizTalk, which is quite powerful and performs extremely well. The message data is then directly inserted by the BAM infrastructure in the appropriate database tables.
But when message size grows over that limit, we have to switch to another solution, as the data size that can be handled by the BAM is limited to 512kB. In that case we have a pipeline component, that writes the message data as a file on the local file system of the machine where the message is being received or sent. As a side note, this is implemented using the transactional file system support (TxF) and inspired by this post on Paolo Salvatori's blog.
Once the message copy is saved on the local file system, it cannot stay there forever though. BizTalk machines are meant to be 100% stateless, and are not backed up, nor is there plenty of disk space on these machines. Therefore, an agent running as a windows service is continuously moving the message copies towards a central location (share) on a SAN, where it will be accessible for consultation, and later be purged.

It was while I was designing the code for the agent that I realized that it should barely do anything else than I/O.  Since that agent is running in a demanding server environment, I wanted to avoid all unneeded resource consumption, and also maximize throughput. To meet that objective, and following the advice from Jeffrey Richter in his excellent book CLR via C# (3rd edition), I decided that all I/O operations should be done asynchronously in order to avoid blocking any worker thread. Indeed, blocking threads is bad for scalability and forces the thread pool to create more threads than necessary, which consumes more resources (memory, OS thread switching and scheduling, etc.).

The Code


So the whole idea is to avoid blocking any thread at any moment while moving data from one stream to another. Note that this excludes any ThreadPool.QueueUserWorkItem approach where the copy would simply be executed synchronously, but on another thread that would itself block. The solution has to be built around the use of the Begin/EndRead and Begin/EndWrite methods of the Stream class.

A few words of explanation about the AsyncStreamCopier class:

  • it is designed to process reads and writes asynchronously and concurrently. It does this by reusing a series of buffers in a cyclic way. A read operation is started when the next buffer in the cycle has already been written. 
  • a significant part of the code is dedicated to exception handling, and ensuring that any exception that occurs on an I/O thread does not go unnoticed by the caller. In case an exception occurs and is not consumed by the caller, it is thrown either in Dispose, or in the finalizer (process crash guaranteed...).
  • the class implements the APM (Asynchronous Programming Model), and exposes the two methods Begin/EndCopy, so its use should be straightforward: just construct a new instance, passing the source and target streams, and a few other options, then kick off the copy using BeginCopy providing your callback for when the operation finishes. After the copy, do not forget to Dispose() of the instance.
  • the most tricky parts of the code are in the EndRead and EndWrite methods. Both methods can launch the next read and/or write, depending on the current state of the buffers in the cyclic buffer array.
  • all the code uses user-mode constructs (Interlocked methods) for thread synchronization. The only place where a kernel-mode thread synchronization construct is used is the IAsyncResult implementation, to satisfy the APM.
So, here's the code, with a few (hopefully useful) embedded comments.
public class AsyncStreamCopier : IDisposable
{
    // size in bytes of the buffers in the buffer pool
    private const int DefaultBufferSize = 4096;
    private readonly int _bufferSize;
    // number of buffers in the pool
    private const int DefaultBufferCount = 4;
    private readonly int _bufferCount;

    // indexes of the next buffer to read into/write from
    private int _nextReadBuffer = -1;
    private int _nextWriteBuffer = -1;

    // the buffer pool, implemented as an array, and used in a cyclic way
    private readonly byte[][] _buffers;
    // sizes in bytes of the available (read) data in the buffers
    private readonly int[] _sizes;
    // the streams...
    private Stream _source;
    private Stream _target;
    private readonly bool _closeStreamsOnEnd;

    // number of buffers that are ready to be written
    private int _buffersToWrite;
    // flag indicating that there is still a read operation to be scheduled
    // (source end of stream not reached)
    private volatile bool _moreDataToRead;
    // the result of the whole operation, returned by BeginCopy()
    private AsyncResult _asyncResult;
    // any exception that occurs during an async operation
    // stored here for rethrow
    private IOException _exception;

    public AsyncStreamCopier(Stream source,
                             Stream target,
                             bool closeStreamsOnEnd, 
                             int bufferSize = DefaultBufferSize, 
                             int bufferCount = DefaultBufferCount)
    {
        if (source == null)
            throw new ArgumentNullException("source");
        if (target == null)
            throw new ArgumentNullException("target");
        if (!source.CanRead)
            throw new ArgumentException("Cannot copy from a non-readable stream.");
        if (!target.CanWrite)
            throw new ArgumentException("Cannot copy to a non-writable stream.");
        _source = source;
        _target = target;
        _moreDataToRead = true;
        _closeStreamsOnEnd = closeStreamsOnEnd;
        _bufferSize = bufferSize;
        _bufferCount = bufferCount;
        _buffers = new byte[_bufferCount][];
        _sizes = new int[_bufferCount];
    }

    ~AsyncStreamCopier()
    {
        // ensure any exception cannot be ignored
        ThrowExceptionIfNeeded();
    }

    public void Dispose()
    {
        if (_asyncResult != null)
            _asyncResult.Dispose();
        if (_closeStreamsOnEnd)
        {
            if (_source != null)
            {
                _source.Dispose();
                _source = null;
            }
            if (_target != null)
            {
                _target.Dispose();
                _target = null;
            }
        }
        GC.SuppressFinalize(this);
        ThrowExceptionIfNeeded();
    }

    public IAsyncResult BeginCopy(AsyncCallback callback, object state)
    {
        // avoid concurrent start of the copy on separate threads
        if (Interlocked.CompareExchange(ref _asyncResult, new AsyncResult(callback, state), null) != null)
            throw new InvalidOperationException("A copy operation has already been started on this object.");
        // allocate buffers
        for (int i = 0; i < _bufferCount; i++)
            _buffers[i] = new byte[_bufferSize];

        // we pass false to BeginRead() to avoid completing the async result
        // immediately which would result in invoking the callback
        // when the method fails synchronously
        BeginRead(false);
        // throw exception synchronously if there is one
        ThrowExceptionIfNeeded();
        return _asyncResult;
    }

    public void EndCopy(IAsyncResult ar)
    {
        if (ar != _asyncResult)
            throw new InvalidOperationException("Invalid IAsyncResult object.");

        if (!_asyncResult.IsCompleted)
            _asyncResult.AsyncWaitHandle.WaitOne();

        if (_closeStreamsOnEnd)
        {
            _source.Close();
            _source = null;
            _target.Close();
            _target = null;
        }

        ThrowExceptionIfNeeded();
    }

    /// <summary>
    /// Here we'll throw a pending exception if there is one, 
    /// and remove it from our instance, so we know it has been consumed.
    /// </summary>
    private void ThrowExceptionIfNeeded()
    {
        if (_exception != null)
        {
            var exception = _exception;
            _exception = null;
            throw exception;
        }
    }

    private void BeginRead(bool completeOnError = true)
    {
        if (!_moreDataToRead)
        {
            return;
        }
        if (_asyncResult.IsCompleted)
            return;
        int bufferIndex = Interlocked.Increment(ref _nextReadBuffer) % _bufferCount;
        try
        {
            _source.BeginRead(_buffers[bufferIndex], 0, _bufferSize, EndRead, bufferIndex);
        }
        catch (IOException exception)
        {
            _exception = exception;
            if (completeOnError)
                _asyncResult.Complete(false);
        }
    }

    private void BeginWrite()
    {
        if (_asyncResult.IsCompleted)
            return;
        // this method can actually be called concurrently!!
        // indeed, let's say we call a BeginWrite, and the thread gets interrupted 
        // just after making the IO request.
        // At that moment, the thread is still in the method. And then the IO request
        // ends (extremely fast io, or caching...), EndWrite gets called
        // on another thread, and calls BeginWrite again! There we have it!
        // That is the reason why an Interlocked is needed here.
        int bufferIndex = Interlocked.Increment(ref _nextWriteBuffer) % _bufferCount;
        try
        {
            _target.BeginWrite(_buffers[bufferIndex], 0, _sizes[bufferIndex], EndWrite, null);
        }
        catch (IOException exception)
        {
            _exception = exception;
            _asyncResult.Complete(false);
        }
    }

    private void EndRead(IAsyncResult ar)
    {
        try
        {
            int read = _source.EndRead(ar);
            _moreDataToRead = read > 0;
            var bufferIndex = (int) ar.AsyncState;
            _sizes[bufferIndex] = read;
        }
        catch (IOException exception)
        {
            _exception = exception;
            _asyncResult.Complete(false);
            return;
        }

        if (_moreDataToRead)
        {
            int usedBuffers = Interlocked.Increment(ref _buffersToWrite);
            // if we incremented from zero to one, then it means we just 
            // added the single buffer to write, so a writer could not 
            // be busy, and we have to schedule one.
            if (usedBuffers == 1)
                BeginWrite();
            // test if there is at least a free buffer, and schedule
            // a read, as we have read some data
            if (usedBuffers < _bufferCount)
                BeginRead();
        }
        else
        {
            // we did not add a buffer, because no data was read, and 
            // there is no buffer left to write so this is the end...
            if (Thread.VolatileRead(ref _buffersToWrite) == 0)
            {
                _asyncResult.Complete(false);
            }
        }
    }

    private void EndWrite(IAsyncResult ar)
    {
        try
        {
            _target.EndWrite(ar);
        }
        catch (IOException exception)
        {
            _exception = exception;
            _asyncResult.Complete(false);
            return;
        }

        int buffersLeftToWrite = Interlocked.Decrement(ref _buffersToWrite);
        // no reader could be active if all buffers were full of data waiting to be written
        bool noReaderIsBusy = buffersLeftToWrite == _bufferCount - 1;
        // note that it is possible that both a reader and
        // a writer see the end of the copy and call Complete
        // on the _asyncResult object. That race condition is handled by
        // Complete that ensures it is only executed fully once.
        if (!_moreDataToRead)
        {
            // at this point we know no reader can schedule a read or write
            if (Thread.VolatileRead(ref _buffersToWrite) == 0)
            {
                // nothing left to write, so it is the end
                _asyncResult.Complete(false);
                return;
            }
        }
        else
            // here, we know we have something left to read, 
            // so schedule a read if no read is busy
            if (noReaderIsBusy)
                BeginRead();

        // also schedule a write if we are sure we did not write the last buffer
        // note that if buffersLeftToWrite is zero and a reader has put another
        // buffer to write between the time we decremented _buffersToWrite 
        // and now, that reader will also schedule another write,
        // as it will increment _buffersToWrite from zero to one
        if (buffersLeftToWrite > 0)
            BeginWrite();
    }
}

Some usage notes


A simple usage example looks like the following:

private AsyncStreamCopier _copier;
public void DoIt()
{
    var source = new FileStream(@"D:\Temp\SomeBigFile.dat", 
                    FileMode.Open, 
                    FileAccess.Read, 
                    FileShare.None, 
                    8192, 
                    FileOptions.Asynchronous);
    var target = new FileStream(...);

    _copier = new AsyncStreamCopier(source, target, false, 32768, 3);
    _copier.BeginCopy(CopyFinished, null);
}

private void CopyFinished(IAsyncResult ar)
{
    try
    {
        _copier.EndCopy(ar);
    }
    catch (Exception e)
    {
        Console.WriteLine(e);
    }
}

As you can see, the code is relatively simple. Worth noticing, is that FileOptions.Asynchronous must be specified to deal with FileStreams asynchronously. As a general rule, the source and target streams must of course support asynchronous operations. You can play with both different buffer sizes, and different numbers of buffers, and see what leads to the best performance/shortest copy time. A degenerate case is when bufferCount = 1. In that case, concurrent reads and writes are disabled, as there is only a single buffer to read into and write from. That is (intuitively) probably the configuration to use when both streams use the same underlying physical device (same disk controller, same network card...), as otherwise both operations will compete for the device. In the context described here above, I could safely use concurrency for reads and writes, as the source was a file on a local disk, and the destination was located on a network share.
As a final note, you can find in Stephen Toub's .NET Matters column another (simpler) implementation for dealing with streams asynchronously. The main difference with the solution described here above is that concurrent reads and writes are not supported by Stephen's code. By the way, I'm curious if the code would be significantly simpler using the new C# 5 async support.

You can download the code here.