using System;
using System.Buffers;
using System.Buffers.Binary;
using System.Collections.Generic;
using System.Data;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Net;
using System.Net.Security;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Runtime.ExceptionServices;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Npgsql.BackendMessages;
using Npgsql.Util;
using Microsoft.Extensions.Logging;
using Npgsql.Properties;
using static Npgsql.Util.Statics;
namespace Npgsql.Internal;
///
/// Represents a connection to a PostgreSQL backend. Unlike NpgsqlConnection objects, which are
/// exposed to users, connectors are internal to Npgsql and are recycled by the connection pool.
///
[Experimental(NpgsqlDiagnostics.ConvertersExperimental)]
public sealed partial class NpgsqlConnector
{
#region Fields and Properties
///
/// The physical connection socket to the backend.
///
Socket _socket = default!;
///
/// The physical connection stream to the backend, without anything on top.
///
NetworkStream _baseStream = default!;
///
/// The physical connection stream to the backend, layered with an SSL/TLS stream if in secure mode.
///
Stream _stream = default!;
///
/// The parsed connection string.
///
public NpgsqlConnectionStringBuilder Settings { get; }
Action? SslClientAuthenticationOptionsCallback { get; }
#pragma warning disable CS0618 // ProvidePasswordCallback is obsolete
ProvidePasswordCallback? ProvidePasswordCallback { get; }
#pragma warning restore CS0618
Action? NegotiateOptionsCallback { get; }
public Encoding TextEncoding { get; private set; } = default!;
///
/// Same as , except that it does not throw an exception if an invalid char is
/// encountered (exception fallback), but rather replaces it with a question mark character (replacement
/// fallback).
///
internal Encoding RelaxedTextEncoding { get; private set; } = default!;
///
/// Buffer used for reading data.
///
internal NpgsqlReadBuffer ReadBuffer { get; private set; } = default!;
///
/// If we read a data row that's bigger than , we allocate an oversize buffer.
/// The original (smaller) buffer is stored here, and restored when the connection is reset.
///
NpgsqlReadBuffer? _origReadBuffer;
///
/// Buffer used for writing data.
///
internal NpgsqlWriteBuffer WriteBuffer { get; private set; } = default!;
///
/// The secret key of the backend for this connector, used for query cancellation.
///
int _backendSecretKey;
///
/// The process ID of the backend for this connector.
///
internal int BackendProcessId { get; private set; }
string? _inferredUserName;
///
/// The user name that has been inferred when the connector was opened
///
internal string InferredUserName
{
get => _inferredUserName ?? throw new InvalidOperationException($"{nameof(InferredUserName)} cannot be accessed before the connector has been opened.");
private set => _inferredUserName = value;
}
bool SupportsPostgresCancellation => BackendProcessId != 0;
///
/// A unique ID identifying this connector, used for logging. Currently mapped to BackendProcessId
///
internal int Id => BackendProcessId;
internal NpgsqlDataSource.ReloadableState ReloadableState = null!;
///
/// Information about PostgreSQL and PostgreSQL-like databases (e.g. type definitions, capabilities...).
///
public NpgsqlDatabaseInfo DatabaseInfo => ReloadableState.DatabaseInfo;
internal PgSerializerOptions SerializerOptions => ReloadableState.SerializerOptions;
internal IDbTypeResolver? DbTypeResolver => ReloadableState.DbTypeResolver;
///
/// The current transaction status for this connector.
///
internal TransactionStatus TransactionStatus { get; set; }
///
/// A transaction object for this connector. Since only one transaction can be in progress at any given time,
/// this instance is recycled. To check whether a transaction is currently in progress on this connector,
/// see .
///
internal NpgsqlTransaction? Transaction { get; set; }
internal NpgsqlTransaction? UnboundTransaction { get; set; }
///
/// The NpgsqlConnection that (currently) owns this connector. Null if the connector isn't
/// owned (i.e. idle in the pool)
///
internal NpgsqlConnection? Connection { get; set; }
///
/// The number of messages that were prepended to the current message chain, but not yet sent.
/// Note that this only tracks messages which produce a ReadyForQuery message
///
internal int PendingPrependedResponses { get; set; }
///
/// A ManualResetEventSlim used to make sure a cancellation request doesn't run
/// while we're reading responses for the prepended query
/// as we can't gracefully handle their cancellation.
///
readonly ManualResetEventSlim ReadingPrependedMessagesMRE = new(initialState: true);
internal NpgsqlDataReader? CurrentReader;
internal PreparedStatementManager PreparedStatementManager { get; }
internal SqlQueryParser SqlQueryParser { get; } = new();
///
/// If the connector is currently in COPY mode, holds a reference to the importer/exporter object.
/// Otherwise null.
///
internal ICancelable? CurrentCopyOperation;
///
/// Holds all run-time parameters received from the backend (via ParameterStatus messages)
///
internal Dictionary PostgresParameters { get; }
///
/// Holds all run-time parameters in raw, binary format for efficient handling without allocations.
///
readonly List<(byte[] Name, byte[] Value)> _rawParameters = [];
///
/// If this connector was broken, this contains the exception that caused the break.
///
volatile Exception? _breakReason;
///
///
/// Used by the pool to indicate that I/O is currently in progress on this connector, so that another write
/// isn't started concurrently. Note that since we have only one write loop, this is only ever usedto
/// protect against an over-capacity writes into a connector that's currently *asynchronously* writing.
///
///
/// It is guaranteed that the currently-executing
/// Specifically, reading may occur - and the connector may even be returned to the pool - before this is
/// released.
///
///
internal volatile int MultiplexAsyncWritingLock;
///
internal void FlagAsNotWritableForMultiplexing()
{
Debug.Assert(Settings.Multiplexing);
Debug.Assert(CommandsInFlightCount > 0 || IsBroken || IsClosed,
$"About to mark multiplexing connector as non-writable, but {nameof(CommandsInFlightCount)} is {CommandsInFlightCount}");
Interlocked.Exchange(ref MultiplexAsyncWritingLock, 1);
}
///
internal void FlagAsWritableForMultiplexing()
{
Debug.Assert(Settings.Multiplexing);
if (Interlocked.CompareExchange(ref MultiplexAsyncWritingLock, 0, 1) != 1)
throw new Exception("Multiplexing lock was not taken when releasing. Please report a bug.");
}
///
/// A lock that's taken while a cancellation is being delivered; new queries are blocked until the
/// cancellation is delivered. This reduces the chance that a cancellation meant for a previous
/// command will accidentally cancel a later one, see #615.
///
object CancelLock { get; } = new();
///
/// A lock that's taken to make sure no other concurrent operation is running.
/// Break takes it to set the state of the connector.
/// Anyone else should immediately check the state and exit
/// if the connector is closed.
///
object SyncObj { get; } = new();
///
/// A lock that's used to wait for the Cleanup to complete while breaking the connection.
///
object CleanupLock { get; } = new();
readonly bool _isKeepAliveEnabled;
readonly Timer? _keepAliveTimer;
///
/// The command currently being executed by the connector, null otherwise.
/// Used only for concurrent use error reporting purposes.
///
NpgsqlCommand? _currentCommand;
bool _sendResetOnClose;
///
/// The connector source (e.g. pool) from where this connector came, and to which it will be returned.
/// Note that in multi-host scenarios, this references the host-specific rather than the
/// .
///
internal NpgsqlDataSource DataSource { get; }
internal string UserFacingConnectionString => DataSource.ConnectionString;
///
/// Contains the UTC timestamp when this connector was opened, used to implement
/// .
///
internal DateTime OpenTimestamp { get; private set; }
internal int ClearCounter { get; set; }
volatile bool _postgresCancellationPerformed;
internal bool PostgresCancellationPerformed
{
get => _postgresCancellationPerformed;
private set => _postgresCancellationPerformed = value;
}
volatile bool _userCancellationRequested;
CancellationTokenRegistration _cancellationTokenRegistration;
internal bool UserCancellationRequested => _userCancellationRequested;
internal CancellationToken UserCancellationToken { get; set; }
internal bool AttemptPostgresCancellation { get; private set; }
static readonly TimeSpan _cancelImmediatelyTimeout = TimeSpan.Zero;
static readonly SslApplicationProtocol _alpnProtocol = new("postgresql");
#pragma warning disable CA1859
// We're casting to IDisposable to not explicitly reference X509Certificate2 for NativeAOT
// TODO: probably pointless now, needs to be rechecked
List? _certificates;
#pragma warning restore CA1859
internal NpgsqlLoggingConfiguration LoggingConfiguration { get; }
internal ILogger ConnectionLogger { get; }
internal ILogger CommandLogger { get; }
internal ILogger TransactionLogger { get; }
internal ILogger CopyLogger { get; }
internal readonly Stopwatch QueryLogStopWatch = new();
internal EndPoint? ConnectedEndPoint { get; private set; }
#endregion
#region Constants
///
/// The minimum timeout that can be set on internal commands such as COMMIT, ROLLBACK.
///
/// Precision is seconds
internal const int MinimumInternalCommandTimeout = 3;
#endregion
#region Reusable Message Objects
byte[]? _resetWithoutDeallocateMessage;
int _resetWithoutDeallocateResponseCount;
// Backend
readonly CommandCompleteMessage _commandCompleteMessage = new();
readonly ReadyForQueryMessage _readyForQueryMessage = new();
readonly ParameterDescriptionMessage _parameterDescriptionMessage = new();
readonly DataRowMessage _dataRowMessage = new();
readonly RowDescriptionMessage _rowDescriptionMessage = new(connectorOwned: true);
// Since COPY is rarely used, allocate these lazily
CopyInResponseMessage? _copyInResponseMessage;
CopyOutResponseMessage? _copyOutResponseMessage;
CopyDataMessage? _copyDataMessage;
CopyBothResponseMessage? _copyBothResponseMessage;
#endregion
internal NpgsqlDataReader DataReader { get; set; }
internal NpgsqlDataReader? UnboundDataReader { get; set; }
#region Constructors
internal NpgsqlConnector(NpgsqlDataSource dataSource, NpgsqlConnection conn)
: this(dataSource)
{
var sslClientAuthenticationOptionsCallback = conn.SslClientAuthenticationOptionsCallback;
#pragma warning disable CS0618 // Obsolete
var provideClientCertificatesCallback = conn.ProvideClientCertificatesCallback;
var userCertificateValidationCallback = conn.UserCertificateValidationCallback;
if (provideClientCertificatesCallback is not null ||
userCertificateValidationCallback is not null)
{
if (sslClientAuthenticationOptionsCallback is not null)
throw new NotSupportedException(NpgsqlStrings.SslClientAuthenticationOptionsCallbackWithOtherCallbacksNotSupported);
sslClientAuthenticationOptionsCallback = options =>
{
if (provideClientCertificatesCallback is not null)
{
options.ClientCertificates ??= new X509Certificate2Collection();
provideClientCertificatesCallback.Invoke(options.ClientCertificates);
}
if (userCertificateValidationCallback is not null)
{
options.RemoteCertificateValidationCallback = userCertificateValidationCallback;
}
};
}
if (sslClientAuthenticationOptionsCallback is not null)
SslClientAuthenticationOptionsCallback = sslClientAuthenticationOptionsCallback;
ProvidePasswordCallback = conn.ProvidePasswordCallback;
#pragma warning restore CS0618
}
NpgsqlConnector(NpgsqlConnector connector)
: this(connector.DataSource)
{
SslClientAuthenticationOptionsCallback = connector.SslClientAuthenticationOptionsCallback;
ProvidePasswordCallback = connector.ProvidePasswordCallback;
}
NpgsqlConnector(NpgsqlDataSource dataSource)
{
Debug.Assert(dataSource.OwnsConnectors);
DataSource = dataSource;
LoggingConfiguration = dataSource.LoggingConfiguration;
ConnectionLogger = LoggingConfiguration.ConnectionLogger;
CommandLogger = LoggingConfiguration.CommandLogger;
TransactionLogger = LoggingConfiguration.TransactionLogger;
CopyLogger = LoggingConfiguration.CopyLogger;
SslClientAuthenticationOptionsCallback = dataSource.SslClientAuthenticationOptionsCallback;
NegotiateOptionsCallback = dataSource.Configuration.NegotiateOptionsCallback;
State = ConnectorState.Closed;
TransactionStatus = TransactionStatus.Idle;
Settings = dataSource.Settings;
PostgresParameters = new Dictionary();
_isKeepAliveEnabled = Settings.KeepAlive > 0;
if (_isKeepAliveEnabled)
{
using (ExecutionContext.SuppressFlow()) // Don't capture the current ExecutionContext and its AsyncLocals onto the timer causing them to live forever
_keepAliveTimer = new Timer(PerformKeepAlive, null, Timeout.Infinite, Timeout.Infinite);
}
DataReader = new NpgsqlDataReader(this);
// TODO: Not just for automatic preparation anymore...
PreparedStatementManager = new PreparedStatementManager(this);
if (Settings.Multiplexing)
{
// Note: It's OK for this channel to be unbounded: each command enqueued to it is accompanied by sending
// it to PostgreSQL. If we overload it, a TCP zero window will make us block on the networking side
// anyway.
// Note: the in-flight channel can probably be single-writer, but that doesn't actually do anything
// at this point. And we currently rely on being able to complete the channel at any point (from
// Break). We may want to revisit this if an optimized, SingleWriter implementation is introduced.
var commandsInFlightChannel = Channel.CreateUnbounded(
new UnboundedChannelOptions { SingleReader = true });
CommandsInFlightReader = commandsInFlightChannel.Reader;
CommandsInFlightWriter = commandsInFlightChannel.Writer;
// TODO: Properly implement this
if (_isKeepAliveEnabled)
throw new NotImplementedException("Keepalive not yet implemented for multiplexing");
}
}
#endregion
#region Configuration settings
internal string Host => Settings.Host!;
internal int Port => Settings.Port;
internal string Database => Settings.Database!;
string KerberosServiceName => Settings.KerberosServiceName;
int ConnectionTimeout => Settings.Timeout;
#endregion Configuration settings
#region State management
int _state;
///
/// Gets the current state of the connector
///
internal ConnectorState State
{
get => (ConnectorState)_state;
set
{
var newState = (int)value;
if (newState == _state)
return;
if (newState is < 0 or > (int)ConnectorState.Replication)
ThrowHelper.ThrowArgumentOutOfRangeException(nameof(value), "Unknown state: " + value);
Interlocked.Exchange(ref _state, newState);
}
}
///
/// Returns whether the connector is open, regardless of any task it is currently performing
///
internal bool IsConnected => State is not (ConnectorState.Closed or ConnectorState.Connecting or ConnectorState.Broken);
internal bool IsReady => State == ConnectorState.Ready;
internal bool IsClosed => State == ConnectorState.Closed;
internal bool IsBroken => State == ConnectorState.Broken;
#endregion
#region Open
///
/// Opens the physical connection to the server.
///
/// Usually called by the RequestConnector
/// Method of the connection pool manager.
internal async Task Open(NpgsqlTimeout timeout, bool async, CancellationToken cancellationToken)
{
Debug.Assert(State == ConnectorState.Closed);
State = ConnectorState.Connecting;
LogMessages.OpeningPhysicalConnection(ConnectionLogger, Host, Port, Database, UserFacingConnectionString);
var startOpenTimestamp = Stopwatch.GetTimestamp();
Activity? activity = null;
try
{
var username = await GetUsernameAsync(async, cancellationToken).ConfigureAwait(false);
activity = NpgsqlActivitySource.PhysicalConnectionOpen(this);
var gssEncMode = GetGssEncMode(Settings);
await OpenCore(this, username, Settings.SslMode, gssEncMode, timeout, async, cancellationToken).ConfigureAwait(false);
if (activity is not null)
NpgsqlActivitySource.Enrich(activity, this);
await DataSource.Bootstrap(this, timeout, forceReload: false, async, cancellationToken).ConfigureAwait(false);
// The connector directly references the current reloadable state reference, to protect it against changes by a concurrent
// ReloadTypes. We update them here before returning the connector from the pool.
ReloadableState = DataSource.CurrentReloadableState;
if (Settings.Pooling && Settings is { Multiplexing: false, NoResetOnClose: false } && DatabaseInfo.SupportsDiscard)
{
_sendResetOnClose = true;
GenerateResetMessage();
}
OpenTimestamp = DateTime.UtcNow;
if (Settings.Multiplexing)
{
// Start an infinite async loop, which processes incoming multiplexing traffic.
// It is intentionally not awaited and will run as long as the connector is alive.
// The CommandsInFlightWriter channel is completed in Cleanup, which should cause this task
// to complete.
// Make sure we do not flow AsyncLocals like Activity.Current
using var __ = ExecutionContext.SuppressFlow();
_ = Task.Run(MultiplexingReadLoop, CancellationToken.None)
.ContinueWith(t =>
{
// Note that we *must* observe the exception if the task is faulted.
ConnectionLogger.LogError(t.Exception!, "Exception bubbled out of multiplexing read loop", Id);
}, TaskContinuationOptions.OnlyOnFaulted);
}
if (_isKeepAliveEnabled)
{
// Start the keep alive mechanism to work by scheduling the timer.
// Otherwise, it doesn't work for cases when no query executed during
// the connection lifetime in case of a new connector.
lock (SyncObj)
{
var keepAlive = Settings.KeepAlive * 1000;
_keepAliveTimer!.Change(keepAlive, keepAlive);
}
}
if (DataSource.ConnectionInitializerAsync is not null)
{
Debug.Assert(DataSource.ConnectionInitializer is not null);
var tempConnection = new NpgsqlConnection(DataSource, this);
try
{
if (async)
await DataSource.ConnectionInitializerAsync(tempConnection).ConfigureAwait(false);
else
DataSource.ConnectionInitializer(tempConnection);
}
finally
{
// Note that we can't just close/dispose the NpgsqlConnection, since that puts the connector back in the pool.
// But we transition it to disposed immediately, in case the user decides to capture the NpgsqlConnection and use it
// later.
Connection?.MakeDisposed();
Connection = null;
}
}
activity?.Dispose();
LogMessages.OpenedPhysicalConnection(
ConnectionLogger, Host, Port, Database, UserFacingConnectionString,
(long)Stopwatch.GetElapsedTime(startOpenTimestamp).TotalMilliseconds, Id);
}
catch (Exception e)
{
if (activity is not null)
NpgsqlActivitySource.SetException(activity, e);
Break(e);
throw;
}
static async Task OpenCore(
NpgsqlConnector conn,
string username,
SslMode sslMode,
GssEncryptionMode gssEncMode,
NpgsqlTimeout timeout,
bool async,
CancellationToken cancellationToken)
{
// If we fail to connect to the socket, there is no reason to retry even if SslMode/GssEncryption allows it
await conn.RawOpen(timeout, async, cancellationToken).ConfigureAwait(false);
try
{
await conn.SetupEncryption(sslMode, gssEncMode, timeout, async, cancellationToken).ConfigureAwait(false);
timeout.CheckAndApply(conn);
conn.WriteStartupMessage(username);
await conn.Flush(async, cancellationToken).ConfigureAwait(false);
using var cancellationRegistration = conn.StartCancellableOperation(cancellationToken, attemptPgCancellation: false);
await conn.Authenticate(username, timeout, async, cancellationToken).ConfigureAwait(false);
}
// We handle any exception here because on Windows while receiving a response from Postgres
// We might hit connection reset, in which case the actual error will be lost
// And we only read some IO error
// In addition, this behavior mimics libpq, where it retries as long as GssEncryptionMode and SslMode allows it
catch (Exception e) when
// We might also get here OperationCancelledException/TimeoutException
// But it's fine to fall down and retry because we'll immediately exit with the exact same exception
//
// Any error after trying with GSS encryption
(gssEncMode == GssEncryptionMode.Prefer ||
// Auth error with/without SSL
(sslMode == SslMode.Prefer && conn.IsSslEncrypted || sslMode == SslMode.Allow && !conn.IsSslEncrypted))
{
if (gssEncMode == GssEncryptionMode.Prefer)
{
conn.ConnectionLogger.LogTrace(e, "Error while opening physical connection with GSS encryption, retrying without it");
gssEncMode = GssEncryptionMode.Disable;
}
else
sslMode = sslMode == SslMode.Prefer ? SslMode.Disable : SslMode.Require;
conn.Cleanup();
// If Prefer was specified and we failed (with SSL), retry without SSL.
// If Allow was specified and we failed (without SSL), retry with SSL
await OpenCore(
conn,
username,
sslMode,
gssEncMode,
timeout,
async,
cancellationToken).ConfigureAwait(false);
return;
}
// We treat BackendKeyData as optional because some PostgreSQL-like database
// don't send it (CockroachDB, CrateDB)
var msg = await conn.ReadMessage(async).ConfigureAwait(false);
if (msg.Code == BackendMessageCode.BackendKeyData)
{
var keyDataMsg = (BackendKeyDataMessage)msg;
conn.BackendProcessId = keyDataMsg.BackendProcessId;
conn._backendSecretKey = keyDataMsg.BackendSecretKey;
msg = await conn.ReadMessage(async).ConfigureAwait(false);
}
if (msg.Code != BackendMessageCode.ReadyForQuery)
throw new NpgsqlException($"Received backend message {msg.Code} while expecting ReadyForQuery. Please file a bug.");
conn.State = ConnectorState.Ready;
}
}
internal async ValueTask GSSEncrypt(bool async, bool isRequired, CancellationToken cancellationToken)
{
ConnectionLogger.LogTrace("Negotiating GSS encryption");
var targetName = $"{KerberosServiceName}/{Host}";
var clientOptions = new NegotiateAuthenticationClientOptions { TargetName = targetName };
NegotiateOptionsCallback?.Invoke(clientOptions);
var authentication = new NegotiateAuthentication(clientOptions);
try
{
var data = authentication.GetOutgoingBlob(ReadOnlySpan.Empty, out var statusCode)!;
if (statusCode != NegotiateAuthenticationStatusCode.ContinueNeeded)
{
// Unable to retrieve credentials
// If it's required, throw an appropriate exception
if (isRequired)
throw new NpgsqlException($"Unable to negotiate GSS encryption: {statusCode}");
return GssEncryptionResult.GetCredentialFailure;
}
WriteGSSEncryptRequest();
await Flush(async, cancellationToken).ConfigureAwait(false);
await ReadBuffer.Ensure(1, async).ConfigureAwait(false);
var response = (char)ReadBuffer.ReadByte();
// TODO: Server can respond with an error here
// but according to documentation we shouldn't display this error to the user/application
// since the server has not been authenticated (CVE-2024-10977)
// See https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-GSSAPI
switch (response)
{
default:
throw new NpgsqlException($"Received unknown response {response} for GSSEncRequest (expecting G or N)");
case 'N':
if (isRequired)
throw new NpgsqlException("GGS encryption requested. No GSS encryption enabled connection from this host is configured.");
return GssEncryptionResult.NegotiateFailure;
case 'G':
break;
}
if (ReadBuffer.ReadBytesLeft > 0)
throw new NpgsqlException(
"Additional unencrypted data received after GSS encryption negotiation - this should never happen, and may be an indication of a man-in-the-middle attack.");
var lengthBuffer = new byte[4];
await WriteGssEncryptMessage(async, data, lengthBuffer).ConfigureAwait(false);
while (true)
{
if (async)
await _stream.ReadExactlyAsync(lengthBuffer, cancellationToken).ConfigureAwait(false);
else
_stream.ReadExactly(lengthBuffer);
var messageLength = BitConverter.IsLittleEndian
? BinaryPrimitives.ReverseEndianness(Unsafe.ReadUnaligned(ref lengthBuffer[0]))
: Unsafe.ReadUnaligned(ref lengthBuffer[0]);
var buffer = ArrayPool.Shared.Rent(messageLength);
if (async)
await _stream.ReadExactlyAsync(buffer.AsMemory(0, messageLength), cancellationToken).ConfigureAwait(false);
else
_stream.ReadExactly(buffer.AsSpan(0, messageLength));
data = authentication.GetOutgoingBlob(buffer.AsSpan(0, messageLength), out statusCode);
ArrayPool.Shared.Return(buffer, clearArray: true);
if (statusCode is not NegotiateAuthenticationStatusCode.Completed and not NegotiateAuthenticationStatusCode.ContinueNeeded)
throw new NpgsqlException($"Error while negotiating GSS encryption: {statusCode}");
// TODO: the code below is the copy from GSS/SSPI auth
// It's unknown whether it holds true here or not
// We might get NegotiateAuthenticationStatusCode.Completed but the data will not be null
// This can happen if it's the first cycle, in which case we have to send that data to complete handshake (#4888)
if (data is null)
{
Debug.Assert(statusCode == NegotiateAuthenticationStatusCode.Completed);
break;
}
await WriteGssEncryptMessage(async, data, lengthBuffer).ConfigureAwait(false);
}
_stream = new GSSStream(_stream, authentication);
ReadBuffer.Underlying = _stream;
WriteBuffer.Underlying = _stream;
IsGssEncrypted = true;
authentication = null;
ConnectionLogger.LogTrace("GSS encryption successful");
return GssEncryptionResult.Success;
async ValueTask WriteGssEncryptMessage(bool async, byte[] data, byte[] lengthBuffer)
{
BinaryPrimitives.WriteInt32BigEndian(lengthBuffer, data.Length);
if (async)
{
await _stream.WriteAsync(lengthBuffer, cancellationToken).ConfigureAwait(false);
await _stream.WriteAsync(data, cancellationToken).ConfigureAwait(false);
await _stream.FlushAsync(cancellationToken).ConfigureAwait(false);
}
else
{
_stream.Write(lengthBuffer);
_stream.Write(data);
_stream.Flush();
}
}
}
catch (Exception e)
{
throw new NpgsqlException("Exception while performing GSS encryption", e);
}
finally
{
authentication?.Dispose();
}
}
internal async ValueTask QueryDatabaseState(
NpgsqlTimeout timeout, bool async, CancellationToken cancellationToken = default)
{
using var batch = CreateBatch();
batch.BatchCommands.Add(new NpgsqlBatchCommand("select pg_is_in_recovery()"));
batch.BatchCommands.Add(new NpgsqlBatchCommand("SHOW default_transaction_read_only"));
batch.Timeout = (int)timeout.CheckAndGetTimeLeft().TotalSeconds;
var reader = async ? await batch.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false) : batch.ExecuteReader();
try
{
if (async)
{
await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
_isHotStandBy = reader.GetBoolean(0);
await reader.NextResultAsync(cancellationToken).ConfigureAwait(false);
await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
}
else
{
reader.Read();
_isHotStandBy = reader.GetBoolean(0);
reader.NextResult();
reader.Read();
}
_isTransactionReadOnly = reader.GetString(0) != "off";
var databaseState = UpdateDatabaseState();
Debug.Assert(databaseState.HasValue);
return databaseState.Value;
}
finally
{
if (async)
await reader.DisposeAsync().ConfigureAwait(false);
else
reader.Dispose();
}
}
void WriteStartupMessage(string username)
{
var startupParams = new Dictionary
{
["user"] = username,
["client_encoding"] = Settings.ClientEncoding ??
PostgresEnvironment.ClientEncoding ??
"UTF8"
};
if (Settings.Database is not null)
startupParams["database"] = Settings.Database;
var applicationName = Settings.ApplicationName ?? PostgresEnvironment.AppName;
if (applicationName?.Length > 0)
startupParams["application_name"] = applicationName;
if (Settings.SearchPath?.Length > 0)
startupParams["search_path"] = Settings.SearchPath;
var timezone = Settings.Timezone ?? PostgresEnvironment.TimeZone;
if (timezone != null)
startupParams["TimeZone"] = timezone;
var options = Settings.Options ?? PostgresEnvironment.Options;
if (options?.Length > 0)
startupParams["options"] = options;
switch (Settings.ReplicationMode)
{
case ReplicationMode.Logical:
startupParams["replication"] = "database";
break;
case ReplicationMode.Physical:
startupParams["replication"] = "true";
break;
}
WriteStartup(startupParams);
}
ValueTask GetUsernameAsync(bool async, CancellationToken cancellationToken)
{
var username = Settings.Username;
if (username?.Length > 0)
{
InferredUserName = username;
return new(username);
}
username = PostgresEnvironment.User;
if (username?.Length > 0)
{
InferredUserName = username;
return new(username);
}
return GetUsernameAsyncInternal();
async ValueTask GetUsernameAsyncInternal()
{
if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
username = await DataSource.IntegratedSecurityHandler.GetUsername(async, Settings.IncludeRealm, ConnectionLogger,
cancellationToken).ConfigureAwait(false);
if (username?.Length > 0)
{
InferredUserName = username;
return username;
}
}
username = Environment.UserName;
if (username?.Length > 0)
{
InferredUserName = username;
return username;
}
throw new NpgsqlException("No username could be found, please specify one explicitly");
}
}
async Task RawOpen(NpgsqlTimeout timeout, bool async, CancellationToken cancellationToken)
{
try
{
if (async)
await ConnectAsync(timeout, cancellationToken).ConfigureAwait(false);
else
Connect(timeout);
ConnectionLogger.LogTrace("Socket connected to {Host}:{Port}", Host, Port);
_baseStream = new NetworkStream(_socket, true);
_stream = _baseStream;
if (Settings.Encoding == "UTF8")
{
TextEncoding = NpgsqlWriteBuffer.UTF8Encoding;
RelaxedTextEncoding = NpgsqlWriteBuffer.RelaxedUTF8Encoding;
}
else
{
TextEncoding = Encoding.GetEncoding(Settings.Encoding, EncoderFallback.ExceptionFallback, DecoderFallback.ExceptionFallback);
RelaxedTextEncoding = Encoding.GetEncoding(Settings.Encoding, EncoderFallback.ReplacementFallback, DecoderFallback.ReplacementFallback);
}
ReadBuffer = new NpgsqlReadBuffer(this, _stream, _socket, Settings.ReadBufferSize, TextEncoding, RelaxedTextEncoding);
WriteBuffer = new NpgsqlWriteBuffer(this, _stream, _socket, Settings.WriteBufferSize, TextEncoding);
timeout.CheckAndApply(this);
IsSslEncrypted = false;
IsGssEncrypted = false;
}
catch
{
_stream?.Dispose();
_stream = null!;
_baseStream?.Dispose();
_baseStream = null!;
_socket?.Dispose();
_socket = null!;
throw;
}
}
async Task SetupEncryption(SslMode sslMode, GssEncryptionMode gssEncryptionMode, NpgsqlTimeout timeout, bool async, CancellationToken cancellationToken)
{
var gssEncryptResult = await TryNegotiateGssEncryption(gssEncryptionMode, async, cancellationToken).ConfigureAwait(false);
if (gssEncryptResult == GssEncryptionResult.Success)
return;
// TryNegotiateGssEncryption should already throw a much more meaningful exception
// if GSS encryption is required but for some reason we can't negotiate it.
// But since we have to return a specific result instead of generic true/false
// To make absolutely sure we didn't miss anything, recheck again
if (gssEncryptionMode == GssEncryptionMode.Require)
throw new NpgsqlException($"Unable to negotiate GSS encryption: {gssEncryptResult}");
timeout.CheckAndApply(this);
if (GetSslNegotiation(Settings) == SslNegotiation.Direct)
{
// We already check that in NpgsqlConnectionStringBuilder.PostProcessAndValidate, but since we also allow environment variables...
if (Settings.SslMode is not SslMode.Require and not SslMode.VerifyCA and not SslMode.VerifyFull)
throw new ArgumentException("SSL Mode has to be Require or higher to be used with direct SSL Negotiation");
if (gssEncryptResult == GssEncryptionResult.NegotiateFailure)
{
// We can be here only if it's fallback from preferred (but failed) gss encryption
// In this case, direct encryption isn't going to work anymore, so we throw a bogus exception to retry again without gss
// Alternatively, we can instead just go with the usual route of writing SslRequest, ignoring direct ssl
// But this is how libpq works
Debug.Assert(gssEncryptionMode == GssEncryptionMode.Prefer);
// The exception message doesn't matter since we're going to retry again
throw new NpgsqlException();
}
await DataSource.TransportSecurityHandler.NegotiateEncryption(async, this, sslMode, timeout, cancellationToken).ConfigureAwait(false);
if (ReadBuffer.ReadBytesLeft > 0)
throw new NpgsqlException("Additional unencrypted data received after SSL negotiation - this should never happen, and may be an indication of a man-in-the-middle attack.");
}
else if ((sslMode is SslMode.Prefer && DataSource.TransportSecurityHandler.SupportEncryption) ||
sslMode is SslMode.Require or SslMode.VerifyCA or SslMode.VerifyFull)
{
WriteSslRequest();
await Flush(async, cancellationToken).ConfigureAwait(false);
await ReadBuffer.Ensure(1, async).ConfigureAwait(false);
var response = (char)ReadBuffer.ReadByte();
timeout.CheckAndApply(this);
switch (response)
{
default:
throw new NpgsqlException($"Received unknown response {response} for SSLRequest (expecting S or N)");
case 'N':
if (sslMode != SslMode.Prefer)
throw new NpgsqlException("SSL connection requested. No SSL enabled connection from this host is configured.");
break;
case 'S':
await DataSource.TransportSecurityHandler.NegotiateEncryption(async, this, sslMode, timeout, cancellationToken).ConfigureAwait(false);
break;
}
if (ReadBuffer.ReadBytesLeft > 0)
throw new NpgsqlException("Additional unencrypted data received after SSL negotiation - this should never happen, and may be an indication of a man-in-the-middle attack.");
}
}
async ValueTask TryNegotiateGssEncryption(GssEncryptionMode gssEncryptionMode, bool async, CancellationToken cancellationToken)
{
// GetCredentialFailure is essentially a nop (since we didn't send anything over the wire)
// So we can proceed further as if gss encryption wasn't even attempted
if (gssEncryptionMode == GssEncryptionMode.Disable) return GssEncryptionResult.GetCredentialFailure;
// Same thing as above, though in this case user doesn't require GSS encryption but didn't enable encryption
// Most of the time they're using the default value, in which case also exit without throwing an error
if (gssEncryptionMode == GssEncryptionMode.Prefer && !DataSource.TransportSecurityHandler.SupportEncryption)
return GssEncryptionResult.GetCredentialFailure;
if (ConnectedEndPoint!.AddressFamily == AddressFamily.Unix)
{
if (gssEncryptionMode == GssEncryptionMode.Prefer)
return GssEncryptionResult.GetCredentialFailure;
Debug.Assert(gssEncryptionMode == GssEncryptionMode.Require);
throw new NpgsqlException("GSS encryption isn't supported over unix socket");
}
return await DataSource.IntegratedSecurityHandler.GSSEncrypt(async, gssEncryptionMode == GssEncryptionMode.Require, this, cancellationToken)
.ConfigureAwait(false);
}
static SslNegotiation GetSslNegotiation(NpgsqlConnectionStringBuilder settings)
{
if (settings.UserProvidedSslNegotiation is { } userProvidedSslNegotiation)
return userProvidedSslNegotiation;
if (PostgresEnvironment.SslNegotiation is { } sslNegotiationEnv)
{
if (Enum.TryParse(sslNegotiationEnv, ignoreCase: true, out var sslNegotiation))
return sslNegotiation;
}
// If user hasn't provided the value via connection string or environment variable
// Retrieve the default value from property
return settings.SslNegotiation;
}
static GssEncryptionMode GetGssEncMode(NpgsqlConnectionStringBuilder settings)
{
if (settings.UserProvidedGssEncMode is { } userProvidedGssEncMode)
return userProvidedGssEncMode;
if (PostgresEnvironment.GssEncryptionMode is { } gssEncModeEnv)
{
if (Enum.TryParse(gssEncModeEnv, ignoreCase: true, out var gssEncMode))
return gssEncMode;
}
// If user hasn't provided the value via connection string or environment variable
// Retrieve the default value from property
return settings.GssEncryptionMode;
}
internal async Task NegotiateEncryption(SslMode sslMode, NpgsqlTimeout timeout, bool async, CancellationToken cancellationToken)
{
ConnectionLogger.LogTrace("Negotiating SSL encryption");
var clientCertificates = new X509Certificate2Collection();
var certPath = Settings.SslCertificate ?? PostgresEnvironment.SslCert ?? PostgresEnvironment.SslCertDefault;
if (certPath != null)
{
var password = Settings.SslPassword;
if (!string.Equals(Path.GetExtension(certPath), ".pfx", StringComparison.OrdinalIgnoreCase))
{
// It's PEM time
var keyPath = Settings.SslKey ?? PostgresEnvironment.SslKey ?? PostgresEnvironment.SslKeyDefault;
// With PEM certificates we might have multiple certificates in a single file
// Where the first one is a leaf (and it has to have a private key)
// And others are intermediate between it and CA cert
// To support this, we first load the leaf certificate with private key
// And then we load everything else including the leaf, but without private key
// And afterwards we just get rid of the duplicate
var firstClientCert = string.IsNullOrEmpty(password)
? X509Certificate2.CreateFromPemFile(certPath, keyPath)
: X509Certificate2.CreateFromEncryptedPemFile(certPath, password, keyPath);
clientCertificates.Add(firstClientCert);
clientCertificates.ImportFromPemFile(certPath);
clientCertificates[1].Dispose();
clientCertificates.RemoveAt(1);
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
for (var i = 0; i < clientCertificates.Count; i++)
{
var cert = clientCertificates[i];
// Windows crypto API has a bug with pem certs
// See #3650
using var previousCert = cert;
#if NET9_0_OR_GREATER
cert = X509CertificateLoader.LoadPkcs12(cert.Export(X509ContentType.Pkcs12), null);
#else
cert = new X509Certificate2(cert.Export(X509ContentType.Pkcs12));
#endif
clientCertificates[i] = cert;
}
}
}
// If it's empty, it's probably PFX
if (clientCertificates.Count == 0)
{
#if NET9_0_OR_GREATER
var certs = X509CertificateLoader.LoadPkcs12CollectionFromFile(certPath, password);
clientCertificates.AddRange(certs);
#else
var cert = new X509Certificate2(certPath, password);
clientCertificates.Add(cert);
#endif
}
var certificates = new List();
foreach (var certificate in clientCertificates)
certificates.Add(certificate);
_certificates = certificates;
}
try
{
var checkCertificateRevocation = Settings.CheckCertificateRevocation;
RemoteCertificateValidationCallback? certificateValidationCallback;
X509Certificate2Collection? caCerts;
string? certRootPath = null;
if (sslMode is SslMode.Prefer or SslMode.Require)
{
certificateValidationCallback = SslTrustServerValidation;
checkCertificateRevocation = false;
}
else if (((caCerts = DataSource.TransportSecurityHandler.RootCertificatesCallback?.Invoke()) is not null && caCerts.Count > 0) ||
(certRootPath = Settings.RootCertificate ??
PostgresEnvironment.SslCertRoot ?? PostgresEnvironment.SslCertRootDefault) is not null)
{
certificateValidationCallback = SslRootValidation(sslMode == SslMode.VerifyFull, certRootPath, caCerts);
}
else if (sslMode == SslMode.VerifyCA)
{
certificateValidationCallback = SslVerifyCAValidation;
}
else
{
Debug.Assert(sslMode == SslMode.VerifyFull);
certificateValidationCallback = SslVerifyFullValidation;
}
SslStreamCertificateContext? clientCertificateContext = null;
if (clientCertificates.Count > 0)
{
// SslClientAuthenticationOptions.ClientCertificates only sends trusted certificates or if they have private key
// Which makes us unable to send intermediate certificates
// Work around this by specifying the first certificate as target
// And others as additional
// See https://github.com/dotnet/runtime/issues/26323
var clientCertificate = clientCertificates[0];
clientCertificates.RemoveAt(0);
clientCertificateContext = SslStreamCertificateContext.Create(clientCertificate, clientCertificates);
}
var host = Host;
timeout.CheckAndApply(this);
var sslStream = new SslStream(_stream, leaveInnerStreamOpen: false);
var sslStreamOptions = new SslClientAuthenticationOptions
{
TargetHost = host,
ClientCertificateContext = clientCertificateContext,
EnabledSslProtocols = SslProtocols.None,
CertificateRevocationCheckMode = checkCertificateRevocation ? X509RevocationMode.Online : X509RevocationMode.NoCheck,
RemoteCertificateValidationCallback = certificateValidationCallback,
ApplicationProtocols = [_alpnProtocol]
};
if (SslClientAuthenticationOptionsCallback is not null)
{
SslClientAuthenticationOptionsCallback.Invoke(sslStreamOptions);
// User changed remote certificate validation callback
// Check whether the change doesn't lead to unexpected behavior
if (sslStreamOptions.RemoteCertificateValidationCallback != certificateValidationCallback)
{
if (sslMode is SslMode.VerifyCA or SslMode.VerifyFull)
throw new ArgumentException(string.Format(NpgsqlStrings.CannotUseSslVerifyWithCustomValidationCallback, sslMode));
if (Settings.RootCertificate is not null)
throw new ArgumentException(NpgsqlStrings.CannotUseSslRootCertificateWithCustomValidationCallback);
if (DataSource.TransportSecurityHandler.RootCertificatesCallback is not null)
throw new ArgumentException(NpgsqlStrings.CannotUseValidationRootCertificateCallbackWithCustomValidationCallback);
}
}
try
{
if (async)
await sslStream.AuthenticateAsClientAsync(sslStreamOptions, cancellationToken).ConfigureAwait(false);
else
sslStream.AuthenticateAsClient(sslStreamOptions);
_stream = sslStream;
}
catch (Exception e)
{
sslStream.Dispose();
throw new NpgsqlException("Exception while performing SSL handshake", e);
}
ReadBuffer.Underlying = _stream;
WriteBuffer.Underlying = _stream;
IsSslEncrypted = true;
ConnectionLogger.LogTrace("SSL negotiation successful");
}
catch
{
_certificates?.ForEach(x => x.Dispose());
_certificates = null;
throw;
}
}
void Connect(NpgsqlTimeout timeout)
{
EndPoint[]? endpoints;
if (NpgsqlConnectionStringBuilder.IsUnixSocket(Host, Port, out var socketPath))
{
endpoints = [new UnixDomainSocketEndPoint(socketPath!)];
}
else
{
// Note that there aren't any timeout-able or cancellable DNS methods
try
{
endpoints = IPAddressesToEndpoints(Dns.GetHostAddresses(Host), Port);
}
catch (SocketException ex)
{
throw new NpgsqlException(ex.Message, ex);
}
}
// Give each endpoint an equal share of the remaining time
var perEndpointTimeout = -1; // Default to infinity
if (timeout.IsSet)
perEndpointTimeout = (int)(timeout.CheckAndGetTimeLeft().Ticks / endpoints.Length / 10);
for (var i = 0; i < endpoints.Length; i++)
{
var endpoint = endpoints[i];
ConnectionLogger.LogTrace("Attempting to connect to {Endpoint}", endpoint);
var protocolType =
endpoint.AddressFamily == AddressFamily.InterNetwork ||
endpoint.AddressFamily == AddressFamily.InterNetworkV6
? ProtocolType.Tcp
: ProtocolType.IP;
var socket = new Socket(endpoint.AddressFamily, SocketType.Stream, protocolType)
{
Blocking = false
};
try
{
// Some options are not applied after the socket is open, see #6013
SetSocketOptions(socket);
try
{
socket.Connect(endpoint);
}
catch (SocketException e)
{
if (e.SocketErrorCode != SocketError.WouldBlock)
throw;
}
var write = new List { socket };
var error = new List { socket };
Socket.Select(null, write, error, perEndpointTimeout);
var errorCode = (int) socket.GetSocketOption(SocketOptionLevel.Socket, SocketOptionName.Error)!;
if (errorCode != 0)
throw new SocketException(errorCode);
if (write.Count is 0)
throw new TimeoutException("Timeout during connection attempt");
socket.Blocking = true;
_socket = socket;
ConnectedEndPoint = endpoint;
return;
}
catch (Exception e)
{
try { socket.Dispose(); }
catch
{
// ignored
}
ConnectionLogger.LogTrace(e, "Failed to connect to {Endpoint}", endpoint);
if (i == endpoints.Length - 1)
throw new NpgsqlException($"Failed to connect to {endpoint}", e);
}
}
}
async Task ConnectAsync(NpgsqlTimeout timeout, CancellationToken cancellationToken)
{
EndPoint[] endpoints;
if (NpgsqlConnectionStringBuilder.IsUnixSocket(Host, Port, out var socketPath))
{
endpoints = [new UnixDomainSocketEndPoint(socketPath)];
}
else
{
IPAddress[] ipAddresses;
try
{
using var combinedCts = timeout.IsSet ? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken) : null;
combinedCts?.CancelAfter(timeout.CheckAndGetTimeLeft());
var combinedToken = combinedCts?.Token ?? cancellationToken;
try
{
ipAddresses = await Dns.GetHostAddressesAsync(Host, combinedToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
cancellationToken.ThrowIfCancellationRequested();
throw new TimeoutException();
}
}
catch (SocketException ex)
{
throw new NpgsqlException(ex.Message, ex);
}
endpoints = IPAddressesToEndpoints(ipAddresses, Port);
}
// Give each endpoint an equal share of the remaining time
var perEndpointTimeout = default(TimeSpan);
if (timeout.IsSet)
perEndpointTimeout = timeout.CheckAndGetTimeLeft() / endpoints.Length;
for (var i = 0; i < endpoints.Length; i++)
{
var endpointTimeout = timeout.IsSet ? new NpgsqlTimeout(perEndpointTimeout) : timeout;
Debug.Assert(timeout.IsSet == endpointTimeout.IsSet);
var endpoint = endpoints[i];
ConnectionLogger.LogTrace("Attempting to connect to {Endpoint}", endpoint);
var protocolType =
endpoint.AddressFamily == AddressFamily.InterNetwork ||
endpoint.AddressFamily == AddressFamily.InterNetworkV6
? ProtocolType.Tcp
: ProtocolType.IP;
var socket = new Socket(endpoint.AddressFamily, SocketType.Stream, protocolType);
try
{
// Some options are not applied after the socket is open, see #6013
SetSocketOptions(socket);
using var combinedCts = endpointTimeout.IsSet ? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken) : null;
combinedCts?.CancelAfter(endpointTimeout.CheckAndGetTimeLeft());
var combinedToken = combinedCts?.Token ?? cancellationToken;
await socket.ConnectAsync(endpoint, combinedToken).ConfigureAwait(false);
_socket = socket;
ConnectedEndPoint = endpoint;
return;
}
catch (Exception e)
{
try
{
socket.Dispose();
}
catch
{
// ignored
}
cancellationToken.ThrowIfCancellationRequested();
if (e is OperationCanceledException)
e = new TimeoutException("Timeout during connection attempt");
else if (e is NpgsqlException)
e = e.InnerException!; // We throw NpgsqlException for timeouts, wrapping TimeoutException
ConnectionLogger.LogTrace(e, "Failed to connect to {Endpoint}", endpoint);
if (i == endpoints.Length - 1)
throw new NpgsqlException($"Failed to connect to {endpoint}", e);
}
}
}
EndPoint[] IPAddressesToEndpoints(IPAddress[] ipAddresses, int port)
{
var result = new EndPoint[ipAddresses.Length];
for (var i = 0; i < ipAddresses.Length; i++)
result[i] = new IPEndPoint(ipAddresses[i], port);
return result;
}
void SetSocketOptions(Socket socket)
{
if (socket.AddressFamily == AddressFamily.InterNetwork || socket.AddressFamily == AddressFamily.InterNetworkV6)
socket.NoDelay = true;
if (Settings.SocketReceiveBufferSize > 0)
socket.ReceiveBufferSize = Settings.SocketReceiveBufferSize;
if (Settings.SocketSendBufferSize > 0)
socket.SendBufferSize = Settings.SocketSendBufferSize;
if (Settings.TcpKeepAlive)
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
if (Settings is { TcpKeepAliveInterval: > 0, TcpKeepAliveTime: 0 })
throw new ArgumentException("If TcpKeepAliveInterval is defined, TcpKeepAliveTime must be defined as well");
if (Settings.TcpKeepAliveTime > 0)
{
var timeSeconds = Settings.TcpKeepAliveTime;
var intervalSeconds = Settings.TcpKeepAliveInterval > 0
? Settings.TcpKeepAliveInterval
: Settings.TcpKeepAliveTime;
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime, timeSeconds);
socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveInterval, intervalSeconds);
}
}
#endregion
#region I/O
readonly ChannelReader? CommandsInFlightReader;
internal readonly ChannelWriter? CommandsInFlightWriter;
internal volatile int CommandsInFlightCount;
internal ManualResetValueTaskSource