#region License
// The PostgreSQL License
//
// Copyright (C) 2016 The Npgsql Development Team
//
// Permission to use, copy, modify, and distribute this software and its
// documentation for any purpose, without fee, and without a written
// agreement is hereby granted, provided that the above copyright notice
// and this paragraph and the following two paragraphs appear in all copies.
//
// IN NO EVENT SHALL THE NPGSQL DEVELOPMENT TEAM BE LIABLE TO ANY PARTY
// FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES,
// INCLUDING LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
// DOCUMENTATION, EVEN IF THE NPGSQL DEVELOPMENT TEAM HAS BEEN ADVISED OF
// THE POSSIBILITY OF SUCH DAMAGE.
//
// THE NPGSQL DEVELOPMENT TEAM SPECIFICALLY DISCLAIMS ANY WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
// ON AN "AS IS" BASIS, AND THE NPGSQL DEVELOPMENT TEAM HAS NO OBLIGATIONS
// TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
#endregion
using Npgsql.FrontendMessages;
using System;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using AsyncRewriter;
namespace Npgsql
{
///
/// An interface to remotely control the seekable stream for an opened large object on a PostgreSQL server.
/// Note that the OpenRead/OpenReadWrite method as well as all operations performed on this stream must be wrapped inside a database transaction.
///
public partial class NpgsqlLargeObjectStream : Stream
{
NpgsqlLargeObjectManager _manager;
int _fd;
long _pos;
bool _writeable;
bool _disposed;
private NpgsqlLargeObjectStream() { }
internal NpgsqlLargeObjectStream(NpgsqlLargeObjectManager manager, uint oid, int fd, bool writeable)
{
_manager = manager;
_fd = fd;
_pos = 0;
_writeable = writeable;
}
void CheckDisposed()
{
if (_disposed)
throw new InvalidOperationException("Object disposed");
}
///
/// Since PostgreSQL 9.3, large objects larger than 2GB can be handled, up to 4TB.
/// This property returns true whether the PostgreSQL version is >= 9.3.
///
public bool Has64BitSupport => _manager._connection.PostgreSqlVersion >= new Version(9, 3);
///
/// Reads count bytes from the large object. The only case when fewer bytes are read is when end of stream is reached.
///
/// The buffer where read data should be stored.
/// The offset in the buffer where the first byte should be read.
/// The maximum number of bytes that should be read.
/// How many bytes actually read, or 0 if end of file was already reached.
[RewriteAsync]
public override int Read(byte[] buffer, int offset, int count)
{
if (buffer == null)
throw new ArgumentNullException(nameof(buffer));
if (offset < 0)
throw new ArgumentOutOfRangeException(nameof(offset));
if (count < 0)
throw new ArgumentOutOfRangeException(nameof(count));
if (buffer.Length - offset < count)
throw new ArgumentException("Invalid offset or count for this buffer");
Contract.EndContractBlock();
CheckDisposed();
int chunkCount = Math.Min(count, _manager.MaxTransferBlockSize);
int read = 0;
while (read < count)
{
var bytesRead = _manager.ExecuteFunctionGetBytes("loread", buffer, offset + read, count - read, _fd, chunkCount);
_pos += bytesRead;
read += bytesRead;
if (bytesRead < chunkCount)
{
return read;
}
}
return read;
}
///
/// Writes count bytes to the large object.
///
/// The buffer to write data from.
/// The offset in the buffer at which to begin copying bytes.
/// The number of bytes to write.
[RewriteAsync]
public override void Write(byte[] buffer, int offset, int count)
{
if (buffer == null)
throw new ArgumentNullException(nameof(buffer));
if (offset < 0)
throw new ArgumentOutOfRangeException(nameof(offset));
if (count < 0)
throw new ArgumentOutOfRangeException(nameof(count));
if (buffer.Length - offset < count)
throw new ArgumentException("Invalid offset or count for this buffer");
Contract.EndContractBlock();
CheckDisposed();
if (!_writeable)
throw new NotSupportedException("Write cannot be called on a stream opened with no write permissions");
int totalWritten = 0;
while (totalWritten < count)
{
var chunkSize = Math.Min(count - totalWritten, _manager.MaxTransferBlockSize);
var bytesWritten = _manager.ExecuteFunction("lowrite", _fd, new ArraySegment(buffer, offset + totalWritten, chunkSize));
totalWritten += bytesWritten;
if (bytesWritten != chunkSize)
throw PGUtil.ThrowIfReached();
_pos += bytesWritten;
}
}
///
/// CanTimeout always returns false.
///
public override bool CanTimeout => false;
///
/// CanRead always returns true, unless the stream has been closed.
///
public override bool CanRead => true && !_disposed;
///
/// CanWrite returns true if the stream was opened with write permissions, and the stream has not been closed.
///
public override bool CanWrite => _writeable && !_disposed;
///
/// CanSeek always returns true, unless the stream has been closed.
///
public override bool CanSeek => true && !_disposed;
///
/// Returns the current position in the stream. Getting the current position does not need a round-trip to the server, however setting the current position does.
///
public override long Position
{
get
{
CheckDisposed();
return _pos;
}
set
{
Seek(value, SeekOrigin.Begin);
}
}
///
/// Gets the length of the large object. This internally seeks to the end of the stream to retrieve the length, and then back again.
///
public override long Length => GetLengthInternal();
// TODO: uncomment this when finally implementing async
/*public Task GetLengthAsync()
{
return GetLengthInternalAsync();
}*/
[RewriteAsync]
long GetLengthInternal()
{
CheckDisposed();
long old = _pos;
long retval = Seek(0, SeekOrigin.End);
if (retval != old)
Seek(old, SeekOrigin.Begin);
return retval;
}
///
/// Seeks in the stream to the specified position. This requires a round-trip to the backend.
///
/// A byte offset relative to the origin parameter.
/// A value of type SeekOrigin indicating the reference point used to obtain the new position.
///
[RewriteAsync]
public override long Seek(long offset, SeekOrigin origin)
{
if (origin < SeekOrigin.Begin || origin > SeekOrigin.End)
throw new ArgumentException("Invalid origin");
if (!Has64BitSupport && offset != (long)(int)offset)
throw new ArgumentOutOfRangeException(nameof(offset), "offset must fit in 32 bits for PostgreSQL versions older than 9.3");
Contract.EndContractBlock();
CheckDisposed();
if (_manager.Has64BitSupport)
return _pos = _manager.ExecuteFunction("lo_lseek64", _fd, offset, (int)origin);
else
return _pos = _manager.ExecuteFunction("lo_lseek", _fd, (int)offset, (int)origin);
}
///
/// Does nothing.
///
[RewriteAsync]
public override void Flush()
{
}
///
/// Truncates or enlarges the large object to the given size. If enlarging, the large object is extended with null bytes.
/// For PostgreSQL versions earlier than 9.3, the value must fit in an Int32.
///
/// Number of bytes to either truncate or enlarge the large object.
[RewriteAsync]
public override void SetLength(long value)
{
if (value < 0)
throw new ArgumentOutOfRangeException(nameof(value));
if (!Has64BitSupport && value != (long)(int)value)
throw new ArgumentOutOfRangeException(nameof(value), "offset must fit in 32 bits for PostgreSQL versions older than 9.3");
Contract.EndContractBlock();
CheckDisposed();
if (!_writeable)
throw new NotSupportedException("SetLength cannot be called on a stream opened with no write permissions");
if (_manager.Has64BitSupport)
_manager.ExecuteFunction("lo_truncate64", _fd, value);
else
_manager.ExecuteFunction("lo_truncate", _fd, (int)value);
}
///
/// Releases resources at the backend allocated for this stream.
///
#if NET45 || NET451 || DNX451
public override void Close()
#else
void Close()
#endif
{
if (!_disposed)
{
_manager.ExecuteFunction("lo_close", _fd);
_disposed = true;
}
}
///
/// Releases resources at the backend allocated for this stream, iff disposing is true.
///
/// Whether to release resources allocated at the backend.
protected override void Dispose(bool disposing)
{
if (disposing)
{
Close();
}
}
}
}