X Tutup
#region License // The PostgreSQL License // // Copyright (C) 2016 The Npgsql Development Team // // Permission to use, copy, modify, and distribute this software and its // documentation for any purpose, without fee, and without a written // agreement is hereby granted, provided that the above copyright notice // and this paragraph and the following two paragraphs appear in all copies. // // IN NO EVENT SHALL THE NPGSQL DEVELOPMENT TEAM BE LIABLE TO ANY PARTY // FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, // INCLUDING LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS // DOCUMENTATION, EVEN IF THE NPGSQL DEVELOPMENT TEAM HAS BEEN ADVISED OF // THE POSSIBILITY OF SUCH DAMAGE. // // THE NPGSQL DEVELOPMENT TEAM SPECIFICALLY DISCLAIMS ANY WARRANTIES, // INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY // AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS // ON AN "AS IS" BASIS, AND THE NPGSQL DEVELOPMENT TEAM HAS NO OBLIGATIONS // TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. #endregion using Npgsql.FrontendMessages; using System; using System.Collections.Generic; using System.Diagnostics.Contracts; using System.IO; using System.Linq; using System.Text; using System.Threading.Tasks; using AsyncRewriter; namespace Npgsql { /// /// An interface to remotely control the seekable stream for an opened large object on a PostgreSQL server. /// Note that the OpenRead/OpenReadWrite method as well as all operations performed on this stream must be wrapped inside a database transaction. /// public partial class NpgsqlLargeObjectStream : Stream { NpgsqlLargeObjectManager _manager; int _fd; long _pos; bool _writeable; bool _disposed; private NpgsqlLargeObjectStream() { } internal NpgsqlLargeObjectStream(NpgsqlLargeObjectManager manager, uint oid, int fd, bool writeable) { _manager = manager; _fd = fd; _pos = 0; _writeable = writeable; } void CheckDisposed() { if (_disposed) throw new InvalidOperationException("Object disposed"); } /// /// Since PostgreSQL 9.3, large objects larger than 2GB can be handled, up to 4TB. /// This property returns true whether the PostgreSQL version is >= 9.3. /// public bool Has64BitSupport => _manager._connection.PostgreSqlVersion >= new Version(9, 3); /// /// Reads count bytes from the large object. The only case when fewer bytes are read is when end of stream is reached. /// /// The buffer where read data should be stored. /// The offset in the buffer where the first byte should be read. /// The maximum number of bytes that should be read. /// How many bytes actually read, or 0 if end of file was already reached. [RewriteAsync] public override int Read(byte[] buffer, int offset, int count) { if (buffer == null) throw new ArgumentNullException(nameof(buffer)); if (offset < 0) throw new ArgumentOutOfRangeException(nameof(offset)); if (count < 0) throw new ArgumentOutOfRangeException(nameof(count)); if (buffer.Length - offset < count) throw new ArgumentException("Invalid offset or count for this buffer"); Contract.EndContractBlock(); CheckDisposed(); int chunkCount = Math.Min(count, _manager.MaxTransferBlockSize); int read = 0; while (read < count) { var bytesRead = _manager.ExecuteFunctionGetBytes("loread", buffer, offset + read, count - read, _fd, chunkCount); _pos += bytesRead; read += bytesRead; if (bytesRead < chunkCount) { return read; } } return read; } /// /// Writes count bytes to the large object. /// /// The buffer to write data from. /// The offset in the buffer at which to begin copying bytes. /// The number of bytes to write. [RewriteAsync] public override void Write(byte[] buffer, int offset, int count) { if (buffer == null) throw new ArgumentNullException(nameof(buffer)); if (offset < 0) throw new ArgumentOutOfRangeException(nameof(offset)); if (count < 0) throw new ArgumentOutOfRangeException(nameof(count)); if (buffer.Length - offset < count) throw new ArgumentException("Invalid offset or count for this buffer"); Contract.EndContractBlock(); CheckDisposed(); if (!_writeable) throw new NotSupportedException("Write cannot be called on a stream opened with no write permissions"); int totalWritten = 0; while (totalWritten < count) { var chunkSize = Math.Min(count - totalWritten, _manager.MaxTransferBlockSize); var bytesWritten = _manager.ExecuteFunction("lowrite", _fd, new ArraySegment(buffer, offset + totalWritten, chunkSize)); totalWritten += bytesWritten; if (bytesWritten != chunkSize) throw PGUtil.ThrowIfReached(); _pos += bytesWritten; } } /// /// CanTimeout always returns false. /// public override bool CanTimeout => false; /// /// CanRead always returns true, unless the stream has been closed. /// public override bool CanRead => true && !_disposed; /// /// CanWrite returns true if the stream was opened with write permissions, and the stream has not been closed. /// public override bool CanWrite => _writeable && !_disposed; /// /// CanSeek always returns true, unless the stream has been closed. /// public override bool CanSeek => true && !_disposed; /// /// Returns the current position in the stream. Getting the current position does not need a round-trip to the server, however setting the current position does. /// public override long Position { get { CheckDisposed(); return _pos; } set { Seek(value, SeekOrigin.Begin); } } /// /// Gets the length of the large object. This internally seeks to the end of the stream to retrieve the length, and then back again. /// public override long Length => GetLengthInternal(); // TODO: uncomment this when finally implementing async /*public Task GetLengthAsync() { return GetLengthInternalAsync(); }*/ [RewriteAsync] long GetLengthInternal() { CheckDisposed(); long old = _pos; long retval = Seek(0, SeekOrigin.End); if (retval != old) Seek(old, SeekOrigin.Begin); return retval; } /// /// Seeks in the stream to the specified position. This requires a round-trip to the backend. /// /// A byte offset relative to the origin parameter. /// A value of type SeekOrigin indicating the reference point used to obtain the new position. /// [RewriteAsync] public override long Seek(long offset, SeekOrigin origin) { if (origin < SeekOrigin.Begin || origin > SeekOrigin.End) throw new ArgumentException("Invalid origin"); if (!Has64BitSupport && offset != (long)(int)offset) throw new ArgumentOutOfRangeException(nameof(offset), "offset must fit in 32 bits for PostgreSQL versions older than 9.3"); Contract.EndContractBlock(); CheckDisposed(); if (_manager.Has64BitSupport) return _pos = _manager.ExecuteFunction("lo_lseek64", _fd, offset, (int)origin); else return _pos = _manager.ExecuteFunction("lo_lseek", _fd, (int)offset, (int)origin); } /// /// Does nothing. /// [RewriteAsync] public override void Flush() { } /// /// Truncates or enlarges the large object to the given size. If enlarging, the large object is extended with null bytes. /// For PostgreSQL versions earlier than 9.3, the value must fit in an Int32. /// /// Number of bytes to either truncate or enlarge the large object. [RewriteAsync] public override void SetLength(long value) { if (value < 0) throw new ArgumentOutOfRangeException(nameof(value)); if (!Has64BitSupport && value != (long)(int)value) throw new ArgumentOutOfRangeException(nameof(value), "offset must fit in 32 bits for PostgreSQL versions older than 9.3"); Contract.EndContractBlock(); CheckDisposed(); if (!_writeable) throw new NotSupportedException("SetLength cannot be called on a stream opened with no write permissions"); if (_manager.Has64BitSupport) _manager.ExecuteFunction("lo_truncate64", _fd, value); else _manager.ExecuteFunction("lo_truncate", _fd, (int)value); } /// /// Releases resources at the backend allocated for this stream. /// #if NET45 || NET451 || DNX451 public override void Close() #else void Close() #endif { if (!_disposed) { _manager.ExecuteFunction("lo_close", _fd); _disposed = true; } } /// /// Releases resources at the backend allocated for this stream, iff disposing is true. /// /// Whether to release resources allocated at the backend. protected override void Dispose(bool disposing) { if (disposing) { Close(); } } } }
X Tutup