using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Npgsql.BackendMessages;
using Npgsql.Internal;
using Npgsql.Internal.Postgres;
using NpgsqlTypes;
using InfiniteTimeout = System.Threading.Timeout;
using static Npgsql.Util.Statics;
namespace Npgsql;
///
/// Provides an API for a binary COPY FROM operation, a high-performance data import mechanism to
/// a PostgreSQL table. Initiated by
///
///
/// See https://www.postgresql.org/docs/current/static/sql-copy.html.
///
public sealed class NpgsqlBinaryImporter : ICancelable
{
#region Fields and Properties
NpgsqlConnector _connector;
NpgsqlWriteBuffer _buf;
ImporterState _state = ImporterState.Uninitialized;
///
/// The number of columns in the current (not-yet-written) row.
///
short _column;
ulong _rowsImported;
///
/// The number of columns, as returned from the backend in the CopyInResponse.
///
int NumColumns => _params.Length;
bool InMiddleOfRow => _column != -1 && _column != NumColumns;
NpgsqlParameter?[] _params;
readonly ILogger _copyLogger;
PgWriter _pgWriter = null!; // Setup in Init
Activity? _activity;
///
/// Current timeout
///
public TimeSpan Timeout
{
set
{
var timeout = value > TimeSpan.Zero ? value : InfiniteTimeout.InfiniteTimeSpan;
_buf.Timeout = timeout;
_connector.ReadBuffer.Timeout = timeout;
}
}
#endregion
#region Construction / Initialization
internal NpgsqlBinaryImporter(NpgsqlConnector connector)
{
_connector = connector;
_buf = connector.WriteBuffer;
_column = -1;
_params = null!;
_copyLogger = connector.LoggingConfiguration.CopyLogger;
}
internal async Task Init(string copyFromCommand, bool async, CancellationToken cancellationToken = default)
{
Debug.Assert(_activity is null);
_activity = _connector.TraceCopyStart(copyFromCommand, "COPY FROM");
try
{
await _connector.WriteQuery(copyFromCommand, async, cancellationToken).ConfigureAwait(false);
await _connector.Flush(async, cancellationToken).ConfigureAwait(false);
using var registration = _connector.StartNestedCancellableOperation(cancellationToken, attemptPgCancellation: false);
CopyInResponseMessage copyInResponse;
var msg = await _connector.ReadMessage(async).ConfigureAwait(false);
switch (msg.Code)
{
case BackendMessageCode.CopyInResponse:
copyInResponse = (CopyInResponseMessage)msg;
if (!copyInResponse.IsBinary)
{
throw _connector.Break(
new ArgumentException("copyFromCommand triggered a text transfer, only binary is allowed",
nameof(copyFromCommand)));
}
break;
case BackendMessageCode.CommandComplete:
throw new InvalidOperationException(
"This API only supports import/export from the client, i.e. COPY commands containing TO/FROM STDIN. " +
"To import/export with files on your PostgreSQL machine, simply execute the command with ExecuteNonQuery. " +
"Note that your data has been successfully imported/exported.");
default:
throw _connector.UnexpectedMessageReceived(msg.Code);
}
_state = ImporterState.Ready;
_params = new NpgsqlParameter[copyInResponse.NumColumns];
_rowsImported = 0;
_buf.StartCopyMode();
WriteHeader();
// Only init after header.
_pgWriter = _buf.GetWriter(_connector.DatabaseInfo);
}
catch (Exception e)
{
TraceSetException(e);
throw;
}
}
void WriteHeader()
{
_buf.WriteBytes(NpgsqlRawCopyStream.BinarySignature, 0, NpgsqlRawCopyStream.BinarySignature.Length);
_buf.WriteInt32(0); // Flags field. OID inclusion not supported at the moment.
_buf.WriteInt32(0); // Header extension area length
}
#endregion
#region Write
///
/// Starts writing a single row, must be invoked before writing any columns.
///
public void StartRow() => StartRow(false).GetAwaiter().GetResult();
///
/// Starts writing a single row, must be invoked before writing any columns.
///
public Task StartRowAsync(CancellationToken cancellationToken = default) => StartRow(async: true, cancellationToken);
async Task StartRow(bool async, CancellationToken cancellationToken = default)
{
CheckReady();
cancellationToken.ThrowIfCancellationRequested();
if (_column is not -1 && _column != NumColumns)
ThrowColumnMismatch();
if (_buf.WriteSpaceLeft < 2)
await _buf.Flush(async, cancellationToken).ConfigureAwait(false);
_buf.WriteInt16((short)NumColumns);
_pgWriter.RefreshBuffer();
_column = 0;
_rowsImported++;
}
///
/// Writes a single column in the current row.
///
/// The value to be written
///
/// The type of the column to be written. This must correspond to the actual type or data
/// corruption will occur. If in doubt, use to manually
/// specify the type.
///
public void Write(T value)
=> Write(async: false, value, npgsqlDbType: null, dataTypeName: null).GetAwaiter().GetResult();
///
/// Writes a single column in the current row.
///
/// The value to be written
///
/// An optional token to cancel the asynchronous operation. The default value is .
///
///
/// The type of the column to be written. This must correspond to the actual type or data
/// corruption will occur. If in doubt, use to manually
/// specify the type.
///
public Task WriteAsync(T value, CancellationToken cancellationToken = default)
=> Write(async: true, value, npgsqlDbType: null, dataTypeName: null, cancellationToken);
///
/// Writes a single column in the current row as type .
///
/// The value to be written
///
/// In some cases isn't enough to infer the data type to be written to
/// the database. This parameter can be used to unambiguously specify the type. An example is
/// the JSONB type, for which will be a simple string but for which
/// must be specified as .
///
/// The .NET type of the column to be written.
public void Write(T value, NpgsqlDbType npgsqlDbType) =>
Write(async: false, value, npgsqlDbType, dataTypeName: null).GetAwaiter().GetResult();
///
/// Writes a single column in the current row as type .
///
/// The value to be written
///
/// In some cases isn't enough to infer the data type to be written to
/// the database. This parameter can be used to unambiguously specify the type. An example is
/// the JSONB type, for which will be a simple string but for which
/// must be specified as .
///
///
/// An optional token to cancel the asynchronous operation. The default value is .
///
/// The .NET type of the column to be written.
public Task WriteAsync(T value, NpgsqlDbType npgsqlDbType, CancellationToken cancellationToken = default)
=> Write(async: true, value, npgsqlDbType, dataTypeName: null, cancellationToken);
///
/// Writes a single column in the current row as type .
///
/// The value to be written
///
/// In some cases isn't enough to infer the data type to be written to
/// the database. This parameter and be used to unambiguously specify the type.
///
/// The .NET type of the column to be written.
public void Write(T value, string dataTypeName) =>
Write(async: false, value, npgsqlDbType: null, dataTypeName).GetAwaiter().GetResult();
///
/// Writes a single column in the current row as type .
///
/// The value to be written
///
/// In some cases isn't enough to infer the data type to be written to
/// the database. This parameter and be used to unambiguously specify the type.
///
///
/// An optional token to cancel the asynchronous operation. The default value is .
///
/// The .NET type of the column to be written.
public Task WriteAsync(T value, string dataTypeName, CancellationToken cancellationToken = default)
=> Write(async: true, value, npgsqlDbType: null, dataTypeName, cancellationToken);
Task Write(bool async, T value, NpgsqlDbType? npgsqlDbType, string? dataTypeName, CancellationToken cancellationToken = default)
{
// Handle DBNull:
// 1. when T = DBNull for backwards compatibility, DBNull as a type normally won't find a mapping.
// 2. when T = object we resolve oid 0 if DBNull is the first value, later column value oids would needlessly be limited to oid 0.
// Also handle null values for object typed parameters, these parameters require non null values to be seen as set.
if (typeof(T) == typeof(DBNull) || (typeof(T) == typeof(object) && value is null or DBNull))
return WriteNull(async, cancellationToken);
return Core(async, value, npgsqlDbType, dataTypeName, cancellationToken);
async Task Core(bool async, T value, NpgsqlDbType? npgsqlDbType, string? dataTypeName, CancellationToken cancellationToken = default)
{
CheckReady();
cancellationToken.ThrowIfCancellationRequested();
CheckColumnIndex();
// Create the parameter objects for the first row or if the value type changes.
var newParam = false;
if (_params[_column] is not NpgsqlParameter param)
{
newParam = true;
param = new NpgsqlParameter();
if (npgsqlDbType is not null)
param._npgsqlDbType = npgsqlDbType;
if (dataTypeName is not null)
param._dataTypeName = dataTypeName;
}
// We only retrieve previous values if anything actually changed.
// For object typed parameters we must do so whenever setting NpgsqlParameter.Value would reset the type info.
PgTypeInfo? previousTypeInfo = null;
PgConverter? previousConverter = null;
PgTypeId previousTypeId = default;
if (!newParam && (
(typeof(T) == typeof(object) && param.ShouldResetObjectTypeInfo(value))
|| param._npgsqlDbType != npgsqlDbType
|| param._dataTypeName != dataTypeName))
{
param.GetResolutionInfo(out previousTypeInfo, out previousConverter, out previousTypeId);
if (!newParam)
{
param.ResetDbType();
if (npgsqlDbType is not null)
param._npgsqlDbType = npgsqlDbType;
if (dataTypeName is not null)
param._dataTypeName = dataTypeName;
}
}
// These actions can reset or change the type info, we'll check afterwards whether we're still consistent with the original values.
param.TypedValue = value;
param.ResolveTypeInfo(_connector.SerializerOptions, _connector.DbTypeResolver);
if (previousTypeInfo is not null && previousConverter is not null && param.PgTypeId != previousTypeId)
{
var currentPgTypeId = param.PgTypeId;
// We should only rollback values when the stored instance was used. We'll throw before writing the new instance back anyway.
// Also always rolling back could set PgTypeInfos that were resolved for a type that doesn't match the T of the NpgsqlParameter.
if (!newParam)
param.SetResolutionInfo(previousTypeInfo, previousConverter, previousTypeId);
throw new InvalidOperationException($"Write for column {_column} resolves to a different PostgreSQL type: {currentPgTypeId} than the first row resolved to ({previousTypeId}). " +
$"Please make sure to use clr types that resolve to the same PostgreSQL type across rows. " +
$"Alternatively pass the same NpgsqlDbType or DataTypeName to ensure the PostgreSQL type ends up to be identical." );
}
if (newParam)
_params[_column] = param;
param.Bind(out _, out _, requiredFormat: DataFormat.Binary);
try
{
await param.Write(async, _pgWriter.WithFlushMode(async ? FlushMode.NonBlocking : FlushMode.Blocking), cancellationToken)
.ConfigureAwait(false);
}
catch (Exception ex)
{
TraceSetException(ex);
_connector.Break(ex);
throw;
}
_column++;
}
}
///
/// Writes a single null column value.
///
public void WriteNull() => WriteNull(false).GetAwaiter().GetResult();
///
/// Writes a single null column value.
///
public Task WriteNullAsync(CancellationToken cancellationToken = default) => WriteNull(async: true, cancellationToken);
async Task WriteNull(bool async, CancellationToken cancellationToken = default)
{
CheckReady();
if (cancellationToken.IsCancellationRequested)
cancellationToken.ThrowIfCancellationRequested();
CheckColumnIndex();
if (_buf.WriteSpaceLeft < 4)
await _buf.Flush(async, cancellationToken).ConfigureAwait(false);
_buf.WriteInt32(-1);
_pgWriter.RefreshBuffer();
_column++;
}
///
/// Writes an entire row of columns.
/// Equivalent to calling , followed by multiple
/// on each value.
///
/// An array of column values to be written as a single row
public void WriteRow(params object?[] values) => WriteRow(false, CancellationToken.None, values).GetAwaiter().GetResult();
///
/// Writes an entire row of columns.
/// Equivalent to calling , followed by multiple
/// on each value.
///
///
/// An optional token to cancel the asynchronous operation. The default value is .
///
/// An array of column values to be written as a single row
public Task WriteRowAsync(CancellationToken cancellationToken = default, params object?[] values)
=> WriteRow(async: true, cancellationToken, values);
async Task WriteRow(bool async, CancellationToken cancellationToken = default, params object?[] values)
{
await StartRow(async, cancellationToken).ConfigureAwait(false);
foreach (var value in values)
await Write(async, value, npgsqlDbType: null, dataTypeName: null, cancellationToken).ConfigureAwait(false);
}
void CheckColumnIndex()
{
if (_column is -1 || _column >= NumColumns)
Throw();
[MethodImpl(MethodImplOptions.NoInlining)]
void Throw()
{
if (_column is -1)
throw new InvalidOperationException("A row hasn't been started");
if (_column >= NumColumns)
ThrowColumnMismatch();
}
}
#endregion
#region Commit / Cancel / Close / Dispose
///
/// Completes the import operation. The writer is unusable after this operation.
///
public ulong Complete() => Complete(false).GetAwaiter().GetResult();
///
/// Completes the import operation. The writer is unusable after this operation.
///
public ValueTask CompleteAsync(CancellationToken cancellationToken = default) => Complete(async: true, cancellationToken);
async ValueTask Complete(bool async, CancellationToken cancellationToken = default)
{
CheckReady();
using var registration = _connector.StartNestedCancellableOperation(cancellationToken, attemptPgCancellation: false);
if (InMiddleOfRow)
{
await Cancel(async, cancellationToken).ConfigureAwait(false);
throw new InvalidOperationException("Binary importer closed in the middle of a row, cancelling import.");
}
try
{
// Write trailer
if (_buf.WriteSpaceLeft < 2)
await _buf.Flush(async, cancellationToken).ConfigureAwait(false);
_buf.WriteInt16(-1);
await _buf.Flush(async, cancellationToken).ConfigureAwait(false);
_buf.EndCopyMode();
await _connector.WriteCopyDone(async, cancellationToken).ConfigureAwait(false);
await _connector.Flush(async, cancellationToken).ConfigureAwait(false);
var cmdComplete = Expect(await _connector.ReadMessage(async).ConfigureAwait(false), _connector);
Expect(await _connector.ReadMessage(async).ConfigureAwait(false), _connector);
_state = ImporterState.Committed;
return cmdComplete.Rows;
}
catch (Exception e)
{
TraceSetException(e);
Cleanup();
throw;
}
}
void ICancelable.Cancel() => Close();
async Task ICancelable.CancelAsync() => await CloseAsync().ConfigureAwait(false);
///
///
/// Terminates the ongoing binary import and puts the connection back into the idle state, where regular commands can be executed.
///
///
/// Note that if hasn't been invoked before calling this, the import will be cancelled and all changes will
/// be reverted.
///
///
public void Dispose() => Close();
///
///
/// Async terminates the ongoing binary import and puts the connection back into the idle state, where regular commands can be executed.
///
///
/// Note that if hasn't been invoked before calling this, the import will be cancelled and all changes will
/// be reverted.
///
///
public ValueTask DisposeAsync() => CloseAsync(true);
async Task Cancel(bool async, CancellationToken cancellationToken = default)
{
_state = ImporterState.Cancelled;
_buf.Clear();
_buf.EndCopyMode();
await _connector.WriteCopyFail(async, cancellationToken).ConfigureAwait(false);
await _connector.Flush(async, cancellationToken).ConfigureAwait(false);
try
{
using var registration = _connector.StartNestedCancellableOperation(cancellationToken, attemptPgCancellation: false);
var msg = await _connector.ReadMessage(async).ConfigureAwait(false);
// The CopyFail should immediately trigger an exception from the read above.
throw _connector.Break(
new NpgsqlException("Expected ErrorResponse when cancelling COPY but got: " + msg.Code));
}
catch (PostgresException e)
{
if (e.SqlState != PostgresErrorCodes.QueryCanceled)
throw;
}
}
///
///
/// Terminates the ongoing binary import and puts the connection back into the idle state, where regular commands can be executed.
///
///
/// Note that if hasn't been invoked before calling this, the import will be cancelled and all changes will
/// be reverted.
///
///
public void Close() => CloseAsync(async: false).GetAwaiter().GetResult();
///
///
/// Async terminates the ongoing binary import and puts the connection back into the idle state, where regular commands can be executed.
///
///
/// Note that if hasn't been invoked before calling this, the import will be cancelled and all changes will
/// be reverted.
///
///
public ValueTask CloseAsync(CancellationToken cancellationToken = default) => CloseAsync(async: true, cancellationToken);
async ValueTask CloseAsync(bool async, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
switch (_state)
{
case ImporterState.Disposed:
return;
case ImporterState.Ready:
await Cancel(async, cancellationToken).ConfigureAwait(false);
break;
case ImporterState.Uninitialized:
case ImporterState.Cancelled:
case ImporterState.Committed:
break;
default:
throw new Exception("Invalid state: " + _state);
}
TraceImportStop();
Cleanup();
}
#pragma warning disable CS8625
void Cleanup()
{
if (_state == ImporterState.Disposed)
return;
var connector = _connector;
LogMessages.BinaryCopyOperationCompleted(_copyLogger, _rowsImported, connector?.Id ?? -1);
if (connector != null)
{
connector.EndUserAction();
connector.CurrentCopyOperation = null;
connector.Connection?.EndBindingScope(ConnectorBindingScope.Copy);
_connector = null;
}
_buf = null;
_state = ImporterState.Disposed;
}
#pragma warning restore CS8625
void CheckReady()
{
if (_state is not ImporterState.Ready and var state)
Throw(state);
[MethodImpl(MethodImplOptions.NoInlining)]
static void Throw(ImporterState state)
=> throw (state switch
{
ImporterState.Uninitialized => throw new InvalidOperationException("The COPY operation has not been initialized."),
ImporterState.Disposed => new ObjectDisposedException(typeof(NpgsqlBinaryImporter).FullName,
"The COPY operation has already ended."),
ImporterState.Cancelled => new InvalidOperationException("The COPY operation has already been cancelled."),
ImporterState.Committed => new InvalidOperationException("The COPY operation has already been committed."),
_ => new Exception("Invalid state: " + state)
});
}
#endregion
#region Enums
enum ImporterState
{
Uninitialized,
Ready,
Committed,
Cancelled,
Disposed
}
#endregion Enums
void ThrowColumnMismatch()
=> throw new InvalidOperationException($"The binary import operation was started with {NumColumns} column(s), but {_column + 1} value(s) were provided.");
#region Tracing
void TraceImportStop()
{
if (_activity is not null)
{
switch (_state)
{
case ImporterState.Committed:
NpgsqlActivitySource.CopyStop(_activity, _rowsImported);
break;
case ImporterState.Cancelled:
NpgsqlActivitySource.CopyStop(_activity, rows: 0);
break;
default:
Debug.Fail("Invalid state: " + _state);
break;
}
_activity = null;
}
}
void TraceSetException(Exception exception)
{
if (_activity is not null)
{
NpgsqlActivitySource.SetException(_activity, exception);
_activity = null;
}
}
#endregion Tracing
}