using System;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Npgsql.BackendMessages;
using Npgsql.Logging;
using static Npgsql.Util.Statics;
#pragma warning disable 1591
namespace Npgsql
{
///
/// Provides an API for a raw binary COPY operation, a high-performance data import/export mechanism to
/// a PostgreSQL table. Initiated by
///
///
/// See http://www.postgresql.org/docs/current/static/sql-copy.html.
///
public sealed class NpgsqlRawCopyStream : Stream, ICancelable
{
#region Fields and Properties
NpgsqlConnector _connector;
NpgsqlReadBuffer _readBuf;
NpgsqlWriteBuffer _writeBuf;
int _leftToReadInDataMsg;
bool _isDisposed, _isConsumed;
readonly bool _canRead;
readonly bool _canWrite;
internal bool IsBinary { get; private set; }
public override bool CanWrite => _canWrite;
public override bool CanRead => _canRead;
///
/// The copy binary format header signature
///
internal static readonly byte[] BinarySignature =
{
(byte)'P',(byte)'G',(byte)'C',(byte)'O',(byte)'P',(byte)'Y',
(byte)'\n', 255, (byte)'\r', (byte)'\n', 0
};
static readonly NpgsqlLogger Log = NpgsqlLogManager.CreateLogger(nameof(NpgsqlRawCopyStream));
#endregion
#region Constructor
internal NpgsqlRawCopyStream(NpgsqlConnector connector, string copyCommand)
{
_connector = connector;
_readBuf = connector.ReadBuffer;
_writeBuf = connector.WriteBuffer;
_connector.WriteQuery(copyCommand);
_connector.Flush();
var msg = _connector.ReadMessage();
switch (msg.Code)
{
case BackendMessageCode.CopyInResponse:
var copyInResponse = (CopyInResponseMessage) msg;
IsBinary = copyInResponse.IsBinary;
_canWrite = true;
_writeBuf.StartCopyMode();
break;
case BackendMessageCode.CopyOutResponse:
var copyOutResponse = (CopyOutResponseMessage) msg;
IsBinary = copyOutResponse.IsBinary;
_canRead = true;
break;
case BackendMessageCode.CompletedResponse:
throw new InvalidOperationException(
"This API only supports import/export from the client, i.e. COPY commands containing TO/FROM STDIN. " +
"To import/export with files on your PostgreSQL machine, simply execute the command with ExecuteNonQuery. " +
"Note that your data has been successfully imported/exported.");
default:
throw _connector.UnexpectedMessageReceived(msg.Code);
}
}
#endregion
#region Write
public override void Write(byte[] buffer, int offset, int count) => Write(buffer, offset, count, false).GetAwaiter().GetResult();
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
return Task.FromCanceled(cancellationToken);
using (NoSynchronizationContextScope.Enter())
return Write(buffer, offset, count, true);
}
async Task Write(byte[] buffer, int offset, int count, bool async)
{
CheckDisposed();
if (!CanWrite)
throw new InvalidOperationException("Stream not open for writing");
if (count == 0) { return; }
if (count <= _writeBuf.WriteSpaceLeft)
{
_writeBuf.WriteBytes(buffer, offset, count);
return;
}
try {
// Value is too big, flush.
await FlushAsync(async);
if (count <= _writeBuf.WriteSpaceLeft)
{
_writeBuf.WriteBytes(buffer, offset, count);
return;
}
// Value is too big even after a flush - bypass the buffer and write directly.
await _writeBuf.DirectWrite(buffer, offset, count, async);
} catch {
_connector.Break();
Cleanup();
throw;
}
}
public override void Flush() => FlushAsync(false).GetAwaiter().GetResult();
public override Task FlushAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
return Task.FromCanceled(cancellationToken);
using (NoSynchronizationContextScope.Enter())
return FlushAsync(true);
}
Task FlushAsync(bool async)
{
CheckDisposed();
return _writeBuf.Flush(async);
}
#endregion
#region Read
public override int Read(byte[] buffer, int offset, int count) => Read(buffer, offset, count, false).GetAwaiter().GetResult();
public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
return Task.FromCanceled(cancellationToken);
using (NoSynchronizationContextScope.Enter())
return Read(buffer, offset, count, true);
}
async Task Read(byte[] buffer, int offset, int count, bool async)
{
CheckDisposed();
if (!CanRead)
throw new InvalidOperationException("Stream not open for reading");
if (_isConsumed) {
return 0;
}
if (_leftToReadInDataMsg == 0)
{
// We've consumed the current DataMessage (or haven't yet received the first),
// read the next message
var msg = await _connector.ReadMessage(async);
switch (msg.Code) {
case BackendMessageCode.CopyData:
_leftToReadInDataMsg = ((CopyDataMessage)msg).Length;
break;
case BackendMessageCode.CopyDone:
Expect(await _connector.ReadMessage(async), _connector);
Expect(await _connector.ReadMessage(async), _connector);
_isConsumed = true;
return 0;
default:
throw _connector.UnexpectedMessageReceived(msg.Code);
}
}
Debug.Assert(_leftToReadInDataMsg > 0);
// If our buffer is empty, read in more. Otherwise return whatever is there, even if the
// user asked for more (normal socket behavior)
if (_readBuf.ReadBytesLeft == 0) {
await _readBuf.ReadMore(async);
}
Debug.Assert(_readBuf.ReadBytesLeft > 0);
var maxCount = Math.Min(_readBuf.ReadBytesLeft, _leftToReadInDataMsg);
if (count > maxCount) {
count = maxCount;
}
_leftToReadInDataMsg -= count;
_readBuf.ReadBytes(buffer, offset, count);
return count;
}
#endregion
#region Cancel
///
/// Cancels and terminates an ongoing operation. Any data already written will be discarded.
///
public void Cancel() => Cancel(false).GetAwaiter().GetResult();
///
/// Cancels and terminates an ongoing operation. Any data already written will be discarded.
///
public Task CancelAsync()
{
using (NoSynchronizationContextScope.Enter())
return Cancel(true);
}
async Task Cancel(bool async)
{
CheckDisposed();
if (CanWrite)
{
_isDisposed = true;
_writeBuf.EndCopyMode();
_writeBuf.Clear();
await _connector.WriteCopyFail(async);
await _connector.Flush(async);
try
{
var msg = await _connector.ReadMessage(async);
// The CopyFail should immediately trigger an exception from the read above.
_connector.Break();
throw new NpgsqlException("Expected ErrorResponse when cancelling COPY but got: " + msg.Code);
}
catch (PostgresException e)
{
if (e.SqlState == PostgresErrorCodes.QueryCanceled)
return;
throw;
}
}
else
{
_connector.CancelRequest();
}
}
#endregion
#region Dispose
protected override void Dispose(bool disposing) => DisposeAsync(disposing, false).GetAwaiter().GetResult();
async ValueTask DisposeAsync(bool disposing, bool async)
{
if (_isDisposed || !disposing) { return; }
try
{
if (CanWrite)
{
await FlushAsync(async);
_writeBuf.EndCopyMode();
await _connector.WriteCopyDone(async);
await _connector.Flush(async);
Expect(await _connector.ReadMessage(async), _connector);
Expect(await _connector.ReadMessage(async), _connector);
}
else
{
if (!_isConsumed)
{
if (_leftToReadInDataMsg > 0)
{
await _readBuf.Skip(_leftToReadInDataMsg, async);
}
_connector.SkipUntil(BackendMessageCode.ReadyForQuery);
}
}
}
finally
{
var connector = _connector;
Cleanup();
connector.EndUserAction();
}
}
#pragma warning disable CS8625
void Cleanup()
{
Log.Debug("COPY operation ended", _connector.Id);
_connector.CurrentCopyOperation = null;
_connector = null;
_readBuf = null;
_writeBuf = null;
_isDisposed = true;
}
#pragma warning restore CS8625
void CheckDisposed()
{
if (_isDisposed) {
throw new ObjectDisposedException(GetType().FullName, "The COPY operation has already ended.");
}
}
#endregion
#region Unsupported
public override bool CanSeek => false;
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override long Length => throw new NotSupportedException();
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
#endregion
}
///
/// Writer for a text import, initiated by .
///
///
/// See http://www.postgresql.org/docs/current/static/sql-copy.html.
///
public sealed class NpgsqlCopyTextWriter : StreamWriter, ICancelable
{
internal NpgsqlCopyTextWriter(NpgsqlConnector connector, NpgsqlRawCopyStream underlying) : base(underlying)
{
if (underlying.IsBinary)
{
connector.Break();
throw new Exception("Can't use a binary copy stream for text writing");
}
}
///
/// Cancels and terminates an ongoing import. Any data already written will be discarded.
///
public void Cancel()
{
((NpgsqlRawCopyStream)BaseStream).Cancel();
}
///
/// Cancels and terminates an ongoing import. Any data already written will be discarded.
///
public Task CancelAsync()
{
using (NoSynchronizationContextScope.Enter())
return ((NpgsqlRawCopyStream)BaseStream).CancelAsync();
}
}
///
/// Reader for a text export, initiated by .
///
///
/// See http://www.postgresql.org/docs/current/static/sql-copy.html.
///
public sealed class NpgsqlCopyTextReader : StreamReader, ICancelable
{
internal NpgsqlCopyTextReader(NpgsqlConnector connector, NpgsqlRawCopyStream underlying) : base(underlying)
{
if (underlying.IsBinary)
{
connector.Break();
throw new Exception("Can't use a binary copy stream for text reading");
}
}
///
/// Cancels and terminates an ongoing import.
///
public void Cancel()
{
((NpgsqlRawCopyStream)BaseStream).Cancel();
}
///
/// Cancels and terminates an ongoing import. Any data already written will be discarded.
///
public Task CancelAsync()
{
using (NoSynchronizationContextScope.Enter())
return ((NpgsqlRawCopyStream)BaseStream).CancelAsync();
}
}
}