X Tutup
// created on 4/3/2003 at 19:45 // Npgsql.NpgsqlBinaryRow.cs // // Author: // Francisco Jr. (fxjrlists@yahoo.com.br) // // Copyright (C) 2002 The Npgsql Development Team // npgsql-general@gborg.postgresql.org // http://gborg.postgresql.org/project/npgsql/projdisplay.php // // Permission to use, copy, modify, and distribute this software and its // documentation for any purpose, without fee, and without a written // agreement is hereby granted, provided that the above copyright notice // and this paragraph and the following two paragraphs appear in all copies. // // IN NO EVENT SHALL THE NPGSQL DEVELOPMENT TEAM BE LIABLE TO ANY PARTY // FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, // INCLUDING LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS // DOCUMENTATION, EVEN IF THE NPGSQL DEVELOPMENT TEAM HAS BEEN ADVISED OF // THE POSSIBILITY OF SUCH DAMAGE. // // THE NPGSQL DEVELOPMENT TEAM SPECIFICALLY DISCLAIMS ANY WARRANTIES, // INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY // AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS // ON AN "AS IS" BASIS, AND THE NPGSQL DEVELOPMENT TEAM HAS NO OBLIGATIONS // TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. using System; using System.Collections.Generic; using System.IO; using System.Reflection; using System.Resources; using System.Text; using System.Threading; using System.Threading.Tasks; using Npgsql.Localization; using NpgsqlTypes; namespace Npgsql { /// /// This is the abstract base class for NpgsqlAsciiRow and NpgsqlBinaryRow. /// internal abstract class NpgsqlRow : IStreamOwner { public abstract object Get(int index); public abstract Task GetAsync(int index); public abstract int NumFields { get; } public abstract bool IsDBNull(int index); public abstract Task IsDBNullAsync(int index); public abstract void Dispose(); public abstract long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length); public abstract long GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length); } internal sealed class CachingRow : NpgsqlRow { private readonly List _data = new List(); private readonly ForwardsOnlyRow _inner; public CachingRow(ForwardsOnlyRow fo) { _inner = fo; } public override object Get(int index) { if ((index < 0) || (index >= NumFields)) { throw new IndexOutOfRangeException("this[] index value"); } while (_data.Count <= index) { _data.Add(_inner.Get(_data.Count)); } return _data[index]; } /// /// Async implementation of . /// /// Note that since the CachingRow has already read all the columns into memory, no I/O /// operation is needed and therefore this method simply calls /// public override Task GetAsync(int index) { return PGUtil.TaskFromResult(Get(index)); } public override int NumFields { get { return _inner.NumFields; } } public override bool IsDBNull(int index) { return Get(index) == DBNull.Value; } /// /// Async implementation of . /// /// Note that since the CachingRow has already read all the columns into memory, no I/O /// operation is needed and therefore this method simply calls /// public override Task IsDBNullAsync(int index) { return PGUtil.TaskFromResult(IsDBNull(index)); } public override long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length) { byte[] source = (byte[]) Get(i); if (buffer == null) { return source.Length - fieldOffset; } long finalLength = Math.Max(0, Math.Min(length, source.Length - fieldOffset)); Array.Copy(source, fieldOffset, buffer, bufferoffset, finalLength); return finalLength; } public override long GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length) { string source = (string) Get(i); if (buffer == null) { return source.Length - fieldoffset; } long finalLength = Math.Max(0, Math.Min(length, source.Length - fieldoffset)); Array.Copy(source.ToCharArray(), fieldoffset, buffer, bufferoffset, finalLength); return finalLength; } public override void Dispose() { _inner.Dispose(); } } internal sealed partial class ForwardsOnlyRow : NpgsqlRow { /// /// The index of the current field in the stream, i.e. the one that hasn't /// been read yet /// private int _i; private readonly RowReader _reader; public ForwardsOnlyRow(RowReader reader) { _reader = reader; } [GenerateAsync] private void Seek(int index, bool consume) { if (index < 0 || index >= NumFields) { throw new IndexOutOfRangeException(); } var d = index - _i; if (d < 0) throw new InvalidOperationException(string.Format(L10N.RowSequentialFieldError, index, _i)); if (d > 0) { _reader.Skip(d); _i += d; } if (consume) _i++; } public void SetRowDescription(NpgsqlRowDescription rowDescr) { _reader.SetRowDescription(rowDescr); } [GenerateAsync(withOverride: true)] public override object Get(int index) { Seek(index, true); return _reader.GetNext(); } public override long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length) { if (buffer == null) { throw new NotSupportedException(); } if (!_reader.CanGetByteStream(i)) { throw new InvalidCastException(); } Seek(i, true); _reader.SkipBytesTo(fieldOffset); return _reader.Read(buffer, bufferoffset, length); } public override long GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length) { if (buffer == null) { throw new NotSupportedException(); } if (!_reader.CanGetCharStream(i)) { throw new InvalidCastException(); } Seek(i, true); _reader.SkipCharsTo(fieldoffset); return _reader.Read(buffer, bufferoffset, length); } public override int NumFields { get { return _reader.NumFields; } } [GenerateAsync(withOverride: true)] public override bool IsDBNull(int index) { Seek(index, false); return _reader.IsNull; } public override void Dispose() { _reader.Dispose(); } } /// /// Reads a row, field by field, allowing a DataRow to be built appropriately. /// internal abstract partial class RowReader : IStreamOwner { /// /// Reads part of a field, as needed (for /// and /// protected abstract class Streamer : IStreamOwner { protected readonly Stream _stream; protected int _remainingBytes; private int _alreadyRead = 0; protected Streamer(Stream stream, int remainingBytes) { _stream = stream; _remainingBytes = remainingBytes; } public int AlreadyRead { get { return _alreadyRead; } protected set { _alreadyRead = value; } } public void Dispose() { _stream.EatStreamBytes(_remainingBytes); } } /// /// Adds further functionality to stream that is dependant upon the type of data read. /// protected abstract class Streamer : Streamer { protected Streamer(Stream stream, int remainingBytes) : base(stream, remainingBytes) { } public abstract int DoRead(T[] output, int outputIdx, int length); public abstract int DoSkip(int length); public int Read(T[] output, int outputIdx, int length) { int ret = DoRead(output, outputIdx, length); AlreadyRead += ret; return ret; } private void Skip(int length) { AlreadyRead += DoSkip(length); } public void SkipTo(long position) { if (position < AlreadyRead) { throw new InvalidOperationException(); } Skip((int) position - AlreadyRead); } } /// /// Completes the implementation of Streamer for char data. /// protected sealed class CharStreamer : Streamer { public CharStreamer(Stream stream, int remainingBytes) : base(stream, remainingBytes) { } public override int DoRead(char[] output, int outputIdx, int length) { return _stream.ReadChars(output, length, ref _remainingBytes, outputIdx); } public override int DoSkip(int length) { return _stream.SkipChars(length, ref _remainingBytes); } } /// /// Completes the implementation of Streamer for byte data. /// protected sealed class ByteStreamer : Streamer { public ByteStreamer(Stream stream, int remainingBytes) : base(stream, remainingBytes) { } public override int DoRead(byte[] output, int outputIdx, int length) { return _stream.ReadEscapedBytes(output, length, ref _remainingBytes, outputIdx); } public override int DoSkip(int length) { return _stream.SkipEscapedBytes(length, ref _remainingBytes); } } protected static readonly Encoding UTF8Encoding = Encoding.UTF8; protected NpgsqlRowDescription _rowDesc; private readonly Stream _stream; private Streamer _streamer; protected int _currentField = -1; public RowReader(Stream stream) { _stream = stream; } public virtual void SetRowDescription(NpgsqlRowDescription rowDesc) { _rowDesc = rowDesc; } protected Streamer CurrentStreamer { get { return _streamer; } set { if (_streamer != null) { _streamer.Dispose(); } _streamer = value; } } public bool CurrentlyStreaming { get { return _streamer != null; } } public bool CanGetByteStream(int index) { //TODO: Add support for byte[] being read as a stream of bytes. return _rowDesc[index].TypeInfo.NpgsqlDbType == NpgsqlDbType.Bytea; } public bool CanGetCharStream(int index) { //TODO: Add support for arrays of string types? return _rowDesc[index].TypeInfo.Type.Equals(typeof (string)); } protected Streamer CurrentByteStreamer { get { if (CurrentStreamer == null) { if (!CanGetByteStream(_currentField + 1)) { throw new InvalidCastException(); } ++_currentField; return (CurrentStreamer = new ByteStreamer(Stream, GetNextFieldCount())) as ByteStreamer; } else if (!(CurrentStreamer is Streamer)) { throw new InvalidOperationException(); } else { return CurrentStreamer as ByteStreamer; } } } protected Streamer CurrentCharStreamer { get { if (CurrentStreamer == null) { if (!CanGetCharStream(_currentField + 1)) { throw new InvalidCastException(); } ++_currentField; return (CurrentStreamer = new CharStreamer(Stream, GetNextFieldCount())) as CharStreamer; } else if (!(CurrentStreamer is Streamer)) { throw new InvalidOperationException(); } else { return CurrentStreamer as CharStreamer; } } } protected Stream Stream { get { return _stream; } } protected NpgsqlRowDescription.FieldData FieldData { get { return _rowDesc[_currentField]; } } public int NumFields { get { return _rowDesc.NumFields; } } protected int CurrentField { get { return _currentField; } } protected abstract object ReadNext(); protected abstract Task ReadNextAsync(); [GenerateAsync] public object GetNext() { if (++_currentField == _rowDesc.NumFields) { throw new IndexOutOfRangeException(); } return ReadNext(); } public abstract bool IsNull { get; } protected abstract void SkipOne(); protected abstract Task SkipOneAsync(); [GenerateAsync] public void Skip(int count) { if (count > 0) { if (_currentField + count >= _rowDesc.NumFields) { throw new IndexOutOfRangeException(); } while (count-- > 0) { ++_currentField; SkipOne(); } } } protected abstract int GetNextFieldCount(); public int Read(byte[] output, int outputIdx, int length) { return CurrentByteStreamer.Read(output, outputIdx, length); } public void SkipBytesTo(long position) { CurrentByteStreamer.SkipTo(position); } public int Read(char[] output, int outputIdx, int length) { return CurrentCharStreamer.Read(output, outputIdx, length); } public void SkipCharsTo(long position) { CurrentCharStreamer.SkipTo(position); } public virtual void Dispose() {} } }
X Tutup