using System;
using System.Data;
using System.Threading;
using System.Threading.Tasks;
namespace Npgsql
{
///
/// Large object manager. This class can be used to store very large files in a PostgreSQL database.
///
public class NpgsqlLargeObjectManager
{
const int InvWrite = 0x00020000;
const int InvRead = 0x00040000;
internal NpgsqlConnection Connection { get; }
///
/// The largest chunk size (in bytes) read and write operations will read/write each roundtrip to the network. Default 4 MB.
///
public int MaxTransferBlockSize { get; set; }
///
/// Creates an NpgsqlLargeObjectManager for this connection. The connection must be opened to perform remote operations.
///
///
public NpgsqlLargeObjectManager(NpgsqlConnection connection)
{
Connection = connection;
MaxTransferBlockSize = 4 * 1024 * 1024; // 4MB
}
///
/// Execute a function
///
internal async Task ExecuteFunction(string function, bool async, params object[] arguments)
{
using var command = new NpgsqlCommand(function, Connection)
{
CommandType = CommandType.StoredProcedure,
CommandText = function
};
foreach (var argument in arguments)
command.Parameters.Add(new NpgsqlParameter { Value = argument });
return (T)(async ? await command.ExecuteScalarAsync() : command.ExecuteScalar());
}
///
/// Execute a function that returns a byte array
///
///
internal async Task ExecuteFunctionGetBytes(string function, byte[] buffer, int offset, int len, bool async, params object[] arguments)
{
using var command = new NpgsqlCommand(function, Connection)
{
CommandType = CommandType.StoredProcedure
};
foreach (var argument in arguments)
command.Parameters.Add(new NpgsqlParameter { Value = argument });
using var reader = async
? await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess)
: command.ExecuteReader(CommandBehavior.SequentialAccess);
if (async)
await reader.ReadAsync();
else
reader.Read();
return (int)reader.GetBytes(0, 0, buffer, offset, len);
}
///
/// Create an empty large object in the database. If an oid is specified but is already in use, an PostgresException will be thrown.
///
/// A preferred oid, or specify 0 if one should be automatically assigned
/// The oid for the large object created
/// If an oid is already in use
public uint Create(uint preferredOid = 0) => Create(preferredOid, false).GetAwaiter().GetResult();
// Review unused parameters
///
/// Create an empty large object in the database. If an oid is specified but is already in use, an PostgresException will be thrown.
///
/// A preferred oid, or specify 0 if one should be automatically assigned
/// The token to monitor for cancellation requests. The default value is .
/// The oid for the large object created
/// If an oid is already in use
public Task CreateAsync(uint preferredOid, CancellationToken cancellationToken = default)
=> cancellationToken.IsCancellationRequested
? Task.FromCanceled(cancellationToken)
: Create(preferredOid, true);
Task Create(uint preferredOid, bool async)
=> ExecuteFunction("lo_create", async, (int)preferredOid);
///
/// Opens a large object on the backend, returning a stream controlling this remote object.
/// A transaction snapshot is taken by the backend when the object is opened with only read permissions.
/// When reading from this object, the contents reflects the time when the snapshot was taken.
/// Note that this method, as well as operations on the stream must be wrapped inside a transaction.
///
/// Oid of the object
/// An NpgsqlLargeObjectStream
public NpgsqlLargeObjectStream OpenRead(uint oid)
=> OpenRead(oid, false).GetAwaiter().GetResult();
///
/// Opens a large object on the backend, returning a stream controlling this remote object.
/// A transaction snapshot is taken by the backend when the object is opened with only read permissions.
/// When reading from this object, the contents reflects the time when the snapshot was taken.
/// Note that this method, as well as operations on the stream must be wrapped inside a transaction.
///
/// Oid of the object
/// The token to monitor for cancellation requests. The default value is .
/// An NpgsqlLargeObjectStream
public Task OpenReadAsync(uint oid, CancellationToken cancellationToken = default)
{
if (cancellationToken.IsCancellationRequested)
return Task.FromCanceled(cancellationToken);
using (NoSynchronizationContextScope.Enter())
return OpenRead(oid, true);
}
async Task OpenRead(uint oid, bool async)
{
var fd = await ExecuteFunction("lo_open", async, (int)oid, InvRead);
return new NpgsqlLargeObjectStream(this, fd, false);
}
///
/// Opens a large object on the backend, returning a stream controlling this remote object.
/// Note that this method, as well as operations on the stream must be wrapped inside a transaction.
///
/// Oid of the object
/// An NpgsqlLargeObjectStream
public NpgsqlLargeObjectStream OpenReadWrite(uint oid)
=> OpenReadWrite(oid, false).GetAwaiter().GetResult();
///
/// Opens a large object on the backend, returning a stream controlling this remote object.
/// Note that this method, as well as operations on the stream must be wrapped inside a transaction.
///
/// Oid of the object
/// The token to monitor for cancellation requests. The default value is .
/// An NpgsqlLargeObjectStream
public Task OpenReadWriteAsync(uint oid, CancellationToken cancellationToken = default)
{
if (cancellationToken.IsCancellationRequested)
return Task.FromCanceled(cancellationToken);
using (NoSynchronizationContextScope.Enter())
return OpenReadWrite(oid, true);
}
async Task OpenReadWrite(uint oid, bool async)
{
var fd = await ExecuteFunction("lo_open", async, (int)oid, InvRead | InvWrite);
return new NpgsqlLargeObjectStream(this, fd, true);
}
///
/// Deletes a large object on the backend.
///
/// Oid of the object to delete
public void Unlink(uint oid)
=> ExecuteFunction