mirror of
https://github.com/duplicati/duplicati.git
synced 2025-11-28 03:20:25 +08:00
997 lines
No EOL
53 KiB
C#
997 lines
No EOL
53 KiB
C#
// Copyright (C) 2025, The Duplicati Team
|
|
// https://duplicati.com, hello@duplicati.com
|
|
//
|
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
|
// copy of this software and associated documentation files (the "Software"),
|
|
// to deal in the Software without restriction, including without limitation
|
|
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
|
// and/or sell copies of the Software, and to permit persons to whom the
|
|
// Software is furnished to do so, subject to the following conditions:
|
|
//
|
|
// The above copyright notice and this permission notice shall be included in
|
|
// all copies or substantial portions of the Software.
|
|
//
|
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
|
// DEALINGS IN THE SOFTWARE.
|
|
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using System.Diagnostics;
|
|
using System.IO;
|
|
using System.Linq;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using CoCoL;
|
|
using Duplicati.Library.Common.IO;
|
|
using Duplicati.Library.Main.Database;
|
|
using Duplicati.Library.Utility;
|
|
|
|
#nullable enable
|
|
|
|
namespace Duplicati.Library.Main.Operation.Restore
|
|
{
|
|
|
|
/// <summary>
|
|
/// Process that handles each file that needs to be restored. It starts by
|
|
/// identifying the blocks that need to be restored. Then it verifies the
|
|
/// which of the target file blocks are missing. Then it checks whether it
|
|
/// can use local blocks to restore the file. Finally, it restores the file
|
|
/// by downloading the missing blocks and writing them to the target file.
|
|
/// It also verifies the file hash and truncates the file if necessary.
|
|
/// If enabled, it also restores the metadata for the file.
|
|
/// </summary>
|
|
internal class FileProcessor
|
|
{
|
|
/// <summary>
|
|
/// The log tag for this class.
|
|
/// </summary>
|
|
private static readonly string LOGTAG = Logging.Log.LogTagFromType<FileProcessor>();
|
|
/// <summary>
|
|
/// The number of file processors that are still restoring files. It is set by the <see cref="RestoreHandler"/>.
|
|
/// </summary>
|
|
public static int file_processors_restoring_files;
|
|
public static object file_processor_continue_lock = new object();
|
|
public static TaskCompletionSource file_processor_continue = new();
|
|
/// <summary>
|
|
/// The current file processor ID. Used for debugging.
|
|
/// </summary>
|
|
private static int processor_id = -1;
|
|
|
|
/// <summary>
|
|
/// Runs the file processor process that restores the files that need to be restored.
|
|
/// </summary>
|
|
/// <param name="channels">The named channels for the restore operation.</param>
|
|
/// <param name="db">The restore database, which is queried for blocks, corresponding volumes and metadata for the files.</param>
|
|
/// <param name="block_request">The channel to request blocks from the block manager.</param>
|
|
/// <param name="block_response">The channel to receive blocks from the block manager.</param>
|
|
/// <param name="options">The restore options.</param>
|
|
/// <param name="results">The restore results.</param>
|
|
public static Task Run(Channels channels, LocalRestoreDatabase db, IChannel<BlockRequest> block_request, IChannel<Task<DataBlock>> block_response, Options options, RestoreResults results)
|
|
{
|
|
return AutomationExtensions.RunTask(
|
|
new
|
|
{
|
|
Input = channels.FilesToRestore.AsRead()
|
|
},
|
|
async self =>
|
|
{
|
|
Stopwatch? sw_file = options.InternalProfiling ? new() : null;
|
|
Stopwatch? sw_block = options.InternalProfiling ? new() : null;
|
|
Stopwatch? sw_meta = options.InternalProfiling ? new() : null;
|
|
Stopwatch? sw_req = options.InternalProfiling ? new() : null;
|
|
Stopwatch? sw_resp = options.InternalProfiling ? new() : null;
|
|
Stopwatch? sw_work = options.InternalProfiling ? new() : null;
|
|
Stopwatch? sw_work_verify_target = options.InternalProfiling ? new() : null;
|
|
Stopwatch? sw_work_retarget = options.InternalProfiling ? new() : null;
|
|
Stopwatch? sw_work_notify_local = options.InternalProfiling ? new() : null;
|
|
Stopwatch? sw_work_verify_local = options.InternalProfiling ? new() : null;
|
|
Stopwatch? sw_work_empty_file = options.InternalProfiling ? new() : null;
|
|
Stopwatch? sw_work_hash = options.InternalProfiling ? new() : null;
|
|
Stopwatch? sw_work_read = options.InternalProfiling ? new() : null;
|
|
Stopwatch? sw_work_write = options.InternalProfiling ? new() : null;
|
|
Stopwatch? sw_work_results = options.InternalProfiling ? new() : null;
|
|
Stopwatch? sw_work_meta = options.InternalProfiling ? new() : null;
|
|
|
|
// Indicates whether this FileProcessor is still restoring files
|
|
var decremented = false;
|
|
// ID for this file processor, used for debugging.
|
|
var my_id = Interlocked.Increment(ref processor_id);
|
|
|
|
try
|
|
{
|
|
using var filehasher = HashFactory.CreateHasher(options.FileHashAlgorithm);
|
|
using var blockhasher = HashFactory.CreateHasher(options.BlockHashAlgorithm);
|
|
var buffer = new byte[options.Blocksize];
|
|
|
|
while (true)
|
|
{
|
|
// Get the next file to restore.
|
|
sw_file?.Start();
|
|
var file = await self.Input.ReadAsync().ConfigureAwait(false);
|
|
sw_file?.Stop();
|
|
|
|
Logging.Log.WriteExplicitMessage(LOGTAG, "FileRestored", null, "{0} Restoring file {1}", my_id, file.TargetPath);
|
|
|
|
if (file.BlocksetID == LocalDatabase.FOLDER_BLOCKSET_ID && !options.SkipMetadata)
|
|
{
|
|
// Check if there are other FileProcessor's still restoring files
|
|
if (!decremented)
|
|
{
|
|
if (file_processors_restoring_files <= 0 && !file_processor_continue.Task.IsCompleted)
|
|
file_processor_continue.SetResult();
|
|
else
|
|
await RendezvousBeforeProcessingFolderMetadata()
|
|
.ConfigureAwait(false);
|
|
decremented = true;
|
|
}
|
|
|
|
await RestoreMetadata(db, file, block_request, block_response, options, sw_meta, sw_work_meta, sw_req, sw_resp, results.TaskControl.ProgressToken)
|
|
.ConfigureAwait(false);
|
|
|
|
continue;
|
|
}
|
|
|
|
// Get information about the blocks for the file
|
|
// TODO rather than keeping all of the blocks in memory, we could do a single pass over the blocks using a cursor, only keeping the relevant block requests in memory. Maybe even only a single block request at a time.
|
|
sw_block?.Start();
|
|
var blocks = await db
|
|
.GetBlocksFromFile(file.BlocksetID, results.TaskControl.ProgressToken)
|
|
.ToArrayAsync(results.TaskControl.ProgressToken)
|
|
.ConfigureAwait(false);
|
|
sw_block?.Stop();
|
|
|
|
sw_work_verify_target?.Start();
|
|
// Verify the target file blocks that may already exist.
|
|
long bytes_verified;
|
|
List<BlockRequest> missing_blocks, verified_blocks;
|
|
try
|
|
{
|
|
(bytes_verified, missing_blocks, verified_blocks) = await VerifyTargetBlocks(file, blocks, filehasher, blockhasher, buffer, options, results).ConfigureAwait(false);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Logging.Log.WriteErrorMessage(LOGTAG, "VerifyTargetBlocks", ex, "Error during checking the target file");
|
|
continue;
|
|
}
|
|
long bytes_written = 0;
|
|
sw_work_verify_target?.Stop();
|
|
if (blocks.Length != missing_blocks.Count + verified_blocks.Count)
|
|
{
|
|
Logging.Log.WriteErrorMessage(LOGTAG, "BlockCountMismatch", null, $"Block count mismatch for {file.TargetPath} - expected: {blocks.Length}, actual: {missing_blocks.Count + verified_blocks.Count}");
|
|
continue;
|
|
}
|
|
|
|
sw_work_retarget?.Start();
|
|
// Check if the target file needs to be retargeted
|
|
if (missing_blocks.Count > 0 && !options.Overwrite && SystemIO.IO_OS.FileExists(file.TargetPath))
|
|
{
|
|
var new_name = GenerateNewName(file, db, filehasher);
|
|
if (SystemIO.IO_OS.FileExists(new_name))
|
|
{
|
|
// The file already exists, which it only does when it matches the target hash. So we can skip the file.
|
|
Logging.Log.WriteInformationMessage(LOGTAG, "FileAlreadyExists", "File {0} already exists and matches the target hash as a copy: {1}", file.TargetPath, new_name);
|
|
missing_blocks.Clear();
|
|
}
|
|
else
|
|
{
|
|
Logging.Log.WriteVerboseMessage(LOGTAG, "RetargetingFile", "Retargeting file {0} to {1}", file.TargetPath, new_name);
|
|
var new_file = new FileRequest(file.ID, file.OriginalPath, new_name, file.Hash, file.Length, file.BlocksetID);
|
|
if (options.UseLocalBlocks)
|
|
{
|
|
if (options.Dryrun)
|
|
{
|
|
Logging.Log.WriteDryrunMessage(LOGTAG, "DryrunRestore", @$"Would have copied {verified_blocks.Count} blocks ({verified_blocks.Count * options.Blocksize} bytes) from ""{file.TargetPath}"" to ""{new_file.TargetPath}""");
|
|
}
|
|
else
|
|
{
|
|
try
|
|
{
|
|
await CopyOldTargetBlocksToNewTarget(file, new_file, buffer, verified_blocks).ConfigureAwait(false);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Logging.Log.WriteErrorMessage(LOGTAG, "CopyOldTargetToNew", ex, "Error when trying to copy {0} to {1}", file, new_file);
|
|
results.BrokenLocalFiles.Add(file.TargetPath);
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
verified_blocks.Clear();
|
|
missing_blocks = [.. blocks.Select(x => { x.RequestType = BlockRequestType.Download; return x; })];
|
|
}
|
|
file = new_file;
|
|
}
|
|
}
|
|
sw_work_retarget?.Stop();
|
|
|
|
sw_work_notify_local?.Start();
|
|
// Notify to the cache that we use local blocks.
|
|
foreach (var block in verified_blocks)
|
|
{
|
|
sw_req?.Start();
|
|
await block_request.WriteAsync(
|
|
new BlockRequest(
|
|
block.BlockID,
|
|
block.BlockOffset,
|
|
block.BlockHash,
|
|
block.BlockSize,
|
|
block.VolumeID,
|
|
BlockRequestType.CacheEvict
|
|
)
|
|
).ConfigureAwait(false);
|
|
sw_req?.Stop();
|
|
}
|
|
sw_work_notify_local?.Stop();
|
|
|
|
sw_work_verify_local?.Start();
|
|
if (missing_blocks.Count > 0 && options.UseLocalBlocks)
|
|
{
|
|
// Verify the local blocks at the original restore path that may be used to restore the file.
|
|
(bytes_written, missing_blocks) = await VerifyLocalBlocks(file, missing_blocks, blocks.Length, filehasher, blockhasher, buffer, options, results, block_request).ConfigureAwait(false);
|
|
}
|
|
sw_work_verify_local?.Stop();
|
|
|
|
bool empty_file_or_symlink = false;
|
|
if (file.BlocksetID != LocalDatabase.SYMLINK_BLOCKSET_ID && (blocks.Length == 0 || (blocks.Length == 1 && blocks[0].BlockSize == 0)))
|
|
{
|
|
sw_work_empty_file?.Start();
|
|
empty_file_or_symlink = true;
|
|
if (options.Dryrun)
|
|
{
|
|
Logging.Log.WriteDryrunMessage(LOGTAG, "DryrunRestore", @$"Would have created empty file ""{file.TargetPath}""");
|
|
}
|
|
else
|
|
{
|
|
var foldername = SystemIO.IO_OS.PathGetDirectoryName(file.TargetPath);
|
|
if (!SystemIO.IO_OS.DirectoryExists(foldername))
|
|
{
|
|
SystemIO.IO_OS.DirectoryCreate(foldername);
|
|
Logging.Log.WriteWarningMessage(LOGTAG, "CreateMissingFolder", null, @$"Creating missing folder ""{foldername}"" for file ""{file.TargetPath}""");
|
|
}
|
|
|
|
// Create an empty file, or truncate to 0
|
|
using var fs = SystemIO.IO_OS.FileOpenWrite(file.TargetPath);
|
|
fs.SetLength(0);
|
|
if (missing_blocks.Count != 0)
|
|
{
|
|
await block_request.WriteAsync(
|
|
new BlockRequest(
|
|
blocks[0].BlockID,
|
|
blocks[0].BlockOffset,
|
|
blocks[0].BlockHash,
|
|
blocks[0].BlockSize,
|
|
blocks[0].VolumeID,
|
|
BlockRequestType.CacheEvict
|
|
)
|
|
).ConfigureAwait(false);
|
|
}
|
|
}
|
|
sw_work_empty_file?.Stop();
|
|
}
|
|
else if (missing_blocks.Count > 0)
|
|
{
|
|
if (missing_blocks.Any(x => x.VolumeID < 0))
|
|
{
|
|
Logging.Log.WriteWarningMessage(LOGTAG, "NegativeVolumeID", null, $"{file.TargetPath} has a negative volume ID, skipping");
|
|
continue;
|
|
}
|
|
|
|
sw_work_hash?.Start();
|
|
filehasher.Initialize();
|
|
sw_work_hash?.Stop();
|
|
|
|
FileStream? fs = null;
|
|
try
|
|
{
|
|
// Open the target file
|
|
if (options.Dryrun)
|
|
{
|
|
// If dryrun, open the file for read only to verify the file hash.
|
|
if (File.Exists(file.TargetPath))
|
|
{
|
|
fs = SystemIO.IO_OS.FileOpenRead(file.TargetPath);
|
|
}
|
|
else
|
|
{
|
|
Logging.Log.WriteDryrunMessage(LOGTAG, "DryrunRestore", @$"Tried opening ""{file.TargetPath}"" for reading, but it doesn't exist.");
|
|
}
|
|
}
|
|
else
|
|
{
|
|
var foldername = SystemIO.IO_OS.PathGetDirectoryName(file.TargetPath);
|
|
if (!SystemIO.IO_OS.DirectoryExists(foldername))
|
|
{
|
|
SystemIO.IO_OS.DirectoryCreate(foldername);
|
|
Logging.Log.WriteWarningMessage(LOGTAG, "CreateMissingFolder", null, @$"Creating missing folder ""{foldername}"" for file ""{file.TargetPath}""");
|
|
}
|
|
fs = SystemIO.IO_OS.FileOpenReadWrite(file.TargetPath);
|
|
}
|
|
|
|
sw_req?.Start();
|
|
// Burst the block requests to speed up the restore
|
|
int burst = (int)Math.Max(options.RestoreChannelBufferSize, 1);
|
|
int j = 0;
|
|
for (int i = 0; i < (int)Math.Min(missing_blocks.Count, burst); i++)
|
|
{
|
|
await block_request.WriteAsync(
|
|
new BlockRequest(
|
|
missing_blocks[i].BlockID,
|
|
missing_blocks[i].BlockOffset,
|
|
missing_blocks[i].BlockHash,
|
|
missing_blocks[i].BlockSize,
|
|
missing_blocks[i].VolumeID,
|
|
BlockRequestType.Download
|
|
)
|
|
).ConfigureAwait(false);
|
|
}
|
|
sw_req?.Stop();
|
|
|
|
if (!options.Dryrun && options.RestorePreAllocate)
|
|
{
|
|
// Preallocate the file size to avoid fragmentation / help the operating system / filesystem.
|
|
fs!.SetLength(file.Length);
|
|
}
|
|
|
|
for (int i = 0; i < blocks.Length; i++)
|
|
{
|
|
// Each response block is not verified against the block hash, as the processes downloading the blocks should do that.
|
|
if (j < missing_blocks.Count && missing_blocks[j].BlockOffset == i)
|
|
{
|
|
// Read the block from the response and issue a new request, if more blocks are missing
|
|
sw_resp?.Start();
|
|
using var datablock = await (await block_response.ReadAsync().ConfigureAwait(false)).ConfigureAwait(false);
|
|
if (datablock.Data == null)
|
|
{
|
|
throw new Exception($"Received null data block from request {missing_blocks[i].BlockID} for file {file.TargetPath}");
|
|
}
|
|
sw_resp?.Stop();
|
|
|
|
sw_req?.Start();
|
|
if (j < missing_blocks.Count - burst)
|
|
{
|
|
await block_request.WriteAsync(
|
|
new BlockRequest(
|
|
missing_blocks[j + burst].BlockID,
|
|
missing_blocks[j + burst].BlockOffset,
|
|
missing_blocks[j + burst].BlockHash,
|
|
missing_blocks[j + burst].BlockSize,
|
|
missing_blocks[j + burst].VolumeID,
|
|
BlockRequestType.Download
|
|
)
|
|
).ConfigureAwait(false);
|
|
}
|
|
sw_req?.Stop();
|
|
|
|
// Hash the block to verify the file hash
|
|
sw_work_hash?.Start();
|
|
filehasher.TransformBlock(datablock.Data, 0, (int)blocks[i].BlockSize, datablock.Data, 0);
|
|
sw_work_hash?.Stop();
|
|
if (options.Dryrun)
|
|
{
|
|
// Simulate writing the block
|
|
fs?.Seek(fs.Position + blocks[i].BlockSize, SeekOrigin.Begin);
|
|
}
|
|
else
|
|
{
|
|
// Write the block to the file
|
|
sw_work_write?.Start();
|
|
await fs!.WriteAsync(datablock.Data.AsMemory(0, (int)blocks[i].BlockSize)).ConfigureAwait(false);
|
|
sw_work_write?.Stop();
|
|
}
|
|
|
|
// Keep track of metrics
|
|
bytes_written += blocks[i].BlockSize;
|
|
j++;
|
|
sw_req?.Start();
|
|
await block_request.WriteAsync(
|
|
new BlockRequest(
|
|
blocks[i].BlockID,
|
|
blocks[i].BlockOffset,
|
|
blocks[i].BlockHash,
|
|
blocks[i].BlockSize,
|
|
blocks[i].VolumeID,
|
|
BlockRequestType.CacheEvict
|
|
)
|
|
).ConfigureAwait(false);
|
|
sw_req?.Stop();
|
|
}
|
|
else
|
|
{
|
|
// Not a missing block, so read from the file
|
|
if (fs != null)
|
|
{
|
|
sw_work_read?.Start();
|
|
var read = await fs
|
|
.ReadAsync(buffer, 0, buffer.Length)
|
|
.ConfigureAwait(false);
|
|
sw_work_read?.Stop();
|
|
sw_work_hash?.Start();
|
|
filehasher.TransformBlock(buffer, 0, read, buffer, 0);
|
|
sw_work_hash?.Stop();
|
|
}
|
|
}
|
|
}
|
|
|
|
if (options.Dryrun)
|
|
{
|
|
Logging.Log.WriteDryrunMessage(LOGTAG, "DryrunRestore", @$"Would have restored {bytes_written} bytes of ""{file.TargetPath}""");
|
|
}
|
|
|
|
// Verify the file hash
|
|
sw_work_hash?.Start();
|
|
filehasher.TransformFinalBlock([], 0, 0);
|
|
sw_work_hash?.Stop();
|
|
|
|
sw_work?.Start();
|
|
if (filehasher.Hash != null && Convert.ToBase64String(filehasher.Hash) != file.Hash)
|
|
{
|
|
Logging.Log.WriteErrorMessage(LOGTAG, "FileHashMismatch", null, $"File hash mismatch for {file.TargetPath} - expected: {file.Hash}, actual: {Convert.ToBase64String(filehasher.Hash)}");
|
|
throw new Exception("File hash mismatch");
|
|
}
|
|
|
|
// Truncate the file if it is larger than the expected size.
|
|
if (fs?.Length > file.Length)
|
|
{
|
|
if (options.Dryrun)
|
|
{
|
|
Logging.Log.WriteDryrunMessage(LOGTAG, "DryrunRestore", @$"Would have truncated ""{file.TargetPath}"" from {fs.Length} to {file.Length}");
|
|
}
|
|
else
|
|
{
|
|
fs.SetLength(file.Length);
|
|
}
|
|
}
|
|
sw_work?.Stop();
|
|
}
|
|
catch (Exception)
|
|
{
|
|
lock (results)
|
|
{
|
|
results.BrokenLocalFiles.Add(file.TargetPath);
|
|
}
|
|
block_request.Retire();
|
|
block_response.Retire();
|
|
throw;
|
|
}
|
|
finally
|
|
{
|
|
fs?.Dispose();
|
|
}
|
|
}
|
|
|
|
if (!options.SkipMetadata)
|
|
{
|
|
empty_file_or_symlink |= await RestoreMetadata(db, file, block_request, block_response, options, sw_meta, sw_work_meta, sw_req, sw_resp, results.TaskControl.ProgressToken).ConfigureAwait(false);
|
|
sw_work_meta?.Stop();
|
|
}
|
|
|
|
// TODO legacy restore doesn't count metadata restore as a restored file.
|
|
|
|
sw_work_results?.Start();
|
|
if (empty_file_or_symlink || bytes_written > 0)
|
|
{
|
|
// Keep track of the restored files and their sizes
|
|
lock (results)
|
|
{
|
|
results.RestoredFiles++;
|
|
results.SizeOfRestoredFiles += bytes_written;
|
|
}
|
|
}
|
|
sw_work_results?.Stop();
|
|
|
|
Logging.Log.WriteVerboseMessage(LOGTAG, "RestoredFile", $"{my_id} Restored file {{0}}", file.TargetPath);
|
|
}
|
|
}
|
|
catch (RetiredException)
|
|
{
|
|
Logging.Log.WriteVerboseMessage(LOGTAG, "RetiredProcess", null, "File processor retired");
|
|
|
|
if (options.InternalProfiling)
|
|
{
|
|
Logging.Log.WriteProfilingMessage(LOGTAG, "InternalTimings", $"File: {sw_file!.ElapsedMilliseconds}ms, Block: {sw_block!.ElapsedMilliseconds}ms, Meta: {sw_meta!.ElapsedMilliseconds}ms, Req: {sw_req!.ElapsedMilliseconds}ms, Resp: {sw_resp!.ElapsedMilliseconds}ms, Work: {sw_work!.ElapsedMilliseconds}ms, VerifyTarget: {sw_work_verify_target!.ElapsedMilliseconds}ms, Retarget: {sw_work_retarget!.ElapsedMilliseconds}ms, NotifyLocal: {sw_work_notify_local!.ElapsedMilliseconds}ms, VerifyLocal: {sw_work_verify_local!.ElapsedMilliseconds}ms, EmptyFile: {sw_work_empty_file!.ElapsedMilliseconds}ms, Hash: {sw_work_hash!.ElapsedMilliseconds}ms, Read: {sw_work_read!.ElapsedMilliseconds}ms, Write: {sw_work_write!.ElapsedMilliseconds}ms, Results: {sw_work_results!.ElapsedMilliseconds}ms, Meta: {sw_work_meta?.ElapsedMilliseconds}ms");
|
|
}
|
|
return;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Logging.Log.WriteErrorMessage(LOGTAG, "FileProcessingError", ex, "Error during file processing");
|
|
throw;
|
|
}
|
|
finally
|
|
{
|
|
if (!decremented)
|
|
lock (file_processor_continue_lock)
|
|
{
|
|
file_processors_restoring_files--;
|
|
if (file_processors_restoring_files <= 0 && !file_processor_continue.Task.IsCompleted)
|
|
file_processor_continue.SetResult();
|
|
}
|
|
|
|
block_request.Retire();
|
|
block_response.Retire();
|
|
}
|
|
});
|
|
}
|
|
|
|
/// <summary>
|
|
/// Copies the blocks that were verified in the old target to the new target file.
|
|
/// </summary>
|
|
/// <param name="old_file">The old target file.</param>
|
|
/// <param name="new_file">The new target file.</param>
|
|
/// <param name="verified_blocks">The blocks in the old file that were verified.</param>
|
|
private static async Task CopyOldTargetBlocksToNewTarget(FileRequest old_file, FileRequest new_file, byte[] buffer, List<BlockRequest> verified_blocks)
|
|
{
|
|
using var fs_old = SystemIO.IO_OS.FileOpenRead(old_file.TargetPath);
|
|
using var fs_new = SystemIO.IO_OS.FileOpenWrite(new_file.TargetPath);
|
|
|
|
foreach (var block in verified_blocks)
|
|
{
|
|
fs_old.Seek(block.BlockOffset * block.BlockSize, SeekOrigin.Begin);
|
|
fs_new.Seek(block.BlockOffset * block.BlockSize, SeekOrigin.Begin);
|
|
|
|
await fs_old.ReadAsync(buffer, 0, buffer.Length).ConfigureAwait(false);
|
|
await fs_new.WriteAsync(buffer, 0, buffer.Length).ConfigureAwait(false); ;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Generates a new name for the target file. It starts by appending
|
|
/// the restore time to the file name. If that name is also already
|
|
/// taken, it appends a number to the name. It keeps incrementing the
|
|
/// number until it finds a name that is not taken. If it encounters a
|
|
/// file with the same name and the correct hash, it will retarget the
|
|
/// filename to that file, assuming that the file is the correct one.
|
|
/// </summary>
|
|
/// <param name="request">The original target file.</param>
|
|
/// <param name="database">The restore database.</param>
|
|
/// <param name="filehasher">The file hasher used to verify whether any of the </param>
|
|
/// <returns>The new filename. If the file already exists, its file hash matches the target hash.</returns>
|
|
private static string GenerateNewName(FileRequest request, LocalRestoreDatabase database, System.Security.Cryptography.HashAlgorithm filehasher)
|
|
{
|
|
var ext = SystemIO.IO_OS.PathGetExtension(request.TargetPath) ?? "";
|
|
if (!string.IsNullOrEmpty(ext) && !ext.StartsWith(".", StringComparison.Ordinal))
|
|
ext = "." + ext;
|
|
|
|
// First we try with a simple date append, assuming that there are not many conflicts there
|
|
var newname = SystemIO.IO_OS.PathChangeExtension(request.TargetPath, null) + "." + database.RestoreTime.ToLocalTime().ToString("yyyy-MM-dd", System.Globalization.CultureInfo.InvariantCulture);
|
|
var tr = newname + ext;
|
|
var c = 0;
|
|
var max_tries = 100;
|
|
while (SystemIO.IO_OS.FileExists(tr) && c < max_tries)
|
|
{
|
|
try
|
|
{
|
|
// If we have a file with the correct name,
|
|
// it is most likely the file we want
|
|
filehasher.Initialize();
|
|
|
|
string key;
|
|
using (var file = SystemIO.IO_OS.FileOpenRead(tr))
|
|
key = Convert.ToBase64String(filehasher.ComputeHash(file));
|
|
|
|
if (key == request.Hash)
|
|
{
|
|
//TODO: Also needs metadata check to make correct decision.
|
|
// We stick to the policy to restore metadata in place, if data ok. So, metadata block may be restored.
|
|
break;
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Logging.Log.WriteWarningMessage(LOGTAG, "FailedToReadRestoreTarget", ex, "Failed to read candidate restore target {0}", tr);
|
|
}
|
|
tr = newname + " (" + (c++).ToString() + ")" + ext;
|
|
}
|
|
|
|
if (c >= max_tries)
|
|
{
|
|
Logging.Log.WriteErrorMessage(LOGTAG, "TooManyConflicts", null, "Too many conflicts when trying to find a new name for the target file");
|
|
throw new Exception("Too many conflicts when trying to find a new name for the target file");
|
|
}
|
|
|
|
newname = tr;
|
|
|
|
return newname;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Rendezvous with the other FileProcessor's before processing folder metadata.
|
|
/// </summary>
|
|
/// <returns>An awaitable task that completes once all of the FileProcessor's have rendezvoused.</returns>
|
|
private static async Task RendezvousBeforeProcessingFolderMetadata()
|
|
{
|
|
var should_wait = false;
|
|
lock (file_processor_continue_lock)
|
|
{
|
|
file_processors_restoring_files--;
|
|
if (file_processors_restoring_files <= 0 && !file_processor_continue.Task.IsCompleted)
|
|
file_processor_continue.SetResult();
|
|
should_wait = file_processors_restoring_files > 0;
|
|
}
|
|
|
|
if (should_wait)
|
|
{
|
|
await file_processor_continue.Task.ConfigureAwait(false);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Restores the metadata for a file.
|
|
/// </summary>
|
|
/// <param name="cmd">The command to execute to retrieve the metadata blocks.</param>
|
|
/// <param name="file">The file to restore.</param>
|
|
/// <param name="block_request">The channel to request blocks from the block manager.</param>
|
|
/// <param name="block_response">The channel to receive blocks from the block manager.</param>
|
|
/// <param name="options">The restore options.</param>
|
|
/// <param name="sw_meta">The stopwatch for internal profiling of the metadata processing.</param>
|
|
/// <param name="sw_work">The stopwatch for internal profiling of the general processing.</param>
|
|
/// <param name="sw_req">The stopwatch for internal profiling of the block requests.</param>
|
|
/// <param name="sw_resp">The stopwatch for internal profiling of the block responses.</param>
|
|
/// <param name="cancellationToken">The cancellation token to cancel the operation.</param>
|
|
/// <returns>An awaitable `Task`, which returns `true` if the metadata was restored successfully, `false` otherwise.</returns>
|
|
private static async Task<bool> RestoreMetadata(LocalRestoreDatabase db, FileRequest file, IChannel<BlockRequest> block_request, IChannel<Task<DataBlock>> block_response, Options options, Stopwatch? sw_meta, Stopwatch? sw_work, Stopwatch? sw_req, Stopwatch? sw_resp, CancellationToken cancellationToken)
|
|
{
|
|
sw_meta?.Start();
|
|
// Since each FileProcessor should have its own connection, it's ok
|
|
// to keep the lock associated with the read transaction for a long
|
|
// time, as the other processes should still be able to read.
|
|
// Therefore, we don't need to read the entire result set into
|
|
// memory.
|
|
var blocks = db.GetMetadataBlocksFromFile(file.ID, cancellationToken);
|
|
sw_meta?.Stop();
|
|
|
|
using var ms = new MemoryStream();
|
|
|
|
await foreach (var block in blocks.ConfigureAwait(false))
|
|
{
|
|
sw_work?.Stop();
|
|
sw_req?.Start();
|
|
await block_request.WriteAsync(
|
|
new BlockRequest(
|
|
block.BlockID,
|
|
block.BlockOffset,
|
|
block.BlockHash,
|
|
block.BlockSize,
|
|
block.VolumeID,
|
|
BlockRequestType.Download
|
|
)
|
|
).ConfigureAwait(false);
|
|
sw_req?.Stop();
|
|
|
|
sw_resp?.Start();
|
|
using var datablock = await (await block_response.ReadAsync().ConfigureAwait(false)).ConfigureAwait(false);
|
|
if (datablock.Data == null)
|
|
{
|
|
throw new Exception($"Received null data block from request {block.BlockID} when restoring metadata for file {file.TargetPath}");
|
|
}
|
|
sw_resp?.Stop();
|
|
|
|
sw_work?.Start();
|
|
ms.Write(datablock.Data, 0, (int)block.BlockSize);
|
|
sw_work?.Stop();
|
|
|
|
sw_req?.Start();
|
|
await block_request.WriteAsync(
|
|
new BlockRequest(
|
|
block.BlockID,
|
|
block.BlockOffset,
|
|
block.BlockHash,
|
|
block.BlockSize,
|
|
block.VolumeID,
|
|
BlockRequestType.CacheEvict
|
|
)
|
|
).ConfigureAwait(false);
|
|
sw_req?.Stop();
|
|
|
|
sw_work?.Start();
|
|
}
|
|
ms.Seek(0, SeekOrigin.Begin);
|
|
|
|
return RestoreHandler.ApplyMetadata(file.TargetPath, ms, options.RestorePermissions, options.RestoreSymlinkMetadata, options.Dryrun);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Verifies the target blocks of a file that may already exist.
|
|
/// </summary>
|
|
/// <param name="file">The target file.</param>
|
|
/// <param name="blocks">The metadata about the blocks that make up the file.</param>
|
|
/// <param name="filehasher">A hasher for the file.</param>
|
|
/// <param name="blockhasher">A hasher for a data block.</param>
|
|
/// <param name="buffer">A buffer to read and write data blocks.</param>
|
|
/// <param name="options">The Duplicati configuration options.</param>
|
|
/// <param name="results">The restoration results.</param>
|
|
/// <returns>An awaitable `Task`, which returns a collection of data blocks that are missing.</returns>
|
|
private static async Task<(long, List<BlockRequest>, List<BlockRequest>)> VerifyTargetBlocks(FileRequest file, BlockRequest[] blocks, System.Security.Cryptography.HashAlgorithm filehasher, System.Security.Cryptography.HashAlgorithm blockhasher, byte[] buffer, Options options, RestoreResults results)
|
|
{
|
|
long bytes_read = 0;
|
|
List<BlockRequest> missing_blocks = [];
|
|
List<BlockRequest> verified_blocks = [];
|
|
|
|
// Check if the file exists
|
|
if (File.Exists(file.TargetPath))
|
|
{
|
|
filehasher.Initialize();
|
|
try
|
|
{
|
|
using var f = SystemIO.IO_OS.FileOpenRead(file.TargetPath);
|
|
for (int i = 0; i < blocks.Length; i++)
|
|
{
|
|
var read = await f.ReadAsync(buffer, 0, (int)blocks[i].BlockSize).ConfigureAwait(false);
|
|
if (read == blocks[i].BlockSize)
|
|
{
|
|
// Block is present
|
|
filehasher.TransformBlock(buffer, 0, read, buffer, 0);
|
|
var blockhash = Convert.ToBase64String(blockhasher.ComputeHash(buffer, 0, read));
|
|
if (blockhash == blocks[i].BlockHash)
|
|
{
|
|
// Block matches
|
|
bytes_read += read;
|
|
verified_blocks.Add(blocks[i]);
|
|
}
|
|
else
|
|
{
|
|
// Block mismatch
|
|
missing_blocks.Add(blocks[i]);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// Block is missing
|
|
missing_blocks.Add(blocks[i]);
|
|
if (f.Position == f.Length)
|
|
{
|
|
// No more file - the rest of the blocks are missing.
|
|
missing_blocks.AddRange(blocks.Skip(i + 1));
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
catch (Exception)
|
|
{
|
|
lock (results)
|
|
{
|
|
results.BrokenLocalFiles.Add(file.TargetPath);
|
|
}
|
|
throw;
|
|
}
|
|
|
|
// If all of the individual blocks have been verified.
|
|
if (missing_blocks.Count == 0)
|
|
{
|
|
// Verify the file hash
|
|
filehasher.TransformFinalBlock([], 0, 0);
|
|
if (filehasher.Hash != null && Convert.ToBase64String(filehasher.Hash) == file.Hash)
|
|
{
|
|
// Truncate the file if it is larger than the expected size.
|
|
FileInfo fi = new(file.TargetPath);
|
|
if (file.Length < fi.Length)
|
|
{
|
|
if (options.Dryrun)
|
|
{
|
|
Logging.Log.WriteDryrunMessage(LOGTAG, "DryrunRestore", @$"Would have truncated ""{file.TargetPath}"" from {fi.Length} to {file.Length}");
|
|
}
|
|
else
|
|
{
|
|
// Reopen file with write permission
|
|
fi.IsReadOnly = false; // The metadata handler will revert this back later.
|
|
using var f = SystemIO.IO_OS.FileOpenWrite(file.TargetPath);
|
|
f.SetLength(file.Length);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// The file doesn't exist, so all blocks are missing.
|
|
missing_blocks.AddRange(blocks);
|
|
}
|
|
|
|
return (bytes_read, missing_blocks, verified_blocks);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Verifies the local blocks at the original restore path that may be used to restore the file.
|
|
/// </summary>
|
|
/// <param name="file">The file to restore. Contains both the target and original paths.</param>
|
|
/// <param name="blocks">The collection of blocks that are currently missing.</param>
|
|
/// <param name="total_blocks">The total number of blocks for the file.</param>
|
|
/// <param name="filehasher">A hasher for the file.</param>
|
|
/// <param name="blockhasher">A hasher for the data block.</param>
|
|
/// <param name="buffer">A buffer to read and write data blocks.</param>
|
|
/// <param name="options">The Duplicati configuration options.</param>
|
|
/// <param name="results">The restoration results.</param>
|
|
/// <param name="block_request">The channel to request blocks from the block manager. Used to inform the block manager which blocks are already present.</param>
|
|
/// <returns>An awaitable `Task`, which returns a collection of data blocks that are missing.</returns>
|
|
private static async Task<(long, List<BlockRequest>)> VerifyLocalBlocks(FileRequest file, List<BlockRequest> blocks, long total_blocks, System.Security.Cryptography.HashAlgorithm filehasher, System.Security.Cryptography.HashAlgorithm blockhasher, byte[] buffer, Options options, RestoreResults results, IChannel<BlockRequest> block_request)
|
|
{
|
|
if (file.TargetPath == file.OriginalPath)
|
|
{
|
|
// The original file is the same as the target file, so no new blocks can be used.
|
|
return (0, blocks);
|
|
}
|
|
|
|
List<BlockRequest> missing_blocks = [];
|
|
List<BlockRequest> verified_blocks = [];
|
|
|
|
// Check if the file exists
|
|
if (File.Exists(file.OriginalPath))
|
|
{
|
|
filehasher.Initialize();
|
|
|
|
// Open both files, as the target file is still being read to produce the overall file hash, if all the blocks are present across both the target and original files.
|
|
using var f_original = SystemIO.IO_OS.FileOpenRead(file.OriginalPath);
|
|
using var f_target = options.Dryrun ?
|
|
(SystemIO.IO_OS.FileExists(file.TargetPath) ?
|
|
SystemIO.IO_OS.FileOpenRead(file.TargetPath) :
|
|
null) :
|
|
SystemIO.IO_OS.FileOpenReadWrite(file.TargetPath);
|
|
long bytes_read = 0;
|
|
long bytes_written = 0;
|
|
|
|
int j = 0;
|
|
for (long i = 0; i < total_blocks; i++)
|
|
{
|
|
int read = 0;
|
|
if (j < blocks.Count && blocks[j].BlockOffset == i)
|
|
{
|
|
// The current block is a missing block
|
|
try
|
|
{
|
|
f_original.Seek(i * options.Blocksize, SeekOrigin.Begin);
|
|
read = await f_original.ReadAsync(buffer, 0, (int)blocks[j].BlockSize).ConfigureAwait(false);
|
|
}
|
|
catch (Exception)
|
|
{
|
|
lock (results)
|
|
{
|
|
results.BrokenLocalFiles.Add(file.TargetPath);
|
|
}
|
|
throw;
|
|
}
|
|
|
|
if (read == blocks[j].BlockSize)
|
|
{
|
|
var blockhash = Convert.ToBase64String(blockhasher.ComputeHash(buffer, 0, read));
|
|
if (blockhash != blocks[j].BlockHash)
|
|
{
|
|
missing_blocks.Add(blocks[j]);
|
|
}
|
|
else
|
|
{
|
|
if (!options.Dryrun)
|
|
{
|
|
if (f_target != null)
|
|
{
|
|
try
|
|
{
|
|
f_target.Seek(blocks[j].BlockOffset * options.Blocksize, SeekOrigin.Begin);
|
|
await f_target
|
|
.WriteAsync(buffer, 0, read)
|
|
.ConfigureAwait(false);
|
|
}
|
|
catch (Exception)
|
|
{
|
|
lock (results)
|
|
{
|
|
results.BrokenLocalFiles.Add(file.TargetPath);
|
|
}
|
|
throw;
|
|
}
|
|
}
|
|
}
|
|
bytes_read += read;
|
|
bytes_written += read;
|
|
await block_request.WriteAsync(
|
|
new BlockRequest(
|
|
blocks[j].BlockID,
|
|
blocks[j].BlockOffset,
|
|
blocks[j].BlockHash,
|
|
blocks[j].BlockSize,
|
|
blocks[j].VolumeID,
|
|
BlockRequestType.CacheEvict
|
|
)
|
|
).ConfigureAwait(false);
|
|
verified_blocks.Add(blocks[j]);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
missing_blocks.Add(blocks[j]);
|
|
if (f_original.Position == f_original.Length)
|
|
{
|
|
missing_blocks.AddRange(blocks.Skip(j + 1));
|
|
break;
|
|
}
|
|
}
|
|
|
|
j++;
|
|
}
|
|
else
|
|
{
|
|
// The current block is not a missing block - read from the target file.
|
|
if (f_target != null)
|
|
{
|
|
try
|
|
{
|
|
f_target.Seek(i * options.Blocksize, SeekOrigin.Begin);
|
|
read = await f_target
|
|
.ReadAsync(buffer, 0, options.Blocksize)
|
|
.ConfigureAwait(false);
|
|
}
|
|
catch (Exception)
|
|
{
|
|
lock (results)
|
|
{
|
|
results.BrokenLocalFiles.Add(file.TargetPath);
|
|
}
|
|
throw;
|
|
}
|
|
}
|
|
}
|
|
filehasher.TransformBlock(buffer, 0, read, buffer, 0);
|
|
}
|
|
|
|
if (missing_blocks.Count == 0)
|
|
{
|
|
// No more blocks are missing, so check the file hash.
|
|
filehasher.TransformFinalBlock([], 0, 0);
|
|
if (filehasher.Hash != null && Convert.ToBase64String(filehasher.Hash) == file.Hash)
|
|
{
|
|
// Truncate the file if it is larger than the expected size.
|
|
if (file.Length < f_target?.Length)
|
|
{
|
|
if (options.Dryrun)
|
|
{
|
|
Logging.Log.WriteDryrunMessage(LOGTAG, "DryrunRestore", @$"Would have truncated ""{file.TargetPath}"" from {f_target.Length} to {file.Length}");
|
|
}
|
|
else
|
|
{
|
|
try
|
|
{
|
|
f_target.SetLength(file.Length);
|
|
}
|
|
catch (Exception)
|
|
{
|
|
lock (results)
|
|
{
|
|
results.BrokenLocalFiles.Add(file.TargetPath);
|
|
}
|
|
throw;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
Logging.Log.WriteErrorMessage(LOGTAG, "FileHashMismatch", null, $"File hash mismatch for {file.TargetPath} - expected: {file.Hash}, actual: {Convert.ToBase64String(filehasher.Hash ?? [0])}");
|
|
lock (results)
|
|
{
|
|
results.BrokenLocalFiles.Add(file.TargetPath);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (options.Dryrun)
|
|
{
|
|
Logging.Log.WriteDryrunMessage(LOGTAG, "DryrunRestore", @$"Would have restored {verified_blocks.Count} blocks ({bytes_written} bytes) from ""{file.OriginalPath}"" to ""{file.TargetPath}""");
|
|
}
|
|
|
|
return (bytes_written, missing_blocks);
|
|
}
|
|
else
|
|
{
|
|
// The original file is no longer present, so all blocks are missing.
|
|
return (0, blocks);
|
|
}
|
|
}
|
|
}
|
|
|
|
} |