using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Security;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Runtime.ExceptionServices;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Npgsql.BackendMessages;
using Npgsql.Logging;
using Npgsql.TypeMapping;
using Npgsql.Util;
using static Npgsql.Util.Statics;
namespace Npgsql
{
///
/// Represents a connection to a PostgreSQL backend. Unlike NpgsqlConnection objects, which are
/// exposed to users, connectors are internal to Npgsql and are recycled by the connection pool.
///
sealed partial class NpgsqlConnector : IDisposable
{
#region Fields and Properties
///
/// The physical connection socket to the backend.
///
Socket _socket = default!;
///
/// The physical connection stream to the backend, without anything on top.
///
NetworkStream _baseStream = default!;
///
/// The physical connection stream to the backend, layered with an SSL/TLS stream if in secure mode.
///
Stream _stream = default!;
internal NpgsqlConnectionStringBuilder Settings { get; }
internal string ConnectionString { get; }
ProvideClientCertificatesCallback? ProvideClientCertificatesCallback { get; }
RemoteCertificateValidationCallback? UserCertificateValidationCallback { get; }
ProvidePasswordCallback? ProvidePasswordCallback { get; }
internal Encoding TextEncoding { get; private set; } = default!;
///
/// Same as , except that it does not throw an exception if an invalid char is
/// encountered (exception fallback), but rather replaces it with a question mark character (replacement
/// fallback).
///
internal Encoding RelaxedTextEncoding { get; private set; } = default!;
///
/// Buffer used for reading data.
///
internal NpgsqlReadBuffer ReadBuffer { get; private set; } = default!;
///
/// If we read a data row that's bigger than , we allocate an oversize buffer.
/// The original (smaller) buffer is stored here, and restored when the connection is reset.
///
NpgsqlReadBuffer? _origReadBuffer;
///
/// Buffer used for writing data.
///
internal NpgsqlWriteBuffer WriteBuffer { get; private set; } = default!;
///
/// The secret key of the backend for this connector, used for query cancellation.
///
int _backendSecretKey;
///
/// The process ID of the backend for this connector.
///
internal int BackendProcessId { get; private set; }
///
/// A unique ID identifying this connector, used for logging. Currently mapped to BackendProcessId
///
internal int Id => BackendProcessId;
internal NpgsqlDatabaseInfo DatabaseInfo { get; private set; } = default!;
internal ConnectorTypeMapper TypeMapper { get; set; } = default!;
///
/// The current transaction status for this connector.
///
internal TransactionStatus TransactionStatus { get; set; }
///
/// A transaction object for this connector. Since only one transaction can be in progress at any given time,
/// this instance is recycled. To check whether a transaction is currently in progress on this connector,
/// see .
///
internal NpgsqlTransaction Transaction { get; }
///
/// The NpgsqlConnection that (currently) owns this connector. Null if the connector isn't
/// owned (i.e. idle in the pool)
///
internal NpgsqlConnection? Connection { get; set; }
///
/// The number of messages that were prepended to the current message chain, but not yet sent.
/// Note that this only tracks messages which produce a ReadyForQuery message
///
int _pendingPrependedResponses;
internal NpgsqlDataReader? CurrentReader;
internal PreparedStatementManager PreparedStatementManager;
///
/// If the connector is currently in COPY mode, holds a reference to the importer/exporter object.
/// Otherwise null.
///
internal ICancelable? CurrentCopyOperation;
///
/// Holds all run-time parameters received from the backend (via ParameterStatus messages)
///
internal readonly Dictionary PostgresParameters;
///
/// Holds all run-time parameters in raw, binary format for efficient handling without allocations.
///
readonly List<(byte[] Name, byte[] Value)> _rawParameters = new List<(byte[], byte[])>();
///
/// The timeout for reading messages that are part of the user's command
/// (i.e. which aren't internal prepended commands).
///
internal int UserTimeout { private get; set; }
int ReceiveTimeout
{
set
{
// TODO: Socket.ReceiveTimeout doesn't work for async.
if (value != _currentTimeout)
_socket.ReceiveTimeout = _currentTimeout = value;
}
}
///
/// Contains the current value of the socket's ReceiveTimeout, used to determine whether
/// we need to change it when commands are received.
///
int _currentTimeout;
// This is used by NpgsqlCommand, but we place it on the connector because only one instance is needed
// at any one time (per connection).
internal SqlQueryParser SqlParser { get; }
///
/// A lock that's taken while a user action is in progress, e.g. a command being executed.
/// Only used when keepalive is enabled, otherwise null.
///
SemaphoreSlim? _userLock;
///
/// A lock that's taken while a cancellation is being delivered; new queries are blocked until the
/// cancellation is delivered. This reduces the chance that a cancellation meant for a previous
/// command will accidentally cancel a later one, see #615.
///
internal object CancelLock { get; }
readonly bool _isKeepAliveEnabled;
readonly Timer? _keepAliveTimer;
///
/// The command currently being executed by the connector, null otherwise.
/// Used only for concurrent use error reporting purposes.
///
NpgsqlCommand? _currentCommand;
///
/// If pooled, the pool index on which this connector will be returned to the pool.
///
internal int PoolIndex { get; set; } = int.MaxValue;
internal int ClearCounter { get; set; }
static readonly NpgsqlLogger Log = NpgsqlLogManager.CreateLogger(nameof(NpgsqlConnector));
#endregion
#region Constants
///
/// The minimum timeout that can be set on internal commands such as COMMIT, ROLLBACK.
///
internal const int MinimumInternalCommandTimeout = 3;
#endregion
#region Reusable Message Objects
byte[]? _resetWithoutDeallocateMessage;
int _resetWithoutDeallocateResponseCount;
// Backend
readonly CommandCompleteMessage _commandCompleteMessage = new CommandCompleteMessage();
readonly ReadyForQueryMessage _readyForQueryMessage = new ReadyForQueryMessage();
readonly ParameterDescriptionMessage _parameterDescriptionMessage = new ParameterDescriptionMessage();
readonly DataRowMessage _dataRowMessage = new DataRowMessage();
readonly RowDescriptionMessage _rowDescriptionMessage = new RowDescriptionMessage();
// Since COPY is rarely used, allocate these lazily
CopyInResponseMessage? _copyInResponseMessage;
CopyOutResponseMessage? _copyOutResponseMessage;
CopyDataMessage? _copyDataMessage;
#endregion
internal NpgsqlDataReader DataReader { get; }
#region Constructors
internal NpgsqlConnector(NpgsqlConnection connection)
: this(connection.Settings, connection.OriginalConnectionString)
{
Connection = connection;
Connection.Connector = this;
ProvideClientCertificatesCallback = Connection.ProvideClientCertificatesCallback;
UserCertificateValidationCallback = Connection.UserCertificateValidationCallback;
ProvidePasswordCallback = Connection.ProvidePasswordCallback;
}
NpgsqlConnector(NpgsqlConnector connector)
: this(connector.Settings, connector.ConnectionString)
{
ProvideClientCertificatesCallback = connector.ProvideClientCertificatesCallback;
UserCertificateValidationCallback = connector.UserCertificateValidationCallback;
ProvidePasswordCallback = connector.ProvidePasswordCallback;
}
///
/// Creates a new connector with the given connection string.
///
/// The parsed connection string.
/// The connection string.
NpgsqlConnector(NpgsqlConnectionStringBuilder settings, string connectionString)
{
State = ConnectorState.Closed;
TransactionStatus = TransactionStatus.Idle;
Settings = settings;
ConnectionString = connectionString;
PostgresParameters = new Dictionary();
SqlParser = new SqlQueryParser();
Transaction = new NpgsqlTransaction(this);
CancelLock = new object();
_isKeepAliveEnabled = Settings.KeepAlive > 0;
if (_isKeepAliveEnabled)
{
_userLock = new SemaphoreSlim(1, 1);
_keepAliveTimer = new Timer(PerformKeepAlive, null, Timeout.Infinite, Timeout.Infinite);
}
DataReader = new NpgsqlDataReader(this);
// TODO: Not just for automatic preparation anymore...
PreparedStatementManager = new PreparedStatementManager(this);
}
#endregion
#region Configuration settings
string Host => Settings.Host!;
int Port => Settings.Port;
string KerberosServiceName => Settings.KerberosServiceName;
SslMode SslMode => Settings.SslMode;
int ConnectionTimeout => Settings.Timeout;
bool IntegratedSecurity => Settings.IntegratedSecurity;
internal bool ConvertInfinityDateTime => Settings.ConvertInfinityDateTime;
int InternalCommandTimeout
{
get
{
var internalTimeout = Settings.InternalCommandTimeout;
if (internalTimeout == -1)
return Math.Max(Settings.CommandTimeout, MinimumInternalCommandTimeout) * 1000;
Debug.Assert(internalTimeout == 0 || internalTimeout >= MinimumInternalCommandTimeout);
return internalTimeout * 1000;
}
}
#endregion Configuration settings
#region State management
int _state;
///
/// Gets the current state of the connector
///
internal ConnectorState State
{
get => (ConnectorState)_state;
set
{
var newState = (int)value;
if (newState == _state)
return;
Interlocked.Exchange(ref _state, newState);
}
}
///
/// Returns whether the connector is open, regardless of any task it is currently performing
///
bool IsConnected
=> State switch
{
ConnectorState.Ready => true,
ConnectorState.Executing => true,
ConnectorState.Fetching => true,
ConnectorState.Waiting => true,
ConnectorState.Copy => true,
ConnectorState.Closed => false,
ConnectorState.Connecting => false,
ConnectorState.Broken => false,
_ => throw new ArgumentOutOfRangeException("Unknown state: " + State)
};
internal bool IsReady => State == ConnectorState.Ready;
internal bool IsClosed => State == ConnectorState.Closed;
internal bool IsBroken => State == ConnectorState.Broken;
bool _isConnecting;
#endregion
#region Open
///
/// Opens the physical connection to the server.
///
/// Usually called by the RequestConnector
/// Method of the connection pool manager.
internal async Task Open(NpgsqlTimeout timeout, bool async, CancellationToken cancellationToken)
{
Debug.Assert(Connection != null && Connection.Connector == this);
Debug.Assert(State == ConnectorState.Closed);
if (string.IsNullOrWhiteSpace(Host))
throw new ArgumentException("Host can't be null");
_isConnecting = true;
State = ConnectorState.Connecting;
try {
await RawOpen(timeout, async, cancellationToken);
var username = GetUsername();
if (Settings.Database == null)
Settings.Database = username;
WriteStartupMessage(username);
await Flush(async);
timeout.Check();
await Authenticate(username, timeout, async);
// We treat BackendKeyData as optional because some PostgreSQL-like database
// don't send it (CockroachDB, CrateDB)
var msg = await ReadMessage(async);
if (msg.Code == BackendMessageCode.BackendKeyData)
{
var keyDataMsg = (BackendKeyDataMessage)msg;
BackendProcessId = keyDataMsg.BackendProcessId;
_backendSecretKey = keyDataMsg.BackendSecretKey;
msg = await ReadMessage(async);
}
if (msg.Code != BackendMessageCode.ReadyForQuery)
throw new NpgsqlException($"Received backend message {msg.Code} while expecting ReadyForQuery. Please file a bug.");
State = ConnectorState.Ready;
await LoadDatabaseInfo(timeout, async);
if (Settings.Pooling && DatabaseInfo.SupportsDiscard)
GenerateResetMessage();
Counters.NumberOfNonPooledConnections.Increment();
Counters.HardConnectsPerSecond.Increment();
Log.Trace($"Opened connection to {Host}:{Port}");
// If an exception occurs during open, Break() below shouldn't close the connection, which would also
// update pool state. Instead we let the exception propagate and get handled by the calling pool code.
// We use an extra state flag because the connector's State varies during the type loading query
// above (Executing, Fetching...). Note also that Break() gets called from ReadMessageLong().
_isConnecting = false;
}
catch
{
Break();
throw;
}
}
internal async Task LoadDatabaseInfo(NpgsqlTimeout timeout, bool async)
{
// The type loading below will need to send queries to the database, and that depends on a type mapper
// being set up (even if its empty)
TypeMapper = new ConnectorTypeMapper(this);
if (!NpgsqlDatabaseInfo.Cache.TryGetValue(ConnectionString, out var database))
NpgsqlDatabaseInfo.Cache[ConnectionString] = database = await NpgsqlDatabaseInfo.Load(Connection!, timeout, async);
DatabaseInfo = database;
TypeMapper.Bind(DatabaseInfo);
}
void WriteStartupMessage(string username)
{
var startupParams = new Dictionary
{
["user"] = username,
["client_encoding"] =
Settings.ClientEncoding ??
PostgresEnvironment.ClientEncoding ??
"UTF8"
};
startupParams["database"] = Settings.Database!;
if (Settings.ApplicationName?.Length > 0)
startupParams["application_name"] = Settings.ApplicationName;
if (Settings.SearchPath?.Length > 0)
startupParams["search_path"] = Settings.SearchPath;
var timezone = Settings.Timezone ?? PostgresEnvironment.TimeZone;
if (timezone != null)
startupParams["TimeZone"] = timezone;
WriteStartup(startupParams);
}
string GetUsername()
{
var username = Settings.Username;
if (username?.Length > 0)
return username;
username = PostgresEnvironment.User;
if (username?.Length > 0)
return username;
#if NET461
if (PGUtil.IsWindows && Type.GetType("Mono.Runtime") == null)
{
username = WindowsUsernameProvider.GetUsername(Settings.IncludeRealm);
if (username?.Length > 0)
return username;
}
#endif
if (!PGUtil.IsWindows)
{
username = KerberosUsernameProvider.GetUsername(Settings.IncludeRealm);
if (username?.Length > 0)
return username;
}
username = Environment.UserName;
if (username?.Length > 0)
return username;
throw new NpgsqlException("No username could be found, please specify one explicitly");
}
async Task RawOpen(NpgsqlTimeout timeout, bool async, CancellationToken cancellationToken)
{
try
{
if (async)
await ConnectAsync(timeout, cancellationToken);
else
Connect(timeout);
_baseStream = new NetworkStream(_socket, true);
_stream = _baseStream;
if (Settings.Encoding == "UTF8")
{
TextEncoding = PGUtil.UTF8Encoding;
RelaxedTextEncoding = PGUtil.RelaxedUTF8Encoding;
}
else
{
TextEncoding = Encoding.GetEncoding(Settings.Encoding, EncoderFallback.ExceptionFallback, DecoderFallback.ExceptionFallback);
RelaxedTextEncoding = Encoding.GetEncoding(Settings.Encoding, EncoderFallback.ReplacementFallback, DecoderFallback.ReplacementFallback);
}
ReadBuffer = new NpgsqlReadBuffer(this, _stream, Settings.ReadBufferSize, TextEncoding, RelaxedTextEncoding);
WriteBuffer = new NpgsqlWriteBuffer(this, _stream, Settings.WriteBufferSize, TextEncoding);
if (SslMode == SslMode.Require || SslMode == SslMode.Prefer)
{
WriteSslRequest();
await Flush(async);
await ReadBuffer.Ensure(1, async);
var response = (char)ReadBuffer.ReadByte();
timeout.Check();
switch (response)
{
default:
throw new NpgsqlException($"Received unknown response {response} for SSLRequest (expecting S or N)");
case 'N':
if (SslMode == SslMode.Require)
throw new NpgsqlException("SSL connection requested. No SSL enabled connection from this host is configured.");
break;
case 'S':
var clientCertificates = new X509CertificateCollection();
var certPath = Settings.ClientCertificate ?? PostgresEnvironment.SslCert;
var certPathExists = true;
if (certPath is null)
{
certPath = PostgresEnvironment.SslCertDefault;
certPathExists = File.Exists(certPath);
}
if (certPathExists)
clientCertificates.Add(new X509Certificate(certPath));
ProvideClientCertificatesCallback?.Invoke(clientCertificates);
RemoteCertificateValidationCallback certificateValidationCallback;
if (Settings.TrustServerCertificate)
certificateValidationCallback = (sender, certificate, chain, errors) => true;
else if (UserCertificateValidationCallback != null)
certificateValidationCallback = UserCertificateValidationCallback;
else
certificateValidationCallback = DefaultUserCertificateValidationCallback;
var sslStream = new SslStream(_stream, leaveInnerStreamOpen: false, certificateValidationCallback);
if (async)
await sslStream.AuthenticateAsClientAsync(Host, clientCertificates, SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12, Settings.CheckCertificateRevocation);
else
sslStream.AuthenticateAsClient(Host, clientCertificates, SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12, Settings.CheckCertificateRevocation);
_stream = sslStream;
timeout.Check();
ReadBuffer.Clear(); // Reset to empty after reading single SSL char
ReadBuffer.Underlying = _stream;
WriteBuffer.Underlying = _stream;
IsSecure = true;
Log.Trace("SSL negotiation successful");
break;
}
}
if (!IsSecure)
{
WriteBuffer.AwaitableSocket = new AwaitableSocket(new SocketAsyncEventArgs(), _socket);
ReadBuffer.AwaitableSocket = new AwaitableSocket(new SocketAsyncEventArgs(), _socket);
}
Log.Trace($"Socket connected to {Host}:{Port}");
}
catch
{
#pragma warning disable CS8625
try { _stream?.Dispose(); } catch {
// ignored
}
_stream = null;
try { _baseStream?.Dispose(); }
catch
{
// ignored
}
_baseStream = null;
try { _socket?.Dispose(); }
catch
{
// ignored
}
_socket = null;
throw;
#pragma warning restore CS8625
}
}
void Connect(NpgsqlTimeout timeout)
{
EndPoint[] endpoints;
if (!string.IsNullOrEmpty(Host) && Host[0] == '/')
{
endpoints = new EndPoint[] { new UnixEndPoint(Path.Combine(Host, $".s.PGSQL.{Port}")) };
}
else
{
// Note that there aren't any timeout-able DNS methods, and we want to use sync-only
// methods (not to rely on any TP threads etc.)
endpoints = Dns.GetHostAddresses(Host).Select(a => new IPEndPoint(a, Port)).ToArray();
timeout.Check();
}
// Give each endpoint an equal share of the remaining time
var perEndpointTimeout = -1; // Default to infinity
if (timeout.IsSet)
{
var timeoutTicks = timeout.TimeLeft.Ticks;
if (timeoutTicks <= 0)
throw new TimeoutException();
perEndpointTimeout = (int)(timeoutTicks / endpoints.Length / 10);
}
for (var i = 0; i < endpoints.Length; i++)
{
var endpoint = endpoints[i];
Log.Trace($"Attempting to connect to {endpoint}");
var protocolType = endpoint.AddressFamily == AddressFamily.InterNetwork ? ProtocolType.Tcp : ProtocolType.IP;
var socket = new Socket(endpoint.AddressFamily, SocketType.Stream, protocolType)
{
Blocking = false
};
try
{
try
{
socket.Connect(endpoint);
}
catch (SocketException e)
{
if (e.SocketErrorCode != SocketError.WouldBlock)
throw;
}
var write = new List { socket };
var error = new List { socket };
Socket.Select(null, write, error, perEndpointTimeout);
var errorCode = (int) socket.GetSocketOption(SocketOptionLevel.Socket, SocketOptionName.Error);
if (errorCode != 0)
throw new SocketException(errorCode);
if (!write.Any())
throw new TimeoutException("Timeout during connection attempt");
socket.Blocking = true;
SetSocketOptions(socket);
_socket = socket;
return;
}
catch (Exception e)
{
try { socket.Dispose(); }
catch
{
// ignored
}
Log.Trace($"Failed to connect to {endpoint}", e);
if (i == endpoints.Length - 1)
throw new NpgsqlException("Exception while connecting", e);
}
}
}
async Task ConnectAsync(NpgsqlTimeout timeout, CancellationToken cancellationToken)
{
EndPoint[] endpoints;
if (!string.IsNullOrEmpty(Host) && Host[0] == '/')
{
endpoints = new EndPoint[] { new UnixEndPoint(Path.Combine(Host, $".s.PGSQL.{Port}")) };
}
else
{
// Note that there aren't any timeoutable or cancellable DNS methods
endpoints = (await Dns.GetHostAddressesAsync(Host).WithCancellation(cancellationToken))
.Select(a => new IPEndPoint(a, Port)).ToArray();
}
// Give each IP an equal share of the remaining time
var perIpTimespan = default(TimeSpan);
var perIpTimeout = timeout;
if (timeout.IsSet)
{
var timeoutTicks = timeout.TimeLeft.Ticks;
if (timeoutTicks <= 0)
throw new TimeoutException();
perIpTimespan = new TimeSpan(timeoutTicks / endpoints.Length);
perIpTimeout = new NpgsqlTimeout(perIpTimespan);
}
for (var i = 0; i < endpoints.Length; i++)
{
var endpoint = endpoints[i];
Log.Trace($"Attempting to connect to {endpoint}");
var protocolType = endpoint.AddressFamily == AddressFamily.InterNetwork ? ProtocolType.Tcp : ProtocolType.IP;
var socket = new Socket(endpoint.AddressFamily, SocketType.Stream, protocolType);
try
{
await socket.ConnectAsync(endpoint)
.WithCancellationAndTimeout(perIpTimeout, cancellationToken);
SetSocketOptions(socket);
_socket = socket;
return;
}
catch (Exception e)
{
try { socket.Dispose(); }
catch
{
// ignored
}
cancellationToken.ThrowIfCancellationRequested();
if (e is OperationCanceledException)
e = new TimeoutException("Timeout during connection attempt");
Log.Trace($"Failed to connect to {endpoint}", e);
if (i == endpoints.Length - 1)
{
throw new NpgsqlException("Exception while connecting", e);
}
}
}
}
void SetSocketOptions(Socket socket)
{
if (socket.AddressFamily == AddressFamily.InterNetwork)
socket.NoDelay = true;
if (Settings.SocketReceiveBufferSize > 0)
socket.ReceiveBufferSize = Settings.SocketReceiveBufferSize;
if (Settings.SocketSendBufferSize > 0)
socket.SendBufferSize = Settings.SocketSendBufferSize;
if (Settings.TcpKeepAlive)
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
if (Settings.TcpKeepAliveInterval > 0 && Settings.TcpKeepAliveTime == 0)
throw new ArgumentException("If TcpKeepAliveInterval is defined, TcpKeepAliveTime must be defined as well");
if (Settings.TcpKeepAliveTime > 0)
{
if (!PGUtil.IsWindows)
throw new PlatformNotSupportedException(
"Npgsql management of TCP keepalive is supported only on Windows. " +
"TCP keepalives can still be used on other systems but are enabled via the TcpKeepAlive option or configured globally for the machine, see the relevant docs.");
var time = Settings.TcpKeepAliveTime;
var interval = Settings.TcpKeepAliveInterval > 0
? Settings.TcpKeepAliveInterval
: Settings.TcpKeepAliveTime;
// For the following see https://msdn.microsoft.com/en-us/library/dd877220.aspx
var uintSize = Marshal.SizeOf(typeof(uint));
var inOptionValues = new byte[uintSize * 3];
BitConverter.GetBytes((uint)1).CopyTo(inOptionValues, 0);
BitConverter.GetBytes((uint)time).CopyTo(inOptionValues, uintSize);
BitConverter.GetBytes((uint)interval).CopyTo(inOptionValues, uintSize * 2);
var result = socket.IOControl(IOControlCode.KeepAliveValues, inOptionValues, null);
if (result != 0)
throw new NpgsqlException($"Got non-zero value when trying to set TCP keepalive: {result}");
}
}
#endregion
#region Frontend message processing
///
/// Prepends a message to be sent at the beginning of the next message chain.
///
internal void PrependInternalMessage(byte[] rawMessage, int responseMessageCount)
{
_pendingPrependedResponses += responseMessageCount;
var t = WritePregenerated(rawMessage);
Debug.Assert(t.IsCompleted, "Could not fully write pregenerated message into the buffer");
}
#endregion
#region Backend message processing
internal IBackendMessage ReadMessage(DataRowLoadingMode dataRowLoadingMode=DataRowLoadingMode.NonSequential)
=> ReadMessage(false, dataRowLoadingMode).GetAwaiter().GetResult()!;
internal ValueTask ReadMessage(bool async, DataRowLoadingMode dataRowLoadingMode = DataRowLoadingMode.NonSequential)
=> DoReadMessage(async, dataRowLoadingMode)!;
internal ValueTask ReadMessageWithNotifications(bool async)
=> DoReadMessage(async, DataRowLoadingMode.NonSequential, true);
ValueTask DoReadMessage(
bool async,
DataRowLoadingMode dataRowLoadingMode = DataRowLoadingMode.NonSequential,
bool readingNotifications = false)
{
if (_pendingPrependedResponses > 0 ||
dataRowLoadingMode != DataRowLoadingMode.NonSequential ||
readingNotifications ||
ReadBuffer.ReadBytesLeft < 5)
{
return ReadMessageLong(dataRowLoadingMode, readingNotifications);
}
var messageCode = (BackendMessageCode)ReadBuffer.ReadByte();
switch (messageCode)
{
case BackendMessageCode.NoticeResponse:
case BackendMessageCode.NotificationResponse:
case BackendMessageCode.ParameterStatus:
case BackendMessageCode.ErrorResponse:
ReadBuffer.ReadPosition--;
return ReadMessageLong(dataRowLoadingMode, readingNotifications2: false);
}
PGUtil.ValidateBackendMessageCode(messageCode);
var len = ReadBuffer.ReadInt32() - 4; // Transmitted length includes itself
if (len > ReadBuffer.ReadBytesLeft)
{
ReadBuffer.ReadPosition -= 5;
return ReadMessageLong(dataRowLoadingMode, readingNotifications2: false);
}
return new ValueTask(ParseServerMessage(ReadBuffer, messageCode, len, false));
async ValueTask ReadMessageLong(
DataRowLoadingMode dataRowLoadingMode2,
bool readingNotifications2,
bool isReadingPrependedMessage = false)
{
// First read the responses of any prepended messages.
if (_pendingPrependedResponses > 0 && !isReadingPrependedMessage)
{
try
{
// TODO: There could be room for optimization here, rather than the async call(s)
ReceiveTimeout = InternalCommandTimeout;
for (; _pendingPrependedResponses > 0; _pendingPrependedResponses--)
await ReadMessageLong(DataRowLoadingMode.Skip, false, true);
}
catch (PostgresException)
{
Break();
throw;
}
}
PostgresException? error = null;
try
{
ReceiveTimeout = UserTimeout;
while (true)
{
await ReadBuffer.Ensure(5, async, readingNotifications2);
messageCode = (BackendMessageCode)ReadBuffer.ReadByte();
PGUtil.ValidateBackendMessageCode(messageCode);
len = ReadBuffer.ReadInt32() - 4; // Transmitted length includes itself
if ((messageCode == BackendMessageCode.DataRow &&
dataRowLoadingMode2 != DataRowLoadingMode.NonSequential) ||
messageCode == BackendMessageCode.CopyData)
{
if (dataRowLoadingMode2 == DataRowLoadingMode.Skip)
{
await ReadBuffer.Skip(len, async);
continue;
}
}
else if (len > ReadBuffer.ReadBytesLeft)
{
if (len > ReadBuffer.Size)
{
if (_origReadBuffer == null)
_origReadBuffer = ReadBuffer;
ReadBuffer = ReadBuffer.AllocateOversize(len);
}
await ReadBuffer.Ensure(len, async);
}
var msg = ParseServerMessage(ReadBuffer, messageCode, len, isReadingPrependedMessage);
switch (messageCode)
{
case BackendMessageCode.ErrorResponse:
Debug.Assert(msg == null);
// An ErrorResponse is (almost) always followed by a ReadyForQuery. Save the error
// and throw it as an exception when the ReadyForQuery is received (next).
error = PostgresException.Load(ReadBuffer);
if (State == ConnectorState.Connecting)
{
// During the startup/authentication phase, an ErrorResponse isn't followed by
// an RFQ. Instead, the server closes the connection immediately
throw error;
}
continue;
case BackendMessageCode.ReadyForQuery:
if (error != null)
{
NpgsqlEventSource.Log.CommandFailed();
throw error;
}
break;
// Asynchronous messages which can come anytime, they have already been handled
// in ParseServerMessage. Read the next message.
case BackendMessageCode.NoticeResponse:
case BackendMessageCode.NotificationResponse:
case BackendMessageCode.ParameterStatus:
Debug.Assert(msg == null);
if (!readingNotifications2)
continue;
return null;
}
Debug.Assert(msg != null, "Message is null for code: " + messageCode);
return msg;
}
}
catch (PostgresException)
{
if (CurrentReader != null)
{
// The reader cleanup will call EndUserAction
await CurrentReader.Cleanup(async);
}
else
{
EndUserAction();
}
throw;
}
catch (NpgsqlException)
{
// An ErrorResponse isn't followed by ReadyForQuery
if (error != null)
ExceptionDispatchInfo.Capture(error).Throw();
throw;
}
}
}
internal IBackendMessage? ParseServerMessage(NpgsqlReadBuffer buf, BackendMessageCode code, int len, bool isPrependedMessage)
{
switch (code)
{
case BackendMessageCode.RowDescription:
return _rowDescriptionMessage.Load(buf, TypeMapper);
case BackendMessageCode.DataRow:
return _dataRowMessage.Load(len);
case BackendMessageCode.CompletedResponse:
return _commandCompleteMessage.Load(buf, len);
case BackendMessageCode.ReadyForQuery:
var rfq = _readyForQueryMessage.Load(buf);
if (!isPrependedMessage) {
// Transaction status on prepended messages shouldn't be processed, because there may be prepended messages
// before the begin transaction message. In this case, they will contain transaction status Idle, which will
// clear our Pending transaction status. Only process transaction status on RFQ's from user-provided, non
// prepended messages.
ProcessNewTransactionStatus(rfq.TransactionStatusIndicator);
}
return rfq;
case BackendMessageCode.EmptyQueryResponse:
return EmptyQueryMessage.Instance;
case BackendMessageCode.ParseComplete:
return ParseCompleteMessage.Instance;
case BackendMessageCode.ParameterDescription:
return _parameterDescriptionMessage.Load(buf);
case BackendMessageCode.BindComplete:
return BindCompleteMessage.Instance;
case BackendMessageCode.NoData:
return NoDataMessage.Instance;
case BackendMessageCode.CloseComplete:
return CloseCompletedMessage.Instance;
case BackendMessageCode.ParameterStatus:
ReadParameterStatus(buf.GetNullTerminatedBytes(), buf.GetNullTerminatedBytes());
return null;
case BackendMessageCode.NoticeResponse:
var notice = PostgresNotice.Load(buf);
Log.Debug($"Received notice: {notice.MessageText}", Id);
Connection?.OnNotice(notice);
return null;
case BackendMessageCode.NotificationResponse:
Connection?.OnNotification(new NpgsqlNotificationEventArgs(buf));
return null;
case BackendMessageCode.AuthenticationRequest:
var authType = (AuthenticationRequestType)buf.ReadInt32();
return authType switch
{
AuthenticationRequestType.AuthenticationOk => (AuthenticationRequestMessage)AuthenticationOkMessage.Instance,
AuthenticationRequestType.AuthenticationCleartextPassword => AuthenticationCleartextPasswordMessage.Instance,
AuthenticationRequestType.AuthenticationMD5Password => AuthenticationMD5PasswordMessage.Load(buf),
AuthenticationRequestType.AuthenticationGSS => AuthenticationGSSMessage.Instance,
AuthenticationRequestType.AuthenticationSSPI => AuthenticationSSPIMessage.Instance,
AuthenticationRequestType.AuthenticationGSSContinue => AuthenticationGSSContinueMessage.Load(buf, len),
AuthenticationRequestType.AuthenticationSASL => new AuthenticationSASLMessage(buf),
AuthenticationRequestType.AuthenticationSASLContinue => new AuthenticationSASLContinueMessage(buf, len - 4),
AuthenticationRequestType.AuthenticationSASLFinal => new AuthenticationSASLFinalMessage(buf, len - 4),
_ => throw new NotSupportedException($"Authentication method not supported (Received: {authType})")
};
case BackendMessageCode.BackendKeyData:
return new BackendKeyDataMessage(buf);
case BackendMessageCode.CopyInResponse:
return (_copyInResponseMessage ??= new CopyInResponseMessage()).Load(ReadBuffer);
case BackendMessageCode.CopyOutResponse:
return (_copyOutResponseMessage ??= new CopyOutResponseMessage()).Load(ReadBuffer);
case BackendMessageCode.CopyData:
return (_copyDataMessage ??= new CopyDataMessage()).Load(len);
case BackendMessageCode.CopyDone:
return CopyDoneMessage.Instance;
case BackendMessageCode.PortalSuspended:
throw new NpgsqlException("Unimplemented message: " + code);
case BackendMessageCode.ErrorResponse:
return null;
case BackendMessageCode.FunctionCallResponse:
// We don't use the obsolete function call protocol
throw new NpgsqlException("Unexpected backend message: " + code);
default:
throw new InvalidOperationException($"Internal Npgsql bug: unexpected value {code} of enum {nameof(BackendMessageCode)}. Please file a bug.");
}
}
///
/// Reads backend messages and discards them, stopping only after a message of the given type has
/// been seen. Only a sync I/O version of this method exists - in async flows we inline the loop
/// rather than calling an additional async method, in order to avoid the overhead.
///
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal IBackendMessage SkipUntil(BackendMessageCode stopAt)
{
Debug.Assert(stopAt != BackendMessageCode.DataRow, "Shouldn't be used for rows, doesn't know about sequential");
while (true)
{
var msg = ReadMessage(false, DataRowLoadingMode.Skip).GetAwaiter().GetResult()!;
Debug.Assert(!(msg is DataRowMessage));
if (msg.Code == stopAt)
return msg;
}
}
#endregion Backend message processing
#region Transactions
internal Task Rollback(bool async)
{
Log.Debug("Rolling back transaction", Id);
using (StartUserAction())
return ExecuteInternalCommand(PregeneratedMessages.RollbackTransaction, async);
}
internal bool InTransaction
=> TransactionStatus switch
{
TransactionStatus.Idle => false,
TransactionStatus.Pending => true,
TransactionStatus.InTransactionBlock => true,
TransactionStatus.InFailedTransactionBlock => true,
_ => throw new InvalidOperationException($"Internal Npgsql bug: unexpected value {TransactionStatus} of enum {nameof(TransactionStatus)}. Please file a bug.")
};
///
/// Handles a new transaction indicator received on a ReadyForQuery message
///
void ProcessNewTransactionStatus(TransactionStatus newStatus)
{
if (newStatus == TransactionStatus)
return;
TransactionStatus = newStatus switch
{
TransactionStatus.Idle => newStatus,
TransactionStatus.InTransactionBlock => newStatus,
TransactionStatus.InFailedTransactionBlock => newStatus,
TransactionStatus.Pending => throw new Exception("Invalid TransactionStatus (should be frontend-only)"),
_ => throw new InvalidOperationException($"Internal Npgsql bug: unexpected value {newStatus} of enum {nameof(TransactionStatus)}. Please file a bug.")
};
}
void ClearTransaction()
{
Transaction.DisposeImmediately();
TransactionStatus = TransactionStatus.Idle;
}
#endregion
#region SSL
///
/// Returns whether SSL is being used for the connection
///
internal bool IsSecure { get; private set; }
#pragma warning disable CA1801 // Review unused parameters
static bool DefaultUserCertificateValidationCallback(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
=> sslPolicyErrors == SslPolicyErrors.None;
#pragma warning restore CA1801 // Review unused parameters
#endregion SSL
#region Cancel
///
/// Creates another connector and sends a cancel request through it for this connector.
///
internal void CancelRequest()
{
if (BackendProcessId == 0)
throw new NpgsqlException("Cancellation not supported on this database (no BackendKeyData was received during connection)");
Log.Debug("Sending cancellation...", Id);
lock (CancelLock)
{
try
{
var cancelConnector = new NpgsqlConnector(this);
cancelConnector.DoCancelRequest(BackendProcessId, _backendSecretKey);
}
catch (Exception e)
{
var socketException = e.InnerException as SocketException;
if (socketException == null || socketException.SocketErrorCode != SocketError.ConnectionReset)
Log.Debug("Exception caught while attempting to cancel command", e, Id);
}
}
}
void DoCancelRequest(int backendProcessId, int backendSecretKey)
{
Debug.Assert(State == ConnectorState.Closed);
try
{
RawOpen(new NpgsqlTimeout(TimeSpan.FromSeconds(ConnectionTimeout)), false, CancellationToken.None)
.GetAwaiter().GetResult();
WriteCancelRequest(backendProcessId, backendSecretKey);
Flush();
Debug.Assert(ReadBuffer.ReadPosition == 0);
// Now wait for the server to close the connection, better chance of the cancellation
// actually being delivered before we continue with the user's logic.
var count = _stream.Read(ReadBuffer.Buffer, 0, 1);
if (count > 0)
Log.Error("Received response after sending cancel request, shouldn't happen! First byte: " + ReadBuffer.Buffer[0]);
}
finally
{
lock (this)
Cleanup();
}
}
#endregion Cancel
#region Close / Reset
internal bool HasOngoingOperation => CurrentReader != null || CurrentCopyOperation != null;
///
/// Closes ongoing operations, i.e. an open reader exists or a COPY operation still in progress, as
/// part of a connection close.
/// Does nothing if the thread has been aborted - the connector will be closed immediately.
///
internal async Task CloseOngoingOperations(bool async)
{
var reader = CurrentReader;
var copyOperation = CurrentCopyOperation;
if (reader != null)
await reader.Close(connectionClosing: true, async);
if (copyOperation == null)
return;
// TODO: There's probably a race condition as the COPY operation may finish on its own during the next few lines
// Note: we only want to cancel import operations, since in these cases cancel is safe.
// Export cancellations go through the PostgreSQL "asynchronous" cancel mechanism and are
// therefore vulnerable to the race condition in #615.
if (copyOperation is NpgsqlBinaryImporter ||
copyOperation is NpgsqlCopyTextWriter ||
copyOperation is NpgsqlRawCopyStream rawCopyStream && rawCopyStream.CanWrite)
{
try
{
copyOperation.Cancel();
}
catch (Exception e)
{
Log.Warn("Error while cancelling COPY on connector close", e, Id);
}
}
try
{
copyOperation.Dispose();
}
catch (Exception e)
{
Log.Warn("Error while disposing cancelled COPY on connector close", e, Id);
}
}
// TODO in theory this should be async-optional, but the only I/O done here is the Terminate Flush, which is
// very unlikely to block (plus locking would need to be worked out)
internal void Close()
{
lock (this)
{
Log.Trace("Closing connector", Id);
if (IsReady)
{
try
{
WriteTerminate();
Flush();
}
catch (Exception e)
{
Log.Error("Exception while closing connector", e, Id);
Debug.Assert(IsBroken);
}
}
switch (State)
{
case ConnectorState.Broken:
case ConnectorState.Closed:
return;
}
State = ConnectorState.Closed;
Counters.NumberOfNonPooledConnections.Decrement();
Counters.HardDisconnectsPerSecond.Increment();
Cleanup();
}
}
public void Dispose() => Close();
///
/// Called when an unexpected message has been received during an action. Breaks the
/// connector and returns the appropriate message.
///
internal Exception UnexpectedMessageReceived(BackendMessageCode received)
{
Break();
return new Exception($"Received unexpected backend message {received}. Please file a bug.");
}
///
/// Called when a connector becomes completely unusable, e.g. when an unexpected I/O exception is raised or when
/// we lose protocol sync.
/// Note that fatal errors during the Open phase do *not* pass through here.
///
internal void Break()
{
Debug.Assert(!IsClosed);
lock (this)
{
if (State == ConnectorState.Broken)
return;
Log.Error("Breaking connector", Id);
State = ConnectorState.Broken;
var conn = Connection;
Cleanup();
// We have no connection if we're broken by a keepalive occuring while the connector is in the pool
// When we break, we normally need to call into NpgsqlConnection to reset its state.
// The exception to this is when we're connecting, in which case the exception bubbles up and
// try/catch above takes care of everything.
if (conn != null && !_isConnecting)
{
// Note that the connection's full state is usually calculated from the connector's, but in
// states closed/broken the connector is null. We therefore need a way to distinguish between
// Closed and Broken on the connection.
conn.Close(wasBroken: true, async: false);
}
}
}
///
/// Closes the socket and cleans up client-side resources associated with this connector.
///
///
/// This method doesn't actually perform any meaningful I/O, and therefore is sync-only.
///
void Cleanup()
{
Debug.Assert(Monitor.IsEntered(this));
Log.Trace("Cleaning up connector", Id);
try
{
_stream?.Dispose();
}
catch
{
// ignored
}
if (CurrentReader != null)
{
CurrentReader.Command.State = CommandState.Idle;
try
{
// Will never complete asynchronously (stream is already closed)
CurrentReader.Close();
}
catch
{
// ignored
}
CurrentReader = null;
}
ClearTransaction();
#pragma warning disable CS8625
_stream = null;
_baseStream = null;
ReadBuffer?.AwaitableSocket?.Dispose();
ReadBuffer = null;
WriteBuffer?.AwaitableSocket?.Dispose();
WriteBuffer = null;
Connection = null;
PostgresParameters.Clear();
_currentCommand = null;
if (_isKeepAliveEnabled)
{
_userLock!.Dispose();
_userLock = null;
_keepAliveTimer!.Change(Timeout.Infinite, Timeout.Infinite);
_keepAliveTimer.Dispose();
}
#pragma warning restore CS8625
}
void GenerateResetMessage()
{
var sb = new StringBuilder("SET SESSION AUTHORIZATION DEFAULT;RESET ALL;");
_resetWithoutDeallocateResponseCount = 2;
if (DatabaseInfo.SupportsCloseAll)
{
sb.Append("CLOSE ALL;");
_resetWithoutDeallocateResponseCount++;
}
if (DatabaseInfo.SupportsUnlisten)
{
sb.Append("UNLISTEN *;");
_resetWithoutDeallocateResponseCount++;
}
if (DatabaseInfo.SupportsAdvisoryLocks)
{
sb.Append("SELECT pg_advisory_unlock_all();");
_resetWithoutDeallocateResponseCount += 2;
}
if (DatabaseInfo.SupportsDiscardSequences)
{
sb.Append("DISCARD SEQUENCES;");
_resetWithoutDeallocateResponseCount++;
}
if (DatabaseInfo.SupportsDiscardTemp)
{
sb.Append("DISCARD TEMP");
_resetWithoutDeallocateResponseCount++;
}
_resetWithoutDeallocateResponseCount++; // One ReadyForQuery at the end
_resetWithoutDeallocateMessage = PregeneratedMessages.Generate(WriteBuffer, sb.ToString());
}
///
/// Called when a pooled connection is closed, and its connector is returned to the pool.
/// Resets the connector back to its initial state, releasing server-side sources
/// (e.g. prepared statements), resetting parameters to their defaults, and resetting client-side
/// state
///
///
/// It's important that this method be idempotent, since some race conditions in the pool
/// can cause it to be called twice (and also the user may close the connection right after
/// allocating it, without doing anything).
///
internal void Reset()
{
Debug.Assert(State == ConnectorState.Ready);
Connection = null;
switch (State)
{
case ConnectorState.Ready:
break;
case ConnectorState.Closed:
case ConnectorState.Broken:
return;
case ConnectorState.Connecting:
case ConnectorState.Executing:
case ConnectorState.Fetching:
case ConnectorState.Copy:
case ConnectorState.Waiting:
throw new InvalidOperationException("Reset() called on connector with state " + State);
default:
throw new InvalidOperationException($"Internal Npgsql bug: unexpected value {State} of enum {nameof(ConnectorState)}. Please file a bug.");
}
// Our buffer may contain unsent prepended messages (such as BeginTransaction), clear it out completely
WriteBuffer.Clear();
_pendingPrependedResponses = 0;
// We may have allocated an oversize read buffer, switch back to the original one
if (_origReadBuffer != null)
{
ReadBuffer = _origReadBuffer;
_origReadBuffer = null;
}
// Must rollback transaction before sending DISCARD ALL
switch (TransactionStatus)
{
case TransactionStatus.Idle:
break;
case TransactionStatus.Pending:
// BeginTransaction() was called, but was left in the write buffer and not yet sent to server.
// Just clear the transaction state.
ClearTransaction();
break;
case TransactionStatus.InTransactionBlock:
case TransactionStatus.InFailedTransactionBlock:
Rollback(false);
ClearTransaction();
break;
default:
throw new InvalidOperationException($"Internal Npgsql bug: unexpected value {TransactionStatus} of enum {nameof(TransactionStatus)}. Please file a bug.");
}
if (!Settings.NoResetOnClose && DatabaseInfo.SupportsDiscard)
{
if (PreparedStatementManager.NumPrepared > 0)
{
// We have prepared statements, so we can't reset the connection state with DISCARD ALL
// Note: the send buffer has been cleared above, and we assume all this will fit in it.
PrependInternalMessage(_resetWithoutDeallocateMessage!, _resetWithoutDeallocateResponseCount);
}
else
{
// There are no prepared statements.
// We simply send DISCARD ALL which is more efficient than sending the above messages separately
PrependInternalMessage(PregeneratedMessages.DiscardAll, 2);
}
}
}
internal void UnprepareAll()
{
ExecuteInternalCommand("DEALLOCATE ALL");
PreparedStatementManager.ClearAll();
}
#endregion Close / Reset
#region Locking
internal UserAction StartUserAction(NpgsqlCommand command)
=> StartUserAction(ConnectorState.Executing, command);
internal UserAction StartUserAction(ConnectorState newState=ConnectorState.Executing, NpgsqlCommand? command = null)
{
// If keepalive is enabled, we must protect state transitions with a SemaphoreSlim
// (which itself must be protected by a lock, since its dispose isn't thread-safe).
// This will make the keepalive abort safely if a user query is in progress, and make
// the user query wait if a keepalive is in progress.
// If keepalive isn't enabled, we don't use the semaphore and rely only on the connector's
// state (updated via Interlocked.Exchange) to detect concurrent use, on a best-effort basis.
if (!_isKeepAliveEnabled)
return DoStartUserAction();
lock (this)
{
if (!_userLock!.Wait(0))
{
var currentCommand = _currentCommand;
throw currentCommand == null
? new NpgsqlOperationInProgressException(State)
: new NpgsqlOperationInProgressException(currentCommand);
}
try
{
// Disable keepalive, it will be restarted at the end of the user action
_keepAliveTimer!.Change(Timeout.Infinite, Timeout.Infinite);
// We now have both locks and are sure nothing else is running.
// Check that the connector is ready.
return DoStartUserAction();
}
catch
{
_userLock.Release();
throw;
}
}
UserAction DoStartUserAction()
{
switch (State)
{
case ConnectorState.Ready:
break;
case ConnectorState.Closed:
case ConnectorState.Broken:
throw new InvalidOperationException("Connection is not open");
case ConnectorState.Executing:
case ConnectorState.Fetching:
case ConnectorState.Waiting:
case ConnectorState.Connecting:
case ConnectorState.Copy:
var currentCommand = _currentCommand;
throw currentCommand == null
? new NpgsqlOperationInProgressException(State)
: new NpgsqlOperationInProgressException(currentCommand);
default:
throw new ArgumentOutOfRangeException(nameof(State), State, "Invalid connector state: " + State);
}
Debug.Assert(IsReady);
Log.Trace("Start user action", Id);
State = newState;
_currentCommand = command;
return new UserAction(this);
}
}
internal void EndUserAction()
{
Debug.Assert(CurrentReader == null);
if (_isKeepAliveEnabled)
{
lock (this)
{
if (IsReady || !IsConnected)
return;
var keepAlive = Settings.KeepAlive * 1000;
_keepAliveTimer!.Change(keepAlive, keepAlive);
Log.Trace("End user action", Id);
_currentCommand = null;
_userLock!.Release();
State = ConnectorState.Ready;
}
}
else
{
if (IsReady || !IsConnected)
return;
Log.Trace("End user action", Id);
_currentCommand = null;
State = ConnectorState.Ready;
}
}
///
/// An IDisposable wrapper around .
///
internal readonly struct UserAction : IDisposable
{
readonly NpgsqlConnector _connector;
internal UserAction(NpgsqlConnector connector)
{
_connector = connector;
}
public void Dispose() => _connector.EndUserAction();
}
#endregion
#region Keepalive
#pragma warning disable CA1801 // Review unused parameters
void PerformKeepAlive(object? state)
{
Debug.Assert(_isKeepAliveEnabled);
// SemaphoreSlim.Dispose() isn't thread-safe - it may be in progress so we shouldn't try to wait on it;
// we need a standard lock to protect it.
if (!Monitor.TryEnter(this))
return;
try
{
// There may already be a user action, or the connector may be closed etc.
if (!IsReady)
return;
Log.Trace("Performed keepalive", Id);
WritePregenerated(PregeneratedMessages.KeepAlive);
Flush();
SkipUntil(BackendMessageCode.ReadyForQuery);
}
catch (Exception e)
{
Log.Error("Keepalive failure", e, Id);
try
{
Break();
}
catch (Exception e2)
{
Log.Error("Further exception while breaking connector on keepalive failure", e2, Id);
}
}
finally
{
Monitor.Exit(this);
}
}
#pragma warning restore CA1801 // Review unused parameters
#endregion
#region Wait
public bool Wait(int timeout)
{
if ((timeout == 0 || timeout == -1) && IsSecure)
throw new NotSupportedException("Wait() with timeout isn't supported when SSL is used, see https://github.com/npgsql/npgsql/issues/1501");
using (StartUserAction(ConnectorState.Waiting))
{
// We may have prepended messages in the connection's write buffer - these need to be flushed now.
Flush();
var keepaliveMs = Settings.KeepAlive * 1000;
while (true)
{
var timeoutForKeepalive = _isKeepAliveEnabled && (timeout == 0 || timeout == -1 || keepaliveMs < timeout);
UserTimeout = timeoutForKeepalive ? keepaliveMs : timeout;
try
{
var msg = ReadMessageWithNotifications(false).Result;
if (msg != null)
{
Break();
throw new NpgsqlException($"Received unexpected message of type {msg.Code} while waiting");
}
return true;
}
catch (TimeoutException)
{
if (!timeoutForKeepalive) // We really timed out
return false;
}
// Time for a keepalive
var keepaliveTime = Stopwatch.StartNew();
WritePregenerated(PregeneratedMessages.KeepAlive);
Flush();
var receivedNotification = false;
var expectedMessageCode = BackendMessageCode.RowDescription;
while (true)
{
var msg = ReadMessage(false).Result;
if (msg == null)
{
receivedNotification = true;
continue;
}
if (msg.Code != expectedMessageCode)
throw new NpgsqlException($"Received unexpected message of type {msg.Code} while expecting {expectedMessageCode} as part of keepalive");
switch (msg.Code)
{
case BackendMessageCode.RowDescription:
expectedMessageCode = BackendMessageCode.DataRow;
continue;
case BackendMessageCode.DataRow:
// DataRow is usually consumed by a reader, here we have to skip it manually.
ReadBuffer.Skip(((DataRowMessage)msg).Length);
expectedMessageCode = BackendMessageCode.CompletedResponse;
continue;
case BackendMessageCode.CompletedResponse:
expectedMessageCode = BackendMessageCode.ReadyForQuery;
continue;
case BackendMessageCode.ReadyForQuery:
break;
}
Log.Trace("Performed keepalive", Id);
if (receivedNotification)
return true; // Notification was received during the keepalive
break;
}
if (timeout > 0)
timeout -= (keepaliveMs + (int)keepaliveTime.ElapsedMilliseconds);
}
}
}
public Task WaitAsync(CancellationToken cancellationToken)
{
using (NoSynchronizationContextScope.Enter())
return DoWaitAsync(cancellationToken);
}
async Task DoWaitAsync(CancellationToken cancellationToken)
{
var keepaliveSent = false;
var keepaliveLock = new SemaphoreSlim(1, 1);
TimerCallback performKeepaliveMethod = state =>
{
if (!keepaliveLock.Wait(0))
return;
try
{
if (keepaliveSent)
return;
keepaliveSent = true;
WritePregenerated(PregeneratedMessages.KeepAlive);
Flush();
}
finally
{
keepaliveLock.Release();
}
};
using (StartUserAction(ConnectorState.Waiting))
using (cancellationToken.Register(() => performKeepaliveMethod(null)))
{
// We may have prepended messages in the connection's write buffer - these need to be flushed now.
Flush();
Timer? keepaliveTimer = null;
if (_isKeepAliveEnabled)
keepaliveTimer = new Timer(performKeepaliveMethod, null, Settings.KeepAlive*1000, Timeout.Infinite);
try
{
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
var msg = await ReadMessageWithNotifications(true);
if (!keepaliveSent)
{
if (msg != null)
{
Break();
throw new NpgsqlException($"Received unexpected message of type {msg.Code} while waiting");
}
return;
}
// A keepalive was sent. Consume the response (RowDescription, CommandComplete,
// ReadyForQuery) while also keeping track if an async message was received in between.
keepaliveLock.Wait();
try
{
var receivedNotification = false;
var expectedMessageCode = BackendMessageCode.RowDescription;
while (true)
{
while (msg == null)
{
receivedNotification = true;
msg = await ReadMessage(true);
}
if (msg.Code != expectedMessageCode)
throw new NpgsqlException($"Received unexpected message of type {msg.Code} while expecting {expectedMessageCode} as part of keepalive");
var finishedKeepalive = false;
switch (msg.Code)
{
case BackendMessageCode.RowDescription:
expectedMessageCode = BackendMessageCode.DataRow;
break;
case BackendMessageCode.DataRow:
// DataRow is usually consumed by a reader, here we have to skip it manually.
await ReadBuffer.Skip(((DataRowMessage)msg).Length, true);
expectedMessageCode = BackendMessageCode.CompletedResponse;
break;
case BackendMessageCode.CompletedResponse:
expectedMessageCode = BackendMessageCode.ReadyForQuery;
break;
case BackendMessageCode.ReadyForQuery:
finishedKeepalive = true;
break;
}
if (!finishedKeepalive)
{
msg = await ReadMessage(true);
continue;
}
Log.Trace("Performed keepalive", Id);
if (receivedNotification)
return; // Notification was received during the keepalive
cancellationToken.ThrowIfCancellationRequested();
// Keepalive completed without notification, set up the next one and continue waiting
keepaliveTimer!.Change(Settings.KeepAlive*1000, Timeout.Infinite);
keepaliveSent = false;
break;
}
}
finally
{
keepaliveLock.Release();
}
}
}
finally
{
keepaliveTimer?.Dispose();
}
}
}
#endregion
#region Supported features and PostgreSQL settings
internal bool UseConformingStrings { get; private set; }
///
/// The connection's timezone as reported by PostgreSQL, in the IANA/Olson database format.
///
internal string Timezone { get; private set; } = default!;
#endregion Supported features and PostgreSQL settings
#region Execute internal command
internal void ExecuteInternalCommand(string query)
=> ExecuteInternalCommand(query, false).GetAwaiter().GetResult();
internal async Task ExecuteInternalCommand(string query, bool async)
{
Log.Trace($"Executing internal command: {query}", Id);
await WriteQuery(query, async);
await Flush(async);
Expect(await ReadMessage(async), this);
Expect(await ReadMessage(async), this);
}
internal async Task ExecuteInternalCommand(byte[] data, bool async)
{
Debug.Assert(State != ConnectorState.Ready, "Forgot to start a user action...");
Log.Trace($"Executing internal pregenerated command", Id);
await WritePregenerated(data, async);
await Flush(async);
Expect(await ReadMessage(async), this);
Expect(await ReadMessage(async), this);
}
#endregion
#region Misc
void ReadParameterStatus(ReadOnlySpan incomingName, ReadOnlySpan incomingValue)
{
byte[] rawName;
byte[] rawValue;
foreach (var current in _rawParameters)
if (incomingName.SequenceEqual(current.Name))
{
if (incomingValue.SequenceEqual(current.Value))
return;
rawName = current.Name;
rawValue = incomingValue.ToArray();
goto ProcessParameter;
}
rawName = incomingName.ToArray();
rawValue = incomingValue.ToArray();
_rawParameters.Add((rawName, rawValue));
ProcessParameter:
var name = TextEncoding.GetString(rawName);
var value = TextEncoding.GetString(rawValue);
PostgresParameters[name] = value;
switch (name)
{
case "standard_conforming_strings":
UseConformingStrings = value == "on";
SqlParser.StandardConformingStrings = UseConformingStrings;
return;
case "TimeZone":
Timezone = value;
return;
}
}
#endregion Misc
}
#region Enums
///
/// Expresses the exact state of a connector.
///
enum ConnectorState
{
///
/// The connector has either not yet been opened or has been closed.
///
Closed,
///
/// The connector is currently connecting to a PostgreSQL server.
///
Connecting,
///
/// The connector is connected and may be used to send a new query.
///
Ready,
///
/// The connector is waiting for a response to a query which has been sent to the server.
///
Executing,
///
/// The connector is currently fetching and processing query results.
///
Fetching,
///
/// The connector is currently waiting for asynchronous notifications to arrive.
///
Waiting,
///
/// The connection was broken because an unexpected error occurred which left it in an unknown state.
/// This state isn't implemented yet.
///
Broken,
///
/// The connector is engaged in a COPY operation.
///
Copy,
}
#pragma warning disable CA1717
enum TransactionStatus : byte
#pragma warning restore CA1717
{
///
/// Currently not in a transaction block
///
Idle = (byte)'I',
///
/// Currently in a transaction block
///
InTransactionBlock = (byte)'T',
///
/// Currently in a failed transaction block (queries will be rejected until block is ended)
///
InFailedTransactionBlock = (byte)'E',
///
/// A new transaction has been requested but not yet transmitted to the backend. It will be transmitted
/// prepended to the next query.
/// This is a client-side state option only, and is never transmitted from the backend.
///
Pending = byte.MaxValue,
}
///
/// Specifies how to load/parse DataRow messages as they're received from the backend.
///
internal enum DataRowLoadingMode
{
///
/// Load DataRows in non-sequential mode
///
NonSequential,
///
/// Load DataRows in sequential mode
///
Sequential,
///
/// Skip DataRow messages altogether
///
Skip
}
#endregion
}