using System;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace Npgsql
{
public sealed partial class NpgsqlReadBuffer
{
internal sealed class ColumnStream : Stream
{
readonly NpgsqlReadBuffer _buf;
int _start, _len, _read;
bool _canSeek;
internal bool IsDisposed { get; private set; }
internal ColumnStream(NpgsqlReadBuffer buf)
=> _buf = buf;
internal void Init(int len, bool canSeek)
{
Debug.Assert(!canSeek || _buf.ReadBytesLeft >= len,
"Seekable stream constructed but not all data is in buffer (sequential)");
_start = _buf.ReadPosition;
_len = len;
_read = 0;
_canSeek = canSeek;
IsDisposed = false;
}
public override bool CanRead => true;
public override bool CanWrite => false;
public override bool CanSeek => _canSeek;
public override long Length
{
get
{
CheckDisposed();
return _len;
}
}
public override void SetLength(long value)
=> throw new NotSupportedException();
public override long Position
{
get
{
CheckDisposed();
return _read;
}
set
{
if (value < 0)
throw new ArgumentOutOfRangeException(nameof(value), "Non - negative number required.");
Seek(_start + value, SeekOrigin.Begin);
}
}
public override long Seek(long offset, SeekOrigin origin)
{
CheckDisposed();
if (!_canSeek)
throw new NotSupportedException();
if (offset > int.MaxValue)
throw new ArgumentOutOfRangeException(nameof(offset), "Stream length must be non-negative and less than 2^31 - 1 - origin.");
const string seekBeforeBegin = "An attempt was made to move the position before the beginning of the stream.";
switch (origin)
{
case SeekOrigin.Begin:
{
var tempPosition = unchecked(_start + (int)offset);
if (offset < 0 || tempPosition < _start)
throw new IOException(seekBeforeBegin);
_buf.ReadPosition = _start;
return tempPosition;
}
case SeekOrigin.Current:
{
var tempPosition = unchecked(_buf.ReadPosition + (int)offset);
if (unchecked(_buf.ReadPosition + offset) < _start || tempPosition < _start)
throw new IOException(seekBeforeBegin);
_buf.ReadPosition = tempPosition;
return tempPosition;
}
case SeekOrigin.End:
{
var tempPosition = unchecked(_len + (int)offset);
if (unchecked(_len + offset) < _start || tempPosition < _start)
throw new IOException(seekBeforeBegin);
_buf.ReadPosition = tempPosition;
return tempPosition;
}
default:
throw new ArgumentOutOfRangeException(nameof(origin), "Invalid seek origin.");
}
}
public override void Flush()
=> throw new NotSupportedException();
public override Task FlushAsync(CancellationToken cancellationToken)
=> throw new NotSupportedException();
public override int Read(byte[] buffer, int offset, int count)
=> Read(buffer, offset, count, CancellationToken.None, false).GetAwaiter().GetResult();
public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
using (NoSynchronizationContextScope.Enter())
return Read(buffer, offset, count, cancellationToken, true).AsTask();
}
ValueTask Read(byte[] buffer, int offset, int count, CancellationToken cancellationToken, bool async)
{
CheckDisposed();
if (buffer == null)
throw new ArgumentNullException(nameof(buffer));
if (offset < 0)
throw new ArgumentNullException(nameof(offset));
if (count < 0)
throw new ArgumentNullException(nameof(count));
if (buffer.Length - offset < count)
throw new ArgumentException("Offset and length were out of bounds for the array or count is greater than the number of elements from index to the end of the source collection.");
if (cancellationToken.IsCancellationRequested)
return new ValueTask(Task.FromCanceled(cancellationToken));
count = Math.Min(count, _len - _read);
if (count == 0)
return new ValueTask(0);
var task = _buf.ReadBytes(buffer, offset, count, async);
if (task.IsCompleted) // This may be a bug in the new version of ValueTask
{
_read += task.Result;
return task;
}
return new ValueTask(ReadLong(task, async));
}
async Task ReadLong(ValueTask task, bool async)
{
var read = async
? await task
: task.GetAwaiter().GetResult();
_read += read;
return read;
}
public override void Write(byte[] buffer, int offset, int count)
=> throw new NotSupportedException();
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
=> throw new NotSupportedException();
void CheckDisposed()
{
if (IsDisposed)
throw new ObjectDisposedException(null);
}
protected override void Dispose(bool disposing)
{
if (IsDisposed)
return;
var leftToSkip = _len - _read;
if (leftToSkip > 0)
_buf.Skip(leftToSkip, false).GetAwaiter().GetResult();
IsDisposed = true;
}
}
}
}