X Tutup
using Npgsql.FrontendMessages; using System; using System.Collections.Generic; using System.Diagnostics.Contracts; using System.IO; using System.Linq; using System.Text; using System.Threading.Tasks; using AsyncRewriter; namespace Npgsql { /// /// An interface to remotely control the seekable stream for an opened large object on a PostgreSQL server. /// Note that the OpenRead/OpenReadWrite method as well as all operations performed on this stream must be wrapped inside a database transaction. /// public partial class NpgsqlLargeObjectStream : Stream { NpgsqlLargeObjectManager _manager; uint _oid; int _fd; long _pos; bool _writeable; bool _disposed; private NpgsqlLargeObjectStream() { } internal NpgsqlLargeObjectStream(NpgsqlLargeObjectManager manager, uint oid, int fd, bool writeable) { _manager = manager; _oid = oid; _fd = fd; _pos = 0; _writeable = writeable; } void CheckDisposed() { if (_disposed) throw new InvalidOperationException("Object disposed"); } /// /// Since PostgreSQL 9.3, large objects larger than 2GB can be handled, up to 4TB. /// This property returns true whether the PostgreSQL version is >= 9.3. /// public bool Has64BitSupport { get { return _manager._connection.PostgreSqlVersion >= new Version(9, 3); } } /// /// Reads count bytes from the large object. The only case when fewer bytes are read is when end of stream is reached. /// /// The buffer where read data should be stored. /// The offset in the buffer where the first byte should be read. /// The maximum number of bytes that should be read. /// How many bytes actually read, or 0 if end of file was already reached. [RewriteAsync] public override int Read(byte[] buffer, int offset, int count) { if (buffer == null) throw new ArgumentNullException("buffer"); if (offset < 0) throw new ArgumentOutOfRangeException("offset"); if (count < 0) throw new ArgumentOutOfRangeException("count"); if (buffer.Length - offset < count) throw new ArgumentException("Invalid offset or count for this buffer"); Contract.EndContractBlock(); CheckDisposed(); int chunkCount = Math.Min(count, _manager.MaxTransferBlockSize); int read = 0; while (read < count) { var bytesRead = _manager.ExecuteFunctionGetBytes("loread", buffer, offset + read, count - read, _fd, chunkCount); _pos += bytesRead; read += bytesRead; if (bytesRead < chunkCount) { return read; } } return read; } /// /// Writes count bytes to the large object. /// /// The buffer to write data from. /// The offset in the buffer at which to begin copying bytes. /// The number of bytes to write. [RewriteAsync] public override void Write(byte[] buffer, int offset, int count) { if (buffer == null) throw new ArgumentNullException("buffer"); if (offset < 0) throw new ArgumentOutOfRangeException("offset"); if (count < 0) throw new ArgumentOutOfRangeException("count"); if (buffer.Length - offset < count) throw new ArgumentException("Invalid offset or count for this buffer"); Contract.EndContractBlock(); CheckDisposed(); if (!_writeable) throw new NotSupportedException("Write cannot be called on a stream opened with no write permissions"); int totalWritten = 0; while (totalWritten < count) { var chunkSize = Math.Min(count - totalWritten, _manager.MaxTransferBlockSize); var bytesWritten = _manager.ExecuteFunction("lowrite", _fd, new ArraySegment(buffer, offset + totalWritten, chunkSize)); totalWritten += bytesWritten; if (bytesWritten != chunkSize) throw PGUtil.ThrowIfReached(); _pos += bytesWritten; } } /// /// CanTimeout always returns false. /// public override bool CanTimeout { get { return false; // TODO? } } /// /// CanRead always returns true, unless the stream has been closed. /// public override bool CanRead { get { return true && !_disposed; } } /// /// CanWrite returns true if the stream was opened with write permissions, and the stream has not been closed. /// public override bool CanWrite { get { return _writeable && !_disposed; } } /// /// CanSeek always returns true, unless the stream has been closed. /// public override bool CanSeek { get { return true && !_disposed; } } /// /// Returns the current position in the stream. Getting the current position does not need a round-trip to the server, however setting the current position does. /// public override long Position { get { CheckDisposed(); return _pos; } set { Seek(value, SeekOrigin.Begin); } } /// /// Gets the length of the large object. This internally seeks to the end of the stream to retrieve the length, and then back again. /// public override long Length { get { return GetLengthInternal(); } } // TODO: uncomment this when finally implementing async /*public Task GetLengthAsync() { return GetLengthInternalAsync(); }*/ [RewriteAsync] long GetLengthInternal() { CheckDisposed(); long old = _pos; long retval = Seek(0, SeekOrigin.End); if (retval != old) Seek(old, SeekOrigin.Begin); return retval; } /// /// Seeks in the stream to the specified position. This requires a round-trip to the backend. /// /// A byte offset relative to the origin parameter. /// A value of type SeekOrigin indicating the reference point used to obtain the new position. /// [RewriteAsync] public override long Seek(long offset, SeekOrigin origin) { if (origin < SeekOrigin.Begin || origin > SeekOrigin.End) throw new ArgumentException("Invalid origin"); if (!Has64BitSupport && offset != (long)(int)offset) throw new ArgumentOutOfRangeException("offset", "offset must fit in 32 bits for PostgreSQL versions older than 9.3"); Contract.EndContractBlock(); CheckDisposed(); if (_manager.Has64BitSupport) return _pos = _manager.ExecuteFunction("lo_lseek64", _fd, offset, (int)origin); else return _pos = _manager.ExecuteFunction("lo_lseek", _fd, (int)offset, (int)origin); } /// /// Does nothing. /// [RewriteAsync] public override void Flush() { } /// /// Truncates or enlarges the large object to the given size. If enlarging, the large object is extended with null bytes. /// For PostgreSQL versions earlier than 9.3, the value must fit in an Int32. /// /// Number of bytes to either truncate or enlarge the large object. [RewriteAsync] public override void SetLength(long value) { if (value < 0) throw new ArgumentOutOfRangeException("value"); if (!Has64BitSupport && value != (long)(int)value) throw new ArgumentOutOfRangeException("value", "offset must fit in 32 bits for PostgreSQL versions older than 9.3"); Contract.EndContractBlock(); CheckDisposed(); if (!_writeable) throw new NotSupportedException("SetLength cannot be called on a stream opened with no write permissions"); if (_manager.Has64BitSupport) _manager.ExecuteFunction("lo_truncate64", _fd, value); else _manager.ExecuteFunction("lo_truncate", _fd, (int)value); } /// /// Releases resources at the backend allocated for this stream. /// #if DNXCORE50 void Close() #else public override void Close() #endif { if (!_disposed) { _manager.ExecuteFunction("lo_close", _fd); _disposed = true; } } /// /// Releases resources at the backend allocated for this stream, iff disposing is true. /// /// Whether to release resources allocated at the backend. protected override void Dispose(bool disposing) { if (disposing) { Close(); } } } }
X Tutup