mirror of
https://github.com/duplicati/duplicati.git
synced 2025-11-28 11:30:24 +08:00
372 lines
17 KiB
C#
372 lines
17 KiB
C#
using System;
|
|
using System.Collections.Generic;
|
|
using System.IO;
|
|
using System.Linq;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using Duplicati.Library.Interface;
|
|
using Duplicati.Library.Utility;
|
|
|
|
namespace RemoteSynchronization
|
|
{
|
|
/// <summary>
|
|
/// A lightweight backend manager that handles remote synchronization operations.
|
|
/// This class is designed to manage backend operations such as Get, Put, Delete, Rename, and List asynchronously.
|
|
/// It supports retrying operations with a specified delay and can automatically create folders if they do not exist.
|
|
/// The class implements IDisposable to ensure proper resource management.
|
|
/// It uses a streaming backend for efficient file operations and handles exceptions gracefully to allow for retries and recovery.
|
|
/// A retry incurs a new instantiation of the backend.
|
|
/// It is designed to mimic the behavior of Duplicati.Library.Main.Backend.BackendManager.
|
|
/// </summary>
|
|
/// <param name="backendUrl">The backend URL string.</param>
|
|
/// <param name="options">A dictionary of options to pass to the backend.</param>
|
|
/// <param name="maxRetries">The maximum number of retries for failed operations.</param>
|
|
/// <param name="retryDelay">The delay between retries, in milliseconds.</param>
|
|
/// <param name="autoCreateFolders">Whether to automatically create folders if they do not exist.</param>
|
|
/// <param name="retryWithExponentialBackoff">Whether to use exponential backoff for retries.</param>
|
|
public class LightWeightBackendManager(string backendUrl, Dictionary<string, string> options, int maxRetries = 3, int retryDelay = 1000, bool autoCreateFolders = false, bool retryWithExponentialBackoff = false) : IDisposable
|
|
{
|
|
private static readonly string LOGTAG = Duplicati.Library.Logging.Log.LogTagFromType<Program>();
|
|
|
|
public IBackend? _backend = null;
|
|
|
|
private bool _anyDownloaded = false;
|
|
private bool _anyUploaded = false;
|
|
private readonly string _backendUrl = backendUrl;
|
|
//private int _instantiations = 0;
|
|
private readonly int _maxRetries = maxRetries;
|
|
private readonly Dictionary<string, string> _options = options;
|
|
private int _retryDelay = retryDelay;
|
|
private int _currentRetryDelay = retryDelay;
|
|
private IStreamingBackend? _streamingBackend = null;
|
|
|
|
/// <summary>
|
|
/// Deletes a file from the remote backend.
|
|
/// </summary>
|
|
/// <param name="remotename">The name of the remote file to delete.</param>
|
|
/// <param name="token">A cancellation token to cancel the operation.</param>
|
|
/// <returns>A task representing the asynchronous delete operation.</returns>
|
|
public Task DeleteAsync(string remotename, CancellationToken token)
|
|
{
|
|
return RetryWithDelay(
|
|
$"Delete {remotename}",
|
|
() => _streamingBackend!.DeleteAsync(remotename, token),
|
|
null,
|
|
false,
|
|
token
|
|
);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Gets the display name of the backend.
|
|
/// This property initializes the backend if it has not been instantiated yet.
|
|
/// </summary>
|
|
/// <returns>The display name of the backend.</returns>
|
|
public string DisplayName
|
|
{
|
|
get
|
|
{
|
|
Instantiate();
|
|
|
|
return _streamingBackend!.DisplayName;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Disposes of the backend and streaming backend resources.
|
|
/// </summary>
|
|
public void Dispose()
|
|
{
|
|
try
|
|
{
|
|
_streamingBackend?.Dispose();
|
|
_streamingBackend = null;
|
|
_backend?.Dispose();
|
|
_backend = null;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Duplicati.Library.Logging.Log.WriteErrorMessage(LOGTAG, "rsync", ex, "Error during Dispose", null);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Gets a file from the remote backend and writes it to the specified stream.
|
|
/// This method retries the operation with a delay if it fails.
|
|
/// </summary>
|
|
/// <param name="remotename">The name of the remote file to get.</param>
|
|
/// <param name="stream">The stream to write the file to.</param>
|
|
/// <param name="token">A cancellation token to cancel the operation.</param>
|
|
/// <returns>A task representing the asynchronous get operation.</returns>
|
|
public Task GetAsync(string remotename, Stream stream, CancellationToken token)
|
|
{
|
|
return RetryWithDelay(
|
|
$"Get {remotename}",
|
|
async () =>
|
|
{
|
|
await _streamingBackend!.GetAsync(remotename, stream, token).ConfigureAwait(false);
|
|
_anyDownloaded = true;
|
|
},
|
|
stream,
|
|
true,
|
|
token
|
|
);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Instantiates the backend if it has not been instantiated yet.
|
|
/// If the backend is already instantiated, it simply returns.
|
|
/// If the maximum number of retries has been reached, it throws an InvalidOperationException.
|
|
/// This method is called internally to ensure that the backend is ready for operations.
|
|
/// </summary>
|
|
/// <exception cref="InvalidOperationException">If the maximum number of instantiations has been reached.</exception>
|
|
private void Instantiate()
|
|
{
|
|
if (_backend != null)
|
|
{
|
|
// If we already have a backend, we can just return.
|
|
return;
|
|
}
|
|
|
|
_backend = Duplicati.Library.DynamicLoader.BackendLoader.GetBackend(_backendUrl, _options);
|
|
_streamingBackend = _backend as IStreamingBackend;
|
|
if (_streamingBackend == null || !_streamingBackend.SupportsStreaming)
|
|
{
|
|
_backend.Dispose();
|
|
_backend = null;
|
|
throw new InvalidOperationException("Backend does not support streaming operations.");
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Lists the files in the remote backend asynchronously.
|
|
/// </summary>
|
|
/// <param name="token">A cancellation token to cancel the operation.</param>
|
|
/// <returns>A list of the file entries on the remote backend.</returns>
|
|
public async Task<List<IFileEntry>> ListAsync(CancellationToken token)
|
|
{
|
|
// TODO It would be more graceful if this method returned an
|
|
// IAsyncEnumerable instead, capturing failures along the way,
|
|
// Followed by retrying / resuming the listing from where it
|
|
// crashed. Current "workaround" is to build the entire list before
|
|
// returning it.
|
|
List<IFileEntry> entries = [];
|
|
await RetryWithDelay("List", async () =>
|
|
{
|
|
entries = await _streamingBackend!.ListAsync(token).ToListAsync().ConfigureAwait(false);
|
|
},
|
|
null,
|
|
false,
|
|
token)
|
|
.ConfigureAwait(false);
|
|
|
|
return entries;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Puts a file to the remote backend from the specified stream.
|
|
/// </summary>
|
|
/// <param name="remotename">The name of the remote file to put.</param>
|
|
/// <param name="stream">The stream containing the file data to put.</param>
|
|
/// <param name="token">A cancellation token to cancel the operation.</param>
|
|
/// <returns>A task representing the asynchronous put operation.</returns>
|
|
public Task PutAsync(string remotename, Stream stream, CancellationToken token)
|
|
{
|
|
return RetryWithDelay(
|
|
$"Put {remotename}",
|
|
async () =>
|
|
{
|
|
await _streamingBackend!.PutAsync(remotename, stream, token).ConfigureAwait(false);
|
|
_anyUploaded = true;
|
|
},
|
|
stream,
|
|
false,
|
|
token
|
|
);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Renames a file in the remote backend.
|
|
/// If the backend supports renaming, it uses the RenameAsync method.
|
|
/// If the backend does not support renaming, it downloads the file, renames it, and deletes the old one.
|
|
/// </summary>
|
|
/// <param name="oldname">The current name of the remote file.</param>
|
|
/// <param name="newname">The new name for the remote file.</param>
|
|
/// <param name="token">A cancellation token to cancel the operation.</param>
|
|
/// <returns>A task representing the asynchronous rename operation.</returns>
|
|
public Task RenameAsync(string oldname, string newname, CancellationToken token)
|
|
{
|
|
Instantiate();
|
|
|
|
return _backend switch
|
|
{
|
|
IStreamingBackend sb =>
|
|
RetryWithDelay(
|
|
$"Rename {oldname} to {newname}",
|
|
async () =>
|
|
{
|
|
// Download the file, rename it, and delete the old one
|
|
using var downloaded = new MemoryStream();
|
|
await sb.GetAsync(oldname, downloaded, token).ConfigureAwait(false);
|
|
downloaded.Seek(0, SeekOrigin.Begin);
|
|
await sb.PutAsync(newname, downloaded, token).ConfigureAwait(false);
|
|
await sb.DeleteAsync(oldname, token).ConfigureAwait(false);
|
|
_anyUploaded = true;
|
|
_anyDownloaded = true;
|
|
},
|
|
null,
|
|
false,
|
|
token
|
|
),
|
|
IRenameEnabledBackend ireb =>
|
|
RetryWithDelay(
|
|
$"Rename {oldname} to {newname}",
|
|
async () =>
|
|
{
|
|
await ireb.RenameAsync(oldname, newname, token).ConfigureAwait(false);
|
|
_anyUploaded = true;
|
|
_anyDownloaded = true;
|
|
},
|
|
null,
|
|
false,
|
|
token
|
|
),
|
|
_ => throw new InvalidOperationException("Backend does not support renaming."),
|
|
};
|
|
}
|
|
|
|
/// <summary>
|
|
/// Retries an operation with a delay if it fails.
|
|
/// This method will instantiate the backend if it has not been instantiated yet.
|
|
/// If the operation fails, it will log the error, dispose of the current backend and streaming backend,
|
|
/// reset the stream if specified, and attempt to recover from the exception.
|
|
/// If recovery is not possible, it will wait for the specified retry delay before retrying the operation.
|
|
/// The retry delay can be increased exponentially if specified.
|
|
/// </summary>
|
|
/// <param name="operationName">The name of the operation being retried, used for logging.</param>
|
|
/// <param name="action">The action to perform.</param>
|
|
/// <param name="stream">The stream to use for the operation. Used when resetting the stream during recovery.</param>
|
|
/// <param name="resetStream">Whether to reset the stream if the operation fails.</param>
|
|
/// <param name="token">A cancellation token to cancel the operation.</param>
|
|
/// <returns>A task representing the asynchronous operation.</returns>
|
|
private async Task RetryWithDelay(string operationName, Func<Task> action, Stream? stream, bool resetStream, CancellationToken token)
|
|
{
|
|
int instantiations = 0;
|
|
_currentRetryDelay = _retryDelay; // Reset the current retry delay to the initial value
|
|
|
|
do
|
|
{
|
|
// This will throw an exception if we've reached the max
|
|
// number of retries.
|
|
Instantiate();
|
|
instantiations++;
|
|
|
|
try
|
|
{
|
|
await action().ConfigureAwait(false);
|
|
return; // Exit the loop if the action succeeds.
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Duplicati.Library.Logging.Log.WriteErrorMessage(LOGTAG, "rsync", ex, "Error during operation: {0}", operationName);
|
|
Dispose(); // Dispose current backend and streaming backend.
|
|
|
|
// Reset the stream, as it's in a potentially faulty state.
|
|
if (stream != null && stream.CanSeek)
|
|
{
|
|
stream.Seek(0, SeekOrigin.Begin);
|
|
if (resetStream)
|
|
stream.SetLength(0);
|
|
}
|
|
|
|
// Try to see if we can recover from the error.
|
|
await TryRecoverFromException(ex, token).ConfigureAwait(false);
|
|
}
|
|
} while (instantiations < _maxRetries);
|
|
|
|
// If we reach here, it means all retries failed.
|
|
throw new InvalidOperationException($"Operation '{operationName}' failed after {instantiations} attempts.");
|
|
}
|
|
|
|
/// <summary>
|
|
/// Attempts to create a folder in the remote backend.
|
|
/// </summary>
|
|
/// <param name="token">A cancellation token to cancel the operation.</param>
|
|
/// <returns>A task that represents the asynchronous operation. The task result is true if the folder was created successfully, false otherwise.</returns>
|
|
private async Task<bool> TryCreateFolder(CancellationToken token)
|
|
{
|
|
bool created = false;
|
|
await RetryWithDelay("CreateFolder", async () =>
|
|
{
|
|
try
|
|
{
|
|
await _backend!.CreateFolderAsync(token).ConfigureAwait(false);
|
|
created = true; // Folder creation succeeded
|
|
}
|
|
catch
|
|
{
|
|
created = false; // Folder creation failed
|
|
}
|
|
},
|
|
null, false, token);
|
|
|
|
return created;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Attempts to recover from an exception that occurred during a backend operation.
|
|
/// This method checks for specific types of exceptions, such as DNS resolution failures or folder missing exceptions.
|
|
/// If a DNS failure is detected, it attempts to refresh the DNS name by re-instantiating the backend and resolving DNS names.
|
|
/// If the exception is a folder missing exception and auto-creation of folders is enabled, it attempts to create the folder.
|
|
/// If recovery is not possible, it waits for the specified retry delay.
|
|
/// The retry delay will be doubled if exponential backoff is enabled.
|
|
/// </summary>
|
|
/// <param name="ex">The exception that occurred during the operation.</param>
|
|
/// <param name="token">A cancellation token to cancel the operation.</param>
|
|
/// <returns>A task representing the asynchronous recovery operation.</returns>
|
|
private async Task TryRecoverFromException(Exception ex, CancellationToken token)
|
|
{
|
|
// Copied from Duplicati.Library.Main.Backend.BackendManager.Handler.
|
|
|
|
// Refresh DNS name if we fail to connect in order to prevent issues with incorrect DNS entries.
|
|
var dnsFailure = ExceptionExtensions.FlattenException(ex)
|
|
.Any(x =>
|
|
(x is System.Net.WebException wex && wex.Status == System.Net.WebExceptionStatus.NameResolutionFailure)
|
|
||
|
|
(x is System.Net.Sockets.SocketException sockEx && sockEx.SocketErrorCode == System.Net.Sockets.SocketError.HostNotFound)
|
|
);
|
|
|
|
if (dnsFailure)
|
|
{
|
|
try
|
|
{
|
|
Instantiate();
|
|
|
|
foreach (var name in await _backend!.GetDNSNamesAsync(token).ConfigureAwait(false) ?? [])
|
|
if (!string.IsNullOrWhiteSpace(name))
|
|
System.Net.Dns.GetHostEntry(name);
|
|
}
|
|
catch { }
|
|
}
|
|
|
|
var recovered = false;
|
|
|
|
// Check if this was a folder missing exception and we are allowed to autocreate folders
|
|
if (!(_anyDownloaded || _anyUploaded) && autoCreateFolders && ExceptionExtensions.FlattenException(ex).Any(x => x is FolderMissingException))
|
|
{
|
|
if (await TryCreateFolder(token).ConfigureAwait(false))
|
|
recovered = true;
|
|
}
|
|
|
|
// Finally, if we did not recover, wait the specified delay before retrying.
|
|
if (!recovered && _retryDelay > 0)
|
|
{
|
|
await Task.Delay(_currentRetryDelay, token).ConfigureAwait(false);
|
|
|
|
if (retryWithExponentialBackoff)
|
|
_currentRetryDelay <<= 1; // Double the delay for exponential backoff
|
|
}
|
|
}
|
|
|
|
}
|
|
}
|