#region License
// The PostgreSQL License
//
// Copyright (C) 2017 The Npgsql Development Team
//
// Permission to use, copy, modify, and distribute this software and its
// documentation for any purpose, without fee, and without a written
// agreement is hereby granted, provided that the above copyright notice
// and this paragraph and the following two paragraphs appear in all copies.
//
// IN NO EVENT SHALL THE NPGSQL DEVELOPMENT TEAM BE LIABLE TO ANY PARTY
// FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES,
// INCLUDING LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
// DOCUMENTATION, EVEN IF THE NPGSQL DEVELOPMENT TEAM HAS BEEN ADVISED OF
// THE POSSIBILITY OF SUCH DAMAGE.
//
// THE NPGSQL DEVELOPMENT TEAM SPECIFICALLY DISCLAIMS ANY WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
// ON AN "AS IS" BASIS, AND THE NPGSQL DEVELOPMENT TEAM HAS NO OBLIGATIONS
// TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
#endregion
using System;
using System.Collections;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Data;
using System.Data.Common;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Npgsql.BackendMessages;
using Npgsql.Logging;
using Npgsql.PostgresTypes;
using Npgsql.Schema;
using Npgsql.TypeHandlers;
using Npgsql.TypeHandling;
using NpgsqlTypes;
#pragma warning disable CA2222 // Do not decrease inherited member visibility
namespace Npgsql
{
///
/// Reads a forward-only stream of rows from a data source.
///
#pragma warning disable CA1010
public abstract class NpgsqlDataReader : DbDataReader
#pragma warning restore CA1010
#if NETSTANDARD1_3
, IDbColumnSchemaGenerator
#endif
{
internal NpgsqlCommand Command { get; }
internal readonly NpgsqlConnector Connector;
readonly NpgsqlConnection _connection;
readonly CommandBehavior _behavior;
readonly Task _sendTask;
internal ReaderState State;
internal NpgsqlReadBuffer Buffer;
///
/// Holds the list of statements being executed by this reader.
///
readonly List _statements;
///
/// The index of the current query resultset we're processing (within a multiquery)
///
internal int StatementIndex { get; private set; }
///
/// For streaming types (e.g. bytea), holds the byte length of the column.
/// Does not include the length prefix.
///
internal int ColumnLen;
internal int PosInColumn;
int _charPos;
///
/// The RowDescription message for the current resultset being processed
///
[CanBeNull]
internal RowDescriptionMessage RowDescription;
uint? _recordsAffected;
///
/// Whether the current result set has rows
///
bool _hasRows;
///
/// Is raised whenever Close() is called.
///
public event EventHandler ReaderClosed;
bool IsSchemaOnly => (_behavior & CommandBehavior.SchemaOnly) != 0;
static readonly NpgsqlLogger Log = NpgsqlLogManager.GetCurrentClassLogger();
internal NpgsqlDataReader(NpgsqlCommand command, CommandBehavior behavior, List statements, Task sendTask)
{
Command = command;
_connection = command.Connection;
Connector = _connection.Connector;
_behavior = behavior;
_statements = statements;
StatementIndex = -1;
_sendTask = sendTask;
State = ReaderState.BetweenResults;
}
#region Read
///
/// Advances the reader to the next record in a result set.
///
/// true if there are more rows; otherwise false.
///
/// The default position of a data reader is before the first record. Therefore, you must call Read to begin accessing data.
///
public override bool Read() => Read(false).GetAwaiter().GetResult();
///
/// This is the asynchronous version of The cancellation token is currently ignored.
///
/// Ignored for now.
/// A task representing the asynchronous operation.
public override Task ReadAsync(CancellationToken cancellationToken)
=> SynchronizationContextSwitcher.NoContext(async () => await Read(true));
async Task Read(bool async)
{
switch (State)
{
case ReaderState.BeforeResult:
// First Read() after NextResult. Data row has already been processed.
State = ReaderState.InResult;
return true;
case ReaderState.InResult:
await ConsumeRow(async);
if ((_behavior & CommandBehavior.SingleRow) != 0)
{
// TODO: See optimization proposal in #410
await Consume(async);
return false;
}
break;
case ReaderState.BetweenResults:
case ReaderState.Consumed:
case ReaderState.Closed:
return false;
default:
throw new ArgumentOutOfRangeException();
}
try
{
var msg = await ReadMessage(async);
ProcessMessage(msg);
return msg.Code == BackendMessageCode.DataRow;
}
catch (PostgresException)
{
State = ReaderState.Consumed;
throw;
}
}
void ProcessMessage(IBackendMessage msg)
{
Debug.Assert(msg != null);
switch (msg.Code)
{
case BackendMessageCode.DataRow:
Debug.Assert(RowDescription != null);
if (Connector.State != ConnectorState.Fetching)
Connector.State = ConnectorState.Fetching;
ProcessDataMessage((DataRowMessage)msg);
_hasRows = true;
switch (State)
{
case ReaderState.BetweenResults:
State = ReaderState.BeforeResult;
break;
case ReaderState.BeforeResult:
State = ReaderState.InResult;
break;
case ReaderState.InResult:
break;
default:
throw Connector.UnexpectedMessageReceived(BackendMessageCode.DataRow);
}
return;
case BackendMessageCode.CompletedResponse:
var completed = (CommandCompleteMessage) msg;
switch (completed.StatementType)
{
case StatementType.Update:
case StatementType.Insert:
case StatementType.Delete:
case StatementType.Copy:
case StatementType.Move:
if (!_recordsAffected.HasValue)
_recordsAffected = 0;
_recordsAffected += completed.Rows;
break;
}
_statements[StatementIndex].ApplyCommandComplete(completed);
goto case BackendMessageCode.EmptyQueryResponse;
case BackendMessageCode.EmptyQueryResponse:
State = ReaderState.BetweenResults;
return;
case BackendMessageCode.ReadyForQuery:
State = ReaderState.Consumed;
return;
default:
throw new Exception("Received unexpected backend message of type " + msg.Code);
}
}
internal abstract ValueTask ReadMessage(bool async);
internal abstract void ProcessDataMessage(DataRowMessage dataMsg);
internal abstract Task SeekToColumn(int column, bool async);
internal abstract Task SeekInColumn(int posInColumn, bool async);
internal abstract ValueTask GetStreamInternal(int column, bool async);
internal abstract Task ConsumeRow(bool async);
#endregion
#region NextResult
///
/// Advances the reader to the next result when reading the results of a batch of statements.
///
///
public sealed override bool NextResult()
{
try
{
return (IsSchemaOnly ? NextResultSchemaOnly(false) : NextResult(false))
.GetAwaiter().GetResult();
}
catch (PostgresException e)
{
State = ReaderState.Consumed;
if ((StatementIndex >= 0) && (StatementIndex < _statements.Count))
e.Statement = _statements[StatementIndex];
throw;
}
}
///
/// This is the asynchronous version of NextResult.
/// The parameter is currently ignored.
///
/// Currently ignored.
/// A task representing the asynchronous operation.
public override Task NextResultAsync(CancellationToken cancellationToken)
=> SynchronizationContextSwitcher.NoContext(async () =>
{
try
{
return IsSchemaOnly ? await NextResultSchemaOnly(true) : await NextResult(true);
}
catch (PostgresException e)
{
State = ReaderState.Consumed;
if (StatementIndex >= 0 && StatementIndex < _statements.Count)
e.Statement = _statements[StatementIndex];
throw;
}
});
internal virtual async Task NextResult(bool async)
{
IBackendMessage msg;
Debug.Assert(!IsSchemaOnly);
// If we're in the middle of a resultset, consume it
switch (State)
{
case ReaderState.BeforeResult:
case ReaderState.InResult:
await ConsumeRow(async);
var completedMsg = await Connector.SkipUntil(BackendMessageCode.CompletedResponse, BackendMessageCode.EmptyQueryResponse, async);
ProcessMessage(completedMsg);
break;
case ReaderState.BetweenResults:
break;
case ReaderState.Consumed:
case ReaderState.Closed:
return false;
default:
throw new ArgumentOutOfRangeException();
}
Debug.Assert(State == ReaderState.BetweenResults);
_hasRows = false;
if ((_behavior & CommandBehavior.SingleResult) != 0 && StatementIndex == 0)
{
if (State == ReaderState.BetweenResults)
await Consume(async);
return false;
}
// We are now at the end of the previous result set. Read up to the next result set, if any.
// Non-prepared statements receive ParseComplete, BindComplete, DescriptionRow/NoData,
// prepared statements receive only BindComplete
for (StatementIndex++; StatementIndex < _statements.Count; StatementIndex++)
{
var statement = _statements[StatementIndex];
if (statement.IsPrepared)
{
await Connector.ReadExpecting(async);
RowDescription = statement.Description;
}
else // Non-prepared flow
{
var pStatement = statement.PreparedStatement;
if (pStatement != null)
{
Debug.Assert(!pStatement.IsPrepared);
Debug.Assert(pStatement.Description == null);
if (pStatement.StatementBeingReplaced != null)
{
await Connector.ReadExpecting(async);
pStatement.StatementBeingReplaced.CompleteUnprepare();
pStatement.StatementBeingReplaced = null;
}
}
await Connector.ReadExpecting(async);
await Connector.ReadExpecting(async);
msg = await Connector.ReadMessage(async);
switch (msg.Code)
{
case BackendMessageCode.NoData:
RowDescription = statement.Description = null;
break;
case BackendMessageCode.RowDescription:
// We have a resultset
RowDescription = statement.Description = (RowDescriptionMessage)msg;
break;
default:
throw Connector.UnexpectedMessageReceived(msg.Code);
}
if (pStatement != null)
{
Debug.Assert(!pStatement.IsPrepared);
pStatement.CompletePrepare();
}
}
msg = await ReadMessage(async);
if (RowDescription == null)
{
// Statement did not generate a resultset (e.g. INSERT)
// Read and process its completion message and move on to the next statement
switch (msg.Code)
{
case BackendMessageCode.CompletedResponse:
case BackendMessageCode.EmptyQueryResponse:
break;
default:
throw Connector.UnexpectedMessageReceived(msg.Code);
}
ProcessMessage(msg);
continue;
}
switch (msg.Code)
{
case BackendMessageCode.DataRow:
case BackendMessageCode.CompletedResponse:
break;
default:
throw Connector.UnexpectedMessageReceived(msg.Code);
}
ProcessMessage(msg);
return true;
}
// There are no more queries, we're done. Read to the RFQ.
ProcessMessage(Connector.ReadExpecting());
RowDescription = null;
return false;
}
///
/// Note that in SchemaOnly mode there are no resultsets, and we read nothing from the backend (all
/// RowDescriptions have already been processed and are available)
///
async Task NextResultSchemaOnly(bool async)
{
Debug.Assert(IsSchemaOnly);
for (StatementIndex++; StatementIndex < _statements.Count; StatementIndex++)
{
var statement = _statements[StatementIndex];
if (statement.IsPrepared)
{
// Row descriptions have already been populated in the statement objects at the
// Prepare phase
RowDescription = _statements[StatementIndex].Description;
}
else
{
await Connector.ReadExpecting(async);
await Connector.ReadExpecting(async);
var msg = await Connector.ReadMessage(async);
switch (msg.Code)
{
case BackendMessageCode.NoData:
RowDescription = _statements[StatementIndex].Description = null;
break;
case BackendMessageCode.RowDescription:
// We have a resultset
RowDescription = _statements[StatementIndex].Description = (RowDescriptionMessage)msg;
Command.FixupRowDescription(RowDescription, StatementIndex == 0);
break;
default:
throw Connector.UnexpectedMessageReceived(msg.Code);
}
}
// Found a resultset
if (RowDescription != null)
return true;
}
// There are no more queries, we're done. Read to the RFQ.
if (!_statements.All(s => s.IsPrepared))
{
ProcessMessage(await Connector.ReadExpecting(async));
RowDescription = null;
}
return false;
}
#endregion
///
/// Gets a value indicating the depth of nesting for the current row. Always returns zero.
///
public override int Depth => 0;
///
/// Gets a value indicating whether the data reader is closed.
///
public override bool IsClosed => State == ReaderState.Closed;
///
/// Gets the number of rows changed, inserted, or deleted by execution of the SQL statement.
///
public override int RecordsAffected => _recordsAffected.HasValue ? (int)_recordsAffected.Value : -1;
///
/// Returns details about each statement that this reader will or has executed.
///
///
/// Note that some fields (i.e. rows and oid) are only populated as the reader
/// traverses the result.
///
/// For commands with multiple queries, this exposes the number of rows affected on
/// a statement-by-statement basis, unlike
/// which exposes an aggregation across all statements.
///
public IReadOnlyList Statements => _statements.AsReadOnly();
///
/// Gets a value that indicates whether this DbDataReader contains one or more rows.
///
public override bool HasRows => State == ReaderState.Closed
? throw new InvalidOperationException("Invalid attempt to call HasRows when reader is closed.")
: _hasRows;
///
/// Indicates whether the reader is currently positioned on a row, i.e. whether reading a
/// column is possible.
/// This property is different from in that will
/// return true even if attempting to read a column will fail, e.g. before
/// has been called
///
[PublicAPI]
public bool IsOnRow => State == ReaderState.InResult;
///
/// Gets the name of the column, given the zero-based column ordinal.
///
/// The zero-based column ordinal.
/// The name of the specified column.
public override string GetName(int ordinal)
{
CheckResultSet();
CheckColumn(ordinal);
return RowDescription[ordinal].Name;
}
///
/// Gets the number of columns in the current row.
///
public override int FieldCount => RowDescription?.NumFields ?? 0;
#region Cleanup / Dispose
///
/// Consumes all result sets for this reader, leaving the connector ready for sending and processing further
/// queries
///
async Task Consume(bool async)
{
if (IsSchemaOnly && _statements.All(s => s.IsPrepared))
{
State = ReaderState.Consumed;
return;
}
switch (State)
{
case ReaderState.BeforeResult:
case ReaderState.InResult:
await ConsumeRow(async);
break;
}
// Skip over the other result sets, processing only CommandCompleted for RecordsAffected
while (true)
{
var msg = await Connector.SkipUntil(BackendMessageCode.CompletedResponse, BackendMessageCode.ReadyForQuery, async);
switch (msg.Code)
{
case BackendMessageCode.CompletedResponse:
ProcessMessage(msg);
continue;
case BackendMessageCode.ReadyForQuery:
ProcessMessage(msg);
return;
default:
throw new NpgsqlException("Unexpected message of type " + msg.Code);
}
}
}
///
/// Releases the resources used by the NpgsqlDataReader.
///
protected override void Dispose(bool disposing) => Close();
///
/// Closes the reader, allowing a new command to be executed.
///
#if NETSTANDARD1_3
public void Close()
#else
public override void Close()
#endif
=> Close(false, false).GetAwaiter().GetResult();
///
/// Closes the reader, allowing a new command to be executed.
///
public Task CloseAsync() => Close(false, true);
internal async Task Close(bool connectionClosing, bool async)
{
if (State == ReaderState.Closed)
return;
switch (Connector.State)
{
case ConnectorState.Broken:
case ConnectorState.Closed:
// This may have happen because an I/O error while reading a value, or some non-safe
// exception thrown from a type handler. Or if the connection was closed while the reader
// was still open
State = ReaderState.Closed;
Command.State = CommandState.Idle;
ReaderClosed?.Invoke(this, EventArgs.Empty);
return;
}
if (State != ReaderState.Consumed)
await Consume(async);
await Cleanup(async, connectionClosing);
}
internal async Task Cleanup(bool async, bool connectionClosing=false)
{
Log.Trace("Cleaning up reader", Connector.Id);
// Make sure the send task for this command, which may have executed asynchronously and in
// parallel with the reading, has completed, throwing any exceptions it generated.
if (async)
await _sendTask;
else
_sendTask.GetAwaiter().GetResult();
State = ReaderState.Closed;
Command.State = CommandState.Idle;
Connector.CurrentReader = null;
Connector.EndUserAction();
// If the reader is being closed as part of the connection closing, we don't apply
// the reader's CommandBehavior.CloseConnection
if ((_behavior & CommandBehavior.CloseConnection) != 0 && !connectionClosing)
_connection.Close();
if (ReaderClosed != null)
{
ReaderClosed(this, EventArgs.Empty);
ReaderClosed = null;
}
}
#endregion
#region Simple value getters
///
/// Gets the value of the specified column as a Boolean.
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override bool GetBoolean(int ordinal) => GetFieldValue(ordinal);
///
/// Gets the value of the specified column as a byte.
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override byte GetByte(int ordinal) => GetFieldValue(ordinal);
///
/// Gets the value of the specified column as a single character.
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override char GetChar(int ordinal) => GetFieldValue(ordinal);
///
/// Gets the value of the specified column as a 16-bit signed integer.
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override short GetInt16(int ordinal) => GetFieldValue(ordinal);
///
/// Gets the value of the specified column as a 32-bit signed integer.
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override int GetInt32(int ordinal) => GetFieldValue(ordinal);
///
/// Gets the value of the specified column as a 64-bit signed integer.
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override long GetInt64(int ordinal) => GetFieldValue(ordinal);
///
/// Gets the value of the specified column as a object.
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override DateTime GetDateTime(int ordinal) => GetFieldValue(ordinal);
///
/// Gets the value of the specified column as an instance of .
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override string GetString(int ordinal) => GetFieldValue(ordinal);
///
/// Gets the value of the specified column as a object.
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override decimal GetDecimal(int ordinal) => GetFieldValue(ordinal);
///
/// Gets the value of the specified column as a double-precision floating point number.
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override double GetDouble(int ordinal) => GetFieldValue(ordinal);
///
/// Gets the value of the specified column as a single-precision floating point number.
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override float GetFloat(int ordinal) => GetFieldValue(ordinal);
///
/// Gets the value of the specified column as a globally-unique identifier (GUID).
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override Guid GetGuid(int ordinal) => GetFieldValue(ordinal);
///
/// Populates an array of objects with the column values of the current row.
///
/// An array of Object into which to copy the attribute columns.
/// The number of instances of in the array.
public override int GetValues(object[] values)
{
if (values == null)
throw new ArgumentNullException(nameof(values));
CheckRow();
var count = Math.Min(FieldCount, values.Length);
for (var i = 0; i < count; i++)
values[i] = GetValue(i);
return count;
}
///
/// Gets the value of the specified column as an instance of .
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override object this[int ordinal] => GetValue(ordinal);
#endregion
#region Provider-specific type getters
///
/// Gets the value of the specified column as an ,
/// Npgsql's provider-specific type for dates.
///
///
/// PostgreSQL's date type represents dates from 4713 BC to 5874897 AD, while .NET's DateTime
/// only supports years from 1 to 1999. If you require years outside this range use this accessor.
/// The standard method will also return this type, but has
/// the disadvantage of boxing the value.
/// See http://www.postgresql.org/docs/current/static/datatype-datetime.html
///
/// The zero-based column ordinal.
/// The value of the specified column.
public NpgsqlDate GetDate(int ordinal) => GetFieldValue(ordinal);
///
/// Gets the value of the specified column as a TimeSpan,
///
///
/// PostgreSQL's interval type has has a resolution of 1 microsecond and ranges from
/// -178000000 to 178000000 years, while .NET's TimeSpan has a resolution of 100 nanoseconds
/// and ranges from roughly -29247 to 29247 years.
/// See http://www.postgresql.org/docs/current/static/datatype-datetime.html
///
/// The zero-based column ordinal.
/// The value of the specified column.
public TimeSpan GetTimeSpan(int ordinal) => GetFieldValue(ordinal);
///
/// Gets the value of the specified column as an ,
/// Npgsql's provider-specific type for time spans.
///
///
/// PostgreSQL's interval type has has a resolution of 1 microsecond and ranges from
/// -178000000 to 178000000 years, while .NET's TimeSpan has a resolution of 100 nanoseconds
/// and ranges from roughly -29247 to 29247 years. If you require values from outside TimeSpan's
/// range use this accessor.
/// The standard ADO.NET method will also return this
/// type, but has the disadvantage of boxing the value.
/// See http://www.postgresql.org/docs/current/static/datatype-datetime.html
///
/// The zero-based column ordinal.
/// The value of the specified column.
public NpgsqlTimeSpan GetInterval(int ordinal) => GetFieldValue(ordinal);
///
/// Gets the value of the specified column as an ,
/// Npgsql's provider-specific type for date/time timestamps. Note that this type covers
/// both PostgreSQL's "timestamp with time zone" and "timestamp without time zone" types,
/// which differ only in how they are converted upon input/output.
///
///
/// PostgreSQL's timestamp type represents dates from 4713 BC to 5874897 AD, while .NET's DateTime
/// only supports years from 1 to 1999. If you require years outside this range use this accessor.
/// The standard method will also return this type, but has
/// the disadvantage of boxing the value.
/// See http://www.postgresql.org/docs/current/static/datatype-datetime.html
///
/// The zero-based column ordinal.
/// The value of the specified column.
public NpgsqlDateTime GetTimeStamp(int ordinal) => GetFieldValue(ordinal);
#endregion
#region Special binary getters
///
/// Reads a stream of bytes from the specified column, starting at location indicated by dataOffset, into the buffer, starting at the location indicated by bufferOffset.
///
/// The zero-based column ordinal.
/// The index within the row from which to begin the read operation.
/// The buffer into which to copy the data.
/// The index with the buffer to which the data will be copied.
/// The maximum number of characters to read.
/// The actual number of bytes read.
public override long GetBytes(int ordinal, long dataOffset, [CanBeNull] byte[] buffer, int bufferOffset, int length)
{
CheckRowAndOrdinal(ordinal);
if (dataOffset < 0 || dataOffset > int.MaxValue)
throw new ArgumentOutOfRangeException(nameof(dataOffset), dataOffset, $"dataOffset must be between {0} and {int.MaxValue}");
if (buffer != null && (bufferOffset < 0 || bufferOffset >= buffer.Length))
throw new IndexOutOfRangeException($"bufferOffset must be between {0} and {(buffer.Length - 1)}");
if (buffer != null && (length < 0 || length > buffer.Length - bufferOffset))
throw new IndexOutOfRangeException($"length must be between {0} and {buffer.Length - bufferOffset}");
var fieldDescription = RowDescription[ordinal];
var handler = fieldDescription.Handler;
if (!(handler is ByteaHandler || handler is PostgisGeometryHandler))
throw new InvalidCastException("GetBytes() not supported for type " + fieldDescription.Name);
SeekToColumn(ordinal, false).GetAwaiter().GetResult();
if (ColumnLen == -1)
throw new InvalidCastException("Column is null");
if (buffer == null)
return ColumnLen;
var offset = (int)dataOffset;
SeekInColumn(offset, false).GetAwaiter().GetResult();
// Attempt to read beyond the end of the column
if (offset + length > ColumnLen)
length = ColumnLen - offset;
return Buffer.ReadAllBytes(buffer, bufferOffset, length, false, false).Result;
}
///
/// Retrieves data as a .
///
/// The zero-based column ordinal.
/// The returned object.
public override Stream GetStream(int ordinal) => GetStream(ordinal, false).Result;
///
/// Retrieves data as a .
///
/// The zero-based column ordinal.
/// The returned object.
public Task GetStreamAsync(int ordinal)
=> SynchronizationContextSwitcher.NoContext(async () => await GetStream(ordinal, true));
ValueTask GetStream(int ordinal, bool async)
{
CheckRowAndOrdinal(ordinal);
var fieldDescription = RowDescription[ordinal];
var handler = fieldDescription.Handler as ByteaHandler;
if (handler == null)
throw new InvalidCastException($"GetStream() not supported for type {fieldDescription.Handler.PgDisplayName}");
return GetStreamInternal(ordinal, async);
}
#endregion
#region Special text getters
///
/// Reads a stream of characters from the specified column, starting at location indicated by dataOffset, into the buffer, starting at the location indicated by bufferOffset.
///
/// The zero-based column ordinal.
/// The index within the row from which to begin the read operation.
/// The buffer into which to copy the data.
/// The index with the buffer to which the data will be copied.
/// The maximum number of characters to read.
/// The actual number of characters read.
public override long GetChars(int ordinal, long dataOffset, [CanBeNull] char[] buffer, int bufferOffset, int length)
{
CheckRowAndOrdinal(ordinal);
if (dataOffset < 0 || dataOffset > int.MaxValue)
throw new ArgumentOutOfRangeException(nameof(dataOffset), dataOffset, $"dataOffset must be between {0} and {int.MaxValue}");
if (buffer != null && (bufferOffset < 0 || bufferOffset >= buffer.Length))
throw new IndexOutOfRangeException($"bufferOffset must be between {0} and {(buffer.Length - 1)}");
if (buffer != null && (length < 0 || length > buffer.Length - bufferOffset))
throw new IndexOutOfRangeException($"length must be between {0} and {buffer.Length - bufferOffset}");
var fieldDescription = RowDescription[ordinal];
var handler = fieldDescription.Handler as TextHandler;
if (handler == null)
throw new InvalidCastException("GetChars() not supported for type " + fieldDescription.Name);
SeekToColumn(ordinal, false).GetAwaiter().GetResult();
if (ColumnLen == -1)
throw new InvalidCastException("Column is null");
if (PosInColumn == 0)
_charPos = 0;
if (buffer == null)
{
// Note: Getting the length of a text column means decoding the entire field,
// very inefficient and also consumes the column in sequential mode. But this seems to
// be SqlClient's behavior as well.
Buffer.SkipChars(int.MaxValue, ColumnLen - PosInColumn, out var bytesSkipped, out var charsSkipped);
Debug.Assert(bytesSkipped == ColumnLen - PosInColumn);
PosInColumn += bytesSkipped;
_charPos += charsSkipped;
return _charPos;
}
if (PosInColumn == ColumnLen || dataOffset < _charPos)
{
// Either the column has already been read (e.g. GetString()) or a previous GetChars()
// has positioned us in the column *after* the requested read start offset. Seek back
// (this will throw for sequential)
SeekInColumn(0, false).GetAwaiter().GetResult();
_charPos = 0;
}
if (dataOffset > _charPos)
{
var charsToSkip = (int)dataOffset - _charPos;
int bytesSkipped, charsSkipped;
Buffer.SkipChars(charsToSkip, ColumnLen - PosInColumn, out bytesSkipped, out charsSkipped);
PosInColumn += bytesSkipped;
_charPos += charsSkipped;
if (charsSkipped < charsToSkip)
{
// TODO: What is the actual required behavior here?
throw new IndexOutOfRangeException();
}
}
Buffer.ReadAllChars(buffer, bufferOffset, length, ColumnLen - PosInColumn, out var bytesRead, out var charsRead);
PosInColumn += bytesRead;
_charPos += charsRead;
return charsRead;
}
///
/// Retrieves data as a .
///
/// The zero-based column ordinal.
/// The returned object.
public override TextReader GetTextReader(int ordinal)
=> GetTextReader(ordinal, false).Result;
///
/// Retrieves data as a .
///
/// The zero-based column ordinal.
/// The returned object.
public Task GetTextReaderAsync(int ordinal)
=> SynchronizationContextSwitcher.NoContext(async () => await GetTextReader(ordinal, true));
async ValueTask GetTextReader(int ordinal, bool async)
{
CheckRowAndOrdinal(ordinal);
var fieldDescription = RowDescription[ordinal];
var handler = fieldDescription.Handler as ITextReaderHandler;
if (handler == null)
throw new InvalidCastException($"GetTextReader() not supported for type {fieldDescription.Handler.PgDisplayName}");
var stream = async
? await GetStreamInternal(ordinal, async)
: GetStreamInternal(ordinal, async).Result;
return handler.GetTextReader(stream);
}
#endregion
///
/// Gets the value of the specified column as an instance of .
///
/// The name of the column.
/// The value of the specified column.
public override object this[string name] => GetValue(GetOrdinal(name));
///
/// Gets the column ordinal given the name of the column.
///
/// The name of the column.
/// The zero-based column ordinal.
public override int GetOrdinal(string name)
{
CheckResultSet();
if (string.IsNullOrEmpty(name))
throw new ArgumentException("name cannot be empty", nameof(name));
return RowDescription.GetFieldIndex(name);
}
///
/// Gets a representation of the PostgreSQL data type for the specified field.
/// The returned representation can be used to access various information about the field.
///
/// The zero-based column index.
[PublicAPI]
public PostgresType GetPostgresType(int ordinal)
{
CheckResultSet();
CheckColumn(ordinal);
return RowDescription[ordinal].PostgresType;
}
///
/// Gets the data type information for the specified field.
/// This will be the PostgreSQL type name (e.g. int4) as in the pg_type table,
/// not the .NET type (see for that).
///
/// The zero-based column index.
///
public override string GetDataTypeName(int ordinal) => GetPostgresType(ordinal).DisplayName;
///
/// Gets the OID for the PostgreSQL type for the specified field, as it appears in the pg_type table.
///
///
/// This is a PostgreSQL-internal value that should not be relied upon and should only be used for
/// debugging purposes.
///
/// The zero-based column index.
public uint GetDataTypeOID(int ordinal)
{
CheckResultSet();
CheckColumn(ordinal);
return RowDescription[ordinal].TypeOID;
}
///
/// Gets the data type of the specified column.
///
/// The zero-based column ordinal.
/// The data type of the specified column.
[NotNull]
public override Type GetFieldType(int ordinal)
{
CheckResultSet();
CheckColumn(ordinal);
var type = Command.ObjectResultTypes?[ordinal];
return type ?? RowDescription[ordinal].FieldType;
}
///
/// Returns the provider-specific field type of the specified column.
///
/// The zero-based column ordinal.
/// The Type object that describes the data type of the specified column.
public override Type GetProviderSpecificFieldType(int ordinal)
{
CheckResultSet();
CheckColumn(ordinal);
var fieldDescription = RowDescription[ordinal];
return fieldDescription.Handler.GetProviderSpecificFieldType(fieldDescription);
}
///
/// Gets all provider-specific attribute columns in the collection for the current row.
///
/// An array of Object into which to copy the attribute columns.
/// The number of instances of in the array.
public override int GetProviderSpecificValues(object[] values)
{
if (values == null)
throw new ArgumentNullException(nameof(values));
CheckRow();
var count = Math.Min(FieldCount, values.Length);
for (var i = 0; i < count; i++)
values[i] = GetProviderSpecificValue(i);
return count;
}
///
/// Returns an that can be used to iterate through the rows in the data reader.
///
/// An that can be used to iterate through the rows in the data reader.
public override IEnumerator GetEnumerator()
{
#if NETSTANDARD1_3
throw new NotSupportedException("GetEnumerator not yet supported in .NET Core");
#else
return new DbEnumerator(this);
#endif
}
#region New (CoreCLR) schema API
///
/// Returns schema information for the columns in the current resultset.
///
///
public ReadOnlyCollection GetColumnSchema()
=> new DbColumnSchemaGenerator(_connection, RowDescription, (_behavior & CommandBehavior.KeyInfo) != 0)
.GetColumnSchema();
#if NETSTANDARD1_3
ReadOnlyCollection IDbColumnSchemaGenerator.GetColumnSchema()
=> new ReadOnlyCollection(GetColumnSchema().Cast().ToList());
#endif
#endregion
#region Schema metadata table
#if !NETSTANDARD1_3
///
/// Returns a System.Data.DataTable that describes the column metadata of the DataReader.
///
[CanBeNull]
public override DataTable GetSchemaTable()
{
if (FieldCount == 0) // No resultset
return null;
var table = new DataTable("SchemaTable");
table.Columns.Add("AllowDBNull", typeof(bool));
table.Columns.Add("BaseCatalogName", typeof(string));
table.Columns.Add("BaseColumnName", typeof(string));
table.Columns.Add("BaseSchemaName", typeof(string));
table.Columns.Add("BaseTableName", typeof(string));
table.Columns.Add("ColumnName", typeof(string));
table.Columns.Add("ColumnOrdinal", typeof(int));
table.Columns.Add("ColumnSize", typeof(int));
table.Columns.Add("DataType", typeof(Type));
table.Columns.Add("IsUnique", typeof(bool));
table.Columns.Add("IsKey", typeof(bool));
table.Columns.Add("IsAliased", typeof(bool));
table.Columns.Add("IsExpression", typeof(bool));
table.Columns.Add("IsIdentity", typeof(bool));
table.Columns.Add("IsAutoIncrement", typeof(bool));
table.Columns.Add("IsRowVersion", typeof(bool));
table.Columns.Add("IsHidden", typeof(bool));
table.Columns.Add("IsLong", typeof(bool));
table.Columns.Add("IsReadOnly", typeof(bool));
table.Columns.Add("NumericPrecision", typeof(int));
table.Columns.Add("NumericScale", typeof(int));
table.Columns.Add("ProviderSpecificDataType", typeof(Type));
table.Columns.Add("ProviderType", typeof(Type));
foreach (var column in GetColumnSchema())
{
var row = table.NewRow();
row["AllowDBNull"] = (object)column.AllowDBNull ?? DBNull.Value;
row["BaseColumnName"] = column.BaseColumnName;
row["BaseCatalogName"] = column.BaseCatalogName;
row["BaseSchemaName"] = column.BaseSchemaName;
row["BaseTableName"] = column.BaseTableName;
row["ColumnName"] = column.ColumnName;
row["ColumnOrdinal"] = column.ColumnOrdinal ?? -1;
row["ColumnSize"] = column.ColumnSize ?? -1;
row["DataType"] = row["ProviderType"] = column.DataType; // Non-standard
row["IsUnique"] = column.IsUnique == true;
row["IsKey"] = column.IsKey == true;
row["IsAliased"] = column.IsAliased == true;
row["IsExpression"] = column.IsExpression == true;
row["IsAutoIncrement"] = column.IsAutoIncrement == true;
row["IsIdentity"] = column.IsIdentity == true;
row["IsRowVersion"] = false;
row["IsHidden"] = column.IsHidden == true;
row["IsLong"] = column.IsLong == true;
row["NumericPrecision"] = column.NumericPrecision ?? 0;
row["NumericScale"] = column.NumericScale ?? 0;
table.Rows.Add(row);
}
return table;
}
#endif
#endregion Schema metadata table
#region Checks
internal void CheckRowAndOrdinal(int ordinal)
{
CheckRow();
CheckColumn(ordinal);
}
void CheckRow()
{
if (!IsOnRow)
throw new InvalidOperationException("No row is available");
}
// ReSharper disable once UnusedParameter.Local
internal void CheckColumn(int column)
{
if (column < 0 || column >= FieldCount)
throw new IndexOutOfRangeException($"Column must be between {0} and {(FieldCount - 1)}");
}
void CheckResultSet()
{
if (FieldCount == 0)
throw new InvalidOperationException("No resultset is currently being traversed");
}
#endregion
}
enum ReaderState
{
BeforeResult,
InResult,
BetweenResults,
Consumed,
Closed,
}
}