mirror of
https://github.com/duplicati/duplicati.git
synced 2025-11-28 03:20:25 +08:00
615 lines
32 KiB
C#
615 lines
32 KiB
C#
// Copyright (C) 2025, The Duplicati Team
|
|
// https://duplicati.com, hello@duplicati.com
|
|
//
|
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
|
// copy of this software and associated documentation files (the "Software"),
|
|
// to deal in the Software without restriction, including without limitation
|
|
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
|
// and/or sell copies of the Software, and to permit persons to whom the
|
|
// Software is furnished to do so, subject to the following conditions:
|
|
//
|
|
// The above copyright notice and this permission notice shall be included in
|
|
// all copies or substantial portions of the Software.
|
|
//
|
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
|
// DEALINGS IN THE SOFTWARE.
|
|
|
|
using System;
|
|
using System.Buffers;
|
|
using System.Collections.Concurrent;
|
|
using System.Collections.Generic;
|
|
using System.Data;
|
|
using System.Diagnostics;
|
|
using System.Linq;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using CoCoL;
|
|
using Duplicati.Library.Main.Database;
|
|
using Microsoft.Extensions.Caching.Memory;
|
|
|
|
#nullable enable
|
|
|
|
namespace Duplicati.Library.Main.Operation.Restore
|
|
{
|
|
|
|
/// <summary>
|
|
/// Process that manages the block requests and responses to/from the
|
|
/// `FileProcessor` process by caching the blocks.
|
|
/// </summary>
|
|
internal class BlockManager
|
|
{
|
|
/// <summary>
|
|
/// The log tag for this class.
|
|
/// </summary>
|
|
private static readonly string LOGTAG = Logging.Log.LogTagFromType<BlockManager>();
|
|
|
|
/// <summary>
|
|
/// Dictionary for data blocks that are being cached. Whenever a block
|
|
/// is requested, it checks if it is in the cache. If it is not, it
|
|
/// will request the block from the corresponding volume. The requester
|
|
/// will be given a `Task` that will be completed when the block is
|
|
/// available. The cache will also keep track of how many times a block
|
|
/// is requested, and only remove it from the cache when all requests
|
|
/// have been fulfilled. This is to ensure that the cache is not
|
|
/// prematurely evicted, while keeping the memory usage low. The
|
|
/// dictionary also keeps track of how many readers are accessing it,
|
|
/// and will retire the volume request channel when the last reader is
|
|
/// done. Once disposed, the dictionary will clean up the database
|
|
/// table that was used to keep track of the block counts.
|
|
/// </summary>
|
|
internal class SleepableDictionary : IDisposable
|
|
{
|
|
/// <summary>
|
|
/// Channel for submitting block requests from a volume.
|
|
/// </summary>
|
|
private readonly IWriteChannel<object> m_volume_request;
|
|
/// <summary>
|
|
/// The dictionary holding the cached blocks.
|
|
/// </summary>
|
|
private readonly MemoryCache m_block_cache;
|
|
/// <summary>
|
|
/// Secondary counter for the number of blocks in the cache. It's faster than the MemoryCache.Count property.
|
|
/// </summary>
|
|
private int m_block_cache_count;
|
|
/// <summary>
|
|
/// The maximum number of blocks to hold in the cache. If the number of blocks exceeds this value, the cache will be compacted. It is a cached value of <see cref="Options.RestoreCacheMax"/>.
|
|
/// </summary>
|
|
private long m_block_cache_max;
|
|
/// <summary>
|
|
/// Flag indicating if the cache is currently being compacted. Used to avoid triggering multiple compactions as they are expensive.
|
|
/// Multiple triggers can occur as the eviction callback is launched in a task, so the count can be above the max for a short while.
|
|
/// </summary>
|
|
private int m_block_cache_compacting = 0;
|
|
/// <summary>
|
|
/// The dictionary holding the `Task` for each block request in flight.
|
|
/// </summary>
|
|
private readonly ConcurrentDictionary<long, (int Count, TaskCompletionSource<DataBlock> Task)> m_waiters = [];
|
|
/// <summary>
|
|
/// The number of readers accessing this dictionary. Used during shutdown / cleanup.
|
|
/// </summary>
|
|
private int m_readers = 0;
|
|
/// <summary>
|
|
/// Internal stopwatch for profiling the cache eviction.
|
|
/// </summary>
|
|
private readonly Stopwatch? sw_cacheevict;
|
|
/// <summary>
|
|
/// Internal stopwatch for profiling the `CheckCounts` method.
|
|
/// </summary>
|
|
private readonly Stopwatch? sw_checkcounts;
|
|
/// <summary>
|
|
/// Internal stopwatch for profiling setting up the waiters.
|
|
/// </summary>
|
|
private readonly Stopwatch? sw_get_wait;
|
|
/// <summary>
|
|
/// Internal stopwatch for profiling writing the block request to the volume request channel.
|
|
/// </summary>
|
|
private readonly Stopwatch? sw_get_write;
|
|
/// <summary>
|
|
/// Internal stopwatch for profiling setting a block in the cache.
|
|
/// </summary>
|
|
private readonly Stopwatch? sw_set_set;
|
|
/// <summary>
|
|
/// Internal stopwatch for profiling getting a waiter to notify that the block is available.
|
|
/// </summary>
|
|
private readonly Stopwatch? sw_set_wake_get;
|
|
/// <summary>
|
|
/// Internal stopwatch for profiling for waking up the waiting request.
|
|
/// </summary>
|
|
private readonly Stopwatch? sw_set_wake_set;
|
|
|
|
/// <summary>
|
|
/// Dictionary for keeping track of how many times each block is requested. Used to determine when a block is no longer needed.
|
|
/// </summary>
|
|
private readonly Dictionary<long, long> m_blockcount = new();
|
|
/// <summary>
|
|
/// Lock for the block count dictionary.
|
|
/// </summary>
|
|
private readonly object m_blockcount_lock = new();
|
|
/// <summary>
|
|
/// Dictionary for keeping track of how many times each volume is requested. Used to determine when a volume is no longer needed.
|
|
/// </summary>
|
|
private readonly Dictionary<long, long> m_volumecount = new();
|
|
/// <summary>
|
|
/// The options for the restore.
|
|
/// </summary>
|
|
private readonly Options m_options;
|
|
/// <summary>
|
|
/// The cache eviction options. Used for registering a callback when a block is evicted from the cache.
|
|
/// </summary>
|
|
private readonly MemoryCacheEntryOptions m_entry_options = new();
|
|
/// <summary>
|
|
/// Flag indicating if the dictionary has been retired. Used to avoid multiple retire attempts.
|
|
/// </summary>
|
|
private bool m_retired = false;
|
|
|
|
/// <summary>
|
|
/// Initializes a new instance of the <see cref="SleepableDictionary"/> class.
|
|
/// </summary>
|
|
/// <param name="volume_request">Channel for submitting block requests from a volume.</param>
|
|
/// <param name="readers">Number of readers accessing this dictionary. Used during shutdown / cleanup.</param>
|
|
private SleepableDictionary(IWriteChannel<object> volume_request, Options options, int readers)
|
|
{
|
|
m_options = options;
|
|
m_volume_request = volume_request;
|
|
var cache_options = new MemoryCacheOptions();
|
|
m_block_cache = new MemoryCache(cache_options);
|
|
m_block_cache_max = options.RestoreCacheMax;
|
|
m_block_cache_count = 0;
|
|
m_entry_options.RegisterPostEvictionCallback((key, value, reason, state) =>
|
|
{
|
|
if (value is DataBlock dataBlock)
|
|
{
|
|
dataBlock.Dispose();
|
|
}
|
|
else if (value is null)
|
|
{
|
|
Logging.Log.WriteWarningMessage(LOGTAG, "CacheEvictCallback", null, "Evicted block {0} from cache, but the value was null", key);
|
|
}
|
|
else
|
|
{
|
|
Logging.Log.WriteWarningMessage(LOGTAG, "CacheEvictCallback", null, "Evicted block {0} from cache, but the value was of unexpected type {1}", key, value.GetType().FullName);
|
|
}
|
|
Interlocked.Decrement(ref m_block_cache_count);
|
|
Logging.Log.WriteExplicitMessage(LOGTAG, "CacheEvictCallback", "Evicted block {0} from cache", key);
|
|
if (reason is EvictionReason.Capacity)
|
|
Interlocked.Exchange(ref m_block_cache_compacting, 0);
|
|
});
|
|
m_readers = readers;
|
|
sw_cacheevict = options.InternalProfiling ? new() : null;
|
|
sw_checkcounts = options.InternalProfiling ? new() : null;
|
|
sw_get_wait = options.InternalProfiling ? new() : null;
|
|
sw_get_write = options.InternalProfiling ? new() : null;
|
|
sw_set_set = options.InternalProfiling ? new() : null;
|
|
sw_set_wake_get = options.InternalProfiling ? new() : null;
|
|
sw_set_wake_set = options.InternalProfiling ? new() : null;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Asynchronously creates a new instance of the <see cref="SleepableDictionary"/> class.
|
|
/// This method initializes the block and volume counts based on the data in the database.
|
|
/// </summary>
|
|
/// <param name="db">The database holding information about how many of each block this restore requires.</param>
|
|
/// <param name="volume_request">CoCoL channel for submitting block requests from a volume.</param>
|
|
/// <param name="options">The restore options.</param>
|
|
/// <param name="readers">The number of readers accessing this dictionary. Used during shutdown / cleanup.</param>
|
|
/// <param name="cancellationToken">A cancellation token to cancel the operation.</param>
|
|
/// <returns>A task that when awaited returns a new instance of the <see cref="SleepableDictionary"/> class.</returns>
|
|
public static async Task<SleepableDictionary> CreateAsync(LocalRestoreDatabase db, IWriteChannel<object> volume_request, Options options, int readers, CancellationToken cancellationToken)
|
|
{
|
|
var sd = new SleepableDictionary(volume_request, options, readers);
|
|
|
|
await foreach (var (block_id, volume_id) in db.GetBlocksAndVolumeIDs(options.SkipMetadata, cancellationToken).ConfigureAwait(false))
|
|
{
|
|
var bc = sd.m_blockcount.TryGetValue(block_id, out var c);
|
|
sd.m_blockcount[block_id] = bc ? c + 1 : 1;
|
|
var vc = sd.m_volumecount.TryGetValue(volume_id, out var v);
|
|
sd.m_volumecount[volume_id] = vc ? v + 1 : 1;
|
|
}
|
|
|
|
return sd;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Decrements the block counters for the block and volume, and checks
|
|
/// if the block is still needed. If the block is no longer needed, it
|
|
/// will be removed from the cache. If the volume is no longer needed,
|
|
/// the VolumeDownloader will be notified.
|
|
/// </summary>
|
|
/// <param name="blockRequest">The block request to check.</param>
|
|
public async Task CheckCounts(BlockRequest blockRequest)
|
|
{
|
|
long error_block_id = -1;
|
|
long error_volume_id = -1;
|
|
var emit_evict = false;
|
|
|
|
Logging.Log.WriteExplicitMessage(LOGTAG, "CheckCounts", "Trying to acquire m_blockcount_lock for block {0}", blockRequest.BlockID);
|
|
lock (m_blockcount_lock)
|
|
{
|
|
sw_checkcounts?.Start();
|
|
|
|
var block_count = m_blockcount.TryGetValue(blockRequest.BlockID, out var c) ? c - 1 : 0;
|
|
|
|
if (block_count > 0)
|
|
{
|
|
m_blockcount[blockRequest.BlockID] = block_count;
|
|
}
|
|
else if (block_count == 0)
|
|
{
|
|
// Evict the block from the cache and check if the volume is no longer needed.
|
|
m_blockcount.Remove(blockRequest.BlockID);
|
|
m_block_cache.Remove(blockRequest.BlockID);
|
|
}
|
|
else // block_count < 0
|
|
{
|
|
error_block_id = blockRequest.BlockID;
|
|
}
|
|
|
|
var vol_count = m_volumecount.TryGetValue(blockRequest.VolumeID, out var vc) ? vc - 1 : 0;
|
|
if (vol_count > 0)
|
|
{
|
|
m_volumecount[blockRequest.VolumeID] = vol_count;
|
|
}
|
|
else if (vol_count == 0)
|
|
{
|
|
m_volumecount.Remove(blockRequest.VolumeID);
|
|
blockRequest.RequestType = BlockRequestType.CacheEvict;
|
|
emit_evict = true;
|
|
}
|
|
else // vol_count < 0
|
|
{
|
|
error_volume_id = blockRequest.VolumeID;
|
|
}
|
|
sw_checkcounts?.Stop();
|
|
}
|
|
Logging.Log.WriteExplicitMessage(LOGTAG, "CheckCounts", "Released m_blockcount_lock for block {0}", blockRequest.BlockID);
|
|
|
|
// Notify the `VolumeManager` that it should evict the volume.
|
|
if (emit_evict)
|
|
await m_volume_request.WriteAsync(blockRequest).ConfigureAwait(false);
|
|
|
|
if (error_block_id != -1)
|
|
{
|
|
Logging.Log.WriteWarningMessage(LOGTAG, "BlockCountError", null, "Block {0} has a count below 0", blockRequest.BlockID);
|
|
}
|
|
|
|
if (error_volume_id != -1)
|
|
{
|
|
Logging.Log.WriteWarningMessage(LOGTAG, "VolumeCountError", null, "Volume {0} has a count below 0", blockRequest.VolumeID);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Get a block from the cache. If the block is not in the cache,
|
|
/// it will request the block from the volume and return a `Task`
|
|
/// that will be completed when the block is available.
|
|
/// </summary>
|
|
/// <param name="block_request">The requested block.</param>
|
|
/// <returns>A `Task` holding the data block.</returns>
|
|
public async Task<DataBlock> Get(BlockRequest block_request)
|
|
{
|
|
Logging.Log.WriteExplicitMessage(LOGTAG, "BlockCacheGet", "Getting block {0} from cache", block_request.BlockID);
|
|
|
|
// Check if the block is already in the cache, and return it if it is.
|
|
if (m_block_cache.TryGetValue(block_request.BlockID, out DataBlock? value))
|
|
{
|
|
if (value is null)
|
|
throw new InvalidOperationException($"Block {block_request.BlockID} was in the cache, but the value was null");
|
|
|
|
value.Reference();
|
|
|
|
// If the block was evicted in between the TryGetValue and the Reference call,
|
|
// we need to request it again.
|
|
if (value.Data is not null)
|
|
return value;
|
|
}
|
|
|
|
// If the block is not in the cache, request it from the volume.
|
|
sw_get_wait?.Start();
|
|
var tcs = new TaskCompletionSource<DataBlock>(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
var (_, new_tcs) = m_waiters.AddOrUpdate(block_request.BlockID, (1, tcs), (key, old) => (old.Count + 1, old.Task));
|
|
if (tcs == new_tcs)
|
|
{
|
|
sw_get_wait?.Stop();
|
|
Logging.Log.WriteExplicitMessage(LOGTAG, "BlockCacheGet", "Requesting block {0} from volume {1}", block_request.BlockID, block_request.VolumeID);
|
|
|
|
// We are the first to request this block
|
|
sw_get_write?.Start();
|
|
await m_volume_request.WriteAsync(block_request).ConfigureAwait(false);
|
|
sw_get_write?.Stop();
|
|
|
|
// Add a timeout monitor
|
|
var timeout = TimeSpan.FromMilliseconds(DeadlockTimer.MaxProcessingTime * 2);
|
|
using var tcs1 = new CancellationTokenSource();
|
|
var t = await Task.WhenAny(
|
|
Task.Delay(timeout, tcs1.Token),
|
|
new_tcs.Task
|
|
).ConfigureAwait(false);
|
|
if (t != new_tcs.Task)
|
|
Logging.Log.WriteWarningMessage(LOGTAG, "BlockRequestTimeout", null, "Block request for block {0} has been in flight for over {1} milliseconds. This may be a deadlock.", block_request.BlockID, timeout.TotalMilliseconds);
|
|
}
|
|
else
|
|
{
|
|
sw_get_wait?.Stop();
|
|
Logging.Log.WriteExplicitMessage(LOGTAG, "BlockCacheGet", "Block {0} is already being requested, waiting for it to be available", block_request.BlockID);
|
|
}
|
|
|
|
return await new_tcs.Task.ConfigureAwait(false);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Set a block in the cache. If the block is already in the cache,
|
|
/// it will be replaced. If the block is not in the cache, it will
|
|
/// be added. If the block is no longer needed, it will be removed
|
|
/// from the cache. If the block is requested while it is being
|
|
/// removed, the requester will be given a `Task` that will be
|
|
/// completed when the block is available.
|
|
/// </summary>
|
|
/// <param name="blockRequest">The block request related to the value.</param>
|
|
/// <param name="value">The byte[] buffer holding the block data.</param>
|
|
public void Set(long blockID, DataBlock value)
|
|
{
|
|
Logging.Log.WriteExplicitMessage(LOGTAG, "BlockCacheSet", "Setting block {0} in cache", blockID);
|
|
|
|
sw_set_wake_get?.Start();
|
|
var waiters_exist = m_waiters.TryRemove(blockID, out var entry);
|
|
sw_set_wake_get?.Stop();
|
|
|
|
sw_set_set?.Start();
|
|
bool fullfills = waiters_exist && entry.Count >= m_blockcount[blockID];
|
|
bool added_to_cache = !fullfills && m_block_cache_max > 0;
|
|
if (added_to_cache)
|
|
{
|
|
m_block_cache.Set(blockID, value, m_entry_options);
|
|
Interlocked.Increment(ref m_block_cache_count);
|
|
}
|
|
sw_set_set?.Stop();
|
|
|
|
// Notify any waiters that the block is available.
|
|
sw_set_wake_set?.Start();
|
|
if (waiters_exist)
|
|
{
|
|
value.Reference(entry.Count);
|
|
entry.Task.SetResult(value);
|
|
}
|
|
sw_set_wake_set?.Stop();
|
|
|
|
sw_cacheevict?.Start();
|
|
if (added_to_cache)
|
|
{
|
|
if (m_block_cache_count > m_block_cache_max && Interlocked.CompareExchange(ref m_block_cache_compacting, 1, 0) == 0)
|
|
m_block_cache.Compact(m_options.RestoreCacheEvict);
|
|
}
|
|
else
|
|
// This method is responsible for the disposal
|
|
value.Dispose();
|
|
sw_cacheevict?.Stop();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Retire the dictionary. This will decrement the number of readers
|
|
/// accessing the dictionary, and if there are no more readers, it
|
|
/// will retire the volume request channel, effectively shutting
|
|
/// down the restore process network.
|
|
/// </summary>
|
|
public void Retire()
|
|
{
|
|
if (!m_retired && Interlocked.Decrement(ref m_readers) <= 0)
|
|
{
|
|
m_volume_request.Retire();
|
|
m_retired = true;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Clean up the dictionary. This will remove the database table
|
|
/// that was used to keep track of the block counts.
|
|
/// </summary>
|
|
public void Dispose()
|
|
{
|
|
// Verify that the tables are empty
|
|
var blockcount = m_blockcount.Sum(x => x.Value);
|
|
var volumecount = m_volumecount.Sum(x => x.Value);
|
|
|
|
if (blockcount != 0)
|
|
{
|
|
var blocks = m_blockcount
|
|
.Where(x => x.Value != 0)
|
|
.Take(10)
|
|
.Select(x => x.Key);
|
|
var blockids = string.Join(", ", blocks);
|
|
Logging.Log.WriteErrorMessage(LOGTAG, "BlockCountError", null, $"Block count in SleepableDictionarys block table is not zero: {blockcount}{Environment.NewLine}First 10 blocks: {blockids}");
|
|
}
|
|
|
|
if (volumecount != 0)
|
|
{
|
|
var vols = m_volumecount
|
|
.Where(x => x.Value != 0)
|
|
.Take(10)
|
|
.Select(x => x.Key);
|
|
var volids = string.Join(", ", vols);
|
|
Logging.Log.WriteErrorMessage(LOGTAG, "VolumeCountError", null, $"Volume count in SleepableDictionarys volume table is not zero: {volumecount}{Environment.NewLine}First 10 volumes: {volids}");
|
|
}
|
|
|
|
if (m_block_cache.Count > 0)
|
|
{
|
|
Logging.Log.WriteErrorMessage(LOGTAG, "BlockCacheMismatch", null, $"Internal Block cache is not empty: {m_block_cache.Count}");
|
|
Logging.Log.WriteErrorMessage(LOGTAG, "BlockCacheMismatch", null, $"First 10 block counts in cache ({m_blockcount.Count}): {string.Join(", ", m_blockcount.Take(10).Select(x => x.Value))}");
|
|
}
|
|
|
|
if (m_options.InternalProfiling)
|
|
{
|
|
Logging.Log.WriteProfilingMessage(LOGTAG, "InternalTimings", $"Sleepable dictionary - CheckCounts: {sw_checkcounts!.ElapsedMilliseconds}ms, Get wait: {sw_get_wait!.ElapsedMilliseconds}ms, Get write: {sw_get_write!.ElapsedMilliseconds}ms, Set set: {sw_set_set!.ElapsedMilliseconds}ms, Set wake get: {sw_set_wake_get!.ElapsedMilliseconds}ms, Set wake set: {sw_set_wake_set!.ElapsedMilliseconds}ms, Cache evict: {sw_cacheevict!.ElapsedMilliseconds}ms");
|
|
}
|
|
|
|
if (!m_retired)
|
|
{
|
|
Logging.Log.WriteErrorMessage(LOGTAG, "NotRetired", null, "SleepableDictionary was disposed without having retired channels.");
|
|
m_retired = true;
|
|
m_volume_request.Retire();
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Cancel all pending requests. This will set an exception on all
|
|
/// pending requests, effectively cancelling them.
|
|
/// </summary>
|
|
public void CancelAll()
|
|
{
|
|
foreach (var (_, tcs) in m_waiters.Values)
|
|
{
|
|
tcs.SetException(new RetiredException("Request waiter"));
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Run the block manager process. This will create a cache for the
|
|
/// blocks, and start two tasks: one for reading blocks from the input
|
|
/// channel (data blocks from the volumes) and storing them in the
|
|
/// cache, and one for reading block requests from the `FileProcessor`,
|
|
/// accessing the cache for the blocks, and writing the resulting
|
|
/// blocks back to the `FileProcessor`.
|
|
/// </summary>
|
|
/// <param name="channels">The named channels for the restore operation.</param>
|
|
/// <param name="db">The database holding information about how many of each block this restore requires.</param>
|
|
/// <param name="fp_requests">The channels for reading block requests from the `FileProcessor`.</param>
|
|
/// <param name="fp_responses">The channels for writing block responses back to the `FileProcessor`.</param>
|
|
/// <param name="options">The restore options.</param>
|
|
/// <param name="results">The results of the restore operation.</param>
|
|
public static Task Run(Channels channels, LocalRestoreDatabase db, IChannel<BlockRequest>[] fp_requests, IChannel<Task<DataBlock>>[] fp_responses, Options options, RestoreResults results)
|
|
{
|
|
return AutomationExtensions.RunTask(
|
|
new
|
|
{
|
|
Input = channels.DecompressedBlock.AsRead(),
|
|
Ack = channels.DecompressionAck.AsWrite(),
|
|
Output = channels.VolumeRequest.AsWrite()
|
|
},
|
|
async self =>
|
|
{
|
|
// Create a cache for the blocks,
|
|
using SleepableDictionary cache =
|
|
await SleepableDictionary.CreateAsync(db, self.Output, options, fp_requests.Length, results.TaskControl.ProgressToken)
|
|
.ConfigureAwait(false);
|
|
|
|
// The volume consumer will read blocks from the input channel (data blocks from the volumes) and store them in the cache.
|
|
var volume_consumer = Task.Run(async () =>
|
|
{
|
|
Stopwatch? sw_read = options.InternalProfiling ? new() : null;
|
|
Stopwatch? sw_set = options.InternalProfiling ? new() : null;
|
|
Stopwatch? sw_deadlock = options.InternalProfiling ? new() : null;
|
|
try
|
|
{
|
|
while (true)
|
|
{
|
|
Logging.Log.WriteExplicitMessage(LOGTAG, "VolumeConsumer", null, "Waiting for block request from volume");
|
|
sw_read?.Start();
|
|
var (block_request, data) = await self.Input.ReadAsync().ConfigureAwait(false);
|
|
sw_read?.Stop();
|
|
|
|
Logging.Log.WriteExplicitMessage(LOGTAG, "VolumeConsumer", null, "Received data for block {0} from volume {1}", block_request.BlockID, block_request.VolumeID);
|
|
sw_set?.Start();
|
|
cache.Set(block_request.BlockID, data);
|
|
sw_set?.Stop();
|
|
|
|
Logging.Log.WriteExplicitMessage(LOGTAG, "VolumeConsumer", null, "Updating deadlock timer for block {0} from volume {1}", block_request.BlockID, block_request.VolumeID);
|
|
sw_deadlock?.Start();
|
|
DeadlockTimer.MaxProcessingTime = Math.Max(DeadlockTimer.MaxProcessingTime, (int)(DateTime.Now - block_request.TimestampMilliseconds).TotalMilliseconds);
|
|
sw_deadlock?.Stop();
|
|
}
|
|
}
|
|
catch (RetiredException)
|
|
{
|
|
Logging.Log.WriteVerboseMessage(LOGTAG, "RetiredProcess", null, "BlockManager Volume consumer retired");
|
|
|
|
if (options.InternalProfiling)
|
|
{
|
|
Logging.Log.WriteProfilingMessage(LOGTAG, "InternalTimings", $"Volume consumer - Read: {sw_read!.ElapsedMilliseconds}ms, Set: {sw_set!.ElapsedMilliseconds}ms, Deadlock timer: {sw_deadlock!.ElapsedMilliseconds}ms");
|
|
}
|
|
|
|
// Cancel any remaining readers - although there shouldn't be any.
|
|
cache.CancelAll();
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Logging.Log.WriteErrorMessage(LOGTAG, "VolumeConsumerError", ex, "Error in volume consumer");
|
|
self.Input.Retire();
|
|
|
|
// Cancel any remaining readers - although there shouldn't be any.
|
|
cache.CancelAll();
|
|
cache.Retire();
|
|
}
|
|
});
|
|
|
|
// The block handlers will read block requests from the `FileProcessor`, access the cache for the blocks, and write the resulting blocks to the `FileProcessor`.
|
|
var block_handlers = fp_requests.Zip(fp_responses, (req, res) => Task.Run(async () =>
|
|
{
|
|
Stopwatch? sw_req = options.InternalProfiling ? new() : null;
|
|
Stopwatch? sw_resp = options.InternalProfiling ? new() : null;
|
|
Stopwatch? sw_cache = options.InternalProfiling ? new() : null;
|
|
Stopwatch? sw_get = options.InternalProfiling ? new() : null;
|
|
try
|
|
{
|
|
while (true)
|
|
{
|
|
sw_req?.Start();
|
|
var block_request = await req.ReadAsync().ConfigureAwait(false);
|
|
sw_req?.Stop();
|
|
Logging.Log.WriteExplicitMessage(LOGTAG, "BlockHandler", null, "Received block request: {0}", block_request.RequestType);
|
|
switch (block_request.RequestType)
|
|
{
|
|
case BlockRequestType.Download:
|
|
sw_get?.Start();
|
|
var datatask = cache.Get(block_request);
|
|
sw_get?.Stop();
|
|
Logging.Log.WriteExplicitMessage(LOGTAG, "BlockHandler", null, "Retrieved data for block {0} and volume {1}", block_request.BlockID, block_request.VolumeID);
|
|
|
|
sw_resp?.Start();
|
|
await res.WriteAsync(datatask).ConfigureAwait(false);
|
|
sw_resp?.Stop();
|
|
Logging.Log.WriteExplicitMessage(LOGTAG, "BlockHandler", null, "Passed data for block {0} and volume {1} to FileProcessor", block_request.BlockID, block_request.VolumeID);
|
|
break;
|
|
case BlockRequestType.CacheEvict:
|
|
sw_cache?.Start();
|
|
// Target file already had the block.
|
|
await cache.CheckCounts(block_request).ConfigureAwait(false);
|
|
sw_cache?.Stop();
|
|
|
|
Logging.Log.WriteExplicitMessage(LOGTAG, "BlockHandler", null, "Decremented counts for block {0} and volume {1}", block_request.BlockID, block_request.VolumeID);
|
|
break;
|
|
default:
|
|
throw new InvalidOperationException($"Unexpected block request type: {block_request.RequestType}");
|
|
}
|
|
}
|
|
}
|
|
catch (RetiredException)
|
|
{
|
|
Logging.Log.WriteVerboseMessage(LOGTAG, "RetiredProcess", null, "BlockManager Block handler retired");
|
|
|
|
if (options.InternalProfiling)
|
|
{
|
|
Logging.Log.WriteProfilingMessage(LOGTAG, "InternalTimings", $"Block handler - Req: {sw_req!.ElapsedMilliseconds}ms, Resp: {sw_resp!.ElapsedMilliseconds}ms, Cache: {sw_cache!.ElapsedMilliseconds}ms, Get: {sw_get!.ElapsedMilliseconds}ms");
|
|
}
|
|
|
|
cache.Retire();
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Logging.Log.WriteErrorMessage(LOGTAG, "BlockHandlerError", ex, "Error in block handler");
|
|
req.Retire();
|
|
res.Retire();
|
|
cache.Retire();
|
|
}
|
|
})).ToArray();
|
|
|
|
await Task.WhenAll([volume_consumer, .. block_handlers]).ConfigureAwait(false);
|
|
});
|
|
}
|
|
}
|
|
|
|
}
|