using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace Npgsql
{
///
/// An interface to remotely control the seekable stream for an opened large object on a PostgreSQL server.
/// Note that the OpenRead/OpenReadWrite method as well as all operations performed on this stream must be wrapped inside a database transaction.
///
public sealed class NpgsqlLargeObjectStream : Stream
{
readonly NpgsqlLargeObjectManager _manager;
readonly int _fd;
long _pos;
readonly bool _writeable;
bool _disposed;
internal NpgsqlLargeObjectStream(NpgsqlLargeObjectManager manager, int fd, bool writeable)
{
_manager = manager;
_fd = fd;
_pos = 0;
_writeable = writeable;
}
void CheckDisposed()
{
if (_disposed)
throw new InvalidOperationException("Object disposed");
}
///
/// Since PostgreSQL 9.3, large objects larger than 2GB can be handled, up to 4TB.
/// This property returns true whether the PostgreSQL version is >= 9.3.
///
public bool Has64BitSupport => _manager._connection.PostgreSqlVersion >= new Version(9, 3);
///
/// Reads count bytes from the large object. The only case when fewer bytes are read is when end of stream is reached.
///
/// The buffer where read data should be stored.
/// The offset in the buffer where the first byte should be read.
/// The maximum number of bytes that should be read.
/// How many bytes actually read, or 0 if end of file was already reached.
public override int Read(byte[] buffer, int offset, int count)
=> Read(buffer, offset, count, false).GetAwaiter().GetResult();
///
/// Reads count bytes from the large object. The only case when fewer bytes are read is when end of stream is reached.
///
/// The buffer where read data should be stored.
/// The offset in the buffer where the first byte should be read.
/// The maximum number of bytes that should be read.
/// The token to monitor for cancellation requests. The default value is .
/// How many bytes actually read, or 0 if end of file was already reached.
public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default)
{
if (cancellationToken.IsCancellationRequested)
return Task.FromCanceled(cancellationToken);
using (NoSynchronizationContextScope.Enter())
return Read(buffer, offset, count, true);
}
async Task Read(byte[] buffer, int offset, int count, bool async)
{
if (buffer == null)
throw new ArgumentNullException(nameof(buffer));
if (offset < 0)
throw new ArgumentOutOfRangeException(nameof(offset));
if (count < 0)
throw new ArgumentOutOfRangeException(nameof(count));
if (buffer.Length - offset < count)
throw new ArgumentException("Invalid offset or count for this buffer");
CheckDisposed();
var chunkCount = Math.Min(count, _manager.MaxTransferBlockSize);
var read = 0;
while (read < count)
{
var bytesRead = await _manager.ExecuteFunctionGetBytes("loread", buffer, offset + read, count - read, async, _fd, chunkCount);
_pos += bytesRead;
read += bytesRead;
if (bytesRead < chunkCount)
{
return read;
}
}
return read;
}
///
/// Writes count bytes to the large object.
///
/// The buffer to write data from.
/// The offset in the buffer at which to begin copying bytes.
/// The number of bytes to write.
public override void Write(byte[] buffer, int offset, int count)
=> Write(buffer, offset, count, false).GetAwaiter().GetResult();
///
/// Writes count bytes to the large object.
///
/// The buffer to write data from.
/// The offset in the buffer at which to begin copying bytes.
/// The number of bytes to write.
/// The token to monitor for cancellation requests. The default value is .
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default)
{
if (cancellationToken.IsCancellationRequested)
return Task.FromCanceled(cancellationToken);
using (NoSynchronizationContextScope.Enter())
return Write(buffer, offset, count, true);
}
async Task Write(byte[] buffer, int offset, int count, bool async)
{
if (buffer == null)
throw new ArgumentNullException(nameof(buffer));
if (offset < 0)
throw new ArgumentOutOfRangeException(nameof(offset));
if (count < 0)
throw new ArgumentOutOfRangeException(nameof(count));
if (buffer.Length - offset < count)
throw new ArgumentException("Invalid offset or count for this buffer");
CheckDisposed();
if (!_writeable)
throw new NotSupportedException("Write cannot be called on a stream opened with no write permissions");
var totalWritten = 0;
while (totalWritten < count)
{
var chunkSize = Math.Min(count - totalWritten, _manager.MaxTransferBlockSize);
var bytesWritten = await _manager.ExecuteFunction("lowrite", async, _fd, new ArraySegment(buffer, offset + totalWritten, chunkSize));
totalWritten += bytesWritten;
if (bytesWritten != chunkSize)
throw new InvalidOperationException($"Internal Npgsql bug, please report");
_pos += bytesWritten;
}
}
///
/// CanTimeout always returns false.
///
public override bool CanTimeout => false;
///
/// CanRead always returns true, unless the stream has been closed.
///
public override bool CanRead => true && !_disposed;
///
/// CanWrite returns true if the stream was opened with write permissions, and the stream has not been closed.
///
public override bool CanWrite => _writeable && !_disposed;
///
/// CanSeek always returns true, unless the stream has been closed.
///
public override bool CanSeek => true && !_disposed;
///
/// Returns the current position in the stream. Getting the current position does not need a round-trip to the server, however setting the current position does.
///
public override long Position
{
get
{
CheckDisposed();
return _pos;
}
set => Seek(value, SeekOrigin.Begin);
}
///
/// Gets the length of the large object. This internally seeks to the end of the stream to retrieve the length, and then back again.
///
public override long Length => GetLength(false).GetAwaiter().GetResult();
///
/// Gets the length of the large object. This internally seeks to the end of the stream to retrieve the length, and then back again.
///
/// The token to monitor for cancellation requests. The default value is .
public Task GetLengthAsync(CancellationToken cancellationToken = default)
{
using (NoSynchronizationContextScope.Enter())
return GetLength(true);
}
async Task GetLength(bool async)
{
CheckDisposed();
var old = _pos;
var retval = await Seek(0, SeekOrigin.End, async);
if (retval != old)
await Seek(old, SeekOrigin.Begin, async);
return retval;
}
///
/// Seeks in the stream to the specified position. This requires a round-trip to the backend.
///
/// A byte offset relative to the origin parameter.
/// A value of type SeekOrigin indicating the reference point used to obtain the new position.
///
public override long Seek(long offset, SeekOrigin origin)
=> Seek(offset, origin, false).GetAwaiter().GetResult();
///
/// Seeks in the stream to the specified position. This requires a round-trip to the backend.
///
/// A byte offset relative to the origin parameter.
/// A value of type SeekOrigin indicating the reference point used to obtain the new position.
/// The token to monitor for cancellation requests. The default value is .
public Task SeekAsync(long offset, SeekOrigin origin, CancellationToken cancellationToken = default)
{
if (cancellationToken.IsCancellationRequested)
return Task.FromCanceled(cancellationToken);
using (NoSynchronizationContextScope.Enter())
return Seek(offset, origin, true);
}
async Task Seek(long offset, SeekOrigin origin, bool async)
{
if (origin < SeekOrigin.Begin || origin > SeekOrigin.End)
throw new ArgumentException("Invalid origin");
if (!Has64BitSupport && offset != (long)(int)offset)
throw new ArgumentOutOfRangeException(nameof(offset), "offset must fit in 32 bits for PostgreSQL versions older than 9.3");
CheckDisposed();
if (_manager.Has64BitSupport)
return _pos = await _manager.ExecuteFunction("lo_lseek64", async, _fd, offset, (int)origin);
else
return _pos = await _manager.ExecuteFunction("lo_lseek", async, _fd, (int)offset, (int)origin);
}
///
/// Does nothing.
///
public override void Flush() {}
///
/// Truncates or enlarges the large object to the given size. If enlarging, the large object is extended with null bytes.
/// For PostgreSQL versions earlier than 9.3, the value must fit in an Int32.
///
/// Number of bytes to either truncate or enlarge the large object.
public override void SetLength(long value)
=> SetLength(value, false).GetAwaiter().GetResult();
///
/// Truncates or enlarges the large object to the given size. If enlarging, the large object is extended with null bytes.
/// For PostgreSQL versions earlier than 9.3, the value must fit in an Int32.
///
/// Number of bytes to either truncate or enlarge the large object.
/// Cancellation token.
public Task SetLength(long value, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
using (NoSynchronizationContextScope.Enter())
return SetLength(value, true);
}
async Task SetLength(long value, bool async)
{
if (value < 0)
throw new ArgumentOutOfRangeException(nameof(value));
if (!Has64BitSupport && value != (long)(int)value)
throw new ArgumentOutOfRangeException(nameof(value), "offset must fit in 32 bits for PostgreSQL versions older than 9.3");
CheckDisposed();
if (!_writeable)
throw new NotSupportedException("SetLength cannot be called on a stream opened with no write permissions");
if (_manager.Has64BitSupport)
await _manager.ExecuteFunction("lo_truncate64", async, _fd, value);
else
await _manager.ExecuteFunction("lo_truncate", async, _fd, (int)value);
}
///
/// Releases resources at the backend allocated for this stream.
///
public override void Close()
{
if (!_disposed)
{
_manager.ExecuteFunction("lo_close", false, _fd).GetAwaiter().GetResult();
_disposed = true;
}
}
///
/// Releases resources at the backend allocated for this stream, iff disposing is true.
///
/// Whether to release resources allocated at the backend.
protected override void Dispose(bool disposing)
{
if (disposing)
{
Close();
}
}
}
}