using System;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.IO;
using System.Linq;
using System.Text;
using Npgsql.BackendMessages;
using Npgsql.FrontendMessages;
#pragma warning disable 1591
namespace Npgsql
{
///
/// Provides an API for a raw binary COPY operation, a high-performance data import/export mechanism to
/// a PostgreSQL table. Initiated by
///
///
/// See http://www.postgresql.org/docs/current/static/sql-copy.html.
///
public class NpgsqlRawCopyStream : Stream
{
#region Fields and Properties
NpgsqlConnector _connector;
NpgsqlBuffer _buf;
bool _writingDataMsg;
int _leftToReadInDataMsg;
bool _isDisposed, _isConsumed;
readonly bool _canRead;
readonly bool _canWrite;
internal bool IsBinary { get; private set; }
public override bool CanWrite { get { return _canWrite; } }
public override bool CanRead { get { return _canRead; } }
///
/// The copy binary format header signature
///
internal static readonly byte[] BinarySignature =
{
(byte)'P',(byte)'G',(byte)'C',(byte)'O',(byte)'P',(byte)'Y',
(byte)'\n', 255, (byte)'\r', (byte)'\n', 0
};
#endregion
#region Constructor
internal NpgsqlRawCopyStream(NpgsqlConnector connector, string copyCommand)
{
_connector = connector;
_buf = connector.Buffer;
_connector.SendSingleMessage(new QueryMessage(copyCommand));
var msg = _connector.ReadSingleMessage();
switch (msg.Code)
{
case BackendMessageCode.CopyInResponse:
var copyInResponse = (CopyInResponseMessage) msg;
IsBinary = copyInResponse.IsBinary;
_canWrite = true;
break;
case BackendMessageCode.CopyOutResponse:
var copyOutResponse = (CopyOutResponseMessage) msg;
IsBinary = copyOutResponse.IsBinary;
_canRead = true;
break;
default:
throw _connector.UnexpectedMessageReceived(msg.Code);
}
}
#endregion
#region Write
public override void Write(byte[] buffer, int offset, int count)
{
CheckDisposed();
if (!CanWrite)
throw new InvalidOperationException("Stream not open for writing");
if (count == 0) { return; }
EnsureDataMessage();
if (count <= _buf.WriteSpaceLeft)
{
_buf.WriteBytes(buffer, offset, count);
return;
}
// Buffer is too big. Write whatever will fit and flush.
var written = _buf.WriteSpaceLeft;
_buf.WriteBytes(buffer, offset, _buf.WriteSpaceLeft);
Flush();
offset += written;
count -= written;
// If the remainder fits in a single buffer, no problem.
if (count <= _buf.WriteSpaceLeft) {
EnsureDataMessage();
_buf.WriteBytes(buffer, offset, count);
return;
}
// Otherwise, write the CopyData header via our buffer and the remaining data directly to the socket
_buf.WriteByte((byte)BackendMessageCode.CopyData);
_buf.WriteInt32(count);
_buf.Flush();
_buf.Underlying.Write(buffer, offset, count);
}
public override void Flush()
{
CheckDisposed();
if (!_writingDataMsg) { return; }
// Need to update the length for the CopyData about to be sent
var pos = _buf.WritePosition;
_buf.WritePosition = 1;
_buf.WriteInt32(pos - 1);
_buf.WritePosition = pos;
_buf.Flush();
_writingDataMsg = false;
}
void EnsureDataMessage()
{
if (_writingDataMsg) { return; }
Contract.Assert(_buf.WritePosition == 0);
_buf.WriteByte((byte)BackendMessageCode.CopyData);
// Leave space for the message length
_buf.WriteInt32(0);
_writingDataMsg = true;
}
#endregion
#region Read
public override int Read(byte[] buffer, int offset, int count)
{
CheckDisposed();
if (!CanRead)
throw new InvalidOperationException("Stream not open for reading");
var totalRead = 0;
do {
if (_leftToReadInDataMsg == 0)
{
if (_isConsumed) { return 0; }
var msg = _connector.ReadSingleMessage();
switch (msg.Code) {
case BackendMessageCode.CopyData:
_leftToReadInDataMsg = ((CopyDataMessage)msg).Length;
break;
case BackendMessageCode.CopyDone:
_connector.ReadExpecting();
_connector.ReadExpecting();
_isConsumed = true;
goto done;
default:
throw _connector.UnexpectedMessageReceived(msg.Code);
}
}
var len = Math.Min(count, _leftToReadInDataMsg);
_buf.ReadBytes(buffer, offset, len);
offset += len;
count -= len;
_leftToReadInDataMsg -= len;
totalRead += len;
} while (count > 0);
done:
return totalRead;
}
#endregion
#region Cancel
///
/// Cancels and terminates an ongoing operation. Any data already written will be discarded.
///
public void Cancel()
{
CheckDisposed();
if (CanWrite)
{
_isDisposed = true;
_buf.Clear();
_connector.SendSingleMessage(new CopyFailMessage());
try
{
var msg = _connector.ReadSingleMessage();
// The CopyFail should immediately trigger an exception from the read above.
_connector.Break();
throw new Exception("Expected ErrorResponse when cancelling COPY but got: " + msg.Code);
}
catch (NpgsqlException e)
{
if (e.Code == "57014") { return; }
throw;
}
}
else
{
_connector.CancelRequest();
}
}
#endregion
#region Dispose
protected override void Dispose(bool disposing)
{
if (_isDisposed || !disposing) { return; }
if (CanWrite)
{
Flush();
_connector.SendSingleMessage(CopyDoneMessage.Instance);
_connector.ReadExpecting();
_connector.ReadExpecting();
}
else
{
if (!_isConsumed) {
_buf.Skip(_leftToReadInDataMsg);
_connector.SkipUntil(BackendMessageCode.ReadyForQuery);
}
}
_connector.EndUserAction();
_connector = null;
_buf = null;
_isDisposed = true;
}
void CheckDisposed()
{
if (_isDisposed) {
throw new ObjectDisposedException(GetType().FullName, "The COPY operation has already ended.");
}
}
#endregion
#region Invariants
[ContractInvariantMethod]
void ObjectInvariants()
{
Contract.Invariant(_isDisposed || (_connector != null && _buf != null));
Contract.Invariant(CanRead || CanWrite);
Contract.Invariant(_buf == null || _buf == _connector.Buffer);
}
#endregion
#region Unsupported
public override bool CanSeek { get { return false; } }
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override long Length
{
get { throw new NotSupportedException(); }
}
public override long Position
{
get { throw new NotSupportedException(); }
set { throw new NotSupportedException(); }
}
#endregion
}
///
/// Writer for a text import, initiated by .
///
///
/// See http://www.postgresql.org/docs/current/static/sql-copy.html.
///
public class NpgsqlCopyTextWriter : StreamWriter
{
internal NpgsqlCopyTextWriter(NpgsqlRawCopyStream underlying) : base(underlying)
{
if (underlying.IsBinary)
throw new Exception("Can't use a binary copy stream for text writing");
Contract.EndContractBlock();
}
///
/// Cancels and terminates an ongoing import. Any data already written will be discarded.
///
public void Cancel()
{
((NpgsqlRawCopyStream)BaseStream).Cancel();
}
}
///
/// Reader for a text export, initiated by .
///
///
/// See http://www.postgresql.org/docs/current/static/sql-copy.html.
///
public class NpgsqlCopyTextReader : StreamReader
{
internal NpgsqlCopyTextReader(NpgsqlRawCopyStream underlying) : base(underlying)
{
if (underlying.IsBinary)
throw new Exception("Can't use a binary copy stream for text reading");
Contract.EndContractBlock();
}
///
/// Cancels and terminates an ongoing import.
///
public void Cancel()
{
((NpgsqlRawCopyStream)BaseStream).Cancel();
}
}
}