using System;
using System.Collections.Generic;
using System.Data;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Npgsql.BackendMessages;
using Npgsql.TypeHandlers;
using Npgsql.TypeHandling;
namespace Npgsql
{
///
/// The default, non-sequential reader, which buffers entire rows in memory.
///
#pragma warning disable CA1010
sealed class NpgsqlDefaultDataReader : NpgsqlDataReader
#pragma warning restore CA1010
{
///
/// The number of columns in the current row
///
int _numColumns;
[CanBeNull]
List _columnOffsets;
int _endOffset;
int _column;
///
/// List of all streams that have been opened on the current row, and need to be disposed of
/// when the row is consumed.
///
[CanBeNull]
List _streams;
internal NpgsqlDefaultDataReader(NpgsqlCommand command, CommandBehavior behavior, List statements, Task sendTask)
: base(command, behavior, statements, sendTask) {}
internal override ValueTask ReadMessage(bool async)
=> Connector.ReadMessage(async);
internal override Task NextResult(bool async)
{
var task = base.NextResult(async);
if (Command.Parameters.HasOutputParameters && StatementIndex == 0)
{
// Populate the output parameters from the first row of the first resultset
return task.ContinueWith((t, o) =>
{
if (HasRows)
PopulateOutputParameters();
return t.Result;
}, null);
}
return task;
}
///
/// The first row in a stored procedure command that has output parameters needs to be traversed twice -
/// once for populating the output parameters and once for the actual result set traversal. So in this
/// case we can't be sequential.
///
void PopulateOutputParameters()
{
Debug.Assert(Command.Parameters.Any(p => p.IsOutputDirection));
Debug.Assert(StatementIndex == 0);
Debug.Assert(RowDescription != null);
Debug.Assert(State == ReaderState.BeforeResult);
// Temporarily set our state to InResult to allow us to read the values
State = ReaderState.InResult;
var pending = new Queue();
var taken = new List();
foreach (var p in Command.Parameters.Where(p => p.IsOutputDirection))
{
if (RowDescription.TryGetFieldIndex(p.CleanName, out var idx))
{
// TODO: Provider-specific check?
p.Value = GetValue(idx);
taken.Add(idx);
}
else
pending.Enqueue(p);
}
for (var i = 0; pending.Count != 0 && i != _numColumns; ++i)
{
// TODO: Need to get the provider-specific value based on the out param's type
if (!taken.Contains(i))
pending.Dequeue().Value = GetValue(i);
}
State = ReaderState.BeforeResult; // Set the state back
}
internal override Task ConsumeRow(bool async)
{
Debug.Assert(State == ReaderState.InResult || State == ReaderState.BeforeResult);
Buffer.Seek(_endOffset, SeekOrigin.Begin);
if (_streams != null)
{
foreach (var stream in _streams)
stream.Dispose();
_streams.Clear();
}
return PGUtil.CompletedTask;
}
internal override void ProcessDataMessage(DataRowMessage dataMsg)
{
// The connector's buffer can actually change between DataRows:
// If a large DataRow exceeding the connector's current read buffer arrives, and we're
// reading in non-sequential mode, a new oversize buffer is allocated. We thus have to
// recapture the connector's buffer on each new DataRow.
Buffer = Connector.ReadBuffer;
_numColumns = Buffer.ReadInt16();
_column = -1;
Debug.Assert(RowDescription.NumFields == _numColumns);
if (_columnOffsets == null)
_columnOffsets = new List(_numColumns);
else
_columnOffsets.Clear();
for (var i = 0; i < _numColumns; i++)
{
_columnOffsets.Add(Buffer.ReadPosition);
var len = Buffer.ReadInt32();
if (len != -1)
Buffer.Seek(len, SeekOrigin.Current);
}
_endOffset = Buffer.ReadPosition;
}
// We know the entire row is buffered in memory (non-sequential reader), so no I/O will be performed
public override Task GetFieldValueAsync(int column, CancellationToken cancellationToken)
=> Task.FromResult(GetFieldValue(column));
public override T GetFieldValue(int column)
{
CheckRowAndOrdinal(column);
SeekToColumn(column);
if (ColumnLen == -1)
throw new InvalidCastException("Column is null");
var fieldDescription = RowDescription[column];
try
{
return typeof(T) == typeof(object)
? (T)fieldDescription.Handler.ReadAsObject(Buffer, ColumnLen, fieldDescription)
: fieldDescription.Handler.Read(Buffer, ColumnLen, fieldDescription);
}
catch (NpgsqlSafeReadException e)
{
throw e.InnerException;
}
catch
{
Connector.Break();
throw;
}
}
///
/// Gets the value of the specified column as an instance of .
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override object GetValue(int ordinal)
{
CheckRowAndOrdinal(ordinal);
SeekToColumn(ordinal);
if (ColumnLen == -1)
return DBNull.Value;
var fieldDescription = RowDescription[ordinal];
object result;
try
{
result = fieldDescription.Handler.ReadAsObject(Buffer, ColumnLen, fieldDescription);
}
catch (NpgsqlSafeReadException e)
{
throw e.InnerException;
}
catch
{
Connector.Break();
throw;
}
// Used for Entity Framework <= 6 compability
if (Command.ObjectResultTypes?[ordinal] != null)
{
var type = Command.ObjectResultTypes[ordinal];
result = type == typeof(DateTimeOffset)
? new DateTimeOffset((DateTime)result)
: Convert.ChangeType(result, type);
}
return result;
}
///
/// Gets the value of the specified column as an instance of .
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override object GetProviderSpecificValue(int ordinal)
{
CheckRowAndOrdinal(ordinal);
SeekToColumn(ordinal);
if (ColumnLen == -1)
return DBNull.Value;
var fieldDescription = RowDescription[ordinal];
object result;
try
{
// TODO: Maybe call a non-async method which would allow simple type handlers (and
// maybe also text) to read without going through async...
result = fieldDescription.Handler.ReadPsvAsObject(Buffer, ColumnLen, fieldDescription);
}
catch (NpgsqlSafeReadException e)
{
throw e.InnerException;
}
catch
{
Connector.Break();
throw;
}
return result;
}
///
/// Gets a value that indicates whether the column contains nonexistent or missing values.
///
/// The zero-based column ordinal.
/// true if the specified column is equivalent to ; otherwise false.
public override bool IsDBNull(int ordinal)
{
CheckRowAndOrdinal(ordinal);
SeekToColumn(ordinal);
return ColumnLen == -1;
}
internal override ValueTask GetStreamInternal(int column, bool async)
{
SeekToColumn(column);
if (ColumnLen == -1)
throw new InvalidCastException("Column is null");
var s = new MemoryStream(Buffer.Buffer, Buffer.ReadPosition, ColumnLen, false, false);
if (_streams == null)
_streams = new List();
_streams.Add(s);
return new ValueTask(s);
}
void SeekToColumn(int column) => SeekToColumn(column, false);
internal override Task SeekToColumn(int column, bool async)
{
Buffer.Seek(_columnOffsets[column], SeekOrigin.Begin);
ColumnLen = Buffer.ReadInt32();
_column = column;
PosInColumn = 0;
return PGUtil.CompletedTask;
}
internal override Task SeekInColumn(int posInColumn, bool async)
{
if (posInColumn > ColumnLen)
posInColumn = ColumnLen;
Buffer.Seek(_columnOffsets[_column] + 4 + posInColumn, SeekOrigin.Begin);
PosInColumn = posInColumn;
return PGUtil.CompletedTask;
}
}
}