#region License
// The PostgreSQL License
//
// Copyright (C) 2017 The Npgsql Development Team
//
// Permission to use, copy, modify, and distribute this software and its
// documentation for any purpose, without fee, and without a written
// agreement is hereby granted, provided that the above copyright notice
// and this paragraph and the following two paragraphs appear in all copies.
//
// IN NO EVENT SHALL THE NPGSQL DEVELOPMENT TEAM BE LIABLE TO ANY PARTY
// FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES,
// INCLUDING LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
// DOCUMENTATION, EVEN IF THE NPGSQL DEVELOPMENT TEAM HAS BEEN ADVISED OF
// THE POSSIBILITY OF SUCH DAMAGE.
//
// THE NPGSQL DEVELOPMENT TEAM SPECIFICALLY DISCLAIMS ANY WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
// ON AN "AS IS" BASIS, AND THE NPGSQL DEVELOPMENT TEAM HAS NO OBLIGATIONS
// TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
#endregion
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using Npgsql.BackendMessages;
using Npgsql.FrontendMessages;
using Npgsql.Logging;
#pragma warning disable 1591
namespace Npgsql
{
///
/// Provides an API for a raw binary COPY operation, a high-performance data import/export mechanism to
/// a PostgreSQL table. Initiated by
///
///
/// See http://www.postgresql.org/docs/current/static/sql-copy.html.
///
public sealed class NpgsqlRawCopyStream : Stream, ICancelable
{
#region Fields and Properties
NpgsqlConnector _connector;
NpgsqlReadBuffer _readBuf;
NpgsqlWriteBuffer _writeBuf;
int _leftToReadInDataMsg;
bool _isDisposed, _isConsumed;
readonly bool _canRead;
readonly bool _canWrite;
internal bool IsBinary { get; private set; }
public override bool CanWrite => _canWrite;
public override bool CanRead => _canRead;
///
/// The copy binary format header signature
///
internal static readonly byte[] BinarySignature =
{
(byte)'P',(byte)'G',(byte)'C',(byte)'O',(byte)'P',(byte)'Y',
(byte)'\n', 255, (byte)'\r', (byte)'\n', 0
};
static readonly NpgsqlLogger Log = NpgsqlLogManager.GetCurrentClassLogger();
#endregion
#region Constructor
internal NpgsqlRawCopyStream(NpgsqlConnector connector, string copyCommand)
{
_connector = connector;
_readBuf = connector.ReadBuffer;
_writeBuf = connector.WriteBuffer;
_connector.SendQuery(copyCommand);
var msg = _connector.ReadMessage();
switch (msg.Code)
{
case BackendMessageCode.CopyInResponse:
var copyInResponse = (CopyInResponseMessage) msg;
IsBinary = copyInResponse.IsBinary;
_canWrite = true;
_writeBuf.StartCopyMode();
break;
case BackendMessageCode.CopyOutResponse:
var copyOutResponse = (CopyOutResponseMessage) msg;
IsBinary = copyOutResponse.IsBinary;
_canRead = true;
break;
default:
throw _connector.UnexpectedMessageReceived(msg.Code);
}
}
#endregion
#region Write
public override void Write(byte[] buffer, int offset, int count)
{
CheckDisposed();
if (!CanWrite)
throw new InvalidOperationException("Stream not open for writing");
if (count == 0) { return; }
if (count <= _writeBuf.WriteSpaceLeft)
{
_writeBuf.WriteBytes(buffer, offset, count);
return;
}
try {
// Value is too big, flush.
Flush();
if (count <= _writeBuf.WriteSpaceLeft)
{
_writeBuf.WriteBytes(buffer, offset, count);
return;
}
// Value is too big even after a flush - bypass the buffer and write directly.
_writeBuf.DirectWrite(buffer, offset, count);
} catch {
_connector.Break();
Cleanup();
throw;
}
}
public override void Flush()
{
CheckDisposed();
_writeBuf.Flush();
}
#endregion
#region Read
public override int Read(byte[] buffer, int offset, int count)
{
CheckDisposed();
if (!CanRead)
throw new InvalidOperationException("Stream not open for reading");
if (_isConsumed) {
return 0;
}
if (_leftToReadInDataMsg == 0)
{
// We've consumed the current DataMessage (or haven't yet received the first),
// read the next message
var msg = _connector.ReadMessage();
switch (msg.Code) {
case BackendMessageCode.CopyData:
_leftToReadInDataMsg = ((CopyDataMessage)msg).Length;
break;
case BackendMessageCode.CopyDone:
_connector.ReadExpecting();
_connector.ReadExpecting();
_isConsumed = true;
return 0;
default:
throw _connector.UnexpectedMessageReceived(msg.Code);
}
}
Debug.Assert(_leftToReadInDataMsg > 0);
// If our buffer is empty, read in more. Otherwise return whatever is there, even if the
// user asked for more (normal socket behavior)
if (_readBuf.ReadBytesLeft == 0) {
_readBuf.ReadMore(false).GetAwaiter().GetResult();
}
Debug.Assert(_readBuf.ReadBytesLeft > 0);
var maxCount = Math.Min(_readBuf.ReadBytesLeft, _leftToReadInDataMsg);
if (count > maxCount) {
count = maxCount;
}
_leftToReadInDataMsg -= count;
_readBuf.ReadBytes(buffer, offset, count);
return count;
}
#endregion
#region Cancel
///
/// Cancels and terminates an ongoing operation. Any data already written will be discarded.
///
public void Cancel()
{
CheckDisposed();
if (CanWrite)
{
_isDisposed = true;
_writeBuf.EndCopyMode();
_writeBuf.Clear();
_connector.SendMessage(new CopyFailMessage());
try
{
var msg = _connector.ReadMessage();
// The CopyFail should immediately trigger an exception from the read above.
_connector.Break();
throw new NpgsqlException("Expected ErrorResponse when cancelling COPY but got: " + msg.Code);
}
catch (PostgresException e)
{
if (e.SqlState == "57014") { return; }
throw;
}
}
else
{
_connector.CancelRequest();
}
}
#endregion
#region Dispose
protected override void Dispose(bool disposing)
{
if (_isDisposed || !disposing) { return; }
try
{
if (CanWrite)
{
Flush();
_writeBuf.EndCopyMode();
_connector.SendMessage(CopyDoneMessage.Instance);
_connector.ReadExpecting();
_connector.ReadExpecting();
}
else
{
if (!_isConsumed)
{
if (_leftToReadInDataMsg > 0)
{
_readBuf.Skip(_leftToReadInDataMsg);
}
_connector.SkipUntil(BackendMessageCode.ReadyForQuery);
}
}
}
finally
{
var connector = _connector;
Cleanup();
connector.EndUserAction();
}
}
void Cleanup()
{
Log.Debug("COPY operation ended", _connector.Id);
_connector.CurrentCopyOperation = null;
_connector = null;
_readBuf = null;
_writeBuf = null;
_isDisposed = true;
}
void CheckDisposed()
{
if (_isDisposed) {
throw new ObjectDisposedException(GetType().FullName, "The COPY operation has already ended.");
}
}
#endregion
#region Unsupported
public override bool CanSeek => false;
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override long Length => throw new NotSupportedException();
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
#endregion
}
///
/// Writer for a text import, initiated by .
///
///
/// See http://www.postgresql.org/docs/current/static/sql-copy.html.
///
public sealed class NpgsqlCopyTextWriter : StreamWriter, ICancelable
{
internal NpgsqlCopyTextWriter(NpgsqlRawCopyStream underlying) : base(underlying)
{
if (underlying.IsBinary)
throw new Exception("Can't use a binary copy stream for text writing");
}
///
/// Cancels and terminates an ongoing import. Any data already written will be discarded.
///
public void Cancel()
{
((NpgsqlRawCopyStream)BaseStream).Cancel();
}
}
///
/// Reader for a text export, initiated by .
///
///
/// See http://www.postgresql.org/docs/current/static/sql-copy.html.
///
public sealed class NpgsqlCopyTextReader : StreamReader, ICancelable
{
internal NpgsqlCopyTextReader(NpgsqlRawCopyStream underlying) : base(underlying)
{
if (underlying.IsBinary)
throw new Exception("Can't use a binary copy stream for text reading");
}
///
/// Cancels and terminates an ongoing import.
///
public void Cancel()
{
((NpgsqlRawCopyStream)BaseStream).Cancel();
}
}
}