using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Npgsql.BackendMessages;
using Npgsql.Internal;
using Npgsql.Internal.Postgres;
using NpgsqlTypes;
using InfiniteTimeout = System.Threading.Timeout;
using static Npgsql.Util.Statics;
namespace Npgsql;
///
/// Provides an API for a binary COPY TO operation, a high-performance data export mechanism from
/// a PostgreSQL table. Initiated by
///
public sealed class NpgsqlBinaryExporter : ICancelable
{
const int BeforeRow = -2;
const int BeforeColumn = -1;
#region Fields and Properties
NpgsqlConnector _connector;
NpgsqlReadBuffer _buf;
ExporterState _state = ExporterState.Uninitialized;
long _endOfMessagePos;
short _column;
ulong _rowsExported;
PgReader PgReader => _buf.PgReader;
///
/// The number of columns, as returned from the backend in the CopyInResponse.
///
int NumColumns { get; set; }
PgConverterInfo[] _columnInfoCache;
readonly ILogger _copyLogger;
///
/// Current timeout
///
public TimeSpan Timeout
{
set => _buf.Timeout = value > TimeSpan.Zero ? value : InfiniteTimeout.InfiniteTimeSpan;
}
Activity? _activity;
#endregion
#region Construction / Initialization
internal NpgsqlBinaryExporter(NpgsqlConnector connector)
{
_connector = connector;
_buf = connector.ReadBuffer;
_column = BeforeRow;
_columnInfoCache = null!;
_copyLogger = connector.LoggingConfiguration.CopyLogger;
}
internal async Task Init(string copyToCommand, bool async, CancellationToken cancellationToken = default)
{
Debug.Assert(_activity is null);
_activity = _connector.TraceCopyStart(copyToCommand, "COPY TO");
try
{
await _connector.WriteQuery(copyToCommand, async, cancellationToken).ConfigureAwait(false);
await _connector.Flush(async, cancellationToken).ConfigureAwait(false);
using var registration = _connector.StartNestedCancellableOperation(cancellationToken, attemptPgCancellation: false);
CopyOutResponseMessage copyOutResponse;
var msg = await _connector.ReadMessage(async).ConfigureAwait(false);
switch (msg.Code)
{
case BackendMessageCode.CopyOutResponse:
copyOutResponse = (CopyOutResponseMessage)msg;
if (!copyOutResponse.IsBinary)
{
throw _connector.Break(
new ArgumentException("copyToCommand triggered a text transfer, only binary is allowed",
nameof(copyToCommand)));
}
break;
case BackendMessageCode.CommandComplete:
throw new InvalidOperationException(
"This API only supports import/export from the client, i.e. COPY commands containing TO/FROM STDIN. " +
"To import/export with files on your PostgreSQL machine, simply execute the command with ExecuteNonQuery. " +
"Note that your data has been successfully imported/exported.");
default:
throw _connector.UnexpectedMessageReceived(msg.Code);
}
_state = ExporterState.Ready;
NumColumns = copyOutResponse.NumColumns;
_columnInfoCache = new PgConverterInfo[NumColumns];
_rowsExported = 0;
_endOfMessagePos = _buf.CumulativeReadPosition;
await ReadHeader(async).ConfigureAwait(false);
}
catch (Exception e)
{
TraceSetException(e);
throw;
}
}
async Task ReadHeader(bool async)
{
var msg = await _connector.ReadMessage(async).ConfigureAwait(false);
_endOfMessagePos = _buf.CumulativeReadPosition + Expect(msg, _connector).Length;
var headerLen = NpgsqlRawCopyStream.BinarySignature.Length + 4 + 4;
await _buf.Ensure(headerLen, async).ConfigureAwait(false);
foreach (var t in NpgsqlRawCopyStream.BinarySignature)
if (_buf.ReadByte() != t)
throw new NpgsqlException("Invalid COPY binary signature at beginning!");
var flags = _buf.ReadInt32();
if (flags != 0)
throw new NotSupportedException("Unsupported flags in COPY operation (OID inclusion?)");
_buf.ReadInt32(); // Header extensions, currently unused
}
#endregion
#region Read
///
/// Starts reading a single row, must be invoked before reading any columns.
///
///
/// The number of columns in the row. -1 if there are no further rows.
/// Note: This will currently be the same value for all rows, but this may change in the future.
///
public int StartRow() => StartRow(false).GetAwaiter().GetResult();
///
/// Starts reading a single row, must be invoked before reading any columns.
///
///
/// The number of columns in the row. -1 if there are no further rows.
/// Note: This will currently be the same value for all rows, but this may change in the future.
///
public ValueTask StartRowAsync(CancellationToken cancellationToken = default) => StartRow(true, cancellationToken);
async ValueTask StartRow(bool async, CancellationToken cancellationToken = default)
{
ThrowIfDisposed();
if (_state == ExporterState.Consumed)
return -1;
using var registration = _connector.StartNestedCancellableOperation(cancellationToken);
// Consume and advance any active column.
if (_column >= 0)
{
if (async)
await PgReader.CommitAsync().ConfigureAwait(false);
else
PgReader.Commit();
_column++;
}
// The very first row (i.e. _column == -1) is included in the header's CopyData message.
// Otherwise we need to read in a new CopyData row (the docs specify that there's a CopyData
// message per row).
if (_column == NumColumns)
{
var msg = Expect(await _connector.ReadMessage(async).ConfigureAwait(false), _connector);
_endOfMessagePos = _buf.CumulativeReadPosition + msg.Length;
}
else if (_column != BeforeRow)
ThrowHelper.ThrowInvalidOperationException("Already in the middle of a row");
await _buf.Ensure(2, async).ConfigureAwait(false);
var numColumns = _buf.ReadInt16();
if (numColumns == -1)
{
Expect(await _connector.ReadMessage(async).ConfigureAwait(false), _connector);
Expect(await _connector.ReadMessage(async).ConfigureAwait(false), _connector);
Expect(await _connector.ReadMessage(async).ConfigureAwait(false), _connector);
_column = BeforeRow;
_state = ExporterState.Consumed;
return -1;
}
Debug.Assert(numColumns == NumColumns);
_column = BeforeColumn;
_rowsExported++;
return NumColumns;
}
///
/// Reads the current column, returns its value and moves ahead to the next column.
/// If the column is null an exception is thrown.
///
///
/// The type of the column to be read. This must correspond to the actual type or data
/// corruption will occur. If in doubt, use to manually
/// specify the type.
///
/// The value of the column
public T Read()
=> Read(null);
///
/// Reads the current column, returns its value and moves ahead to the next column.
/// If the column is null an exception is thrown.
///
///
/// The type of the column to be read. This must correspond to the actual type or data
/// corruption will occur. If in doubt, use to manually
/// specify the type.
///
/// The value of the column
public ValueTask ReadAsync(CancellationToken cancellationToken = default)
=> ReadAsync(null, cancellationToken);
///
/// Reads the current column, returns its value according to and
/// moves ahead to the next column.
/// If the column is null an exception is thrown.
///
///
/// In some cases isn't enough to infer the data type coming in from the
/// database. This parameter can be used to unambiguously specify the type. An example is the JSONB
/// type, for which will be a simple string but for which
/// must be specified as .
///
/// The .NET type of the column to be read.
/// The value of the column
public T Read(NpgsqlDbType type)
=> Read((NpgsqlDbType?)type);
///
/// Reads the current column, returns its value according to and
/// moves ahead to the next column.
/// If the column is null an exception is thrown.
///
///
/// In some cases isn't enough to infer the data type coming in from the
/// database. This parameter can be used to unambiguously specify the type. An example is the JSONB
/// type, for which will be a simple string but for which
/// must be specified as .
///
///
/// An optional token to cancel the asynchronous operation. The default value is .
///
/// The .NET type of the column to be read.
/// The value of the column
public ValueTask ReadAsync(NpgsqlDbType type, CancellationToken cancellationToken = default)
=> ReadAsync((NpgsqlDbType?)type, cancellationToken);
T Read(NpgsqlDbType? type)
{
ThrowIfNotOnRow();
if (!IsInitializedAndAtStart)
MoveNextColumn(resumableOp: false);
var reader = PgReader;
try
{
if (reader.FieldIsDbNull)
return DbNullOrThrow();
var info = GetInfo(typeof(T), type, out var asObject);
reader.StartRead(info.BufferRequirement);
var result = asObject
? (T)info.Converter.ReadAsObject(reader)
: info.Converter.UnsafeDowncast().Read(reader);
reader.EndRead();
return result;
}
finally
{
// Don't delay committing the current column, just do it immediately (as opposed to on the next action: Read, IsNull, Skip).
// Zero length columns would otherwise create an edge-case where we'd have to immediately commit as we won't know whether we're at the end.
// To guarantee the commit happens in that case we would still need this try finally, at which point it's just better to be consistent.
reader.Commit();
}
}
async ValueTask ReadAsync(NpgsqlDbType? type, CancellationToken cancellationToken)
{
ThrowIfNotOnRow();
using var registration = _connector.StartNestedCancellableOperation(cancellationToken, attemptPgCancellation: false);
if (!IsInitializedAndAtStart)
await MoveNextColumnAsync(resumableOp: false).ConfigureAwait(false);
var reader = PgReader;
try
{
if (reader.FieldIsDbNull)
return DbNullOrThrow();
var info = GetInfo(typeof(T), type, out var asObject);
await reader.StartReadAsync(info.BufferRequirement, cancellationToken).ConfigureAwait(false);
var result = asObject
? (T)await info.Converter.ReadAsObjectAsync(reader, cancellationToken).ConfigureAwait(false)
: await info.Converter.UnsafeDowncast().ReadAsync(reader, cancellationToken).ConfigureAwait(false);
await reader.EndReadAsync().ConfigureAwait(false);
return result;
}
finally
{
// Don't delay committing the current column, just do it immediately (as opposed to on the next action: Read, IsNull, Skip).
// Zero length columns would otherwise create an edge-case where we'd have to immediately commit as we won't know whether we're at the end.
// To guarantee the commit happens in that case we would still need this try finally, at which point it's just better to be consistent.
await reader.CommitAsync().ConfigureAwait(false);
}
}
static T DbNullOrThrow()
{
// When T is a Nullable, we support returning null
if (default(T) is null && typeof(T).IsValueType)
return default!;
throw new InvalidCastException("Column is null");
}
PgConverterInfo GetInfo(Type type, NpgsqlDbType? npgsqlDbType, out bool asObject)
{
ref var cachedInfo = ref _columnInfoCache[_column];
var converterInfo = cachedInfo.IsDefault ? cachedInfo = CreateConverterInfo(type, npgsqlDbType) : cachedInfo;
asObject = converterInfo.IsBoxingConverter;
return converterInfo;
}
PgConverterInfo CreateConverterInfo(Type type, NpgsqlDbType? npgsqlDbType = null)
{
var options = _connector.SerializerOptions;
PgTypeId? pgTypeId = null;
if (npgsqlDbType.HasValue)
{
pgTypeId = npgsqlDbType.Value.ToDataTypeName() is { } name
? options.GetCanonicalTypeId(name)
// Handle plugin types via lookup.
: GetRepresentationalOrDefault(npgsqlDbType.Value.ToUnqualifiedDataTypeNameOrThrow());
}
var info = options.GetTypeInfoInternal(type, pgTypeId)
?? throw new NotSupportedException($"Reading is not supported for type '{type}'{(npgsqlDbType is null ? "" : $" and NpgsqlDbType '{npgsqlDbType}'")}");
// Binary export has no type info so we only do caller-directed interpretation of data.
return info.Bind(new Field("?",
info.PgTypeId ?? ((PgResolverTypeInfo)info).GetDefaultResolution(null).PgTypeId, -1), DataFormat.Binary);
PgTypeId GetRepresentationalOrDefault(string dataTypeName)
{
var type = options.DatabaseInfo.GetPostgresType(dataTypeName);
return options.ToCanonicalTypeId(type.GetRepresentationalType());
}
}
///
/// Returns whether the current column is null.
///
public bool IsNull
{
get
{
ThrowIfNotOnRow();
if (!IsInitializedAndAtStart)
MoveNextColumn(resumableOp: true);
return PgReader.FieldIsDbNull;
}
}
///
/// Skips the current column without interpreting its value.
///
public void Skip()
{
ThrowIfNotOnRow();
if (!IsInitializedAndAtStart)
MoveNextColumn(resumableOp: false);
PgReader.Commit();
}
///
/// Skips the current column without interpreting its value.
///
public async Task SkipAsync(CancellationToken cancellationToken = default)
{
ThrowIfNotOnRow();
using var registration = _connector.StartNestedCancellableOperation(cancellationToken);
if (!IsInitializedAndAtStart)
await MoveNextColumnAsync(resumableOp: false).ConfigureAwait(false);
await PgReader.CommitAsync().ConfigureAwait(false);
}
#endregion
#region Utilities
bool IsInitializedAndAtStart => PgReader.Initialized && (PgReader.FieldIsDbNull || PgReader.FieldAtStart);
void MoveNextColumn(bool resumableOp)
{
PgReader.Commit();
if (_column + 1 == NumColumns)
ThrowHelper.ThrowInvalidOperationException("No more columns left in the current row");
_column++;
_buf.Ensure(sizeof(int));
var columnLen = _buf.ReadInt32();
PgReader.Init(columnLen, DataFormat.Binary, resumableOp);
}
async ValueTask MoveNextColumnAsync(bool resumableOp)
{
await PgReader.CommitAsync().ConfigureAwait(false);
if (_column + 1 == NumColumns)
ThrowHelper.ThrowInvalidOperationException("No more columns left in the current row");
_column++;
await _buf.Ensure(sizeof(int), async: true).ConfigureAwait(false);
var columnLen = _buf.ReadInt32();
PgReader.Init(columnLen, DataFormat.Binary, resumableOp);
}
void ThrowIfNotOnRow()
{
ThrowIfDisposed();
if (_column is BeforeRow)
ThrowHelper.ThrowInvalidOperationException("Not reading a row");
}
void ThrowIfDisposed()
{
if (_state == ExporterState.Disposed)
ThrowHelper.ThrowObjectDisposedException(nameof(NpgsqlBinaryExporter), "The COPY operation has already ended.");
}
#endregion
#region Cancel / Close / Dispose
///
/// Cancels an ongoing export.
///
public void Cancel() => _connector.PerformImmediateUserCancellation();
///
/// Async cancels an ongoing export.
///
public Task CancelAsync()
{
Cancel();
return Task.CompletedTask;
}
///
/// Completes that binary export and sets the connection back to idle state
///
public void Dispose() => DisposeAsync(async: false).GetAwaiter().GetResult();
///
/// Async completes that binary export and sets the connection back to idle state
///
///
public ValueTask DisposeAsync() => DisposeAsync(async: true);
async ValueTask DisposeAsync(bool async)
{
if (_state == ExporterState.Disposed)
return;
try
{
if (_state is ExporterState.Consumed or ExporterState.Uninitialized)
{
LogMessages.BinaryCopyOperationCompleted(_copyLogger, _rowsExported, _connector.Id);
TraceExportStop();
}
else if (!_connector.IsBroken)
{
try
{
using var registration = _connector.StartNestedCancellableOperation(attemptPgCancellation: false);
// Be sure to commit the reader.
if (async)
await PgReader.CommitAsync().ConfigureAwait(false);
else
PgReader.Commit();
// Finish the current CopyData message
await _buf.Skip(async, checked((int)(_endOfMessagePos - _buf.CumulativeReadPosition))).ConfigureAwait(false);
// Read to the end
_connector.SkipUntil(BackendMessageCode.CopyDone);
// We intentionally do not pass a CancellationToken since we don't want to cancel cleanup
Expect(await _connector.ReadMessage(async).ConfigureAwait(false), _connector);
Expect(await _connector.ReadMessage(async).ConfigureAwait(false), _connector);
TraceExportStop();
}
catch (OperationCanceledException e) when (e.InnerException is PostgresException { SqlState: PostgresErrorCodes.QueryCanceled })
{
LogMessages.CopyOperationCancelled(_copyLogger, _connector.Id);
TraceExportStop();
}
catch (Exception e)
{
LogMessages.ExceptionWhenDisposingCopyOperation(_copyLogger, _connector.Id, e);
TraceSetException(e);
}
}
}
finally
{
_connector.EndUserAction();
Cleanup();
}
void Cleanup()
{
Debug.Assert(_state != ExporterState.Disposed);
var connector = _connector;
if (!ReferenceEquals(connector, null))
{
connector.CurrentCopyOperation = null;
_connector.Connection?.EndBindingScope(ConnectorBindingScope.Copy);
_connector = null!;
}
_buf = null!;
_state = ExporterState.Disposed;
}
}
#endregion
#region Tracing
void TraceExportStop()
{
if (_activity is not null)
{
NpgsqlActivitySource.CopyStop(_activity, _rowsExported);
_activity = null;
}
}
void TraceSetException(Exception exception)
{
if (_activity is not null)
{
NpgsqlActivitySource.SetException(_activity, exception);
_activity = null;
}
}
#endregion Tracing
#region Enums
enum ExporterState
{
Uninitialized,
Ready,
Consumed,
Disposed
}
#endregion Enums
}