X Tutup
using System; using System.IO; using System.Threading; using System.Threading.Tasks; namespace Npgsql { /// /// An interface to remotely control the seekable stream for an opened large object on a PostgreSQL server. /// Note that the OpenRead/OpenReadWrite method as well as all operations performed on this stream must be wrapped inside a database transaction. /// public sealed class NpgsqlLargeObjectStream : Stream { readonly NpgsqlLargeObjectManager _manager; readonly int _fd; long _pos; readonly bool _writeable; bool _disposed; internal NpgsqlLargeObjectStream(NpgsqlLargeObjectManager manager, int fd, bool writeable) { _manager = manager; _fd = fd; _pos = 0; _writeable = writeable; } void CheckDisposed() { if (_disposed) throw new InvalidOperationException("Object disposed"); } /// /// Since PostgreSQL 9.3, large objects larger than 2GB can be handled, up to 4TB. /// This property returns true whether the PostgreSQL version is >= 9.3. /// public bool Has64BitSupport => _manager.Connection.PostgreSqlVersion >= new Version(9, 3); /// /// Reads count bytes from the large object. The only case when fewer bytes are read is when end of stream is reached. /// /// The buffer where read data should be stored. /// The offset in the buffer where the first byte should be read. /// The maximum number of bytes that should be read. /// How many bytes actually read, or 0 if end of file was already reached. public override int Read(byte[] buffer, int offset, int count) => Read(buffer, offset, count, false).GetAwaiter().GetResult(); /// /// Reads count bytes from the large object. The only case when fewer bytes are read is when end of stream is reached. /// /// The buffer where read data should be stored. /// The offset in the buffer where the first byte should be read. /// The maximum number of bytes that should be read. /// The token to monitor for cancellation requests. The default value is . /// How many bytes actually read, or 0 if end of file was already reached. public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { if (cancellationToken.IsCancellationRequested) return Task.FromCanceled(cancellationToken); using (NoSynchronizationContextScope.Enter()) return Read(buffer, offset, count, true); } async Task Read(byte[] buffer, int offset, int count, bool async) { if (buffer == null) throw new ArgumentNullException(nameof(buffer)); if (offset < 0) throw new ArgumentOutOfRangeException(nameof(offset)); if (count < 0) throw new ArgumentOutOfRangeException(nameof(count)); if (buffer.Length - offset < count) throw new ArgumentException("Invalid offset or count for this buffer"); CheckDisposed(); var chunkCount = Math.Min(count, _manager.MaxTransferBlockSize); var read = 0; while (read < count) { var bytesRead = await _manager.ExecuteFunctionGetBytes("loread", buffer, offset + read, count - read, async, _fd, chunkCount); _pos += bytesRead; read += bytesRead; if (bytesRead < chunkCount) { return read; } } return read; } /// /// Writes count bytes to the large object. /// /// The buffer to write data from. /// The offset in the buffer at which to begin copying bytes. /// The number of bytes to write. public override void Write(byte[] buffer, int offset, int count) => Write(buffer, offset, count, false).GetAwaiter().GetResult(); /// /// Writes count bytes to the large object. /// /// The buffer to write data from. /// The offset in the buffer at which to begin copying bytes. /// The number of bytes to write. /// The token to monitor for cancellation requests. The default value is . public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { if (cancellationToken.IsCancellationRequested) return Task.FromCanceled(cancellationToken); using (NoSynchronizationContextScope.Enter()) return Write(buffer, offset, count, true); } async Task Write(byte[] buffer, int offset, int count, bool async) { if (buffer == null) throw new ArgumentNullException(nameof(buffer)); if (offset < 0) throw new ArgumentOutOfRangeException(nameof(offset)); if (count < 0) throw new ArgumentOutOfRangeException(nameof(count)); if (buffer.Length - offset < count) throw new ArgumentException("Invalid offset or count for this buffer"); CheckDisposed(); if (!_writeable) throw new NotSupportedException("Write cannot be called on a stream opened with no write permissions"); var totalWritten = 0; while (totalWritten < count) { var chunkSize = Math.Min(count - totalWritten, _manager.MaxTransferBlockSize); var bytesWritten = await _manager.ExecuteFunction("lowrite", async, _fd, new ArraySegment(buffer, offset + totalWritten, chunkSize)); totalWritten += bytesWritten; if (bytesWritten != chunkSize) throw new InvalidOperationException($"Internal Npgsql bug, please report"); _pos += bytesWritten; } } /// /// CanTimeout always returns false. /// public override bool CanTimeout => false; /// /// CanRead always returns true, unless the stream has been closed. /// public override bool CanRead => !_disposed; /// /// CanWrite returns true if the stream was opened with write permissions, and the stream has not been closed. /// public override bool CanWrite => _writeable && !_disposed; /// /// CanSeek always returns true, unless the stream has been closed. /// public override bool CanSeek => !_disposed; /// /// Returns the current position in the stream. Getting the current position does not need a round-trip to the server, however setting the current position does. /// public override long Position { get { CheckDisposed(); return _pos; } set => Seek(value, SeekOrigin.Begin); } /// /// Gets the length of the large object. This internally seeks to the end of the stream to retrieve the length, and then back again. /// public override long Length => GetLength(false).GetAwaiter().GetResult(); /// /// Gets the length of the large object. This internally seeks to the end of the stream to retrieve the length, and then back again. /// /// The token to monitor for cancellation requests. The default value is . public Task GetLengthAsync(CancellationToken cancellationToken = default) { using (NoSynchronizationContextScope.Enter()) return GetLength(true); } async Task GetLength(bool async) { CheckDisposed(); var old = _pos; var retval = await Seek(0, SeekOrigin.End, async); if (retval != old) await Seek(old, SeekOrigin.Begin, async); return retval; } /// /// Seeks in the stream to the specified position. This requires a round-trip to the backend. /// /// A byte offset relative to the origin parameter. /// A value of type SeekOrigin indicating the reference point used to obtain the new position. /// public override long Seek(long offset, SeekOrigin origin) => Seek(offset, origin, false).GetAwaiter().GetResult(); /// /// Seeks in the stream to the specified position. This requires a round-trip to the backend. /// /// A byte offset relative to the origin parameter. /// A value of type SeekOrigin indicating the reference point used to obtain the new position. /// The token to monitor for cancellation requests. The default value is . public Task SeekAsync(long offset, SeekOrigin origin, CancellationToken cancellationToken = default) { if (cancellationToken.IsCancellationRequested) return Task.FromCanceled(cancellationToken); using (NoSynchronizationContextScope.Enter()) return Seek(offset, origin, true); } async Task Seek(long offset, SeekOrigin origin, bool async) { if (origin < SeekOrigin.Begin || origin > SeekOrigin.End) throw new ArgumentException("Invalid origin"); if (!Has64BitSupport && offset != (int)offset) throw new ArgumentOutOfRangeException(nameof(offset), "offset must fit in 32 bits for PostgreSQL versions older than 9.3"); CheckDisposed(); return _manager.Has64BitSupport ? _pos = await _manager.ExecuteFunction("lo_lseek64", async, _fd, offset, (int)origin) : _pos = await _manager.ExecuteFunction("lo_lseek", async, _fd, (int)offset, (int)origin); } /// /// Does nothing. /// public override void Flush() {} /// /// Truncates or enlarges the large object to the given size. If enlarging, the large object is extended with null bytes. /// For PostgreSQL versions earlier than 9.3, the value must fit in an Int32. /// /// Number of bytes to either truncate or enlarge the large object. public override void SetLength(long value) => SetLength(value, false).GetAwaiter().GetResult(); /// /// Truncates or enlarges the large object to the given size. If enlarging, the large object is extended with null bytes. /// For PostgreSQL versions earlier than 9.3, the value must fit in an Int32. /// /// Number of bytes to either truncate or enlarge the large object. /// Cancellation token. public Task SetLength(long value, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); using (NoSynchronizationContextScope.Enter()) return SetLength(value, true); } async Task SetLength(long value, bool async) { if (value < 0) throw new ArgumentOutOfRangeException(nameof(value)); if (!Has64BitSupport && value != (int)value) throw new ArgumentOutOfRangeException(nameof(value), "offset must fit in 32 bits for PostgreSQL versions older than 9.3"); CheckDisposed(); if (!_writeable) throw new NotSupportedException("SetLength cannot be called on a stream opened with no write permissions"); if (_manager.Has64BitSupport) await _manager.ExecuteFunction("lo_truncate64", async, _fd, value); else await _manager.ExecuteFunction("lo_truncate", async, _fd, (int)value); } /// /// Releases resources at the backend allocated for this stream. /// public override void Close() { if (!_disposed) { _manager.ExecuteFunction("lo_close", false, _fd).GetAwaiter().GetResult(); _disposed = true; } } /// /// Releases resources at the backend allocated for this stream, iff disposing is true. /// /// Whether to release resources allocated at the backend. protected override void Dispose(bool disposing) { if (disposing) { Close(); } } } }
X Tutup