using System;
using System.Collections;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Data;
using System.Data.Common;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Npgsql.BackendMessages;
using Npgsql.Logging;
using Npgsql.PostgresTypes;
using Npgsql.Schema;
using Npgsql.TypeHandlers;
using Npgsql.TypeHandling;
using Npgsql.Util;
using NpgsqlTypes;
using static Npgsql.Util.Statics;
#pragma warning disable CA2222 // Do not decrease inherited member visibility
namespace Npgsql
{
///
/// Reads a forward-only stream of rows from a data source.
///
#pragma warning disable CA1010
public sealed class NpgsqlDataReader : DbDataReader
#pragma warning restore CA1010
#if !NET461
, IDbColumnSchemaGenerator
#endif
{
internal NpgsqlCommand Command { get; private set; } = default!;
internal NpgsqlConnector Connector { get; }
NpgsqlConnection _connection = default!;
///
/// The behavior of the command with which this reader was executed.
///
CommandBehavior _behavior;
Task _sendTask = default!;
internal ReaderState State;
internal NpgsqlReadBuffer Buffer = default!;
///
/// Holds the list of statements being executed by this reader.
///
List _statements = default!;
///
/// The index of the current query resultset we're processing (within a multiquery)
///
internal int StatementIndex { get; private set; }
///
/// The number of columns in the current row
///
int _numColumns;
///
/// Records, for each column, its starting offset and length in the current row.
/// Used only in non-sequential mode.
///
readonly List<(int Offset, int Length)> _columns = new List<(int Offset, int Length)>();
///
/// The index of the column that we're on, i.e. that has already been parsed, is
/// is memory and can be retrieved. Initialized to -1, which means we're on the column
/// count (which comes before the first column).
///
int _column;
///
/// For streaming types (e.g. bytea), holds the byte length of the column.
/// Does not include the length prefix.
///
internal int ColumnLen;
internal int PosInColumn;
///
/// The position in the buffer at which the current data row message ends.
/// Used only in non-sequential mode.
///
int _dataMsgEnd;
int _charPos;
///
/// The RowDescription message for the current resultset being processed
///
internal RowDescriptionMessage? RowDescription;
ulong? _recordsAffected;
///
/// Whether the current result set has rows
///
bool _hasRows;
///
/// Is raised whenever Close() is called.
///
public event EventHandler? ReaderClosed;
bool _isSchemaOnly;
bool _isSequential;
///
/// A stream that has been opened on a column.
///
NpgsqlReadBuffer.ColumnStream? ColumnStream;
///
/// Used for internal temporary purposes
///
char[]? _tempCharBuf;
static readonly NpgsqlLogger Log = NpgsqlLogManager.CreateLogger(nameof(NpgsqlDataReader));
internal NpgsqlDataReader(NpgsqlConnector connector)
{
Connector = connector;
}
internal void Init(NpgsqlCommand command, CommandBehavior behavior, List statements, Task sendTask)
{
Command = command;
Debug.Assert(command.Connection == Connector.Connection);
_connection = command.Connection;
_behavior = behavior;
_isSchemaOnly = _behavior.HasFlag(CommandBehavior.SchemaOnly);
_isSequential = _behavior.HasFlag(CommandBehavior.SequentialAccess);
_statements = statements;
StatementIndex = -1;
_sendTask = sendTask;
State = ReaderState.BetweenResults;
_recordsAffected = null;
}
#region Read
///
/// Advances the reader to the next record in a result set.
///
/// true if there are more rows; otherwise false .
///
/// The default position of a data reader is before the first record. Therefore, you must call Read to begin accessing data.
///
public override bool Read()
{
CheckClosed();
var fastRead = TryFastRead();
return fastRead.HasValue
? fastRead.Value
: Read(false).GetAwaiter().GetResult();
}
///
/// This is the asynchronous version of The cancellation token is currently ignored.
///
/// The token to monitor for cancellation requests.
/// A task representing the asynchronous operation.
public override Task ReadAsync(CancellationToken cancellationToken)
{
CheckClosed();
if (cancellationToken.IsCancellationRequested)
return Task.FromCanceled(cancellationToken);
var fastRead = TryFastRead();
if (fastRead.HasValue)
return fastRead.Value ? PGUtil.TrueTask : PGUtil.FalseTask;
using (NoSynchronizationContextScope.Enter())
return Read(true);
}
bool? TryFastRead()
{
// This is an optimized execution path that avoids calling any async methods for the (usual)
// case where the next row (or CommandComplete) is already in memory.
if (_behavior.HasFlag(CommandBehavior.SingleRow))
return null;
switch (State)
{
case ReaderState.BeforeResult:
// First Read() after NextResult. Data row has already been processed.
State = ReaderState.InResult;
return true;
case ReaderState.InResult:
if (_isSequential)
return null;
ConsumeRowNonSequential();
break;
case ReaderState.BetweenResults:
case ReaderState.Consumed:
case ReaderState.Closed:
return false;
}
var readBuf = Connector.ReadBuffer;
if (readBuf.ReadBytesLeft < 5)
return null;
var messageCode = (BackendMessageCode)readBuf.ReadByte();
var len = readBuf.ReadInt32() - 4; // Transmitted length includes itself
if (messageCode != BackendMessageCode.DataRow || readBuf.ReadBytesLeft < len)
{
readBuf.ReadPosition -= 5;
return null;
}
var msg = Connector.ParseServerMessage(readBuf, messageCode, len, false)!;
Debug.Assert(msg.Code == BackendMessageCode.DataRow);
ProcessMessage(msg);
return true;
}
async Task Read(bool async)
{
switch (State)
{
case ReaderState.BeforeResult:
// First Read() after NextResult. Data row has already been processed.
State = ReaderState.InResult;
return true;
case ReaderState.InResult:
await ConsumeRow(async);
if (_behavior.HasFlag(CommandBehavior.SingleRow))
{
// TODO: See optimization proposal in #410
await Consume(async);
return false;
}
break;
case ReaderState.BetweenResults:
case ReaderState.Consumed:
case ReaderState.Closed:
return false;
default:
throw new ArgumentOutOfRangeException();
}
try
{
var msg2 = await ReadMessage(async);
ProcessMessage(msg2);
return msg2.Code == BackendMessageCode.DataRow;
}
catch (PostgresException)
{
State = ReaderState.Consumed;
throw;
}
}
ValueTask ReadMessage(bool async)
{
return _isSequential
? ReadMessageSequential(async)
: Connector.ReadMessage(async, DataRowLoadingMode.NonSequential);
async ValueTask ReadMessageSequential(bool async2)
{
var msg = await Connector.ReadMessage(async2, DataRowLoadingMode.Sequential);
if (msg.Code == BackendMessageCode.DataRow)
{
// Make sure that the datarow's column count is already buffered
await Connector.ReadBuffer.Ensure(2, async);
return msg;
}
return msg;
}
}
#endregion
#region NextResult
///
/// Advances the reader to the next result when reading the results of a batch of statements.
///
///
public sealed override bool NextResult()
{
try
{
return (_isSchemaOnly ? NextResultSchemaOnly(false) : NextResult(false))
.GetAwaiter().GetResult();
}
catch (PostgresException e)
{
State = ReaderState.Consumed;
if (StatementIndex >= 0 && StatementIndex < _statements.Count)
e.Statement = _statements[StatementIndex];
throw;
}
}
///
/// This is the asynchronous version of NextResult.
/// The parameter is currently ignored.
///
/// The token to monitor for cancellation requests.
/// A task representing the asynchronous operation.
public override Task NextResultAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
return Task.FromCanceled(cancellationToken);
try
{
using (NoSynchronizationContextScope.Enter())
return _isSchemaOnly ? NextResultSchemaOnly(true) : NextResult(true);
}
catch (PostgresException e)
{
State = ReaderState.Consumed;
if (StatementIndex >= 0 && StatementIndex < _statements.Count)
e.Statement = _statements[StatementIndex];
throw;
}
}
///
/// Internal implementation of NextResult
///
[MethodImpl(MethodImplOptions.AggressiveInlining)]
async Task NextResult(bool async, bool isConsuming=false)
{
CheckClosed();
IBackendMessage msg;
Debug.Assert(!_isSchemaOnly);
// If we're in the middle of a resultset, consume it
switch (State)
{
case ReaderState.BeforeResult:
case ReaderState.InResult:
await ConsumeRow(async);
while (true)
{
var completedMsg = await Connector.ReadMessage(async, DataRowLoadingMode.Skip);
switch (completedMsg.Code)
{
case BackendMessageCode.CompletedResponse:
case BackendMessageCode.EmptyQueryResponse:
ProcessMessage(completedMsg);
break;
default:
continue;
}
break;
}
break;
case ReaderState.BetweenResults:
break;
case ReaderState.Consumed:
case ReaderState.Closed:
return false;
default:
throw new ArgumentOutOfRangeException();
}
Debug.Assert(State == ReaderState.BetweenResults);
_hasRows = false;
if (_behavior.HasFlag(CommandBehavior.SingleResult) && StatementIndex == 0 && !isConsuming)
{
await Consume(async);
return false;
}
// We are now at the end of the previous result set. Read up to the next result set, if any.
// Non-prepared statements receive ParseComplete, BindComplete, DescriptionRow/NoData,
// prepared statements receive only BindComplete
for (StatementIndex++; StatementIndex < _statements.Count; StatementIndex++)
{
var statement = _statements[StatementIndex];
if (statement.IsPrepared)
{
Expect(await Connector.ReadMessage(async), Connector);
RowDescription = statement.Description;
}
else // Non-prepared flow
{
var pStatement = statement.PreparedStatement;
if (pStatement != null)
{
Debug.Assert(!pStatement.IsPrepared);
Debug.Assert(pStatement.Description == null);
if (pStatement.StatementBeingReplaced != null)
{
Expect(await Connector.ReadMessage(async), Connector);
pStatement.StatementBeingReplaced.CompleteUnprepare();
pStatement.StatementBeingReplaced = null;
}
}
try
{
Expect(await Connector.ReadMessage(async), Connector);
}
catch
{
// An exception occurred. Check if any statements we being prepared and revert our bookkeeping.
pStatement?.CompleteUnprepare();
throw;
}
pStatement?.CompletePrepare();
Expect(await Connector.ReadMessage(async), Connector);
msg = await Connector.ReadMessage(async);
switch (msg.Code)
{
case BackendMessageCode.NoData:
RowDescription = statement.Description = null;
break;
case BackendMessageCode.RowDescription:
// We have a resultset
RowDescription = statement.Description = (RowDescriptionMessage)msg;
break;
default:
throw Connector.UnexpectedMessageReceived(msg.Code);
}
}
if (RowDescription == null)
{
// Statement did not generate a resultset (e.g. INSERT)
// Read and process its completion message and move on to the next statement
msg = await ReadMessage(async);
switch (msg.Code)
{
case BackendMessageCode.CompletedResponse:
case BackendMessageCode.EmptyQueryResponse:
break;
default:
throw Connector.UnexpectedMessageReceived(msg.Code);
}
ProcessMessage(msg);
continue;
}
if (StatementIndex == 0 && Command.Parameters.HasOutputParameters)
{
// If output parameters are present and this is the first row of the first resultset,
// we must always read it in non-sequential mode because it will be traversed twice (once
// here for the parameters, then as a regular row).
msg = await Connector.ReadMessage(async, DataRowLoadingMode.NonSequential);
ProcessMessage(msg);
if (msg.Code == BackendMessageCode.DataRow)
PopulateOutputParameters();
}
else
{
msg = await ReadMessage(async);
ProcessMessage(msg);
}
switch (msg.Code)
{
case BackendMessageCode.DataRow:
case BackendMessageCode.CompletedResponse:
break;
default:
throw Connector.UnexpectedMessageReceived(msg.Code);
}
return true;
}
// There are no more queries, we're done. Read to the RFQ.
ProcessMessage(Expect(await Connector.ReadMessage(async), Connector));
RowDescription = null;
return false;
}
void PopulateOutputParameters()
{
// The first row in a stored procedure command that has output parameters needs to be traversed twice -
// once for populating the output parameters and once for the actual result set traversal. So in this
// case we can't be sequential.
Debug.Assert(Command.Parameters.Any(p => p.IsOutputDirection));
Debug.Assert(StatementIndex == 0);
Debug.Assert(RowDescription != null);
Debug.Assert(State == ReaderState.BeforeResult);
// Temporarily set our state to InResult to allow us to read the values
State = ReaderState.InResult;
var pending = new Queue();
var taken = new List();
for (var i = 0; i < FieldCount; i++)
{
if (Command.Parameters.TryGetValue(GetName(i), out var p) && p.IsOutputDirection)
{
p.Value = GetValue(i);
taken.Add(p);
}
else
pending.Enqueue(GetValue(i));
}
// Not sure where this odd behavior comes from: all output parameters which did not get matched by
// name now get populated with column values which weren't matched. Keeping this for backwards compat,
// opened #2252 for investigation.
foreach (var p in Command.Parameters.Where(p => p.IsOutputDirection && !taken.Contains(p)))
{
if (pending.Count == 0)
break;
p.Value = pending.Dequeue();
}
State = ReaderState.BeforeResult; // Set the state back
}
///
/// Note that in SchemaOnly mode there are no resultsets, and we read nothing from the backend (all
/// RowDescriptions have already been processed and are available)
///
async Task NextResultSchemaOnly(bool async)
{
Debug.Assert(_isSchemaOnly);
for (StatementIndex++; StatementIndex < _statements.Count; StatementIndex++)
{
var statement = _statements[StatementIndex];
if (statement.IsPrepared)
{
// Row descriptions have already been populated in the statement objects at the
// Prepare phase
RowDescription = _statements[StatementIndex].Description;
}
else
{
Expect(await Connector.ReadMessage(async), Connector);
Expect(await Connector.ReadMessage(async), Connector);
var msg = await Connector.ReadMessage(async);
switch (msg.Code)
{
case BackendMessageCode.NoData:
RowDescription = _statements[StatementIndex].Description = null;
break;
case BackendMessageCode.RowDescription:
// We have a resultset
RowDescription = _statements[StatementIndex].Description = (RowDescriptionMessage)msg;
Command.FixupRowDescription(RowDescription, StatementIndex == 0);
break;
default:
throw Connector.UnexpectedMessageReceived(msg.Code);
}
}
// Found a resultset
if (RowDescription != null)
return true;
}
// There are no more queries, we're done. Read to the RFQ.
if (!_statements.All(s => s.IsPrepared))
{
ProcessMessage(Expect(await Connector.ReadMessage(async), Connector));
RowDescription = null;
}
return false;
}
#endregion
#region ProcessMessage
internal void ProcessMessage(IBackendMessage msg)
{
switch (msg.Code)
{
case BackendMessageCode.DataRow:
ProcessDataRowMessage((DataRowMessage)msg);
return;
case BackendMessageCode.CompletedResponse:
var completed = (CommandCompleteMessage)msg;
switch (completed.StatementType)
{
case StatementType.Update:
case StatementType.Insert:
case StatementType.Delete:
case StatementType.Copy:
case StatementType.Move:
if (!_recordsAffected.HasValue)
_recordsAffected = 0;
_recordsAffected += completed.Rows;
break;
}
_statements[StatementIndex].ApplyCommandComplete(completed);
goto case BackendMessageCode.EmptyQueryResponse;
case BackendMessageCode.EmptyQueryResponse:
State = ReaderState.BetweenResults;
return;
case BackendMessageCode.ReadyForQuery:
State = ReaderState.Consumed;
return;
default:
throw new Exception("Received unexpected backend message of type " + msg.Code);
}
}
void ProcessDataRowMessage(DataRowMessage msg)
{
Connector.State = ConnectorState.Fetching;
// The connector's buffer can actually change between DataRows:
// If a large DataRow exceeding the connector's current read buffer arrives, and we're
// reading in non-sequential mode, a new oversize buffer is allocated. We thus have to
// recapture the connector's buffer on each new DataRow.
// Note that this can happen even in sequential mode, if the row description message is big
// (see #2003)
Buffer = Connector.ReadBuffer;
_hasRows = true;
_column = -1;
ColumnLen = -1;
PosInColumn = 0;
// We assume that the row's number of columns is identical to the description's
_numColumns = Buffer.ReadInt16();
Debug.Assert(_numColumns == RowDescription!.NumFields,
$"Row's number of columns ({_numColumns}) differs from the row description's ({RowDescription.NumFields})");
if (!_isSequential)
{
_dataMsgEnd = Buffer.ReadPosition + msg.Length - 2;
// Initialize our columns array with the offset and length of the first column
_columns.Clear();
var len = Buffer.ReadInt32();
_columns.Add((Buffer.ReadPosition, len));
}
switch (State)
{
case ReaderState.BetweenResults:
State = ReaderState.BeforeResult;
break;
case ReaderState.BeforeResult:
State = ReaderState.InResult;
break;
case ReaderState.InResult:
break;
default:
throw Connector.UnexpectedMessageReceived(BackendMessageCode.DataRow);
}
}
#endregion
///
/// Gets a value indicating the depth of nesting for the current row. Always returns zero.
///
public override int Depth => 0;
///
/// Gets a value indicating whether the data reader is closed.
///
public override bool IsClosed => State == ReaderState.Closed;
///
/// Gets the number of rows changed, inserted, or deleted by execution of the SQL statement.
///
public override int RecordsAffected => _recordsAffected.HasValue ? (int)_recordsAffected.Value : -1;
///
/// Returns details about each statement that this reader will or has executed.
///
///
/// Note that some fields (i.e. rows and oid) are only populated as the reader
/// traverses the result.
///
/// For commands with multiple queries, this exposes the number of rows affected on
/// a statement-by-statement basis, unlike
/// which exposes an aggregation across all statements.
///
public IReadOnlyList Statements => _statements.AsReadOnly();
///
/// Gets a value that indicates whether this DbDataReader contains one or more rows.
///
public override bool HasRows => State == ReaderState.Closed
? throw new InvalidOperationException("Invalid attempt to call HasRows when reader is closed.")
: _hasRows;
///
/// Indicates whether the reader is currently positioned on a row, i.e. whether reading a
/// column is possible.
/// This property is different from in that will
/// return true even if attempting to read a column will fail, e.g. before
/// has been called
///
[PublicAPI]
public bool IsOnRow => State == ReaderState.InResult;
///
/// Gets the name of the column, given the zero-based column ordinal.
///
/// The zero-based column ordinal.
/// The name of the specified column.
public override string GetName(int ordinal) => CheckRowDescriptionAndGetField(ordinal).Name;
///
/// Gets the number of columns in the current row.
///
public override int FieldCount
{
get
{
CheckClosed();
return RowDescription?.NumFields ?? 0;
}
}
#region Cleanup / Dispose
///
/// Consumes all result sets for this reader, leaving the connector ready for sending and processing further
/// queries
///
async Task Consume(bool async)
{
// Skip over the other result sets. Note that this does tally records affected
// from CommandComplete messages, and properly sets state for auto-prepared statements
if (_isSchemaOnly)
while (await NextResultSchemaOnly(async)) {}
else
while (await NextResult(async, true)) {}
}
///
/// Releases the resources used by the NpgsqlDataReader .
///
protected override void Dispose(bool disposing) => Close();
///
/// Closes the reader, allowing a new command to be executed.
///
public override void Close() => Close(false, false).GetAwaiter().GetResult();
///
/// Closes the reader, allowing a new command to be executed.
///
#if !NET461 && !NETSTANDARD2_0
public override Task CloseAsync()
#else
public Task CloseAsync()
#endif
=> Close(false, true);
internal async Task Close(bool connectionClosing, bool async)
{
if (State == ReaderState.Closed)
return;
switch (Connector.State)
{
case ConnectorState.Broken:
case ConnectorState.Closed:
// This may have happen because an I/O error while reading a value, or some non-safe
// exception thrown from a type handler. Or if the connection was closed while the reader
// was still open
State = ReaderState.Closed;
Command.State = CommandState.Idle;
ReaderClosed?.Invoke(this, EventArgs.Empty);
return;
}
if (State != ReaderState.Consumed)
await Consume(async);
await Cleanup(async, connectionClosing);
}
internal async Task Cleanup(bool async, bool connectionClosing=false)
{
Log.Trace("Cleaning up reader", Connector.Id);
// Make sure the send task for this command, which may have executed asynchronously and in
// parallel with the reading, has completed, throwing any exceptions it generated.
if (async)
await _sendTask;
else
_sendTask.GetAwaiter().GetResult();
State = ReaderState.Closed;
Command.State = CommandState.Idle;
Connector.CurrentReader = null;
Connector.EndUserAction();
NpgsqlEventSource.Log.CommandStop();
// If the reader is being closed as part of the connection closing, we don't apply
// the reader's CommandBehavior.CloseConnection
if (_behavior.HasFlag(CommandBehavior.CloseConnection) && !connectionClosing)
_connection.Close();
if (ReaderClosed != null)
{
ReaderClosed(this, EventArgs.Empty);
ReaderClosed = null;
}
}
#endregion
#region Simple value getters
///
/// Gets the value of the specified column as a Boolean.
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override bool GetBoolean(int ordinal) => GetFieldValue(ordinal);
///
/// Gets the value of the specified column as a byte.
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override byte GetByte(int ordinal) => GetFieldValue(ordinal);
///
/// Gets the value of the specified column as a single character.
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override char GetChar(int ordinal) => GetFieldValue(ordinal);
///
/// Gets the value of the specified column as a 16-bit signed integer.
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override short GetInt16(int ordinal) => GetFieldValue(ordinal);
///
/// Gets the value of the specified column as a 32-bit signed integer.
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override int GetInt32(int ordinal) => GetFieldValue(ordinal);
///
/// Gets the value of the specified column as a 64-bit signed integer.
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override long GetInt64(int ordinal) => GetFieldValue(ordinal);
///
/// Gets the value of the specified column as a object.
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override DateTime GetDateTime(int ordinal) => GetFieldValue(ordinal);
///
/// Gets the value of the specified column as an instance of .
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override string GetString(int ordinal) => GetFieldValue(ordinal);
///
/// Gets the value of the specified column as a object.
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override decimal GetDecimal(int ordinal) => GetFieldValue(ordinal);
///
/// Gets the value of the specified column as a double-precision floating point number.
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override double GetDouble(int ordinal) => GetFieldValue(ordinal);
///
/// Gets the value of the specified column as a single-precision floating point number.
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override float GetFloat(int ordinal) => GetFieldValue(ordinal);
///
/// Gets the value of the specified column as a globally-unique identifier (GUID).
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override Guid GetGuid(int ordinal) => GetFieldValue(ordinal);
///
/// Populates an array of objects with the column values of the current row.
///
/// An array of Object into which to copy the attribute columns.
/// The number of instances of in the array.
public override int GetValues(object[] values)
{
if (values == null)
throw new ArgumentNullException(nameof(values));
CheckResultSet();
var count = Math.Min(FieldCount, values.Length);
for (var i = 0; i < count; i++)
values[i] = GetValue(i);
return count;
}
///
/// Gets the value of the specified column as an instance of .
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override object this[int ordinal] => GetValue(ordinal);
#endregion
#region Provider-specific simple type getters
///
/// Gets the value of the specified column as an ,
/// Npgsql's provider-specific type for dates.
///
///
/// PostgreSQL's date type represents dates from 4713 BC to 5874897 AD, while .NET's DateTime
/// only supports years from 1 to 1999. If you require years outside this range use this accessor.
/// The standard method will also return this type, but has
/// the disadvantage of boxing the value.
/// See http://www.postgresql.org/docs/current/static/datatype-datetime.html
///
/// The zero-based column ordinal.
/// The value of the specified column.
public NpgsqlDate GetDate(int ordinal) => GetFieldValue(ordinal);
///
/// Gets the value of the specified column as a TimeSpan,
///
///
/// PostgreSQL's interval type has has a resolution of 1 microsecond and ranges from
/// -178000000 to 178000000 years, while .NET's TimeSpan has a resolution of 100 nanoseconds
/// and ranges from roughly -29247 to 29247 years.
/// See http://www.postgresql.org/docs/current/static/datatype-datetime.html
///
/// The zero-based column ordinal.
/// The value of the specified column.
public TimeSpan GetTimeSpan(int ordinal) => GetFieldValue(ordinal);
///
/// Gets the value of the specified column as an ,
/// Npgsql's provider-specific type for time spans.
///
///
/// PostgreSQL's interval type has has a resolution of 1 microsecond and ranges from
/// -178000000 to 178000000 years, while .NET's TimeSpan has a resolution of 100 nanoseconds
/// and ranges from roughly -29247 to 29247 years. If you require values from outside TimeSpan's
/// range use this accessor.
/// The standard ADO.NET method will also return this
/// type, but has the disadvantage of boxing the value.
/// See http://www.postgresql.org/docs/current/static/datatype-datetime.html
///
/// The zero-based column ordinal.
/// The value of the specified column.
public NpgsqlTimeSpan GetInterval(int ordinal) => GetFieldValue(ordinal);
///
/// Gets the value of the specified column as an ,
/// Npgsql's provider-specific type for date/time timestamps. Note that this type covers
/// both PostgreSQL's "timestamp with time zone" and "timestamp without time zone" types,
/// which differ only in how they are converted upon input/output.
///
///
/// PostgreSQL's timestamp type represents dates from 4713 BC to 5874897 AD, while .NET's DateTime
/// only supports years from 1 to 1999. If you require years outside this range use this accessor.
/// The standard method will also return this type, but has
/// the disadvantage of boxing the value.
/// See http://www.postgresql.org/docs/current/static/datatype-datetime.html
///
/// The zero-based column ordinal.
/// The value of the specified column.
public NpgsqlDateTime GetTimeStamp(int ordinal) => GetFieldValue(ordinal);
#endregion
#region Special binary getters
///
/// Reads a stream of bytes from the specified column, starting at location indicated by dataOffset, into the buffer, starting at the location indicated by bufferOffset.
///
/// The zero-based column ordinal.
/// The index within the row from which to begin the read operation.
/// The buffer into which to copy the data.
/// The index with the buffer to which the data will be copied.
/// The maximum number of characters to read.
/// The actual number of bytes read.
public override long GetBytes(int ordinal, long dataOffset, byte[]? buffer, int bufferOffset, int length)
{
if (dataOffset < 0 || dataOffset > int.MaxValue)
throw new ArgumentOutOfRangeException(nameof(dataOffset), dataOffset, $"dataOffset must be between {0} and {int.MaxValue}");
if (buffer != null && (bufferOffset < 0 || bufferOffset >= buffer.Length + 1))
throw new IndexOutOfRangeException($"bufferOffset must be between {0} and {(buffer.Length)}");
if (buffer != null && (length < 0 || length > buffer.Length - bufferOffset))
throw new IndexOutOfRangeException($"length must be between {0} and {buffer.Length - bufferOffset}");
var fieldDescription = CheckRowAndGetField(ordinal);
var handler = fieldDescription.Handler;
if (!(handler is ByteaHandler))
throw new InvalidCastException("GetBytes() not supported for type " + fieldDescription.Name);
SeekToColumn(ordinal, false).GetAwaiter().GetResult();
if (ColumnLen == -1)
throw new InvalidCastException("Column is null");
if (buffer == null)
return ColumnLen;
var dataOffset2 = (int)dataOffset;
SeekInColumn(dataOffset2, false).GetAwaiter().GetResult();
// Attempt to read beyond the end of the column
if (dataOffset2 + length > ColumnLen)
length = Math.Max(ColumnLen - dataOffset2, 0);
var left = length;
while (left > 0)
{
var read = Buffer.ReadBytes(buffer, bufferOffset, left, false).GetAwaiter().GetResult();
bufferOffset += read;
left -= read;
}
PosInColumn += length;
return length;
}
///
/// Retrieves data as a .
///
/// The zero-based column ordinal.
/// The returned object.
public override Stream GetStream(int ordinal) => GetStream(ordinal, false).Result;
///
/// Retrieves data as a .
///
/// The zero-based column ordinal.
/// The token to monitor for cancellation requests. The default value is .
/// The returned object.
public Task GetStreamAsync(int ordinal, CancellationToken cancellationToken = default)
{
if (cancellationToken.IsCancellationRequested)
return Task.FromCanceled(cancellationToken);
using (NoSynchronizationContextScope.Enter())
return GetStream(ordinal, true).AsTask();
}
ValueTask GetStream(int ordinal, bool async)
{
var fieldDescription = CheckRowAndGetField(ordinal);
if (!(fieldDescription.Handler is ByteaHandler))
throw new InvalidCastException($"GetStream() not supported for type {fieldDescription.Handler.PgDisplayName}");
return GetStreamInternal(ordinal, async);
}
ValueTask GetStreamInternal(int ordinal, bool async)
{
if (ColumnStream != null && !ColumnStream.IsDisposed)
throw new InvalidOperationException("A stream is already open for this reader");
var t = SeekToColumn(ordinal, async);
if (!t.IsCompleted)
return new ValueTask(GetStreamLong(t));
if (ColumnLen == -1)
throw new InvalidCastException("Column is null");
PosInColumn += ColumnLen;
return new ValueTask(ColumnStream = (NpgsqlReadBuffer.ColumnStream)Buffer.GetStream(ColumnLen, !_isSequential));
async Task GetStreamLong(Task seekTask)
{
await seekTask;
if (ColumnLen == -1)
throw new InvalidCastException("Column is null");
PosInColumn += ColumnLen;
return ColumnStream = (NpgsqlReadBuffer.ColumnStream)Buffer.GetStream(ColumnLen, !_isSequential);
}
}
#endregion
#region Special text getters
///
/// Reads a stream of characters from the specified column, starting at location indicated by dataOffset, into the buffer, starting at the location indicated by bufferOffset.
///
/// The zero-based column ordinal.
/// The index within the row from which to begin the read operation.
/// The buffer into which to copy the data.
/// The index with the buffer to which the data will be copied.
/// The maximum number of characters to read.
/// The actual number of characters read.
public override long GetChars(int ordinal, long dataOffset, char[]? buffer, int bufferOffset, int length)
{
if (dataOffset < 0 || dataOffset > int.MaxValue)
throw new ArgumentOutOfRangeException(nameof(dataOffset), dataOffset, $"dataOffset must be between {0} and {int.MaxValue}");
if (buffer != null && (bufferOffset < 0 || bufferOffset >= buffer.Length + 1))
throw new IndexOutOfRangeException($"bufferOffset must be between {0} and {(buffer.Length)}");
if (buffer != null && (length < 0 || length > buffer.Length - bufferOffset))
throw new IndexOutOfRangeException($"length must be between {0} and {buffer.Length - bufferOffset}");
var fieldDescription = CheckRowAndGetField(ordinal);
var handler = fieldDescription.Handler as TextHandler;
if (handler == null)
throw new InvalidCastException("GetChars() not supported for type " + fieldDescription.Name);
SeekToColumn(ordinal, false).GetAwaiter().GetResult();
if (ColumnLen == -1)
throw new InvalidCastException("Column is null");
if (PosInColumn == 0)
_charPos = 0;
var decoder = Buffer.TextEncoding.GetDecoder();
if (buffer == null)
{
// Note: Getting the length of a text column means decoding the entire field,
// very inefficient and also consumes the column in sequential mode. But this seems to
// be SqlClient's behavior as well.
var (bytesSkipped, charsSkipped) = SkipChars(decoder, int.MaxValue, ColumnLen - PosInColumn);
Debug.Assert(bytesSkipped == ColumnLen - PosInColumn);
PosInColumn += bytesSkipped;
_charPos += charsSkipped;
return _charPos;
}
if (PosInColumn == ColumnLen || dataOffset < _charPos)
{
// Either the column has already been read (e.g. GetString()) or a previous GetChars()
// has positioned us in the column *after* the requested read start offset. Seek back
// (this will throw for sequential)
SeekInColumn(0, false).GetAwaiter().GetResult();
_charPos = 0;
}
if (dataOffset > _charPos)
{
var charsToSkip = (int)dataOffset - _charPos;
var (bytesSkipped, charsSkipped) = SkipChars(decoder, charsToSkip, ColumnLen - PosInColumn);
decoder.Reset();
PosInColumn += bytesSkipped;
_charPos += charsSkipped;
if (charsSkipped < charsToSkip) // data offset is beyond the column's end
return 0;
}
// We're now positioned at the start of the segment of characters we need to read.
if (length == 0)
return 0;
var (bytesRead, charsRead) = DecodeChars(decoder, buffer, bufferOffset, length, ColumnLen - PosInColumn);
PosInColumn += bytesRead;
_charPos += charsRead;
return charsRead;
}
(int BytesRead, int CharsRead) DecodeChars(Decoder decoder, char[] output, int outputOffset, int charCount, int byteCount)
{
var (bytesRead, charsRead) = (0, 0);
while (true)
{
Buffer.Ensure(1); // Make sure we have at least some data
var maxBytes = Math.Min(byteCount - bytesRead, Buffer.ReadBytesLeft);
decoder.Convert(Buffer.Buffer, Buffer.ReadPosition, maxBytes, output, outputOffset, charCount - charsRead, false,
out var bytesUsed, out var charsUsed, out var completed);
Buffer.ReadPosition += bytesUsed;
bytesRead += bytesUsed;
charsRead += charsUsed;
if (charsRead == charCount || bytesRead == byteCount)
break;
outputOffset += charsUsed;
Buffer.Clear();
}
return (bytesRead, charsRead);
}
internal (int BytesSkipped, int CharsSkipped) SkipChars(Decoder decoder, int charCount, int byteCount)
{
// TODO: Allocate on the stack with Span
if (_tempCharBuf == null)
_tempCharBuf = new char[1024];
var (charsSkipped, bytesSkipped) = (0, 0);
while (charsSkipped < charCount && bytesSkipped < byteCount)
{
var (bytesRead, charsRead) = DecodeChars(decoder, _tempCharBuf, 0, Math.Min(charCount, _tempCharBuf.Length), byteCount);
bytesSkipped += bytesRead;
charsSkipped += charsRead;
}
return (bytesSkipped, charsSkipped);
}
///
/// Retrieves data as a .
///
/// The zero-based column ordinal.
/// The returned object.
public override TextReader GetTextReader(int ordinal)
=> GetTextReader(ordinal, false).Result;
///
/// Retrieves data as a .
///
/// The zero-based column ordinal.
/// The token to monitor for cancellation requests. The default value is .
/// The returned object.
public Task GetTextReaderAsync(int ordinal, CancellationToken cancellationToken = default)
{
if (cancellationToken.IsCancellationRequested)
return Task.FromCanceled(cancellationToken);
using (NoSynchronizationContextScope.Enter())
return GetTextReader(ordinal, true).AsTask();
}
async ValueTask GetTextReader(int ordinal, bool async)
{
var fieldDescription = CheckRowAndGetField(ordinal);
if (!(fieldDescription.Handler is ITextReaderHandler handler))
throw new InvalidCastException($"GetTextReader() not supported for type {fieldDescription.Handler.PgDisplayName}");
var stream = async
? await GetStreamInternal(ordinal, async)
: GetStreamInternal(ordinal, async).Result;
return handler.GetTextReader(stream);
}
#endregion
#region GetFieldValue
///
/// Asynchronously gets the value of the specified column as a type.
///
/// The type of the value to be returned.
/// The type of the value to be returned.
/// The token to monitor for cancellation requests.
///
public override Task GetFieldValueAsync(int ordinal, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
return Task.FromCanceled(cancellationToken);
// In non-sequential, we know that the column is already buffered - no I/O will take place
if (!_isSequential)
return Task.FromResult(GetFieldValue(ordinal));
using (NoSynchronizationContextScope.Enter())
return GetFieldValueSequential(ordinal, true).AsTask();
}
///
/// Synchronously gets the value of the specified column as a type.
///
/// Synchronously gets the value of the specified column as a type.
/// The column to be retrieved.
/// The column to be retrieved.
public override T GetFieldValue(int ordinal)
{
if (_isSequential)
return GetFieldValueSequential(ordinal, false).GetAwaiter().GetResult();
// In non-sequential, we know that the column is already buffered - no I/O will take place
var fieldDescription = CheckRowAndGetField(ordinal);
SeekToColumnNonSequential(ordinal);
if (ColumnLen == -1)
{
#pragma warning disable CS8653 // A default expression introduces a null value when 'T' is a non-nullable reference type.
// When T is a Nullable, we support returning null
if (NullableHandler.Exists)
return default;
#pragma warning restore CS8653
if (typeof(T) == typeof(object))
return (T)(object)DBNull.Value;
throw new InvalidCastException("Column is null");
}
try
{
return NullableHandler.Exists
? NullableHandler.Read(fieldDescription.Handler, Buffer, ColumnLen, fieldDescription)
: typeof(T) == typeof(object)
? (T)fieldDescription.Handler.ReadAsObject(Buffer, ColumnLen, fieldDescription)
: fieldDescription.Handler.Read(Buffer, ColumnLen, fieldDescription);
}
catch (NpgsqlSafeReadException e)
{
throw e.OriginalException;
}
catch
{
Connector.Break();
throw;
}
finally
{
// Important in case a NpgsqlSafeReadException was thrown, position must still be updated
PosInColumn += ColumnLen;
}
}
async ValueTask GetFieldValueSequential(int column, bool async)
{
var fieldDescription = CheckRowAndGetField(column);
await SeekToColumnSequential(column, async);
CheckColumnStart();
if (ColumnLen == -1)
{
#pragma warning disable CS8653 // A default expression introduces a null value when 'T' is a non-nullable reference type.
// When T is a Nullable, we support returning null
if (NullableHandler.Exists)
return default;
#pragma warning restore CS8653
if (typeof(T) == typeof(object))
return (T)(object)DBNull.Value;
throw new InvalidCastException("Column is null");
}
try
{
return NullableHandler.Exists
? ColumnLen <= Buffer.ReadBytesLeft
? NullableHandler.Read(fieldDescription.Handler, Buffer, ColumnLen, fieldDescription)
: await NullableHandler.ReadAsync(fieldDescription.Handler, Buffer, ColumnLen, async, fieldDescription)
: typeof(T) == typeof(object)
? ColumnLen <= Buffer.ReadBytesLeft
? (T)fieldDescription.Handler.ReadAsObject(Buffer, ColumnLen, fieldDescription)
: (T)await fieldDescription.Handler.ReadAsObject(Buffer, ColumnLen, async, fieldDescription)
: ColumnLen <= Buffer.ReadBytesLeft
? fieldDescription.Handler.Read(Buffer, ColumnLen, fieldDescription)
: await fieldDescription.Handler.Read(Buffer, ColumnLen, async, fieldDescription);
}
catch (NpgsqlSafeReadException e)
{
throw e.OriginalException;
}
catch
{
Connector.Break();
throw;
}
finally
{
// Important in case a NpgsqlSafeReadException was thrown, position must still be updated
PosInColumn += ColumnLen;
}
}
#endregion
#region GetValue
///
/// Gets the value of the specified column as an instance of .
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override object GetValue(int ordinal)
{
var fieldDescription = CheckRowAndGetField(ordinal);
if (_isSequential) {
SeekToColumnSequential(ordinal, false).GetAwaiter().GetResult();
CheckColumnStart();
} else
SeekToColumnNonSequential(ordinal);
if (ColumnLen == -1)
return DBNull.Value;
object result;
try
{
result = _isSequential
? fieldDescription.Handler.ReadAsObject(Buffer, ColumnLen, false, fieldDescription).GetAwaiter().GetResult()
: fieldDescription.Handler.ReadAsObject(Buffer, ColumnLen, fieldDescription);
}
catch (NpgsqlSafeReadException e)
{
throw e.OriginalException;
}
catch
{
Connector.Break();
throw;
}
finally
{
// Important in case a NpgsqlSafeReadException was thrown, position must still be updated
PosInColumn += ColumnLen;
}
// Used for Entity Framework <= 6 compability
var objectResultType = Command.ObjectResultTypes?[ordinal];
if (objectResultType != null)
{
result = objectResultType == typeof(DateTimeOffset)
? new DateTimeOffset((DateTime)result)
: Convert.ChangeType(result, objectResultType)!;
}
return result;
}
///
/// Gets the value of the specified column as an instance of .
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override object GetProviderSpecificValue(int ordinal)
{
var fieldDescription = CheckRowAndGetField(ordinal);
if (_isSequential)
{
SeekToColumnSequential(ordinal, false).GetAwaiter().GetResult();
CheckColumnStart();
}
else
SeekToColumnNonSequential(ordinal);
if (ColumnLen == -1)
return DBNull.Value;
try
{
return _isSequential
? fieldDescription.Handler.ReadPsvAsObject(Buffer, ColumnLen, false, fieldDescription).GetAwaiter().GetResult()
: fieldDescription.Handler.ReadPsvAsObject(Buffer, ColumnLen, fieldDescription);
}
catch (NpgsqlSafeReadException e)
{
throw e.OriginalException;
}
catch
{
Connector.Break();
throw;
}
finally
{
// Important in case a NpgsqlSafeReadException was thrown, position must still be updated
PosInColumn += ColumnLen;
}
}
///
/// Gets the value of the specified column as an instance of .
///
/// The name of the column.
/// The value of the specified column.
public override object this[string name] => GetValue(GetOrdinal(name));
#endregion
#region IsDBNull
///
/// Gets a value that indicates whether the column contains nonexistent or missing values.
///
/// The zero-based column ordinal.
/// true if the specified column is equivalent to ; otherwise false .
public override bool IsDBNull(int ordinal)
{
CheckRowAndGetField(ordinal);
if (_isSequential)
SeekToColumnSequential(ordinal, false).GetAwaiter().GetResult();
else
SeekToColumnNonSequential(ordinal);
return ColumnLen == -1;
}
///
/// An asynchronous version of , which gets a value that indicates whether the column contains non-existent or missing values.
/// The parameter is currently ignored.
///
/// The zero-based column to be retrieved.
/// The token to monitor for cancellation requests.
/// true if the specified column value is equivalent to otherwise false .
public override async Task IsDBNullAsync(int ordinal, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
CheckRowAndGetField(ordinal);
if (!_isSequential)
return IsDBNull(ordinal);
using (NoSynchronizationContextScope.Enter())
{
await SeekToColumn(ordinal, true);
return ColumnLen == -1;
}
}
#endregion
#region Other public accessors
///
/// Gets the column ordinal given the name of the column.
///
/// The name of the column.
/// The zero-based column ordinal.
public override int GetOrdinal(string name)
{
if (string.IsNullOrEmpty(name))
throw new ArgumentException("name cannot be empty", nameof(name));
if (State == ReaderState.Closed)
throw new InvalidOperationException("The reader is closed");
if (RowDescription is null)
throw new InvalidOperationException("No resultset is currently being traversed");
return RowDescription.GetFieldIndex(name);
}
///
/// Gets a representation of the PostgreSQL data type for the specified field.
/// The returned representation can be used to access various information about the field.
///
/// The zero-based column index.
[PublicAPI]
public PostgresType GetPostgresType(int ordinal) => CheckRowDescriptionAndGetField(ordinal).PostgresType;
///
/// Gets the data type information for the specified field.
/// This will be the PostgreSQL type name (e.g. double precision), not the .NET type
/// (see for that).
///
/// The zero-based column index.
public override string GetDataTypeName(int ordinal) => CheckRowDescriptionAndGetField(ordinal).TypeDisplayName;
///
/// Gets the OID for the PostgreSQL type for the specified field, as it appears in the pg_type table.
///
///
/// This is a PostgreSQL-internal value that should not be relied upon and should only be used for
/// debugging purposes.
///
/// The zero-based column index.
public uint GetDataTypeOID(int ordinal) => CheckRowDescriptionAndGetField(ordinal).TypeOID;
///
/// Gets the data type of the specified column.
///
/// The zero-based column ordinal.
/// The data type of the specified column.
public override Type GetFieldType(int ordinal)
=> Command.ObjectResultTypes?[ordinal]
?? CheckRowDescriptionAndGetField(ordinal).FieldType;
///
/// Returns the provider-specific field type of the specified column.
///
/// The zero-based column ordinal.
/// The Type object that describes the data type of the specified column.
public override Type GetProviderSpecificFieldType(int ordinal)
{
var fieldDescription = CheckRowDescriptionAndGetField(ordinal);
return fieldDescription.Handler.GetProviderSpecificFieldType(fieldDescription);
}
///
/// Gets all provider-specific attribute columns in the collection for the current row.
///
/// An array of Object into which to copy the attribute columns.
/// The number of instances of in the array.
public override int GetProviderSpecificValues(object[] values)
{
if (values == null)
throw new ArgumentNullException(nameof(values));
if (State != ReaderState.InResult)
throw new InvalidOperationException("No row is available");
var count = Math.Min(FieldCount, values.Length);
for (var i = 0; i < count; i++)
values[i] = GetProviderSpecificValue(i);
return count;
}
///
/// Returns an that can be used to iterate through the rows in the data reader.
///
/// An that can be used to iterate through the rows in the data reader.
public override IEnumerator GetEnumerator()
=> new DbEnumerator(this);
///
/// Returns schema information for the columns in the current resultset.
///
///
public ReadOnlyCollection GetColumnSchema()
=> RowDescription == null || RowDescription.Fields.Count == 0
? new List().AsReadOnly()
: new DbColumnSchemaGenerator(_connection, RowDescription, _behavior.HasFlag(CommandBehavior.KeyInfo))
.GetColumnSchema();
#if !NET461
ReadOnlyCollection IDbColumnSchemaGenerator.GetColumnSchema()
=> new ReadOnlyCollection(GetColumnSchema().Cast().ToList());
#endif
#endregion
#region Schema metadata table
///
/// Returns a System.Data.DataTable that describes the column metadata of the DataReader.
///
#nullable disable
public override DataTable GetSchemaTable()
#nullable enable
{
if (FieldCount == 0) // No resultset
return null;
var table = new DataTable("SchemaTable");
// Note: column order is important to match SqlClient's, some ADO.NET users appear
// to assume ordering (see #1671)
table.Columns.Add("ColumnName", typeof(string));
table.Columns.Add("ColumnOrdinal", typeof(int));
table.Columns.Add("ColumnSize", typeof(int));
table.Columns.Add("NumericPrecision", typeof(int));
table.Columns.Add("NumericScale", typeof(int));
table.Columns.Add("IsUnique", typeof(bool));
table.Columns.Add("IsKey", typeof(bool));
table.Columns.Add("BaseServerName", typeof(string));
table.Columns.Add("BaseCatalogName", typeof(string));
table.Columns.Add("BaseColumnName", typeof(string));
table.Columns.Add("BaseSchemaName", typeof(string));
table.Columns.Add("BaseTableName", typeof(string));
table.Columns.Add("DataType", typeof(Type));
table.Columns.Add("AllowDBNull", typeof(bool));
table.Columns.Add("ProviderType", typeof(int));
table.Columns.Add("IsAliased", typeof(bool));
table.Columns.Add("IsExpression", typeof(bool));
table.Columns.Add("IsIdentity", typeof(bool));
table.Columns.Add("IsAutoIncrement", typeof(bool));
table.Columns.Add("IsRowVersion", typeof(bool));
table.Columns.Add("IsHidden", typeof(bool));
table.Columns.Add("IsLong", typeof(bool));
table.Columns.Add("IsReadOnly", typeof(bool));
table.Columns.Add("ProviderSpecificDataType", typeof(Type));
table.Columns.Add("DataTypeName", typeof(string));
foreach (var column in GetColumnSchema())
{
var row = table.NewRow();
row["ColumnName"] = column.ColumnName;
row["ColumnOrdinal"] = column.ColumnOrdinal ?? -1;
row["ColumnSize"] = column.ColumnSize ?? -1;
row["NumericPrecision"] = column.NumericPrecision ?? 0;
row["NumericScale"] = column.NumericScale ?? 0;
row["IsUnique"] = column.IsUnique == true;
row["IsKey"] = column.IsKey == true;
row["BaseServerName"] = "";
row["BaseCatalogName"] = column.BaseCatalogName;
row["BaseColumnName"] = column.BaseColumnName;
row["BaseSchemaName"] = column.BaseSchemaName;
row["BaseTableName"] = column.BaseTableName;
row["DataType"] = column.DataType;
row["AllowDBNull"] = (object?)column.AllowDBNull ?? DBNull.Value;
row["ProviderType"] = column.NpgsqlDbType ?? NpgsqlDbType.Unknown;
row["IsAliased"] = column.IsAliased == true;
row["IsExpression"] = column.IsExpression == true;
row["IsIdentity"] = column.IsIdentity == true;
row["IsAutoIncrement"] = column.IsAutoIncrement == true;
row["IsRowVersion"] = false;
row["IsHidden"] = column.IsHidden == true;
row["IsLong"] = column.IsLong == true;
row["DataTypeName"] = column.DataTypeName;
table.Rows.Add(row);
}
return table;
}
#endregion Schema metadata table
#region Seeking
Task SeekToColumn(int column, bool async)
{
if (_isSequential)
return SeekToColumnSequential(column, async);
SeekToColumnNonSequential(column);
return Task.CompletedTask;
}
void SeekToColumnNonSequential(int column)
{
// Shut down any streaming going on on the column
if (ColumnStream != null)
{
ColumnStream.Dispose();
ColumnStream = null;
}
for (var lastColumnRead = _columns.Count; column >= lastColumnRead; lastColumnRead++)
{
int lastColumnLen;
(Buffer.ReadPosition, lastColumnLen) = _columns[lastColumnRead-1];
if (lastColumnLen != -1)
Buffer.ReadPosition += lastColumnLen;
var len = Buffer.ReadInt32();
_columns.Add((Buffer.ReadPosition, len));
}
(Buffer.ReadPosition, ColumnLen) = _columns[column];
_column = column;
PosInColumn = 0;
}
///
/// Seeks to the given column. The 4-byte length is read and stored in .
///
async Task SeekToColumnSequential(int column, bool async)
{
if (column < 0 || column >= _numColumns)
throw new IndexOutOfRangeException("Column index out of range");
if (column < _column)
throw new InvalidOperationException($"Invalid attempt to read from column ordinal '{column}'. With CommandBehavior.SequentialAccess, you may only read from column ordinal '{_column}' or greater.");
if (column == _column)
return;
// Need to seek forward
// Shut down any streaming going on on the column
if (ColumnStream != null)
{
ColumnStream.Dispose();
ColumnStream = null;
// Disposing the stream leaves us at the end of the column
PosInColumn = ColumnLen;
}
// Skip to end of column if needed
// TODO: Simplify by better initializing _columnLen/_posInColumn
var remainingInColumn = ColumnLen == -1 ? 0 : ColumnLen - PosInColumn;
if (remainingInColumn > 0)
await Buffer.Skip(remainingInColumn, async);
// Skip over unwanted fields
for (; _column < column - 1; _column++)
{
await Buffer.Ensure(4, async);
var len = Buffer.ReadInt32();
if (len != -1)
await Buffer.Skip(len, async);
}
await Buffer.Ensure(4, async);
ColumnLen = Buffer.ReadInt32();
PosInColumn = 0;
_column = column;
}
Task SeekInColumn(int posInColumn, bool async)
{
if (_isSequential)
return SeekInColumnSequential(posInColumn, async);
if (posInColumn > ColumnLen)
posInColumn = ColumnLen;
Buffer.ReadPosition = _columns[_column].Offset + posInColumn;
PosInColumn = posInColumn;
return Task.CompletedTask;
async Task SeekInColumnSequential(int posInColumn2, bool async2)
{
Debug.Assert(_column > -1);
if (posInColumn2 < PosInColumn)
throw new InvalidOperationException("Attempt to read a position in the column which has already been read");
if (posInColumn2 > ColumnLen)
posInColumn2 = ColumnLen;
if (posInColumn2 > PosInColumn)
{
await Buffer.Skip(posInColumn2 - PosInColumn, async2);
PosInColumn = posInColumn2;
}
}
}
#endregion
#region ConsumeRow
Task ConsumeRow(bool async)
{
Debug.Assert(State == ReaderState.InResult || State == ReaderState.BeforeResult);
if (_isSequential)
return ConsumeRowSequential(async);
else
{
ConsumeRowNonSequential();
return Task.CompletedTask;
}
async Task ConsumeRowSequential(bool async2)
{
if (ColumnStream != null)
{
ColumnStream.Dispose();
ColumnStream = null;
// Disposing the stream leaves us at the end of the column
PosInColumn = ColumnLen;
}
// TODO: Potential for code-sharing with ReadColumn above, which also skips
// Skip to end of column if needed
var remainingInColumn = ColumnLen == -1 ? 0 : ColumnLen - PosInColumn;
if (remainingInColumn > 0)
await Buffer.Skip(remainingInColumn, async2);
// Skip over the remaining columns in the row
for (; _column < _numColumns - 1; _column++)
{
await Buffer.Ensure(4, async2);
var len = Buffer.ReadInt32();
if (len != -1)
await Buffer.Skip(len, async2);
}
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
void ConsumeRowNonSequential()
{
Debug.Assert(State == ReaderState.InResult || State == ReaderState.BeforeResult);
if (ColumnStream != null)
{
ColumnStream.Dispose();
ColumnStream = null;
// Disposing the stream leaves us at the end of the column
PosInColumn = ColumnLen;
}
Buffer.ReadPosition = _dataMsgEnd;
}
#endregion
#region Checks
[MethodImpl(MethodImplOptions.AggressiveInlining)]
RowDescriptionMessage CheckResultSet()
{
switch (State)
{
case ReaderState.BeforeResult:
case ReaderState.InResult:
break;
case ReaderState.Closed:
throw new InvalidOperationException("The reader is closed");
default:
throw new InvalidOperationException("No resultset is currently being traversed");
}
return RowDescription!;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
FieldDescription CheckResultSetAndGetField(int column)
{
var rowDescription = CheckResultSet();
if (column < 0 || column >= rowDescription.NumFields)
throw new IndexOutOfRangeException($"Column must be between {0} and {rowDescription.NumFields - 1}");
return rowDescription[column];
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
FieldDescription CheckRowAndGetField(int column)
{
switch (State)
{
case ReaderState.InResult:
break;
case ReaderState.Closed:
throw new InvalidOperationException("The reader is closed");
default:
throw new InvalidOperationException("No row is available");
}
if (column < 0 || column >= RowDescription!.NumFields)
throw new IndexOutOfRangeException($"Column must be between {0} and {RowDescription!.NumFields - 1}");
return RowDescription[column];
}
///
/// Checks that we have a RowDescription, but not necessary an actual resultset
/// (for operations which work in SchemaOnly mode.
///
[MethodImpl(MethodImplOptions.AggressiveInlining)]
FieldDescription CheckRowDescriptionAndGetField(int column)
{
if (RowDescription == null)
throw new InvalidOperationException("No resultset is currently being traversed");
if (column < 0 || column >= RowDescription.NumFields)
throw new IndexOutOfRangeException($"Column must be between {0} and {RowDescription.NumFields - 1}");
return RowDescription[column];
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
void CheckColumnStart()
{
Debug.Assert(_isSequential);
if (PosInColumn != 0)
throw new InvalidOperationException("Attempt to read a position in the column which has already been read");
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
void CheckClosed()
{
if (State == ReaderState.Closed)
throw new InvalidOperationException("The reader is closed");
}
#endregion
}
enum ReaderState
{
BeforeResult,
InResult,
BetweenResults,
Consumed,
Closed,
}
}