X Tutup
using System; using System.Buffers.Binary; using System.Diagnostics; using System.IO; using System.Net.Sockets; using System.Runtime.CompilerServices; using System.Text; using System.Threading.Tasks; #pragma warning disable CS1591 // Missing XML comment for publicly visible type or member namespace Npgsql { /// /// A buffer used by Npgsql to read data from the socket efficiently. /// Provides methods which decode different values types and tracks the current position. /// public sealed partial class NpgsqlReadBuffer { #region Fields and Properties public NpgsqlConnection Connection => Connector.Connection!; internal readonly NpgsqlConnector Connector; internal Stream Underlying { private get; set; } /// /// Wraps SocketAsyncEventArgs for better async I/O as long as we're not doing SSL. /// internal AwaitableSocket? AwaitableSocket { get; set; } /// /// The total byte length of the buffer. /// internal int Size { get; } internal Encoding TextEncoding { get; } /// /// Same as , except that it does not throw an exception if an invalid char is /// encountered (exception fallback), but rather replaces it with a question mark character (replacement /// fallback). /// internal Encoding RelaxedTextEncoding { get; } internal int ReadPosition { get; set; } internal int ReadBytesLeft => FilledBytes - ReadPosition; internal readonly byte[] Buffer; internal int FilledBytes; ColumnStream? _columnStream; /// /// The minimum buffer size possible. /// internal const int MinimumSize = 4096; internal const int DefaultSize = 8192; #endregion #region Constructors internal NpgsqlReadBuffer( NpgsqlConnector connector, Stream stream, int size, Encoding textEncoding, Encoding relaxedTextEncoding) { if (size < MinimumSize) { throw new ArgumentOutOfRangeException(nameof(size), size, "Buffer size must be at least " + MinimumSize); } Connector = connector; Underlying = stream; Size = size; Buffer = new byte[Size]; TextEncoding = textEncoding; RelaxedTextEncoding = relaxedTextEncoding; } #endregion #region I/O /// /// Ensures that bytes are available in the buffer, and if /// not, reads from the socket until enough is available. /// public Task Ensure(int count, bool async) => Ensure(count, async, false); internal void Ensure(int count) { if (count <= ReadBytesLeft) return; Ensure(count, false).GetAwaiter().GetResult(); } internal Task Ensure(int count, bool async, bool dontBreakOnTimeouts) { return count <= ReadBytesLeft ? Task.CompletedTask : EnsureLong(); async Task EnsureLong() { Debug.Assert(count <= Size); Debug.Assert(count > ReadBytesLeft); count -= ReadBytesLeft; if (count <= 0) { return; } if (ReadPosition == FilledBytes) { Clear(); } else if (count > Size - FilledBytes) { Array.Copy(Buffer, ReadPosition, Buffer, 0, ReadBytesLeft); FilledBytes = ReadBytesLeft; ReadPosition = 0; } try { var totalRead = 0; while (count > 0) { var toRead = Size - FilledBytes; int read; if (async) { if (AwaitableSocket == null) // SSL read = await Underlying.ReadAsync(Buffer, FilledBytes, toRead); else // Non-SSL async I/O, optimized { AwaitableSocket.SetBuffer(Buffer, FilledBytes, toRead); await AwaitableSocket.ReceiveAsync(); read = AwaitableSocket.BytesTransferred; } } else // Sync I/O read = Underlying.Read(Buffer, FilledBytes, toRead); if (read == 0) throw new EndOfStreamException(); count -= read; FilledBytes += read; totalRead += read; } NpgsqlEventSource.Log.BytesRead(totalRead); } // We have a special case when reading async notifications - a timeout may be normal // shouldn't be fatal // Note that mono throws SocketException with the wrong error (see #1330) catch (IOException e) when ( dontBreakOnTimeouts && (e.InnerException as SocketException)?.SocketErrorCode == (Type.GetType("Mono.Runtime") == null ? SocketError.TimedOut : SocketError.WouldBlock) ) { throw new TimeoutException("Timeout while reading from stream"); } catch (Exception e) { Connector.Break(); throw new NpgsqlException("Exception while reading from stream", e); } } } internal Task ReadMore(bool async) => Ensure(ReadBytesLeft + 1, async); internal NpgsqlReadBuffer AllocateOversize(int count) { Debug.Assert(count > Size); var tempBuf = new NpgsqlReadBuffer(Connector, Underlying, count, TextEncoding, RelaxedTextEncoding); CopyTo(tempBuf); Clear(); return tempBuf; } /// /// Does not perform any I/O - assuming that the bytes to be skipped are in the memory buffer. /// /// internal void Skip(long len) { Debug.Assert(ReadBytesLeft >= len); ReadPosition += (int)len; } /// /// Skip a given number of bytes. /// public async Task Skip(long len, bool async) { Debug.Assert(len >= 0); if (len > ReadBytesLeft) { len -= ReadBytesLeft; while (len > Size) { Clear(); await Ensure(Size, async); len -= Size; } Clear(); await Ensure((int)len, async); } ReadPosition += (int)len; } #endregion #region Read Simple [MethodImpl(MethodImplOptions.AggressiveInlining)] public sbyte ReadSByte() => Read(); [MethodImpl(MethodImplOptions.AggressiveInlining)] public byte ReadByte() => Read(); [MethodImpl(MethodImplOptions.AggressiveInlining)] public short ReadInt16() => ReadInt16(false); [MethodImpl(MethodImplOptions.AggressiveInlining)] public short ReadInt16(bool littleEndian) { var result = Read(); return littleEndian == BitConverter.IsLittleEndian ? result : BinaryPrimitives.ReverseEndianness(result); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public ushort ReadUInt16() => ReadUInt16(false); [MethodImpl(MethodImplOptions.AggressiveInlining)] public ushort ReadUInt16(bool littleEndian) { var result = Read(); return littleEndian == BitConverter.IsLittleEndian ? result : BinaryPrimitives.ReverseEndianness(result); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public int ReadInt32() => ReadInt32(false); [MethodImpl(MethodImplOptions.AggressiveInlining)] public int ReadInt32(bool littleEndian) { var result = Read(); return littleEndian == BitConverter.IsLittleEndian ? result : BinaryPrimitives.ReverseEndianness(result); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public uint ReadUInt32() => ReadUInt32(false); [MethodImpl(MethodImplOptions.AggressiveInlining)] public uint ReadUInt32(bool littleEndian) { var result = Read(); return littleEndian == BitConverter.IsLittleEndian ? result : BinaryPrimitives.ReverseEndianness(result); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public long ReadInt64() => ReadInt64(false); [MethodImpl(MethodImplOptions.AggressiveInlining)] public long ReadInt64(bool littleEndian) { var result = Read(); return littleEndian == BitConverter.IsLittleEndian ? result : BinaryPrimitives.ReverseEndianness(result); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public ulong ReadUInt64() => ReadUInt64(false); [MethodImpl(MethodImplOptions.AggressiveInlining)] public ulong ReadUInt64(bool littleEndian) { var result = Read(); return littleEndian == BitConverter.IsLittleEndian ? result : BinaryPrimitives.ReverseEndianness(result); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public float ReadSingle() => ReadSingle(false); [MethodImpl(MethodImplOptions.AggressiveInlining)] public float ReadSingle(bool littleEndian) { var result = ReadInt32(littleEndian); return Unsafe.As(ref result); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public double ReadDouble() => ReadDouble(false); [MethodImpl(MethodImplOptions.AggressiveInlining)] public double ReadDouble(bool littleEndian) { var result = ReadInt64(littleEndian); return Unsafe.As(ref result); } [MethodImpl(MethodImplOptions.AggressiveInlining)] T Read() { if (Unsafe.SizeOf() > ReadBytesLeft) ThrowNotSpaceLeft(); var result = Unsafe.ReadUnaligned(ref Buffer[ReadPosition]); ReadPosition += Unsafe.SizeOf(); return result; } [MethodImpl(MethodImplOptions.NoInlining)] static void ThrowNotSpaceLeft() => throw new InvalidOperationException("There is not enough space left in the buffer."); public string ReadString(int byteLen) { Debug.Assert(byteLen <= ReadBytesLeft); var result = TextEncoding.GetString(Buffer, ReadPosition, byteLen); ReadPosition += byteLen; return result; } public char[] ReadChars(int byteLen) { Debug.Assert(byteLen <= ReadBytesLeft); var result = TextEncoding.GetChars(Buffer, ReadPosition, byteLen); ReadPosition += byteLen; return result; } public void ReadBytes(Span output) { Debug.Assert(output.Length <= ReadBytesLeft); new Span(Buffer, ReadPosition, output.Length).CopyTo(output); ReadPosition += output.Length; } public void ReadBytes(byte[] output, int outputOffset, int len) => ReadBytes(new Span(output, outputOffset, len)); public ReadOnlySpan ReadSpan(int len) { Debug.Assert(len <= ReadBytesLeft); return new ReadOnlySpan(Buffer, ReadPosition, len); } public ReadOnlyMemory ReadMemory(int len) { Debug.Assert(len <= ReadBytesLeft); return new ReadOnlyMemory(Buffer, ReadPosition, len); } #endregion #region Read Complex public ValueTask ReadBytes(byte[] output, int outputOffset, int len, bool async) { var readFromBuffer = Math.Min(ReadBytesLeft, len); if (readFromBuffer > 0) { System.Buffer.BlockCopy(Buffer, ReadPosition, output, outputOffset, readFromBuffer); ReadPosition += len; return new ValueTask(readFromBuffer); } return new ValueTask(ReadBytesLong()); async Task ReadBytesLong() { Debug.Assert(ReadPosition == 0); Clear(); try { var read = async ? await Underlying.ReadAsync(output, outputOffset, len) : Underlying.Read(output, outputOffset, len); if (read == 0) throw new EndOfStreamException(); return read; } catch (Exception e) { Connector.Break(); throw new NpgsqlException("Exception while reading from stream", e); } } } public Stream GetStream(int len, bool canSeek) { if (_columnStream == null) _columnStream = new ColumnStream(this); _columnStream.Init(len, canSeek); return _columnStream; } /// /// Seeks the first null terminator (\0) and returns the string up to it. The buffer must already /// contain the entire string and its terminator. /// public string ReadNullTerminatedString() => ReadNullTerminatedString(TextEncoding); /// /// Seeks the first null terminator (\0) and returns the string up to it. The buffer must already /// contain the entire string and its terminator. If any character could not be decoded, a question /// mark character is returned instead of throwing an exception. /// public string ReadNullTerminatedStringRelaxed() => ReadNullTerminatedString(RelaxedTextEncoding); /// /// Seeks the first null terminator (\0) and returns the string up to it. The buffer must already /// contain the entire string and its terminator. /// /// Decodes the messages with this encoding. string ReadNullTerminatedString(Encoding encoding) { int i; for (i = ReadPosition; Buffer[i] != 0; i++) Debug.Assert(i <= ReadPosition + ReadBytesLeft); Debug.Assert(i >= ReadPosition); var result = encoding.GetString(Buffer, ReadPosition, i - ReadPosition); ReadPosition = i + 1; return result; } public ReadOnlySpan GetNullTerminatedBytes() { int i; for (i = ReadPosition; Buffer[i] != 0; i++) Debug.Assert(i <= ReadPosition + ReadBytesLeft); Debug.Assert(i >= ReadPosition); var result = new ReadOnlySpan(Buffer, ReadPosition, i - ReadPosition); ReadPosition = i + 1; return result; } #endregion #region Misc internal void Clear() { ReadPosition = 0; FilledBytes = 0; } internal void CopyTo(NpgsqlReadBuffer other) { Debug.Assert(other.Size - other.FilledBytes >= ReadBytesLeft); Array.Copy(Buffer, ReadPosition, other.Buffer, other.FilledBytes, ReadBytesLeft); other.FilledBytes += ReadBytesLeft; } #endregion } }
X Tutup