using System;
using System.Buffers;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Npgsql.Internal;
[Experimental(NpgsqlDiagnostics.ConvertersExperimental)]
public class PgReader
{
const int UninitializedSentinel = -1;
// We don't want to add a ton of memory pressure for large strings.
internal const int MaxPreparedTextReaderSize = 1024 * 64;
readonly NpgsqlReadBuffer _buffer;
bool _resumable;
byte[]? _pooledArray;
NpgsqlReadBuffer.ColumnStream? _userActiveStream;
PreparedTextReader? _preparedTextReader;
long _fieldStartPos;
Size _fieldBufferRequirement;
DataFormat _fieldFormat;
int _fieldSize;
// This position is relative to _fieldStartPos, which is why it can be an int.
int _currentStartPos;
Size _currentBufferRequirement;
int _currentSize;
// GetChars Internal state
TextReader? _charsReadReader;
int _charsRead;
// GetChars User state
int? _charsReadOffset;
ArraySegment? _charsReadBuffer;
bool _requiresCleanup;
// The field reading process of doing init/commit and startread/endread pairs is very perf sensitive.
// So this is used in Commit as a fast-path alternative to FieldRemaining to detect if the field was consumed succesfully.
bool _fieldConsumed;
internal PgReader(NpgsqlReadBuffer buffer)
{
_buffer = buffer;
_fieldStartPos = UninitializedSentinel;
_currentSize = UninitializedSentinel;
}
internal bool Initialized => _fieldStartPos is not UninitializedSentinel;
int FieldOffset => (int)(_buffer.CumulativeReadPosition - _fieldStartPos);
int FieldSize => _fieldSize;
int FieldRemaining => FieldSize - FieldOffset;
internal bool FieldIsDbNull => FieldSize is -1;
internal bool FieldAtStart => FieldOffset is 0;
internal bool IsFieldConsumed(int offset) => FieldOffset > offset;
// TODO refactor out
internal long GetFieldStartPos(NpgsqlNestedDataReader nestedDataReader) => _fieldStartPos;
// TODO refactor out
internal int GetFieldOffset(NpgsqlNestedDataReader nestedDataReader) => FieldOffset;
internal bool NestedInitialized => _currentSize is not UninitializedSentinel;
int CurrentSize => NestedInitialized ? _currentSize : _fieldSize;
public ValueMetadata Current => new() { Size = CurrentSize, Format = _fieldFormat, BufferRequirement = CurrentBufferRequirement };
public int CurrentRemaining => NestedInitialized ? _currentSize - CurrentOffset : FieldRemaining;
internal Size CurrentBufferRequirement => NestedInitialized ? _currentBufferRequirement : _fieldBufferRequirement;
int CurrentOffset => FieldOffset - _currentStartPos;
internal bool Resumable => _resumable;
public bool IsResumed => Resumable && CurrentOffset > 0;
ArrayPool ArrayPool => ArrayPool.Shared;
// Here for testing purposes
internal void BreakConnection() => throw _buffer.Connector.Break(new Exception("Broken"));
internal void Revert(int size, int startPos, Size bufferRequirement)
{
if (startPos > FieldOffset)
ThrowHelper.ThrowArgumentOutOfRangeException(nameof(startPos), "Can't revert forwardly");
_currentStartPos = startPos;
_currentBufferRequirement = bufferRequirement;
_currentSize = size;
}
void CheckBounds(int count)
{
if (NpgsqlReadBuffer.BufferBoundsChecks)
Core(count);
[MethodImpl(MethodImplOptions.NoInlining)]
void Core(int count)
{
if (count > CurrentRemaining)
ThrowHelper.ThrowIndexOutOfRangeException("Attempt to read past the end of the current field size.");
}
}
public byte ReadByte()
{
CheckBounds(sizeof(byte));
var result = _buffer.ReadByte();
return result;
}
public short ReadInt16()
{
CheckBounds(sizeof(short));
var result = _buffer.ReadInt16();
return result;
}
public int ReadInt32()
{
CheckBounds(sizeof(int));
var result = _buffer.ReadInt32();
return result;
}
public long ReadInt64()
{
CheckBounds(sizeof(long));
var result = _buffer.ReadInt64();
return result;
}
public ushort ReadUInt16()
{
CheckBounds(sizeof(ushort));
var result = _buffer.ReadUInt16();
return result;
}
public uint ReadUInt32()
{
CheckBounds(sizeof(uint));
var result = _buffer.ReadUInt32();
return result;
}
public ulong ReadUInt64()
{
CheckBounds(sizeof(ulong));
var result = _buffer.ReadUInt64();
return result;
}
public float ReadFloat()
{
CheckBounds(sizeof(float));
var result = _buffer.ReadSingle();
return result;
}
public double ReadDouble()
{
CheckBounds(sizeof(double));
var result = _buffer.ReadDouble();
return result;
}
public void Read(Span destination)
{
CheckBounds(destination.Length);
_buffer.ReadBytes(destination);
}
public async ValueTask ReadNullTerminatedStringAsync(Encoding encoding, CancellationToken cancellationToken = default)
{
var result = await _buffer.ReadNullTerminatedString(encoding, async: true, cancellationToken).ConfigureAwait(false);
// Can only check after the fact.
CheckBounds(0);
return result;
}
public string ReadNullTerminatedString(Encoding encoding)
{
var result = _buffer.ReadNullTerminatedString(encoding, async: false, CancellationToken.None).GetAwaiter().GetResult();
CheckBounds(0);
return result;
}
public Stream GetStream(int? length = null) => GetColumnStream(false, length);
internal Stream GetStream(bool canSeek, int? length = null) => GetColumnStream(canSeek, length);
NpgsqlReadBuffer.ColumnStream GetColumnStream(bool canSeek = false, int? length = null)
{
if (length > CurrentRemaining)
ThrowHelper.ThrowArgumentOutOfRangeException(nameof(length), "Length is larger than the current remaining value size");
_requiresCleanup = true;
// This will cause any previously handed out StreamReaders etc to throw, as intended.
if (_userActiveStream is not null)
DisposeUserActiveStream(async: false).GetAwaiter().GetResult();
length ??= CurrentRemaining;
CheckBounds(length.GetValueOrDefault());
return _userActiveStream = _buffer.CreateStream(length.GetValueOrDefault(), canSeek && length <= _buffer.ReadBytesLeft, consumeOnDispose: false);
}
public TextReader GetTextReader(Encoding encoding)
=> GetTextReader(async: false, encoding, CancellationToken.None).GetAwaiter().GetResult();
public ValueTask GetTextReaderAsync(Encoding encoding, CancellationToken cancellationToken)
=> GetTextReader(async: true, encoding, cancellationToken);
async ValueTask GetTextReader(bool async, Encoding encoding, CancellationToken cancellationToken)
{
_requiresCleanup = true;
if (CurrentRemaining > _buffer.ReadBytesLeft || CurrentRemaining > MaxPreparedTextReaderSize)
return new StreamReader(GetColumnStream(), encoding, detectEncodingFromByteOrderMarks: false);
if (_preparedTextReader is { IsDisposed: false })
{
_preparedTextReader.Dispose();
_preparedTextReader = null;
}
_preparedTextReader ??= new PreparedTextReader();
_preparedTextReader.Init(
encoding.GetString(async
? await ReadBytesAsync(CurrentRemaining, cancellationToken).ConfigureAwait(false)
: ReadBytes(CurrentRemaining)), GetColumnStream(canSeek: false, 0));
return _preparedTextReader;
}
public ValueTask ReadBytesAsync(Memory buffer, CancellationToken cancellationToken = default)
{
var count = buffer.Length;
CheckBounds(count);
var offset = _buffer.ReadPosition;
var remaining = _buffer.FilledBytes - offset;
if (remaining >= count)
{
_buffer.Buffer.AsSpan(offset, count).CopyTo(buffer.Span);
_buffer.ReadPosition += count;
return new();
}
return Slow(count, buffer, cancellationToken);
async ValueTask Slow(int count, Memory buffer, CancellationToken cancellationToken)
{
var stream = _buffer.CreateStream(count, canSeek: false);
await using var _ = stream.ConfigureAwait(false);
await stream.ReadExactlyAsync(buffer, cancellationToken).ConfigureAwait(false);
}
}
public void ReadBytes(Span buffer)
{
var count = buffer.Length;
CheckBounds(count);
var offset = _buffer.ReadPosition;
var remaining = _buffer.FilledBytes - offset;
if (remaining >= count)
{
_buffer.Buffer.AsSpan(offset, count).CopyTo(buffer);
_buffer.ReadPosition += count;
return;
}
Slow(count, buffer);
void Slow(int count, Span buffer)
{
using var stream = _buffer.CreateStream(count, canSeek: false);
stream.ReadExactly(buffer);
}
}
public bool TryReadBytes(int count, out ReadOnlySpan bytes)
{
CheckBounds(count);
var offset = _buffer.ReadPosition;
var remaining = _buffer.FilledBytes - offset;
if (remaining >= count)
{
bytes = new ReadOnlySpan(_buffer.Buffer, offset, count);
_buffer.ReadPosition += count;
return true;
}
bytes = default;
return false;
}
public bool TryReadBytes(int count, out ReadOnlyMemory bytes)
{
CheckBounds(count);
var offset = _buffer.ReadPosition;
var remaining = _buffer.FilledBytes - offset;
if (remaining >= count)
{
bytes = new ReadOnlyMemory(_buffer.Buffer, offset, count);
_buffer.ReadPosition += count;
return true;
}
bytes = default;
return false;
}
/// ReadBytes without memory management, the next read invalidates the underlying buffer(s), only use this for intermediate transformations.
public ReadOnlySequence ReadBytes(int count)
{
CheckBounds(count);
var offset = _buffer.ReadPosition;
var remaining = _buffer.FilledBytes - offset;
if (remaining >= count)
{
var result = new ReadOnlySequence(_buffer.Buffer, offset, count);
_buffer.ReadPosition += count;
return result;
}
var array = RentArray(count);
ReadBytes(array.AsSpan(0, count));
return new(array, 0, count);
}
/// ReadBytesAsync without memory management, the next read invalidates the underlying buffer(s), only use this for intermediate transformations.
public async ValueTask> ReadBytesAsync(int count, CancellationToken cancellationToken = default)
{
CheckBounds(count);
var offset = _buffer.ReadPosition;
var remaining = _buffer.FilledBytes - offset;
if (remaining >= count)
{
var result = new ReadOnlySequence(_buffer.Buffer, offset, count);
_buffer.ReadPosition += count;
return result;
}
var array = RentArray(count);
await ReadBytesAsync(array.AsMemory(0, count), cancellationToken).ConfigureAwait(false);
return new(array, 0, count);
}
public void Rewind(int count)
{
if (CurrentOffset < count)
ThrowHelper.ThrowArgumentOutOfRangeException(nameof(count), "Attempt to rewind past the current field start.");
if (_buffer.ReadPosition < count)
ThrowHelper.ThrowArgumentOutOfRangeException(nameof(count), "Attempt to rewind past the buffer start, some of this data is no longer part of the underlying buffer.");
// Shut down any streaming going on on the column
if (StreamActive)
DisposeUserActiveStream(async: false).GetAwaiter().GetResult();
_buffer.ReadPosition -= count;
}
///
///
///
///
/// The stream length, if any
async ValueTask DisposeUserActiveStream(bool async)
{
if (async)
await (_userActiveStream?.DisposeAsync() ?? new()).ConfigureAwait(false);
else
_userActiveStream?.Dispose();
_userActiveStream = null;
}
internal int CharsRead => _charsRead;
internal bool CharsReadActive => _charsReadOffset is not null;
internal void GetCharsReadInfo(Encoding encoding, out int charsRead, out TextReader reader, out int charsOffset, out ArraySegment? buffer)
{
if (!CharsReadActive)
ThrowHelper.ThrowInvalidOperationException("No active chars read");
charsRead = _charsRead;
reader = _charsReadReader ??= GetTextReader(encoding);
charsOffset = _charsReadOffset ?? 0;
buffer = _charsReadBuffer;
}
internal void RestartCharsRead()
{
if (!CharsReadActive)
ThrowHelper.ThrowInvalidOperationException("No active chars read");
switch (_charsReadReader)
{
case PreparedTextReader reader:
reader.Restart();
break;
case StreamReader reader:
reader.BaseStream.Seek(0, SeekOrigin.Begin);
reader.DiscardBufferedData();
break;
}
_charsRead = 0;
}
internal void AdvanceCharsRead(int charsRead) => _charsRead += charsRead;
internal void StartCharsRead(int dataOffset, ArraySegment? buffer)
{
if (!Resumable)
ThrowHelper.ThrowInvalidOperationException("Reader was not initialized as resumable");
_charsReadOffset = dataOffset;
_charsReadBuffer = buffer;
}
internal void EndCharsRead()
{
if (!Resumable)
ThrowHelper.ThrowInvalidOperationException("Wasn't initialized as resumed");
if (!CharsReadActive)
ThrowHelper.ThrowInvalidOperationException("No active chars read");
_charsReadOffset = null;
_charsReadBuffer = null;
}
internal void Init(int fieldSize, DataFormat fieldFormat, bool resumable = false)
{
if (Initialized)
ThrowHelper.ThrowInvalidOperationException("Already initialized");
_fieldStartPos = _buffer.CumulativeReadPosition;
_fieldConsumed = false;
_fieldSize = fieldSize;
_fieldFormat = fieldFormat;
_resumable = resumable;
}
internal void StartRead(Size bufferRequirement)
{
Debug.Assert(FieldSize >= 0);
_fieldBufferRequirement = bufferRequirement;
if (ShouldBuffer(bufferRequirement))
BufferNoInlined(bufferRequirement);
[MethodImpl(MethodImplOptions.NoInlining)]
void BufferNoInlined(Size bufferRequirement)
=> Buffer(bufferRequirement);
}
internal ValueTask StartReadAsync(Size bufferRequirement, CancellationToken cancellationToken)
{
Debug.Assert(FieldSize >= 0);
_fieldBufferRequirement = bufferRequirement;
return ShouldBuffer(bufferRequirement) ? BufferAsync(bufferRequirement, cancellationToken) : new();
}
internal void EndRead()
{
if (_resumable || StreamActive)
return;
// If it was upper bound we should consume.
if (_fieldBufferRequirement is { Kind: SizeKind.UpperBound })
{
Consume(FieldRemaining);
return;
}
if (FieldOffset != FieldSize)
ThrowNotConsumedExactly();
_fieldConsumed = true;
}
internal ValueTask EndReadAsync()
{
if (_resumable || StreamActive)
return new();
// If it was upper bound we should consume.
if (_fieldBufferRequirement is { Kind: SizeKind.UpperBound })
return ConsumeAsync(FieldRemaining);
if (FieldOffset != FieldSize)
ThrowNotConsumedExactly();
_fieldConsumed = true;
return new();
}
internal async ValueTask BeginNestedRead(bool async, int size, Size bufferRequirement, CancellationToken cancellationToken = default)
{
if (size > CurrentRemaining)
ThrowHelper.ThrowArgumentOutOfRangeException(nameof(size), "Cannot begin a read for a larger size than the current remaining size.");
if (size < 0)
ThrowHelper.ThrowArgumentOutOfRangeException(nameof(size), "Cannot be negative");
var previousSize = CurrentSize;
var previousStartPos = _currentStartPos;
var previousBufferRequirement = CurrentBufferRequirement;
_currentSize = size;
_currentBufferRequirement = bufferRequirement;
_currentStartPos = FieldOffset;
await Buffer(async, bufferRequirement, cancellationToken).ConfigureAwait(false);
return new NestedReadScope(async, this, previousSize, previousStartPos, previousBufferRequirement);
}
public NestedReadScope BeginNestedRead(int size, Size bufferRequirement)
=> BeginNestedRead(async: false, size, bufferRequirement, CancellationToken.None).GetAwaiter().GetResult();
public ValueTask BeginNestedReadAsync(int size, Size bufferRequirement, CancellationToken cancellationToken = default)
=> BeginNestedRead(async: true, size, bufferRequirement, cancellationToken);
/// Seek origin is the start of Current, e.g. Seek(0) rewinds to the start.
internal int Seek(int offset)
{
if (CurrentOffset > offset)
Rewind(CurrentOffset - offset);
else if (CurrentOffset < offset)
Consume(offset - CurrentOffset);
return FieldRemaining;
}
public void Consume(int? count = null)
{
if (count <= 0 || FieldSize < 0 || FieldRemaining == 0)
return;
var currentRemaining = CurrentRemaining;
var remaining = count ?? currentRemaining;
if (count > currentRemaining)
ThrowHelper.ThrowArgumentOutOfRangeException(nameof(count), "Attempt to read past the end of the current field size.");
if (StreamActive)
DisposeUserActiveStream(async: false).GetAwaiter().GetResult();
var origOffset = FieldOffset;
// A breaking exception unwind from a nested scope should not try to consume its remaining data.
if (!_buffer.Connector.IsBroken)
_buffer.Skip(remaining, allowIO: true);
Debug.Assert(FieldRemaining == FieldSize - origOffset - remaining);
}
public async ValueTask ConsumeAsync(int? count = null, CancellationToken cancellationToken = default)
{
if (count <= 0 || FieldSize < 0 || FieldRemaining == 0)
return;
var currentRemaining = CurrentRemaining;
var remaining = count ?? currentRemaining;
if (count > currentRemaining)
ThrowHelper.ThrowArgumentOutOfRangeException(nameof(count), "Attempt to read past the end of the current field size.");
if (StreamActive)
await DisposeUserActiveStream(async: true).ConfigureAwait(false);
var origOffset = FieldOffset;
// A breaking exception unwind from a nested scope should not try to consume its remaining data.
if (!_buffer.Connector.IsBroken)
await _buffer.Skip(async:true, remaining).ConfigureAwait(false);
Debug.Assert(FieldRemaining == FieldSize - origOffset - remaining);
}
[MemberNotNullWhen(true, nameof(_userActiveStream))]
bool StreamActive => _userActiveStream is { IsDisposed: false };
internal void ThrowIfStreamActive()
{
if (StreamActive)
ThrowHelper.ThrowInvalidOperationException("A stream is already open for this reader");
}
[MethodImpl(MethodImplOptions.NoInlining)]
void Cleanup()
{
if (StreamActive)
DisposeUserActiveStream(async: false).GetAwaiter().GetResult();
if (_pooledArray is not null)
{
ArrayPool.Return(_pooledArray);
_pooledArray = null;
}
if (_charsReadReader is not null)
{
_charsReadReader.Dispose();
_charsReadReader = null;
_charsRead = default;
}
_requiresCleanup = false;
}
void ResetCurrent()
{
_currentStartPos = 0;
_currentBufferRequirement = default;
_currentSize = UninitializedSentinel;
}
internal int Restart(bool resumable)
{
if (!Initialized)
ThrowHelper.ThrowInvalidOperationException("Cannot restart a non-initialized reader.");
// We resume if the reader was initialized as resumable and we're not explicitly restarting as non-resumable.
// When the field size is DbNullFieldSize (i.e. -1) we're always restarting as resumable, to allow rereading null values endlessly.
if ((Resumable && resumable) || FieldIsDbNull)
{
_resumable = resumable || FieldIsDbNull;
return FieldSize;
}
// From this point on we're not resuming, we're resetting any remaining state and rewinding our position.
// Shut down any streaming and pooling going on on the column.
if (_requiresCleanup)
Cleanup();
if (NestedInitialized)
ResetCurrent();
_fieldConsumed = false;
_resumable = resumable;
Seek(0);
Debug.Assert(Initialized);
return FieldSize;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void Commit()
{
if (!Initialized)
return;
// Shut down any streaming and pooling going on on the column.
if (_requiresCleanup)
Cleanup();
if (NestedInitialized)
ResetCurrent();
// We make sure to fuly consume any FieldRemaining in the event of an exception or a nested scope not being disposed.
Debug.Assert(!NestedInitialized);
if (!_fieldConsumed && FieldRemaining > 0)
Consume();
_fieldStartPos = UninitializedSentinel;
Debug.Assert(!Initialized);
// These will always be re-initialized by Init()
// _fieldSize = default;
// _fieldFormat = default;
// _resumable = default;
// _fieldConsumed = default;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal ValueTask CommitAsync()
{
if (!Initialized)
return new();
// Shut down any streaming and pooling going on on the column.
if (_requiresCleanup)
Cleanup();
if (NestedInitialized)
ResetCurrent();
// We make sure to fuly consume any FieldRemaining in the event of an exception or a nested scope not being disposed.
Debug.Assert(!NestedInitialized);
if (!_fieldConsumed && FieldRemaining > 0)
return CommitAsync();
_fieldStartPos = UninitializedSentinel;
Debug.Assert(!Initialized);
// These will always be re-initialized by Init()
// _fieldSize = default;
// _fieldFormat = default;
// _resumable = default;
// _fieldConsumed = default;
return new();
async ValueTask CommitAsync()
{
await ConsumeAsync().ConfigureAwait(false);
_fieldStartPos = UninitializedSentinel;
Debug.Assert(!Initialized);
// These will always be re-initialized by Init()
// _fieldSize = default;
// _fieldFormat = default;
// _resumable = default;
// _fieldConsumed = default;
}
}
byte[] RentArray(int count)
{
_requiresCleanup = true;
var pooledArray = _pooledArray;
if (pooledArray is not null)
{
if (pooledArray.Length >= count)
return pooledArray;
ArrayPool.Return(pooledArray);
}
var array = _pooledArray = ArrayPool.Rent(count);
return array;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
int GetBufferRequirementByteCount(Size bufferRequirement)
=> bufferRequirement is { Kind: SizeKind.UpperBound }
? Math.Min(CurrentRemaining, bufferRequirement.Value)
: bufferRequirement.GetValueOrDefault();
internal bool ShouldBufferCurrent() => ShouldBuffer(CurrentBufferRequirement);
public bool ShouldBuffer(Size bufferRequirement)
=> ShouldBuffer(GetBufferRequirementByteCount(bufferRequirement));
public bool ShouldBuffer(int byteCount)
{
return _buffer.ReadBytesLeft < byteCount && ShouldBufferSlow(byteCount);
[MethodImpl(MethodImplOptions.NoInlining)]
bool ShouldBufferSlow(int byteCount)
{
if (byteCount > _buffer.Size)
ThrowHelper.ThrowArgumentOutOfRangeException(nameof(byteCount),
"Buffer requirement is larger than the buffer size, this can never succeed by buffering data but requires a larger buffer size instead.");
if (byteCount > CurrentRemaining)
ThrowHelper.ThrowArgumentOutOfRangeException(nameof(byteCount),
"Buffer requirement is larger than the remaining length of the value, make sure the value is always at least this size or use an upper bound requirement instead.");
return true;
}
}
public void Buffer(Size bufferRequirement)
=> Buffer(GetBufferRequirementByteCount(bufferRequirement));
public void Buffer(int byteCount) => _buffer.Ensure(byteCount);
public ValueTask BufferAsync(Size bufferRequirement, CancellationToken cancellationToken)
=> BufferAsync(GetBufferRequirementByteCount(bufferRequirement), cancellationToken);
public ValueTask BufferAsync(int byteCount, CancellationToken cancellationToken) => _buffer.EnsureAsync(byteCount);
internal ValueTask Buffer(bool async, Size bufferRequirement, CancellationToken cancellationToken)
=> Buffer(async, GetBufferRequirementByteCount(bufferRequirement), cancellationToken);
internal ValueTask Buffer(bool async, int byteCount, CancellationToken cancellationToken)
{
if (async)
return BufferAsync(byteCount, cancellationToken);
Buffer(byteCount);
return new();
}
void ThrowNotConsumedExactly() =>
throw _buffer.Connector.Break(
new InvalidOperationException(
FieldOffset < FieldSize
? $"The read on this field has not consumed all of its bytes (pos: {FieldOffset}, len: {FieldSize})"
: $"The read on this field has consumed all of its bytes and read into the subsequent bytes (pos: {FieldOffset}, len: {FieldSize})"));
}
public readonly struct NestedReadScope : IDisposable, IAsyncDisposable
{
readonly PgReader _reader;
readonly int _previousSize;
readonly int _previousStartPos;
readonly Size _previousBufferRequirement;
readonly bool _async;
internal NestedReadScope(bool async, PgReader reader, int previousSize, int previousStartPos, Size previousBufferRequirement)
{
_async = async;
_reader = reader;
_previousSize = previousSize;
_previousStartPos = previousStartPos;
_previousBufferRequirement = previousBufferRequirement;
}
public void Dispose()
{
if (_async)
ThrowHelper.ThrowInvalidOperationException("Cannot synchronously dispose async scopes, call DisposeAsync instead.");
DisposeAsync().GetAwaiter().GetResult();
}
public ValueTask DisposeAsync()
{
if (_reader.CurrentRemaining > 0)
{
if (_async)
return AsyncCore(_reader, _previousSize, _previousStartPos, _previousBufferRequirement);
_reader.Consume();
}
_reader.Revert(_previousSize, _previousStartPos, _previousBufferRequirement);
return new();
static async ValueTask AsyncCore(PgReader reader, int previousSize, int previousStartPos, Size previousBufferRequirement)
{
await reader.ConsumeAsync().ConfigureAwait(false);
reader.Revert(previousSize, previousStartPos, previousBufferRequirement);
}
}
}