// Npgsql.NpgsqlCopyOutStream.cs
//
// Author:
// Kalle Hallivuori
//
// Copyright (C) 2007 The Npgsql Development Team
// npgsql-general@gborg.postgresql.org
// http://gborg.postgresql.org/project/npgsql/projdisplay.php
//
// Permission to use, copy, modify, and distribute this software and its
// documentation for any purpose, without fee, and without a written
// agreement is hereby granted, provided that the above copyright notice
// and this paragraph and the following two paragraphs appear in all copies.
//
// IN NO EVENT SHALL THE NPGSQL DEVELOPMENT TEAM BE LIABLE TO ANY PARTY
// FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES,
// INCLUDING LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
// DOCUMENTATION, EVEN IF THE NPGSQL DEVELOPMENT TEAM HAS BEEN ADVISED OF
// THE POSSIBILITY OF SUCH DAMAGE.
//
// THE NPGSQL DEVELOPMENT TEAM SPECIFICALLY DISCLAIMS ANY WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
// ON AN "AS IS" BASIS, AND THE NPGSQL DEVELOPMENT TEAM HAS NO OBLIGATIONS
// TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
using System;
using System.IO;
namespace Npgsql
{
///
/// Stream for reading data from a table or select on a PostgreSQL version 7.4 or newer database during an active COPY TO STDOUT operation.
/// Passes data exactly as provided by the server.
///
internal class NpgsqlCopyOutStream : Stream
{
private NpgsqlConnector _context;
private long _bytesPassed = 0;
private byte[] _buf = null;
private int _bufOffset = 0;
///
/// True while this stream can be used to read copy data from server
///
private bool IsActive
{
get { return _context != null && _context.State == ConnectorState.CopyOut && _context.Mediator.CopyStream == this; }
}
///
/// Created only by NpgsqlCopyOutState.StartCopy()
///
internal NpgsqlCopyOutStream(NpgsqlConnector context)
{
_context = context;
}
///
/// True
///
public override bool CanRead
{
get { return true; }
}
///
/// False
///
public override bool CanWrite
{
get { return false; }
}
///
/// False
///
public override bool CanSeek
{
get { return false; }
}
///
/// Number of bytes read so far
///
public override long Length
{
get { return _bytesPassed; }
}
///
/// Number of bytes read so far; can not be set.
///
public override long Position
{
get { return _bytesPassed; }
set { throw new NotSupportedException("Tried to set Position of network stream " + this); }
}
///
/// Discards copy data as long as server pushes it. Returns after operation is finished.
/// Does nothing if this stream is not the active copy operation reader.
///
public override void Close()
{
if (_context != null)
{
if (IsActive)
{
while (_context.GetCopyOutData() != null)
{
; // flush rest
}
}
if (_context.Mediator.CopyStream == this)
{
_context.Mediator.CopyStream = null;
}
_context = null;
}
}
///
/// Not writable.
///
public override void Write(byte[] buf, int off, int len)
{
throw new NotSupportedException("Tried to write non-writable " + this);
}
///
/// Not flushable.
///
public override void Flush()
{
throw new NotSupportedException("Tried to flush read-only " + this);
}
///
/// Copies data read from server to given byte buffer.
/// Since server returns data row by row, length will differ each time, but it is only zero once the operation ends.
/// Can be mixed with calls to the more efficient NpgsqlCopyOutStream.Read() : byte[] though that would not make much sense.
///
public override int Read(byte[] buf, int off, int len)
{
if (! IsActive)
{
throw new ObjectDisposedException("Reading from closed " + this);
}
if (_buf == null) // otherwise _buf still contains data that did not fit into request buffer in an earlier call
{
_buf = Read();
_bufOffset = 0;
}
if (off + len > buf.Length)
{
len = buf.Length - off;
}
int i = 0;
if (_buf != null)
{
for (; _bufOffset < _buf.Length && i < len; i++)
{
buf[off + i] = _buf[_bufOffset++];
}
if (_bufOffset >= _buf.Length)
{
_buf = null; // whole of our contents fit into request buffer
}
_bytesPassed += i;
}
return i;
}
///
/// Not seekable
///
public override long Seek(long pos, SeekOrigin so)
{
throw new NotSupportedException("Tried to seek non-seekable " + this);
}
///
/// Not supported
///
public override void SetLength(long len)
{
throw new NotSupportedException("Tried to set length of network stream " + this);
}
///
/// Returns a whole row of data from server without extra work.
/// If standard Stream.Read(...) has been called before, it's internal buffers remains are returned.
///
public byte[] Read()
{
byte[] result;
if (_buf == null)
{
result = _context.GetCopyOutData();
}
else if (_bufOffset < 1)
{
result = _buf;
}
else
{
result = new byte[_buf.Length - _bufOffset];
for (int i = 0; i < result.Length; i++)
{
result[i] = _buf[_bufOffset + i];
}
_buf = null;
}
return result;
}
}
}