X Tutup
using Npgsql.Internal; using Npgsql.Util; using System; using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Threading; using System.Threading.Tasks; using System.Transactions; namespace Npgsql; /// /// An which manages connections for multiple hosts, is aware of their states (primary, secondary, /// offline...) and can perform failover and load balancing across them. /// /// /// See . /// public sealed class NpgsqlMultiHostDataSource : NpgsqlDataSource { internal override bool OwnsConnectors => false; readonly NpgsqlDataSource[] _pools; internal NpgsqlDataSource[] Pools => _pools; readonly MultiHostDataSourceWrapper[] _wrappers; volatile int _roundRobinIndex = -1; internal NpgsqlMultiHostDataSource(NpgsqlConnectionStringBuilder settings, NpgsqlDataSourceConfiguration dataSourceConfig) : base(settings, dataSourceConfig, reportMetrics: false) { var hosts = settings.Host!.Split(','); _pools = new NpgsqlDataSource[hosts.Length]; for (var i = 0; i < hosts.Length; i++) { var poolSettings = settings.Clone(); var host = hosts[i].AsSpan().Trim(); if (NpgsqlConnectionStringBuilder.TrySplitHostPort(host, out var newHost, out var newPort)) { poolSettings.Host = newHost; poolSettings.Port = newPort; } else poolSettings.Host = host.ToString(); _pools[i] = settings.Pooling ? new PoolingDataSource(poolSettings, dataSourceConfig) : new UnpooledDataSource(poolSettings, dataSourceConfig); } var targetSessionAttributeValues = Enum.GetValues(); var highestValue = 0; foreach (var value in targetSessionAttributeValues) if ((int)value > highestValue) highestValue = (int)value; _wrappers = new MultiHostDataSourceWrapper[highestValue + 1]; foreach (var targetSessionAttribute in targetSessionAttributeValues) _wrappers[(int)targetSessionAttribute] = new(this, targetSessionAttribute); } /// /// Returns a new, unopened connection from this data source. /// /// Specifies the server type (e.g. primary, standby). public NpgsqlConnection CreateConnection(TargetSessionAttributes targetSessionAttributes) => NpgsqlConnection.FromDataSource(_wrappers[(int)targetSessionAttributes]); /// /// Returns a new, opened connection from this data source. /// /// Specifies the server type (e.g. primary, standby). public NpgsqlConnection OpenConnection(TargetSessionAttributes targetSessionAttributes) { var connection = CreateConnection(targetSessionAttributes); try { connection.Open(); return connection; } catch { connection.Dispose(); throw; } } /// /// Returns a new, opened connection from this data source. /// /// Specifies the server type (e.g. primary, standby). /// /// An optional token to cancel the asynchronous operation. The default value is . /// public async ValueTask OpenConnectionAsync( TargetSessionAttributes targetSessionAttributes, CancellationToken cancellationToken = default) { var connection = CreateConnection(targetSessionAttributes); try { await connection.OpenAsync(cancellationToken).ConfigureAwait(false); return connection; } catch { await connection.DisposeAsync().ConfigureAwait(false); throw; } } /// /// Returns an that wraps this multi-host one with the given server type. /// /// Specifies the server type (e.g. primary, standby). public NpgsqlDataSource WithTargetSession(TargetSessionAttributes targetSessionAttributes) => _wrappers[(int)targetSessionAttributes]; static bool IsPreferred(DatabaseState state, TargetSessionAttributes preferredType) => state switch { DatabaseState.Offline => false, DatabaseState.Unknown => true, // We will check compatibility again after refreshing the database state DatabaseState.PrimaryReadWrite when preferredType is TargetSessionAttributes.Primary or TargetSessionAttributes.PreferPrimary or TargetSessionAttributes.ReadWrite => true, DatabaseState.PrimaryReadOnly when preferredType is TargetSessionAttributes.Primary or TargetSessionAttributes.PreferPrimary or TargetSessionAttributes.ReadOnly => true, DatabaseState.Standby when preferredType is TargetSessionAttributes.Standby or TargetSessionAttributes.PreferStandby or TargetSessionAttributes.ReadOnly => true, _ => preferredType == TargetSessionAttributes.Any }; static bool IsOnline(DatabaseState state, TargetSessionAttributes preferredType) { Debug.Assert(preferredType is TargetSessionAttributes.PreferPrimary or TargetSessionAttributes.PreferStandby); return state != DatabaseState.Offline; } async ValueTask TryGetIdleOrNew( NpgsqlConnection conn, TimeSpan timeoutPerHost, bool async, TargetSessionAttributes preferredType, Func stateValidator, int poolIndex, IList exceptions, CancellationToken cancellationToken) { var pools = _pools; for (var i = 0; i < pools.Length; i++) { var pool = pools[poolIndex]; poolIndex++; if (poolIndex == pools.Length) poolIndex = 0; var databaseState = pool.GetDatabaseState(); if (!stateValidator(databaseState, preferredType)) continue; NpgsqlConnector? connector = null; try { if (pool.TryGetIdleConnector(out connector)) { if (databaseState == DatabaseState.Unknown) { databaseState = await connector.QueryDatabaseState(new NpgsqlTimeout(timeoutPerHost), async, cancellationToken).ConfigureAwait(false); Debug.Assert(databaseState != DatabaseState.Unknown); if (!stateValidator(databaseState, preferredType)) { pool.Return(connector); continue; } } return connector; } else { connector = await pool.OpenNewConnector(conn, new NpgsqlTimeout(timeoutPerHost), async, cancellationToken).ConfigureAwait(false); if (connector is not null) { if (databaseState == DatabaseState.Unknown) { // While opening a new connector we might have refreshed the database state, check again databaseState = pool.GetDatabaseState(); if (databaseState == DatabaseState.Unknown) databaseState = await connector.QueryDatabaseState(new NpgsqlTimeout(timeoutPerHost), async, cancellationToken).ConfigureAwait(false); Debug.Assert(databaseState != DatabaseState.Unknown); if (!stateValidator(databaseState, preferredType)) { pool.Return(connector); continue; } } return connector; } } } catch (OperationCanceledException oce) when (cancellationToken.IsCancellationRequested && oce.CancellationToken == cancellationToken) { if (connector is not null) pool.Return(connector); throw; } catch (Exception ex) { exceptions.Add(ex); if (connector is not null) pool.Return(connector); } } return null; } async ValueTask TryGet( NpgsqlConnection conn, TimeSpan timeoutPerHost, bool async, TargetSessionAttributes preferredType, Func stateValidator, int poolIndex, IList exceptions, CancellationToken cancellationToken) { var pools = _pools; for (var i = 0; i < pools.Length; i++) { var pool = pools[poolIndex]; poolIndex++; if (poolIndex == pools.Length) poolIndex = 0; var databaseState = pool.GetDatabaseState(); if (!stateValidator(databaseState, preferredType)) continue; NpgsqlConnector? connector = null; try { connector = await pool.Get(conn, new NpgsqlTimeout(timeoutPerHost), async, cancellationToken).ConfigureAwait(false); if (databaseState == DatabaseState.Unknown) { // Get might have opened a new physical connection and refreshed the database state, check again databaseState = pool.GetDatabaseState(); if (databaseState == DatabaseState.Unknown) databaseState = await connector.QueryDatabaseState(new NpgsqlTimeout(timeoutPerHost), async, cancellationToken).ConfigureAwait(false); Debug.Assert(databaseState != DatabaseState.Unknown); if (!stateValidator(databaseState, preferredType)) { pool.Return(connector); continue; } } return connector; } catch (Exception ex) { exceptions.Add(ex); if (connector is not null) pool.Return(connector); } } return null; } internal override async ValueTask Get( NpgsqlConnection conn, NpgsqlTimeout timeout, bool async, CancellationToken cancellationToken) { CheckDisposed(); var exceptions = new List(); var poolIndex = conn.Settings.LoadBalanceHosts ? GetRoundRobinIndex() : 0; var timeoutPerHost = timeout.IsSet ? timeout.CheckAndGetTimeLeft() : TimeSpan.Zero; var preferredType = GetTargetSessionAttributes(conn); var checkUnpreferred = preferredType is TargetSessionAttributes.PreferPrimary or TargetSessionAttributes.PreferStandby; var connector = await TryGetIdleOrNew(conn, timeoutPerHost, async, preferredType, IsPreferred, poolIndex, exceptions, cancellationToken).ConfigureAwait(false) ?? (checkUnpreferred ? await TryGetIdleOrNew(conn, timeoutPerHost, async, preferredType, IsOnline, poolIndex, exceptions, cancellationToken).ConfigureAwait(false) : null) ?? await TryGet(conn, timeoutPerHost, async, preferredType, IsPreferred, poolIndex, exceptions, cancellationToken).ConfigureAwait(false) ?? (checkUnpreferred ? await TryGet(conn, timeoutPerHost, async, preferredType, IsOnline, poolIndex, exceptions, cancellationToken).ConfigureAwait(false) : null); return connector ?? throw NoSuitableHostsException(exceptions); } static NpgsqlException NoSuitableHostsException(IList exceptions) { return exceptions.Count == 0 ? new NpgsqlException("No suitable host was found.") : exceptions[0] is PostgresException firstException && AllEqual(firstException, exceptions) ? firstException : new NpgsqlException("Unable to connect to a suitable host. Check inner exception for more details.", new AggregateException(exceptions)); static bool AllEqual(PostgresException first, IList exceptions) { foreach (var x in exceptions) if (x is not PostgresException ex || ex.SqlState != first.SqlState) return false; return true; } } int GetRoundRobinIndex() { while (true) { var index = Interlocked.Increment(ref _roundRobinIndex); if (index >= 0) return index % _pools.Length; // Worst case scenario - we've wrapped around integer counter if (index == int.MinValue) { // This is the thread which wrapped around the counter - reset it to 0 _roundRobinIndex = 0; return 0; } // This is not the thread which wrapped around the counter - just wait until it's 0 or more var sw = new SpinWait(); while (_roundRobinIndex < 0) sw.SpinOnce(); } } internal override void Return(NpgsqlConnector connector) => throw new NpgsqlException("Npgsql bug: a connector was returned to " + nameof(NpgsqlMultiHostDataSource)); internal override bool TryGetIdleConnector([NotNullWhen(true)] out NpgsqlConnector? connector) => throw new NpgsqlException("Npgsql bug: trying to get an idle connector from " + nameof(NpgsqlMultiHostDataSource)); internal override ValueTask OpenNewConnector(NpgsqlConnection conn, NpgsqlTimeout timeout, bool async, CancellationToken cancellationToken) => throw new NpgsqlException("Npgsql bug: trying to open a new connector from " + nameof(NpgsqlMultiHostDataSource)); /// public override void Clear() { foreach (var pool in _pools) pool.Clear(); } /// /// Clears the database state (primary, secondary, offline...) for all data sources managed by this multi-host data source. /// Can be useful to make Npgsql retry a PostgreSQL instance which was previously detected to be offline. /// public void ClearDatabaseStates() { foreach (var pool in _pools) { pool.UpdateDatabaseState(default, default, default, ignoreTimeStamp: true); } } internal override (int Total, int Idle, int Busy) Statistics { get { var numConnectors = 0; var idleCount = 0; foreach (var pool in _pools) { var stat = pool.Statistics; numConnectors += stat.Total; idleCount += stat.Idle; } return (numConnectors, idleCount, numConnectors - idleCount); } } internal override bool TryRentEnlistedPending( Transaction transaction, NpgsqlConnection connection, [NotNullWhen(true)] out NpgsqlConnector? connector) { lock (_pendingEnlistedConnectors) { if (!_pendingEnlistedConnectors.TryGetValue(transaction, out var list)) { connector = null; return false; } var preferredType = GetTargetSessionAttributes(connection); // First try to get a valid preferred connector. if (TryGetValidConnector(list, preferredType, IsPreferred, out connector)) { return true; } // Can't get valid preferred connector. Try to get an unpreferred connector, if supported. if ((preferredType == TargetSessionAttributes.PreferPrimary || preferredType == TargetSessionAttributes.PreferStandby) && TryGetValidConnector(list, preferredType, IsOnline, out connector)) { return true; } connector = null; return false; } bool TryGetValidConnector(List list, TargetSessionAttributes preferredType, Func validationFunc, [NotNullWhen(true)] out NpgsqlConnector? connector) { for (var i = list.Count - 1; i >= 0; i--) { connector = list[i]; var lastKnownState = connector.DataSource.GetDatabaseState(ignoreExpiration: true); Debug.Assert(lastKnownState != DatabaseState.Unknown); if (validationFunc(lastKnownState, preferredType)) { list.RemoveAt(i); if (list.Count == 0) _pendingEnlistedConnectors.Remove(transaction); return true; } } connector = null; return false; } } static TargetSessionAttributes GetTargetSessionAttributes(NpgsqlConnection connection) => connection.Settings.TargetSessionAttributesParsed ?? (PostgresEnvironment.TargetSessionAttributes is { } s ? NpgsqlConnectionStringBuilder.ParseTargetSessionAttributes(s.ToLowerInvariant()) : TargetSessionAttributes.Any); }
X Tutup