using System;
using System.Buffers;
using System.Collections;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Data;
using System.Data.Common;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Runtime.CompilerServices;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Npgsql.BackendMessages;
using Npgsql.Internal;
using Npgsql.Internal.Converters;
using Npgsql.PostgresTypes;
using Npgsql.Schema;
using NpgsqlTypes;
using static Npgsql.Util.Statics;
#pragma warning disable CA2222 // Do not decrease inherited member visibility
namespace Npgsql;
///
/// Reads a forward-only stream of rows from a data source.
///
#pragma warning disable CA1010
public sealed class NpgsqlDataReader : DbDataReader, IDbColumnSchemaGenerator
#pragma warning restore CA1010
{
static readonly Task TrueTask = Task.FromResult(true);
static readonly Task FalseTask = Task.FromResult(false);
internal NpgsqlCommand Command { get; private set; } = default!;
internal NpgsqlConnector Connector { get; }
NpgsqlConnection? _connection;
///
/// The behavior of the command with which this reader was executed.
///
CommandBehavior _behavior;
///
/// In multiplexing, this is as the sending is managed in the write multiplexing loop,
/// and does not need to be awaited by the reader.
///
Task? _sendTask;
internal ReaderState State = ReaderState.Disposed;
internal NpgsqlReadBuffer Buffer = default!;
PgReader PgReader => Buffer.PgReader;
///
/// Holds the list of statements being executed by this reader.
///
List _statements = default!;
///
/// The index of the current query resultset we're processing (within a multiquery)
///
internal int StatementIndex { get; private set; }
///
/// The number of columns in the current row
///
int _numColumns;
///
/// Records, for each column, its starting offset and length in the current row.
/// Used only in non-sequential mode.
///
readonly List<(int Offset, int Length)> _columns = new();
int _columnsStartPos;
///
/// The index of the column that we're on, i.e. that has already been parsed, is
/// is memory and can be retrieved. Initialized to -1, which means we're on the column
/// count (which comes before the first column).
///
int _column;
///
/// The position in the buffer at which the current data row message ends.
/// Used only when the row is consumed non-sequentially.
///
int _dataMsgEnd;
///
/// Determines, if we can consume the row non-sequentially.
/// Mostly useful for a sequential mode, when the row is already in the buffer.
/// Should always be true for the non-sequential mode.
///
bool _canConsumeRowNonSequentially;
///
/// The RowDescription message for the current resultset being processed
///
internal RowDescriptionMessage? RowDescription;
///
/// Stores the last converter info resolved by column, to speed up repeated reading.
///
ColumnInfo[]? ColumnInfoCache { get; set; }
ulong? _recordsAffected;
///
/// Whether the current result set has rows
///
bool _hasRows;
///
/// Is raised whenever Close() is called.
///
public event EventHandler? ReaderClosed;
bool _isSchemaOnly;
bool _isSequential;
internal NpgsqlNestedDataReader? CachedFreeNestedDataReader;
long _startTimestamp;
readonly ILogger _commandLogger;
internal NpgsqlDataReader(NpgsqlConnector connector)
{
Connector = connector;
_commandLogger = connector.CommandLogger;
}
internal void Init(
NpgsqlCommand command,
CommandBehavior behavior,
List statements,
long startTimestamp = 0,
Task? sendTask = null)
{
Debug.Assert(ColumnInfoCache is null);
Command = command;
_connection = command.InternalConnection;
_behavior = behavior;
_isSchemaOnly = _behavior.HasFlag(CommandBehavior.SchemaOnly);
_isSequential = _behavior.HasFlag(CommandBehavior.SequentialAccess);
_statements = statements;
StatementIndex = -1;
_sendTask = sendTask;
State = ReaderState.BetweenResults;
_recordsAffected = null;
_startTimestamp = startTimestamp;
}
#region Read
///
/// Advances the reader to the next record in a result set.
///
/// true if there are more rows; otherwise false.
///
/// The default position of a data reader is before the first record. Therefore, you must call Read to begin accessing data.
///
public override bool Read()
{
CheckClosedOrDisposed();
return TryRead()?.Result ?? Read(false).GetAwaiter().GetResult();
}
///
/// This is the asynchronous version of
///
///
/// An optional token to cancel the asynchronous operation. The default value is .
///
/// A task representing the asynchronous operation.
public override Task ReadAsync(CancellationToken cancellationToken)
{
CheckClosedOrDisposed();
return TryRead() ?? Read(async: true, cancellationToken);
}
// This is an optimized execution path that avoids calling any async methods for the (usual)
// case where the next row (or CommandComplete) is already in memory.
Task? TryRead()
{
switch (State)
{
case ReaderState.BeforeResult:
// First Read() after NextResult. Data row has already been processed.
State = ReaderState.InResult;
return TrueTask;
case ReaderState.InResult:
break;
default:
return FalseTask;
}
// We have a special case path for SingleRow.
if (_behavior.HasFlag(CommandBehavior.SingleRow) || !_canConsumeRowNonSequentially)
return null;
ConsumeRowNonSequential();
const int headerSize = sizeof(byte) + sizeof(int);
var buffer = Buffer;
var readPosition = buffer.ReadPosition;
var bytesLeft = buffer.FilledBytes - readPosition;
if (bytesLeft < headerSize)
return null;
var messageCode = (BackendMessageCode)buffer.ReadByte();
var len = buffer.ReadInt32() - sizeof(int); // Transmitted length includes itself
var isDataRow = messageCode is BackendMessageCode.DataRow;
// sizeof(short) is for the number of columns
var sufficientBytes = isDataRow && _isSequential ? headerSize + sizeof(short) : headerSize + len;
if (bytesLeft < sufficientBytes
|| !isDataRow && (_statements[StatementIndex].AppendErrorBarrier ?? Command.EnableErrorBarriers)
// Could be an error, let main read handle it.
|| Connector.ParseResultSetMessage(buffer, messageCode, len) is not { } msg)
{
buffer.ReadPosition = readPosition;
return null;
}
ProcessMessage(msg);
return isDataRow ? TrueTask : FalseTask;
}
async Task Read(bool async, CancellationToken cancellationToken = default)
{
using var registration = Connector.StartNestedCancellableOperation(cancellationToken);
try
{
switch (State)
{
case ReaderState.BeforeResult:
// First Read() after NextResult. Data row has already been processed.
State = ReaderState.InResult;
return true;
case ReaderState.InResult:
await ConsumeRow(async).ConfigureAwait(false);
if (_behavior.HasFlag(CommandBehavior.SingleRow))
{
// TODO: See optimization proposal in #410
await Consume(async).ConfigureAwait(false);
return false;
}
break;
case ReaderState.BetweenResults:
case ReaderState.Consumed:
case ReaderState.Closed:
case ReaderState.Disposed:
return false;
default:
ThrowHelper.ThrowArgumentOutOfRangeException();
return false;
}
var msg = await ReadMessage(async).ConfigureAwait(false);
switch (msg.Code)
{
case BackendMessageCode.DataRow:
ProcessMessage(msg);
return true;
case BackendMessageCode.CommandComplete:
case BackendMessageCode.EmptyQueryResponse:
ProcessMessage(msg);
if (_statements[StatementIndex].AppendErrorBarrier ?? Command.EnableErrorBarriers)
Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector);
return false;
default:
throw Connector.UnexpectedMessageReceived(msg.Code);
}
}
catch
{
// Break may have progressed the reader already.
if (State is not ReaderState.Closed)
State = ReaderState.Consumed;
throw;
}
}
ValueTask ReadMessage(bool async)
{
return _isSequential ? ReadMessageSequential(Connector, async) : Connector.ReadMessage(async);
static async ValueTask ReadMessageSequential(NpgsqlConnector connector, bool async)
{
var msg = await connector.ReadMessage(async, DataRowLoadingMode.Sequential).ConfigureAwait(false);
if (msg.Code == BackendMessageCode.DataRow)
{
// Make sure that the datarow's column count is already buffered
await connector.ReadBuffer.Ensure(2, async).ConfigureAwait(false);
return msg;
}
return msg;
}
}
#endregion
#region NextResult
///
/// Advances the reader to the next result when reading the results of a batch of statements.
///
///
public override bool NextResult() => (_isSchemaOnly ? NextResultSchemaOnly(false) : NextResult(false))
.GetAwaiter().GetResult();
///
/// This is the asynchronous version of NextResult.
///
///
/// An optional token to cancel the asynchronous operation. The default value is .
///
/// A task representing the asynchronous operation.
public override Task NextResultAsync(CancellationToken cancellationToken)
=> _isSchemaOnly
? NextResultSchemaOnly(async: true, cancellationToken: cancellationToken)
: NextResult(async: true, cancellationToken: cancellationToken);
///
/// Internal implementation of NextResult
///
async Task NextResult(bool async, bool isConsuming = false, CancellationToken cancellationToken = default)
{
Debug.Assert(!_isSchemaOnly);
CheckClosedOrDisposed();
if (State is ReaderState.Consumed)
return false;
try
{
using var registration = isConsuming ? default : Connector.StartNestedCancellableOperation(cancellationToken);
// If we're in the middle of a resultset, consume it
if (State is ReaderState.BeforeResult or ReaderState.InResult)
await ConsumeResultSet(async).ConfigureAwait(false);
Debug.Assert(State is ReaderState.BetweenResults);
_hasRows = false;
var statements = _statements;
var statementIndex = StatementIndex;
if (statementIndex >= 0)
{
if (RowDescription is { } description && statements[statementIndex].IsPrepared && ColumnInfoCache is { } cache)
description.SetConverterInfoCache(new(cache, 0, _numColumns));
if (statementIndex is 0 && _behavior.HasFlag(CommandBehavior.SingleResult) && !isConsuming)
{
await Consume(async).ConfigureAwait(false);
return false;
}
}
// We are now at the end of the previous result set. Read up to the next result set, if any.
// Non-prepared statements receive ParseComplete, BindComplete, DescriptionRow/NoData,
// prepared statements receive only BindComplete
for (statementIndex = ++StatementIndex; statementIndex < statements.Count; statementIndex = ++StatementIndex)
{
var statement = statements[statementIndex];
IBackendMessage msg;
if (statement.TryGetPrepared(out var preparedStatement))
{
Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector);
RowDescription = preparedStatement.Description;
}
else // Non-prepared/preparing flow
{
preparedStatement = statement.PreparedStatement;
if (preparedStatement != null)
{
Debug.Assert(!preparedStatement.IsPrepared);
if (preparedStatement.StatementBeingReplaced != null)
{
Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector);
preparedStatement.StatementBeingReplaced.CompleteUnprepare();
preparedStatement.StatementBeingReplaced = null;
}
}
Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector);
if (statement.IsPreparing)
{
preparedStatement!.State = PreparedState.Prepared;
Connector.PreparedStatementManager.NumPrepared++;
statement.IsPreparing = false;
}
Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector);
msg = await Connector.ReadMessage(async).ConfigureAwait(false);
RowDescription = statement.Description = msg.Code switch
{
BackendMessageCode.NoData => null,
// RowDescription messages are cached on the connector, but if we're auto-preparing, we need to
// clone our own copy which will last beyond the lifetime of this invocation.
BackendMessageCode.RowDescription => preparedStatement == null
? (RowDescriptionMessage)msg
: ((RowDescriptionMessage)msg).Clone(),
_ => throw Connector.UnexpectedMessageReceived(msg.Code)
};
}
if (RowDescription is not null)
{
if (ColumnInfoCache?.Length >= RowDescription.Count)
Array.Clear(ColumnInfoCache, 0, RowDescription.Count);
else
{
if (ColumnInfoCache is { } cache)
ArrayPool.Shared.Return(cache, clearArray: true);
ColumnInfoCache = ArrayPool.Shared.Rent(RowDescription.Count);
}
if (statement.IsPrepared)
RowDescription.LoadConverterInfoCache(ColumnInfoCache);
}
else
{
// Statement did not generate a resultset (e.g. INSERT)
// Read and process its completion message and move on to the next statement
// No need to read sequentially as it's not a DataRow
msg = await Connector.ReadMessage(async).ConfigureAwait(false);
switch (msg.Code)
{
case BackendMessageCode.CommandComplete:
case BackendMessageCode.EmptyQueryResponse:
break;
case BackendMessageCode.CopyInResponse:
throw Connector.Break(new NotSupportedException(
"COPY isn't supported in regular command execution - see https://www.npgsql.org/doc/copy.html for documentation on COPY with Npgsql. " +
"If you are trying to execute a SQL script created by pg_dump, pass the '--inserts' switch to disable generating COPY statements."));
case BackendMessageCode.CopyOutResponse:
throw Connector.Break(new NotSupportedException(
"COPY isn't supported in regular command execution - see https://www.npgsql.org/doc/copy.html for documentation on COPY with Npgsql."));
default:
throw Connector.UnexpectedMessageReceived(msg.Code);
}
ProcessMessage(msg);
if (statement.AppendErrorBarrier ?? Command.EnableErrorBarriers)
Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector);
continue;
}
if (!Command.IsWrappedByBatch && StatementIndex == 0 && Command._parameters?.HasOutputParameters == true)
{
// If output parameters are present and this is the first row of the first resultset,
// we must always read it in non-sequential mode because it will be traversed twice (once
// here for the parameters, then as a regular row).
msg = await Connector.ReadMessage(async).ConfigureAwait(false);
ProcessMessage(msg);
if (msg.Code == BackendMessageCode.DataRow)
PopulateOutputParameters();
}
else
{
msg = await ReadMessage(async).ConfigureAwait(false);
ProcessMessage(msg);
}
switch (msg.Code)
{
case BackendMessageCode.DataRow:
Connector.State = ConnectorState.Fetching;
return true;
case BackendMessageCode.CommandComplete:
if (statement.AppendErrorBarrier ?? Command.EnableErrorBarriers)
Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector);
return true;
default:
Connector.UnexpectedMessageReceived(msg.Code);
break;
}
}
// There are no more queries, we're done. Read the RFQ.
if (_statements.Count is 0 || !(_statements[_statements.Count - 1].AppendErrorBarrier ?? Command.EnableErrorBarriers))
Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector);
State = ReaderState.Consumed;
RowDescription = null;
return false;
}
catch (Exception e)
{
if (e is PostgresException postgresException && StatementIndex >= 0 && StatementIndex < _statements.Count)
{
var statement = _statements[StatementIndex];
// Reference the triggering statement from the exception
postgresException.BatchCommand = statement;
// Prevent the command or batch from being recycled (by the connection) when it's disposed. This is important since
// the exception is very likely to escape the using statement of the command, and by that time some other user may
// already be using the recycled instance.
Command.IsCacheable = false;
// If the schema of a table changes after a statement is prepared on that table, PostgreSQL errors with
// 0A000: cached plan must not change result type. 0A000 seems like a non-specific code, but it's very unlikely the
// statement would successfully execute anyway, so invalidate the prepared statement.
if (postgresException.SqlState == PostgresErrorCodes.FeatureNotSupported &&
statement.PreparedStatement is { } preparedStatement)
{
preparedStatement.State = PreparedState.Invalidated;
Command.ResetPreparation();
foreach (var s in Command.InternalBatchCommands)
s.ResetPreparation();
}
}
// For the statement that errored, if it was being prepared we need to update our bookkeeping to put them back in unprepared
// state.
for (; StatementIndex < _statements.Count; StatementIndex++)
{
var statement = _statements[StatementIndex];
if (statement.IsPreparing)
{
statement.IsPreparing = false;
statement.PreparedStatement!.AbortPrepare();
}
// In normal, non-isolated batching, we've consumed the result set and are done.
// However, if the command has error barrier, we now have to consume results from the commands after it (unless it's the
// last one).
// Note that Consume calls NextResult (this method) recursively, the isConsuming flag tells us we're in this mode.
if ((statement.AppendErrorBarrier ?? Command.EnableErrorBarriers) && StatementIndex < _statements.Count - 1)
{
if (isConsuming)
throw;
switch (State)
{
case ReaderState.Consumed:
case ReaderState.Closed:
case ReaderState.Disposed:
// The exception may have caused the connector to break (e.g. I/O), and so the reader is already closed.
break;
default:
// We provide Consume with the first exception which we've just caught.
// If it encounters other exceptions while consuming the rest of the result set, it will raise an AggregateException,
// otherwise it will rethrow this first exception.
await Consume(async, firstException: e).ConfigureAwait(false);
break; // Never reached, Consume always throws above
}
}
}
// Break may have progressed the reader already.
if (State is not ReaderState.Closed)
State = ReaderState.Consumed;
throw;
}
async ValueTask ConsumeResultSet(bool async)
{
await ConsumeRow(async).ConfigureAwait(false);
while (true)
{
var completedMsg = await Connector.ReadMessage(async, DataRowLoadingMode.Skip).ConfigureAwait(false);
switch (completedMsg.Code)
{
case BackendMessageCode.CommandComplete:
case BackendMessageCode.EmptyQueryResponse:
ProcessMessage(completedMsg);
var statement = _statements[StatementIndex];
if (statement.IsPrepared && ColumnInfoCache is not null)
RowDescription!.SetConverterInfoCache(new(ColumnInfoCache, 0, _numColumns));
if (statement.AppendErrorBarrier ?? Command.EnableErrorBarriers)
Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector);
break;
default:
// TODO if we hit an ErrorResponse here (PG doesn't do this *today*) we should probably throw.
continue;
}
break;
}
}
}
void PopulateOutputParameters()
{
// The first row in a stored procedure command that has output parameters needs to be traversed twice -
// once for populating the output parameters and once for the actual result set traversal. So in this
// case we can't be sequential.
Debug.Assert(StatementIndex == 0);
Debug.Assert(RowDescription != null);
Debug.Assert(State == ReaderState.BeforeResult);
var currentPosition = Buffer.ReadPosition;
// Temporarily set our state to InResult to allow us to read the values
State = ReaderState.InResult;
var pending = new Queue