X Tutup
using System; using System.Data; using System.Threading; using System.Threading.Tasks; namespace Npgsql { /// /// Large object manager. This class can be used to store very large files in a PostgreSQL database. /// public class NpgsqlLargeObjectManager { const int INV_WRITE = 0x00020000; const int INV_READ = 0x00040000; internal readonly NpgsqlConnection _connection; /// /// The largest chunk size (in bytes) read and write operations will read/write each roundtrip to the network. Default 4 MB. /// public int MaxTransferBlockSize { get; set; } /// /// Creates an NpgsqlLargeObjectManager for this connection. The connection must be opened to perform remote operations. /// /// public NpgsqlLargeObjectManager(NpgsqlConnection connection) { _connection = connection; MaxTransferBlockSize = 4 * 1024 * 1024; // 4MB } /// /// Execute a function /// internal async Task ExecuteFunction(string function, bool async, params object[] arguments) { using (var command = new NpgsqlCommand(function, _connection)) { command.CommandType = CommandType.StoredProcedure; command.CommandText = function; foreach (var argument in arguments) command.Parameters.Add(new NpgsqlParameter { Value = argument }); return (T)(async ? await command.ExecuteScalarAsync() : command.ExecuteScalar()); } } /// /// Execute a function that returns a byte array /// /// internal async Task ExecuteFunctionGetBytes(string function, byte[] buffer, int offset, int len, bool async, params object[] arguments) { using (var command = new NpgsqlCommand(function, _connection)) { command.CommandType = CommandType.StoredProcedure; foreach (var argument in arguments) command.Parameters.Add(new NpgsqlParameter { Value = argument }); using (var reader = async ? await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess) : command.ExecuteReader(CommandBehavior.SequentialAccess)) { if (async) await reader.ReadAsync(); else reader.Read(); return (int)reader.GetBytes(0, 0, buffer, offset, len); } } } /// /// Create an empty large object in the database. If an oid is specified but is already in use, an PostgresException will be thrown. /// /// A preferred oid, or specify 0 if one should be automatically assigned /// The oid for the large object created /// If an oid is already in use public uint Create(uint preferredOid = 0) => Create(preferredOid, false).GetAwaiter().GetResult(); // Review unused parameters /// /// Create an empty large object in the database. If an oid is specified but is already in use, an PostgresException will be thrown. /// /// A preferred oid, or specify 0 if one should be automatically assigned /// Cancellation token. /// The oid for the large object created /// If an oid is already in use #pragma warning disable CA1801 public Task CreateAsync(uint preferredOid, CancellationToken cancellationToken) #pragma warning restore CA1801 // Review unused parameters => Create(preferredOid, true); Task Create(uint preferredOid, bool async) => ExecuteFunction("lo_create", async, (int)preferredOid); /// /// Opens a large object on the backend, returning a stream controlling this remote object. /// A transaction snapshot is taken by the backend when the object is opened with only read permissions. /// When reading from this object, the contents reflects the time when the snapshot was taken. /// Note that this method, as well as operations on the stream must be wrapped inside a transaction. /// /// Oid of the object /// An NpgsqlLargeObjectStream public NpgsqlLargeObjectStream OpenRead(uint oid) => OpenRead(oid, false).GetAwaiter().GetResult(); /// /// Opens a large object on the backend, returning a stream controlling this remote object. /// A transaction snapshot is taken by the backend when the object is opened with only read permissions. /// When reading from this object, the contents reflects the time when the snapshot was taken. /// Note that this method, as well as operations on the stream must be wrapped inside a transaction. /// /// Oid of the object /// Cancellation token. /// An NpgsqlLargeObjectStream public Task OpenReadAsync(uint oid, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); using (NoSynchronizationContextScope.Enter()) return OpenRead(oid, true); } async Task OpenRead(uint oid, bool async) { var fd = await ExecuteFunction("lo_open", async, (int)oid, INV_READ); return new NpgsqlLargeObjectStream(this, fd, false); } /// /// Opens a large object on the backend, returning a stream controlling this remote object. /// Note that this method, as well as operations on the stream must be wrapped inside a transaction. /// /// Oid of the object /// An NpgsqlLargeObjectStream public NpgsqlLargeObjectStream OpenReadWrite(uint oid) => OpenReadWrite(oid, false).GetAwaiter().GetResult(); /// /// Opens a large object on the backend, returning a stream controlling this remote object. /// Note that this method, as well as operations on the stream must be wrapped inside a transaction. /// /// Oid of the object /// Cancellation token. /// An NpgsqlLargeObjectStream public Task OpenReadWriteAsync(uint oid, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); using (NoSynchronizationContextScope.Enter()) return OpenReadWrite(oid, true); } async Task OpenReadWrite(uint oid, bool async) { var fd = await ExecuteFunction("lo_open", async, (int)oid, INV_READ | INV_WRITE); return new NpgsqlLargeObjectStream(this, fd, true); } /// /// Deletes a large object on the backend. /// /// Oid of the object to delete public void Unlink(uint oid) => ExecuteFunction("lo_unlink", false, (int)oid).GetAwaiter().GetResult(); /// /// Deletes a large object on the backend. /// /// Oid of the object to delete /// Cancellation token. public Task UnlinkAsync(uint oid, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); using (NoSynchronizationContextScope.Enter()) return ExecuteFunction("lo_unlink", true, (int)oid); } /// /// Exports a large object stored in the database to a file on the backend. This requires superuser permissions. /// /// Oid of the object to export /// Path to write the file on the backend public void ExportRemote(uint oid, string path) => ExecuteFunction("lo_export", false, (int)oid, path).GetAwaiter().GetResult(); /// /// Exports a large object stored in the database to a file on the backend. This requires superuser permissions. /// /// Oid of the object to export /// Path to write the file on the backend /// Cancellation token. public Task ExportRemoteAsync(uint oid, string path, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); using (NoSynchronizationContextScope.Enter()) return ExecuteFunction("lo_export", true, (int)oid, path); } /// /// Imports a large object to be stored as a large object in the database from a file stored on the backend. This requires superuser permissions. /// /// Path to read the file on the backend /// A preferred oid, or specify 0 if one should be automatically assigned public void ImportRemote(string path, uint oid = 0) => ExecuteFunction("lo_import", false, path, (int)oid).GetAwaiter().GetResult(); /// /// Imports a large object to be stored as a large object in the database from a file stored on the backend. This requires superuser permissions. /// /// Path to read the file on the backend /// A preferred oid, or specify 0 if one should be automatically assigned /// Cancellation token. public Task ImportRemoteAsync(string path, uint oid, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); using (NoSynchronizationContextScope.Enter()) return ExecuteFunction("lo_import", true, path, (int)oid); } /// /// Since PostgreSQL 9.3, large objects larger than 2GB can be handled, up to 4TB. /// This property returns true whether the PostgreSQL version is >= 9.3. /// public bool Has64BitSupport => _connection.PostgreSqlVersion >= new Version(9, 3); /* internal enum Function : uint { lo_open = 952, lo_close = 953, loread = 954, lowrite = 955, lo_lseek = 956, lo_lseek64 = 3170, // Since PostgreSQL 9.3 lo_creat = 957, lo_create = 715, lo_tell = 958, lo_tell64 = 3171, // Since PostgreSQL 9.3 lo_truncate = 1004, lo_truncate64 = 3172, // Since PostgreSQL 9.3 // These four are available since PostgreSQL 9.4 lo_from_bytea = 3457, lo_get = 3458, lo_get_fragment = 3459, lo_put = 3460, lo_unlink = 964, lo_import = 764, lo_import_with_oid = 767, lo_export = 765, } */ } }
X Tutup