X Tutup
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; using System.Transactions; using Npgsql.Logging; using Npgsql.Util; namespace Npgsql { /// /// Connection pool for PostgreSQL physical connections. Attempts to allocate connections over MaxPoolSize will /// block until someone releases. Implementation is completely lock-free to avoid contention, and ensure FIFO /// for open attempts waiting (because the pool is at capacity). /// sealed class ConnectorPool : IDisposable { #region Implementation notes // General // // * When we're at capacity (Busy==Max) further open attempts wait until someone releases. // This must happen in FIFO (first to block on open is the first to release), otherwise some attempts may get // starved and time out. This is why we use a ConcurrentQueue. // * We must avoid a race condition whereby an open attempt starts waiting at the same time as another release // puts a connector back into the idle list. This would potentially make the waiter wait forever/time out. // // Rules // * You *only* create a new connector if Total < Max. // * You *only* go into waiting if Busy == Max (which also implies Idle == 0) #endregion Implementation notes #region Fields internal NpgsqlConnectionStringBuilder Settings { get; } /// /// Contains the connection string returned to the user from /// after the connection has been opened. Does not contain the password unless Persist Security Info=true. /// internal string UserFacingConnectionString { get; } readonly int _max; readonly int _min; readonly NpgsqlConnector?[] _idle; readonly NpgsqlConnector?[] _open; readonly ConcurrentQueue<(TaskCompletionSource TaskCompletionSource, bool IsAsync)> _waiting; [StructLayout(LayoutKind.Explicit)] internal struct PoolState { [FieldOffset(0)] public int Open; [FieldOffset(4)] public int Idle; [FieldOffset(0)] public long All; // Busy can actually be read and written non atomically, it would introduce a benign race // between readers of Busy and the writer(s), connector Close, when Idle is close to zero. // The writer would first decrement Open then Idle to prevent readers racing and concluding Busy == _max. // However with that order a race of the Idle read and decrement could happen, having readers read and // conclude Idle > 0, causing readers to loop for a non existent connector until Idle is also decremented. public void Deconstruct(out int open, out int idle, out int busy) { var copy = new PoolState { All = Volatile.Read(ref All) }; open = copy.Open; idle = copy.Idle; busy = copy.Open - copy.Idle; } public override string ToString() { var (open, idle, busy) = this; return $"[{open} total, {idle} idle, {busy} busy]"; } } // Mutable struct, do not make this readonly. internal PoolState State; /// /// Incremented every time this pool is cleared via or /// . Allows us to identify connections which were /// created before the clear. /// int _clearCounter; // TODO move all this out of the pool static readonly TimerCallback PruningTimerCallback = PruneIdleConnectors; readonly Timer _pruningTimer; readonly TimeSpan _pruningSamplingInterval; readonly int _pruningSampleSize; readonly int[] _pruningSamples; int _pruningSampleIndex; int _pruningMedianIndex; volatile bool _pruningTimerEnabled; /// /// Maximum number of possible connections in any pool. /// internal const int PoolSizeLimit = 1024; static readonly NpgsqlLogger Log = NpgsqlLogManager.CreateLogger(nameof(ConnectorPool)); #endregion internal ConnectorPool(NpgsqlConnectionStringBuilder settings, string connString) { Debug.Assert(PoolSizeLimit <= short.MaxValue, "PoolSizeLimit cannot be larger than short.MaxValue unless PoolState is refactored to hold larger values."); if (settings.MaxPoolSize < settings.MinPoolSize) throw new ArgumentException($"Connection can't have MaxPoolSize {settings.MaxPoolSize} under MinPoolSize {settings.MinPoolSize}"); Settings = settings; _max = settings.MaxPoolSize; _min = settings.MinPoolSize; UserFacingConnectionString = settings.PersistSecurityInfo ? connString : settings.ToStringWithoutPassword(); var connectionIdleLifetime = TimeSpan.FromSeconds(Settings.ConnectionIdleLifetime); _pruningSamplingInterval = TimeSpan.FromSeconds(Settings.ConnectionPruningInterval); if (connectionIdleLifetime < _pruningSamplingInterval) throw new ArgumentException($"Connection can't have ConnectionIdleLifetime {connectionIdleLifetime} under ConnectionPruningInterval {_pruningSamplingInterval}"); _pruningMedianIndex = Divide(_pruningSampleSize, 2); _pruningTimer = new Timer(PruningTimerCallback, this, Timeout.Infinite, Timeout.Infinite); _pruningSampleSize = Divide(Settings.ConnectionIdleLifetime, Settings.ConnectionPruningInterval); _pruningSamples = new int[_pruningSampleSize]; _pruningTimerEnabled = false; _idle = new NpgsqlConnector[_max]; _open = new NpgsqlConnector[_max]; _waiting = new ConcurrentQueue<(TaskCompletionSource TaskCompletionSource, bool IsAsync)>(); } [MethodImpl(MethodImplOptions.AggressiveInlining)] internal bool TryAllocateFast(NpgsqlConnection conn, [NotNullWhen(true)] out NpgsqlConnector? connector) { Counters.SoftConnectsPerSecond.Increment(); // Idle may indicate that there are idle connectors, with the subsequent scan failing to find any. // This can happen because of race conditions with Release(), which updates Idle before actually putting // the connector in the list, or because of other allocation attempts, which remove the connector from // the idle list before updating Idle. // Loop until either State.Idle is 0 or you manage to remove a connector. connector = null; var spinner = new SpinWait(); var idle = _idle; while (Volatile.Read(ref State.Idle) > 0) { for (var i = 0; connector == null && i < idle.Length; i++) { // First check without an Interlocked operation, it's faster if (Volatile.Read(ref idle[i]) == null) continue; // If we saw a connector in this slot, atomically exchange it with a null. // Either we get a connector out which we can use, or we get null because // someone has taken it in the meanwhile. Either way put a null in its place. connector = Interlocked.Exchange(ref idle[i], null); } if (connector == null) { spinner.SpinOnce(); continue; } Counters.NumberOfFreeConnections.Decrement(); // An connector could be broken because of a keepalive that occurred while it was // idling in the pool // TODO: Consider removing the pool from the keepalive code. The following branch is simply irrelevant // if keepalive isn't turned on. if (connector.IsBroken) { CloseConnector(connector, true); continue; } connector.Connection = conn; // We successfully extracted an idle connector, update state Counters.NumberOfActiveConnections.Increment(); Interlocked.Decrement(ref State.Idle); CheckInvariants(State); return true; } connector = null; return false; } internal async ValueTask AllocateLong(NpgsqlConnection conn, NpgsqlTimeout timeout, bool async, CancellationToken cancellationToken) { // No idle connector was found in the pool. // We now loop until one of three things happen: // 1. The pool isn't at max capacity (Open < Max), so we can create a new physical connection. // 2. The pool is at maximum capacity and there are no idle connectors (Open - Idle == Max), // so we enqueue an open attempt into the waiting queue, so that the next release will unblock it. // 3. An connector makes it into the idle list (race condition with another Release()). while (true) { NpgsqlConnector? connector; var (openCount, idleCount, busyCount) = State; if (openCount < _max) { // We're under the pool's max capacity, "allocate" a slot for a new physical connection. // Don't spin for this https://github.com/dotnet/coreclr/pull/21437 var prevOpenCount = openCount; while (true) { var currentOpenCount = prevOpenCount; prevOpenCount = Interlocked.CompareExchange(ref State.Open, currentOpenCount + 1, currentOpenCount); // Either we succeeded or someone else did and we're at max opens, break. if (prevOpenCount == currentOpenCount || prevOpenCount == _max) break; } // Restart the outer loop if we're at max opens. if (prevOpenCount == _max) continue; openCount = prevOpenCount + 1; try { // We've managed to increase the open counter, open a physical connections. connector = new NpgsqlConnector(conn) { ClearCounter = _clearCounter }; await connector.Open(timeout, async, cancellationToken); } catch { // Physical open failed, decrement the open and busy counter back down. conn.Connector = null; Interlocked.Decrement(ref State.Open); ReleaseOneWaiter(); throw; } // We immediately store the connector as well, assigning it an index // that will be used during the lifetime of the connector for both _idle and _open. for (var i = 0; i < _open.Length; i++) { if (Interlocked.CompareExchange(ref _open[i], connector, null) == null) { connector.PoolIndex = i; break; } } Debug.Assert(connector.PoolIndex != int.MaxValue); // Only when we are the ones that incremented openCount past _min Change the timer. if (openCount == _min) EnablePruning(); Counters.NumberOfPooledConnections.Increment(); Counters.NumberOfActiveConnections.Increment(); CheckInvariants(State); return connector; } if (busyCount == _max) { // Pool is exhausted. // Enqueue an allocate attempt into the waiting queue so that the next release will unblock us. var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _waiting.Enqueue((tcs, async)); // Scenario: pre-empted waiter // Say there's a pre-emption of the thread right between our State.Busy read and our Enqueue. // If that happens and the waiter queue is empty before we enqueue we couldn't signal to any // releases we are a new waiter, causing any to add their connectors back into the idle pool. // We do a correction for that right here after our own enqueue by re-checking Idle. // We also check Open as we may have raced a connector close. var (racedOpen, racedIdle, _) = State; if (racedIdle > 0 || racedOpen < _max) { // If setting this fails we have been raced to completion by a Release(). // Otherwise we have an idle connector or open slot to try and race to. if (tcs.TrySetCanceled()) continue; connector = tcs.Task.Result; // Our task completion may contain a null in order to unblock us, allowing us to try // allocating again. if (connector == null) continue; // Note that we don't update counters or any state since the connector is being // handed off from one open connection to another. connector.Connection = conn; return connector; } try { if (async) { if (timeout.IsSet) { // Use Task.Delay to implement the timeout, but cancel the timer if we actually // do complete successfully var delayCancellationToken = new CancellationTokenSource(); using (cancellationToken.Register(s => ((CancellationTokenSource)s!).Cancel(), delayCancellationToken)) { var timeLeft = timeout.TimeLeft; if (timeLeft <= TimeSpan.Zero || await Task.WhenAny(tcs.Task, Task.Delay(timeLeft, delayCancellationToken.Token)) != tcs.Task) { // Delay task completed first, either because of a user cancellation or an actual timeout cancellationToken.ThrowIfCancellationRequested(); throw new NpgsqlException( $"The connection pool has been exhausted, either raise MaxPoolSize (currently {_max}) or Timeout (currently {Settings.Timeout} seconds)"); } } delayCancellationToken.Cancel(); } else { using (cancellationToken.Register(s => ((TaskCompletionSource)s!).SetCanceled(), tcs)) await tcs.Task; } } else { if (timeout.IsSet) { var timeLeft = timeout.TimeLeft; if (timeLeft <= TimeSpan.Zero || !tcs.Task.Wait(timeLeft)) throw new NpgsqlException( $"The connection pool has been exhausted, either raise MaxPoolSize (currently {_max}) or Timeout (currently {Settings.Timeout} seconds)"); } else tcs.Task.Wait(); } } catch { // We're here if the timeout expired or the cancellation token was triggered. // Transition our Task to cancelled, so that the next time someone releases // a connection they'll skip over it. tcs.TrySetCanceled(); // There's still a chance of a race condition, whereby the task was transitioned to // completed in the meantime. if (tcs.Task.Status != TaskStatus.RanToCompletion) throw; } // Note that we don't update counters since the connector is being // handed off from one open connection to another. Debug.Assert(tcs.Task.IsCompleted); connector = tcs.Task.Result; if (connector == null) continue; connector.Connection = conn; return connector; } // We didn't create a new connector or start waiting, which means there's a new idle connector, // or we raced a connector close, loop again as we could potentially open a new connector. if (idleCount > 0 && TryAllocateFast(conn, out connector)) return connector; } // Cannot be here } internal void Release(NpgsqlConnector connector) { Counters.SoftDisconnectsPerSecond.Increment(); Counters.NumberOfActiveConnections.Decrement(); // If Clear/ClearAll has been been called since this connector was first opened, // throw it away. The same if it's broken (in which case CloseConnector is only // used to update state/perf counter). if (connector.ClearCounter < _clearCounter || connector.IsBroken) { CloseConnector(connector, false); return; } connector.Reset(); // If there are any pending waiters we hand the connector off to them directly. while (_waiting.TryDequeue(out var waitingOpenAttempt)) { var tcs = waitingOpenAttempt.TaskCompletionSource; // We have a pending waiter. "Complete" it, handing off the connector. if (tcs.TrySetResult(connector)) return; // If the open attempt timed out, the Task's state will be set to Canceled and our // TrySetResult fails. Try again. Debug.Assert(tcs.Task.IsCanceled); } // Scenario: pre-empted release // Right here between our check for waiters and our signalling decrement for storing // a connector there could have been a new waiter enqueueing, we compensate at the end. // If we're here, we put the connector back in the idle list // We increment Idle, any allocate that is racing us will not match Busy == _max // and will not enqueue but try to get our connector. Interlocked.Increment(ref State.Idle); Volatile.Write(ref _idle[connector.PoolIndex], connector); CheckInvariants(State); // Scenario: pre-empted release // We checked at the start of release if there were any waiters. // Unblock any new waiter that raced us by handing it a null result. // We try to complete exactly one waiter as long as there are any in the queue, if any came in at all. // The performance of trying this after each _idle release is fine as the queue is very uncontended. // In the .Net Core BCL, 3.0 as of writing, TryDequeue for the empty path is as fast as doing IsEmpty. ReleaseOneWaiter(); // Scenario: pre-empted waiter // Could have a pre-empted waiter, that didn't enqueue yet it wakes up right after // our correcting dequeue, it will do its own check after that Enqueue for Idle > 0. } void CloseConnector(NpgsqlConnector connector, bool wasIdle) { try { connector.Close(); } catch (Exception e) { Log.Warn("Exception while closing outdated connector", e, connector.Id); } _open[connector.PoolIndex] = null; int openCount; if (wasIdle) { var prevAll = Volatile.Read(ref State.All); var prevState = new PoolState { All = prevAll }; while (true) { var state = new PoolState { Open = prevState.Open - 1, Idle = prevState.Idle - 1 }; prevAll = Interlocked.CompareExchange(ref State.All, state.All, prevState.All); if (prevAll == prevState.All) break; prevState = new PoolState { All = prevAll }; } openCount = prevState.Open - 1; } else openCount = Interlocked.Decrement(ref State.Open); // Unblock a single waiter, if any, to get the slot that just opened up. ReleaseOneWaiter(); // Only turn off the timer one time, when it was this Close that brought Open back to _min. if (openCount == _min) DisablePruning(); Counters.NumberOfPooledConnections.Decrement(); CheckInvariants(State); } /// /// Dequeues a single waiter and signals that it should re-attempt to allocate again. Needed in various /// race conditions. /// void ReleaseOneWaiter() { while (_waiting.TryDequeue(out var waiter)) if (waiter.TaskCompletionSource.TrySetResult(null)) break; } // Manual reactivation of timer happens in callback void EnablePruning() { lock (_pruningTimer) { _pruningTimerEnabled = true; _pruningTimer.Change(_pruningSamplingInterval, Timeout.InfiniteTimeSpan); } } void DisablePruning() { lock (_pruningTimer) { _pruningTimer.Change(Timeout.Infinite, Timeout.Infinite); _pruningSampleIndex = 0; _pruningTimerEnabled = false; } } static void PruneIdleConnectors(object? state) { var pool = (ConnectorPool)state!; var samples = pool._pruningSamples; int toPrune; lock (pool._pruningTimer) { // Check if we might have been contending with DisablePruning. if (!pool._pruningTimerEnabled) return; var sampleIndex = pool._pruningSampleIndex; if (sampleIndex < pool._pruningSampleSize) { samples[sampleIndex] = pool.State.Idle; pool._pruningSampleIndex = sampleIndex + 1; pool._pruningTimer.Change(pool._pruningSamplingInterval, Timeout.InfiniteTimeSpan); return; } // Calculate median value for pruning, reset index and timer, and release the lock. Array.Sort(samples); toPrune = samples[pool._pruningMedianIndex]; pool._pruningSampleIndex = 0; pool._pruningTimer.Change(pool._pruningSamplingInterval, Timeout.InfiniteTimeSpan); } var idle = pool._idle; for (var i = 0; i < idle.Length; i++) { if (Volatile.Read(ref pool.State.Open) <= pool._min || toPrune == 0) return; var connector = Interlocked.Exchange(ref idle[i], null); if (connector == null) continue; toPrune -= 1; pool.CloseConnector(connector, true); } } internal void Clear() { for (var i = 0; i < _idle.Length; i++) { var connector = Interlocked.Exchange(ref _idle[i], null); if (connector != null) CloseConnector(connector, true); } _clearCounter++; } #region Pending Enlisted Connections internal void AddPendingEnlistedConnector(NpgsqlConnector connector, Transaction transaction) { lock (_pendingEnlistedConnectors) { if (!_pendingEnlistedConnectors.TryGetValue(transaction, out var list)) list = _pendingEnlistedConnectors[transaction] = new List(); list.Add(connector); } } internal void TryRemovePendingEnlistedConnector(NpgsqlConnector connector, Transaction transaction) { lock (_pendingEnlistedConnectors) { if (!_pendingEnlistedConnectors.TryGetValue(transaction, out var list)) return; list.Remove(connector); if (list.Count == 0) _pendingEnlistedConnectors.Remove(transaction); } } internal NpgsqlConnector? TryAllocateEnlistedPending(Transaction transaction) { lock (_pendingEnlistedConnectors) { if (!_pendingEnlistedConnectors.TryGetValue(transaction, out var list)) return null; var connector = list[list.Count - 1]; list.RemoveAt(list.Count - 1); if (list.Count == 0) _pendingEnlistedConnectors.Remove(transaction); return connector; } } // Note that while the dictionary is thread-safe, we assume that the lists it contains don't need to be // (i.e. access to connectors of a specific transaction won't be concurrent) readonly Dictionary> _pendingEnlistedConnectors = new Dictionary>(); #endregion #region Misc static int Divide(int value, int divisor) => 1 + (value - 1) / divisor; [Conditional("DEBUG")] void CheckInvariants(PoolState state) { if (state.Open > _max) throw new NpgsqlException($"Pool is over capacity (Total={state.Open}, Max={_max})"); if (state.Open < 0) throw new NpgsqlException("Open is negative"); if (state.Idle < 0) throw new NpgsqlException("Idle is negative"); if (state.Open - state.Idle < 0) throw new NpgsqlException("Busy is negative"); } public void Dispose() => _pruningTimer?.Dispose(); public override string ToString() => State.ToString(); #endregion Misc } }
X Tutup