X Tutup
using Npgsql.Util; using System; using System.IO; using System.Threading; using System.Threading.Tasks; namespace Npgsql; /// /// An interface to remotely control the seekable stream for an opened large object on a PostgreSQL server. /// Note that the OpenRead/OpenReadWrite method as well as all operations performed on this stream must be wrapped inside a database transaction. /// public sealed class NpgsqlLargeObjectStream : Stream { readonly NpgsqlLargeObjectManager _manager; readonly int _fd; long _pos; readonly bool _writeable; bool _disposed; internal NpgsqlLargeObjectStream(NpgsqlLargeObjectManager manager, int fd, bool writeable) { _manager = manager; _fd = fd; _pos = 0; _writeable = writeable; } void CheckDisposed() { if (_disposed) throw new InvalidOperationException("Object disposed"); } /// /// Since PostgreSQL 9.3, large objects larger than 2GB can be handled, up to 4TB. /// This property returns true whether the PostgreSQL version is >= 9.3. /// public bool Has64BitSupport => _manager.Connection.PostgreSqlVersion.IsGreaterOrEqual(9, 3); /// /// Reads count bytes from the large object. The only case when fewer bytes are read is when end of stream is reached. /// /// The buffer where read data should be stored. /// The offset in the buffer where the first byte should be read. /// The maximum number of bytes that should be read. /// How many bytes actually read, or 0 if end of file was already reached. public override int Read(byte[] buffer, int offset, int count) => Read(async: false, buffer, offset, count).GetAwaiter().GetResult(); /// /// Reads count bytes from the large object. The only case when fewer bytes are read is when end of stream is reached. /// /// The buffer where read data should be stored. /// The offset in the buffer where the first byte should be read. /// The maximum number of bytes that should be read. /// /// An optional token to cancel the asynchronous operation. The default value is . /// /// How many bytes actually read, or 0 if end of file was already reached. public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => Read(async: true, buffer, offset, count, cancellationToken); async Task Read(bool async, byte[] buffer, int offset, int count, CancellationToken cancellationToken = default) { if (buffer == null) throw new ArgumentNullException(nameof(buffer)); if (offset < 0) throw new ArgumentOutOfRangeException(nameof(offset)); if (count < 0) throw new ArgumentOutOfRangeException(nameof(count)); if (buffer.Length - offset < count) throw new ArgumentException("Invalid offset or count for this buffer"); CheckDisposed(); var chunkCount = Math.Min(count, _manager.MaxTransferBlockSize); var read = 0; while (read < count) { var bytesRead = await _manager.ExecuteFunctionGetBytes( async, "loread", buffer, offset + read, count - read, cancellationToken, _fd, chunkCount).ConfigureAwait(false); _pos += bytesRead; read += bytesRead; if (bytesRead < chunkCount) { return read; } } return read; } /// /// Writes count bytes to the large object. /// /// The buffer to write data from. /// The offset in the buffer at which to begin copying bytes. /// The number of bytes to write. public override void Write(byte[] buffer, int offset, int count) => Write(async: false, buffer, offset, count).GetAwaiter().GetResult(); /// /// Writes count bytes to the large object. /// /// The buffer to write data from. /// The offset in the buffer at which to begin copying bytes. /// The number of bytes to write. /// /// An optional token to cancel the asynchronous operation. The default value is . /// public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => Write(async: true, buffer, offset, count, cancellationToken); async Task Write(bool async, byte[] buffer, int offset, int count, CancellationToken cancellationToken = default) { if (buffer == null) throw new ArgumentNullException(nameof(buffer)); if (offset < 0) throw new ArgumentOutOfRangeException(nameof(offset)); if (count < 0) throw new ArgumentOutOfRangeException(nameof(count)); if (buffer.Length - offset < count) throw new ArgumentException("Invalid offset or count for this buffer"); CheckDisposed(); if (!_writeable) throw new NotSupportedException("Write cannot be called on a stream opened with no write permissions"); var totalWritten = 0; while (totalWritten < count) { var chunkSize = Math.Min(count - totalWritten, _manager.MaxTransferBlockSize); var bytesWritten = await _manager.ExecuteFunction(async, "lowrite", cancellationToken, _fd, new ArraySegment(buffer, offset + totalWritten, chunkSize)).ConfigureAwait(false); totalWritten += bytesWritten; if (bytesWritten != chunkSize) throw new InvalidOperationException($"Internal Npgsql bug, please report"); _pos += bytesWritten; } } /// /// CanTimeout always returns false. /// public override bool CanTimeout => false; /// /// CanRead always returns true, unless the stream has been closed. /// public override bool CanRead => !_disposed; /// /// CanWrite returns true if the stream was opened with write permissions, and the stream has not been closed. /// public override bool CanWrite => _writeable && !_disposed; /// /// CanSeek always returns true, unless the stream has been closed. /// public override bool CanSeek => !_disposed; /// /// Returns the current position in the stream. Getting the current position does not need a round-trip to the server, however setting the current position does. /// public override long Position { get { CheckDisposed(); return _pos; } set => Seek(value, SeekOrigin.Begin); } /// /// Gets the length of the large object. This internally seeks to the end of the stream to retrieve the length, and then back again. /// public override long Length => GetLength(false).GetAwaiter().GetResult(); /// /// Gets the length of the large object. This internally seeks to the end of the stream to retrieve the length, and then back again. /// /// /// An optional token to cancel the asynchronous operation. The default value is . /// public Task GetLengthAsync(CancellationToken cancellationToken = default) => GetLength(async: true); async Task GetLength(bool async) { CheckDisposed(); var old = _pos; var retval = await Seek(async, 0, SeekOrigin.End).ConfigureAwait(false); if (retval != old) await Seek(async, old, SeekOrigin.Begin).ConfigureAwait(false); return retval; } /// /// Seeks in the stream to the specified position. This requires a round-trip to the backend. /// /// A byte offset relative to the origin parameter. /// A value of type SeekOrigin indicating the reference point used to obtain the new position. /// public override long Seek(long offset, SeekOrigin origin) => Seek(async: false, offset, origin).GetAwaiter().GetResult(); /// /// Seeks in the stream to the specified position. This requires a round-trip to the backend. /// /// A byte offset relative to the origin parameter. /// A value of type SeekOrigin indicating the reference point used to obtain the new position. /// /// An optional token to cancel the asynchronous operation. The default value is . /// public Task SeekAsync(long offset, SeekOrigin origin, CancellationToken cancellationToken = default) => Seek(async: true, offset, origin, cancellationToken); async Task Seek(bool async, long offset, SeekOrigin origin, CancellationToken cancellationToken = default) { if (origin < SeekOrigin.Begin || origin > SeekOrigin.End) throw new ArgumentException("Invalid origin"); if (!Has64BitSupport && offset != (int)offset) throw new ArgumentOutOfRangeException(nameof(offset), "offset must fit in 32 bits for PostgreSQL versions older than 9.3"); CheckDisposed(); return _manager.Has64BitSupport ? _pos = await _manager.ExecuteFunction(async, "lo_lseek64", cancellationToken, _fd, offset, (int)origin).ConfigureAwait(false) : _pos = await _manager.ExecuteFunction(async, "lo_lseek", cancellationToken, _fd, (int)offset, (int)origin).ConfigureAwait(false); } /// /// Does nothing. /// public override void Flush() {} /// /// Truncates or enlarges the large object to the given size. If enlarging, the large object is extended with null bytes. /// For PostgreSQL versions earlier than 9.3, the value must fit in an Int32. /// /// Number of bytes to either truncate or enlarge the large object. public override void SetLength(long value) => SetLength(async: false, value).GetAwaiter().GetResult(); /// /// Truncates or enlarges the large object to the given size. If enlarging, the large object is extended with null bytes. /// For PostgreSQL versions earlier than 9.3, the value must fit in an Int32. /// /// Number of bytes to either truncate or enlarge the large object. /// /// An optional token to cancel the asynchronous operation. The default value is . /// public Task SetLength(long value, CancellationToken cancellationToken) => SetLength(async: true, value, cancellationToken); async Task SetLength(bool async, long value, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); if (value < 0) throw new ArgumentOutOfRangeException(nameof(value)); if (!Has64BitSupport && value != (int)value) throw new ArgumentOutOfRangeException(nameof(value), "offset must fit in 32 bits for PostgreSQL versions older than 9.3"); CheckDisposed(); if (!_writeable) throw new NotSupportedException("SetLength cannot be called on a stream opened with no write permissions"); if (_manager.Has64BitSupport) await _manager.ExecuteFunction(async, "lo_truncate64", cancellationToken, _fd, value).ConfigureAwait(false); else await _manager.ExecuteFunction(async, "lo_truncate", cancellationToken, _fd, (int)value).ConfigureAwait(false); } /// /// Releases resources at the backend allocated for this stream. /// public override void Close() { if (!_disposed) { _manager.ExecuteFunction(async: false, "lo_close", CancellationToken.None, _fd).GetAwaiter().GetResult(); _disposed = true; } } /// /// Releases resources at the backend allocated for this stream, iff disposing is true. /// /// Whether to release resources allocated at the backend. protected override void Dispose(bool disposing) { if (disposing) { Close(); } } }
X Tutup