#region License
// The PostgreSQL License
//
// Copyright (C) 2017 The Npgsql Development Team
//
// Permission to use, copy, modify, and distribute this software and its
// documentation for any purpose, without fee, and without a written
// agreement is hereby granted, provided that the above copyright notice
// and this paragraph and the following two paragraphs appear in all copies.
//
// IN NO EVENT SHALL THE NPGSQL DEVELOPMENT TEAM BE LIABLE TO ANY PARTY
// FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES,
// INCLUDING LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
// DOCUMENTATION, EVEN IF THE NPGSQL DEVELOPMENT TEAM HAS BEEN ADVISED OF
// THE POSSIBILITY OF SUCH DAMAGE.
//
// THE NPGSQL DEVELOPMENT TEAM SPECIFICALLY DISCLAIMS ANY WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
// ON AN "AS IS" BASIS, AND THE NPGSQL DEVELOPMENT TEAM HAS NO OBLIGATIONS
// TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
#endregion
using System;
using System.Data;
using System.Threading;
using System.Threading.Tasks;
namespace Npgsql
{
///
/// Large object manager. This class can be used to store very large files in a PostgreSQL database.
///
public class NpgsqlLargeObjectManager
{
const int INV_WRITE = 0x00020000;
const int INV_READ = 0x00040000;
internal readonly NpgsqlConnection _connection;
///
/// The largest chunk size (in bytes) read and write operations will read/write each roundtrip to the network. Default 4 MB.
///
public int MaxTransferBlockSize { get; set; }
///
/// Creates an NpgsqlLargeObjectManager for this connection. The connection must be opened to perform remote operations.
///
///
public NpgsqlLargeObjectManager(NpgsqlConnection connection)
{
_connection = connection;
MaxTransferBlockSize = 4 * 1024 * 1024; // 4MB
}
///
/// Execute a function
///
internal async Task ExecuteFunction(string function, bool async, params object[] arguments)
{
using (var command = new NpgsqlCommand(function, _connection))
{
command.CommandType = CommandType.StoredProcedure;
command.CommandText = function;
foreach (var argument in arguments)
command.Parameters.Add(new NpgsqlParameter { Value = argument });
return (T)(async ? await command.ExecuteScalarAsync() : command.ExecuteScalar());
}
}
///
/// Execute a function that returns a byte array
///
///
internal async Task ExecuteFunctionGetBytes(string function, byte[] buffer, int offset, int len, bool async, params object[] arguments)
{
using (var command = new NpgsqlCommand(function, _connection))
{
command.CommandType = CommandType.StoredProcedure;
foreach (var argument in arguments)
command.Parameters.Add(new NpgsqlParameter { Value = argument });
using (var reader = async ? await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess) : command.ExecuteReader(CommandBehavior.SequentialAccess))
{
if (async)
await reader.ReadAsync();
else
reader.Read();
return (int)reader.GetBytes(0, 0, buffer, offset, len);
}
}
}
///
/// Create an empty large object in the database. If an oid is specified but is already in use, an PostgresException will be thrown.
///
/// A preferred oid, or specify 0 if one should be automatically assigned
/// The oid for the large object created
/// If an oid is already in use
public uint Create(uint preferredOid = 0) => Create(preferredOid, false).GetAwaiter().GetResult();
///
/// Create an empty large object in the database. If an oid is specified but is already in use, an PostgresException will be thrown.
///
/// A preferred oid, or specify 0 if one should be automatically assigned
/// Cancellation token.
/// The oid for the large object created
/// If an oid is already in use
public Task CreateAsync(uint preferredOid, CancellationToken cancellationToken)
=> Create(preferredOid, true);
Task Create(uint preferredOid, bool async)
=> ExecuteFunction("lo_create", async, (int)preferredOid);
///
/// Opens a large object on the backend, returning a stream controlling this remote object.
/// A transaction snapshot is taken by the backend when the object is opened with only read permissions.
/// When reading from this object, the contents reflects the time when the snapshot was taken.
/// Note that this method, as well as operations on the stream must be wrapped inside a transaction.
///
/// Oid of the object
/// An NpgsqlLargeObjectStream
public NpgsqlLargeObjectStream OpenRead(uint oid)
=> OpenRead(oid, false).GetAwaiter().GetResult();
///
/// Opens a large object on the backend, returning a stream controlling this remote object.
/// A transaction snapshot is taken by the backend when the object is opened with only read permissions.
/// When reading from this object, the contents reflects the time when the snapshot was taken.
/// Note that this method, as well as operations on the stream must be wrapped inside a transaction.
///
/// Oid of the object
/// Cancellation token.
/// An NpgsqlLargeObjectStream
public Task OpenReadAsync(uint oid, CancellationToken cancellationToken)
=> SynchronizationContextSwitcher.NoContext(async () =>
{
cancellationToken.ThrowIfCancellationRequested();
return await OpenRead(oid, true);
});
async Task OpenRead(uint oid, bool async)
{
var fd = await ExecuteFunction("lo_open", async, (int)oid, INV_READ);
return new NpgsqlLargeObjectStream(this, oid, fd, false);
}
///
/// Opens a large object on the backend, returning a stream controlling this remote object.
/// Note that this method, as well as operations on the stream must be wrapped inside a transaction.
///
/// Oid of the object
/// An NpgsqlLargeObjectStream
public NpgsqlLargeObjectStream OpenReadWrite(uint oid)
=> OpenReadWrite(oid, false).GetAwaiter().GetResult();
///
/// Opens a large object on the backend, returning a stream controlling this remote object.
/// Note that this method, as well as operations on the stream must be wrapped inside a transaction.
///
/// Oid of the object
/// Cancellation token.
/// An NpgsqlLargeObjectStream
public Task OpenReadWriteAsync(uint oid, CancellationToken cancellationToken)
=> SynchronizationContextSwitcher.NoContext(async () =>
{
cancellationToken.ThrowIfCancellationRequested();
return await OpenReadWrite(oid, true);
});
async Task OpenReadWrite(uint oid, bool async)
{
var fd = await ExecuteFunction("lo_open", async, (int)oid, INV_READ | INV_WRITE);
return new NpgsqlLargeObjectStream(this, oid, fd, true);
}
///
/// Deletes a large object on the backend.
///
/// Oid of the object to delete
public void Unlink(uint oid)
=> ExecuteFunction