#region License
// The PostgreSQL License
//
// Copyright (C) 2015 The Npgsql Development Team
//
// Permission to use, copy, modify, and distribute this software and its
// documentation for any purpose, without fee, and without a written
// agreement is hereby granted, provided that the above copyright notice
// and this paragraph and the following two paragraphs appear in all copies.
//
// IN NO EVENT SHALL THE NPGSQL DEVELOPMENT TEAM BE LIABLE TO ANY PARTY
// FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES,
// INCLUDING LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
// DOCUMENTATION, EVEN IF THE NPGSQL DEVELOPMENT TEAM HAS BEEN ADVISED OF
// THE POSSIBILITY OF SUCH DAMAGE.
//
// THE NPGSQL DEVELOPMENT TEAM SPECIFICALLY DISCLAIMS ANY WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
// ON AN "AS IS" BASIS, AND THE NPGSQL DEVELOPMENT TEAM HAS NO OBLIGATIONS
// TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
#endregion
using System;
using System.Diagnostics.Contracts;
using System.Linq;
using System.Runtime.CompilerServices;
using Npgsql.BackendMessages;
using Npgsql.FrontendMessages;
using NpgsqlTypes;
namespace Npgsql
{
///
/// Provides an API for a binary COPY TO operation, a high-performance data export mechanism from
/// a PostgreSQL table. Initiated by
///
public class NpgsqlBinaryExporter : IDisposable, ICancelable
{
#region Fields and Properties
NpgsqlConnector _connector;
NpgsqlBuffer _buf;
TypeHandlerRegistry _registry;
bool _isConsumed, _isDisposed;
int _leftToReadInDataMsg, _columnLen;
short _column;
///
/// The number of columns, as returned from the backend in the CopyInResponse.
///
internal int NumColumns { get; private set; }
#endregion
#region Construction / Initialization
internal NpgsqlBinaryExporter(NpgsqlConnector connector, string copyToCommand)
{
_connector = connector;
_buf = connector.Buffer;
_registry = connector.TypeHandlerRegistry;
_columnLen = int.MinValue; // Mark that the (first) column length hasn't been read yet
_column = -1;
try
{
_connector.SendSingleMessage(new QueryMessage(copyToCommand));
// TODO: Failure will break the connection (e.g. if we get CopyOutResponse), handle more gracefully
var copyOutResponse = _connector.ReadExpecting();
if (!copyOutResponse.IsBinary) {
throw new ArgumentException("copyToCommand triggered a text transfer, only binary is allowed", "copyToCommand");
}
NumColumns = copyOutResponse.NumColumns;
ReadHeader();
}
catch
{
_connector.Break();
throw;
}
}
void ReadHeader()
{
_leftToReadInDataMsg = _connector.ReadExpecting().Length;
var headerLen = NpgsqlRawCopyStream.BinarySignature.Length + 4 + 4;
_buf.Ensure(headerLen);
if (NpgsqlRawCopyStream.BinarySignature.Any(t => _buf.ReadByte() != t)) {
throw new Exception("Invalid COPY binary signature at beginning!");
}
var flags = _buf.ReadInt32();
if (flags != 0) {
throw new NotSupportedException("Unsupported flags in COPY operation (OID inclusion?)");
}
_buf.ReadInt32(); // Header extensions, currently unused
_leftToReadInDataMsg -= headerLen;
}
#endregion
#region Read
///
/// Starts reading a single row, must be invoked before reading any columns.
///
///
/// The number of columns in the row. -1 if there are no further rows.
/// Note: This will currently be the same value for all rows, but this may change in the future.
///
public int StartRow()
{
CheckDisposed();
if (_isConsumed) { return -1; }
// The very first row (i.e. _column == -1) is included in the header's CopyData message.
// Otherwise we need to read in a new CopyData row (the docs specify that there's a CopyData
// message per row).
if (_column == NumColumns)
{
_leftToReadInDataMsg = _connector.ReadExpecting().Length;
}
else if (_column != -1)
{
throw new InvalidOperationException("Already in the middle of a row");
}
_buf.Ensure(2);
_leftToReadInDataMsg -= 2;
var numColumns = _buf.ReadInt16();
if (numColumns == -1)
{
Contract.Assume(_leftToReadInDataMsg == 0);
_connector.ReadExpecting();
_connector.ReadExpecting();
_connector.ReadExpecting();
_column = -1;
_isConsumed = true;
return -1;
}
Contract.Assume(numColumns == NumColumns);
_column = 0;
return NumColumns;
}
///
/// Reads the current column, returns its value and moves ahead to the next column.
/// If the column is null an exception is thrown.
///
///
/// The type of the column to be read. This must correspond to the actual type or data
/// corruption will occur. If in doubt, use to manually
/// specify the type.
///
/// The value of the column
public T Read()
{
CheckDisposed();
if (_column == -1 || _column == NumColumns) {
throw new InvalidOperationException("Not reading a row");
}
var type = typeof(T);
var handler = _registry[type];
return DoRead(handler);
}
///
/// Reads the current column, returns its value according to and
/// moves ahead to the next column.
/// If the column is null an exception is thrown.
///
///
/// In some cases isn't enough to infer the data type coming in from the
/// database. This parameter and be used to unambiguously specify the type. An example is the JSONB
/// type, for which will be a simple string but for which
/// must be specified as .
///
/// The .NET type of the column to be read.
/// The value of the column
public T Read(NpgsqlDbType type)
{
CheckDisposed();
if (_column == -1 || _column == NumColumns) {
throw new InvalidOperationException("Not reading a row");
}
var handler = _registry[type];
return DoRead(handler);
}
T DoRead(TypeHandler handler)
{
try {
ReadColumnLenIfNeeded();
if (_columnLen == -1) {
throw new InvalidCastException("Column is null");
}
var result = handler.ReadFully(_buf, _columnLen);
_leftToReadInDataMsg -= _columnLen;
_columnLen = int.MinValue; // Mark that the (next) column length hasn't been read yet
_column++;
return result;
} catch {
_connector.Break();
Cleanup();
throw;
}
}
///
/// Returns whether the current column is null.
///
public bool IsNull
{
get
{
ReadColumnLenIfNeeded();
return _columnLen == -1;
}
}
///
/// Skips the current column without interpreting its value.
///
public void Skip()
{
ReadColumnLenIfNeeded();
if (_columnLen != -1) {
_buf.Skip(_columnLen);
}
_columnLen = int.MinValue;
_column++;
}
#endregion
#region Utilities
void ReadColumnLenIfNeeded()
{
if (_columnLen == int.MinValue) {
_buf.Ensure(4);
_columnLen = _buf.ReadInt32();
_leftToReadInDataMsg -= 4;
}
}
void CheckDisposed()
{
if (_isDisposed) {
throw new ObjectDisposedException(GetType().FullName, "The COPY operation has already ended.");
}
}
#endregion
#region Cancel / Close / Dispose
///
/// Cancels an ongoing export.
///
public void Cancel()
{
_connector.CancelRequest();
}
///
/// Completes that binary export and sets the connection back to idle state
///
public void Dispose()
{
if (_isDisposed) { return; }
if (!_isConsumed)
{
// Finish the current CopyData message
_buf.Skip(_leftToReadInDataMsg);
// Read to the end
_connector.SkipUntil(BackendMessageCode.CopyDone);
_connector.ReadExpecting();
_connector.ReadExpecting();
}
_connector.State = ConnectorState.Ready;
Cleanup();
}
void Cleanup()
{
_connector = null;
_registry = null;
_buf = null;
_isDisposed = true;
}
#endregion
}
}