X Tutup
// Copyright (C) 2002 The Npgsql Development Team // npgsql-general@gborg.postgresql.org // http://gborg.postgresql.org/project/npgsql/projdisplay.php // // Permission to use, copy, modify, and distribute this software and its // documentation for any purpose, without fee, and without a written // agreement is hereby granted, provided that the above copyright notice // and this paragraph and the following two paragraphs appear in all copies. // // IN NO EVENT SHALL THE NPGSQL DEVELOPMENT TEAM BE LIABLE TO ANY PARTY // FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, // INCLUDING LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS // DOCUMENTATION, EVEN IF THE NPGSQL DEVELOPMENT TEAM HAS BEEN ADVISED OF // THE POSSIBILITY OF SUCH DAMAGE. // // THE NPGSQL DEVELOPMENT TEAM SPECIFICALLY DISCLAIMS ANY WARRANTIES, // INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY // AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS // ON AN "AS IS" BASIS, AND THE NPGSQL DEVELOPMENT TEAM HAS NO OBLIGATIONS // TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. // // ConnectorPool.cs // ------------------------------------------------------------------ // Status // 0.00.0000 - 06/17/2002 - ulrich sprick - creation // - 05/??/2004 - Glen Parker rewritten using // System.Queue. using System; using System.Collections.Generic; using System.Data; using System.Diagnostics.Contracts; using System.Threading; using Npgsql.Logging; namespace Npgsql { /// /// This class manages all connector objects, pooled AND non-pooled. /// internal class NpgsqlConnectorPool { /// /// A queue with an extra Int32 for keeping track of busy connections. /// private class ConnectorQueue { /// /// Connections available to the end user /// public Queue Available = new Queue(); /// /// Connections currently in use /// public Dictionary Busy = new Dictionary(); public Int32 ConnectionLifeTime; public Int32 InactiveTime = 0; public Int32 MinPoolSize; } /// Unique static instance of the connector pool /// mamager. internal static NpgsqlConnectorPool ConnectorPoolMgr; /// /// Maximum number of possible connections in the pool. /// internal const int PoolSizeLimit = 1024; static readonly NpgsqlLogger Log = NpgsqlLogManager.GetCurrentClassLogger(); private object locker = new object(); public NpgsqlConnectorPool() { PooledConnectors = new Dictionary(); _timer = new Timer(TimerElapsedHandler, null, Timeout.Infinite, Timeout.Infinite); } private void StartTimer() { lock (locker) { _timer.Change(TimerInterval, Timeout.Infinite); } } private void TimerElapsedHandler(object sender) { NpgsqlConnector Connector; var activeConnectionsExist = false; lock (locker) { try { foreach (ConnectorQueue Queue in PooledConnectors.Values) { lock (Queue) { if (Queue.Available.Count > 0) { if (Queue.Available.Count + Queue.Busy.Count > Queue.MinPoolSize) { if (Queue.InactiveTime >= Queue.ConnectionLifeTime) { Int32 diff = Queue.Available.Count + Queue.Busy.Count - Queue.MinPoolSize; Int32 toBeClosed = (diff + 1) / 2; toBeClosed = Math.Min(toBeClosed, Queue.Available.Count); if (diff < 2) { diff = 2; } Queue.InactiveTime -= Queue.ConnectionLifeTime / (int)(Math.Log(diff) / Math.Log(2)); for (Int32 i = 0; i < toBeClosed; ++i) { Connector = Queue.Available.Dequeue(); Connector.Close(); } } else { Queue.InactiveTime++; } } else { Queue.InactiveTime = 0; } if (Queue.Available.Count > 0 || Queue.Busy.Count > 0) activeConnectionsExist = true; } else { Queue.InactiveTime = 0; } } } } finally { if (activeConnectionsExist) _timer.Change(TimerInterval, Timeout.Infinite); } } } /// Map of index to unused pooled connectors, avaliable to the /// next RequestConnector() call. /// This hashmap will be indexed by connection string. /// This key will hold a list of queues of pooled connectors available to be used. private readonly Dictionary PooledConnectors; readonly Timer _timer; const int TimerInterval = 1000; /// /// Searches the pooled connector lists for a matching connector object or creates a new one. /// /// The NpgsqlConnection that is requesting /// the connector. Its ConnectionString will be used to search the /// pool for available connectors. /// A connector object. public NpgsqlConnector RequestConnector(NpgsqlConnection connection) { if (connection.MaxPoolSize < connection.MinPoolSize) throw new ArgumentException(string.Format("Connection can't have MaxPoolSize {0} under MinPoolSize {1}", connection.MaxPoolSize, connection.MinPoolSize)); Contract.Ensures(Contract.Result().State == ConnectorState.Ready, "Pool returned a connector with state "); NpgsqlConnector connector; Int32 timeoutMilliseconds = connection.Timeout * 1000; // No need for this lock anymore //lock (this) { connector = GetPooledConnector(connection); } while (connector == null && timeoutMilliseconds > 0) { Int32 ST = timeoutMilliseconds > 1000 ? 1000 : timeoutMilliseconds; Thread.Sleep(ST); timeoutMilliseconds -= ST; //lock (this) { connector = GetPooledConnector(connection); } } if (connector == null) { if (connection.Timeout > 0) { throw new Exception("Timeout while getting a connection from pool."); } else { throw new Exception("Connection pool exceeds maximum size."); } } connector.Connection = connection; StartTimer(); return connector; } /// /// Releases a connector, possibly back to the pool for future use. /// /// /// Pooled connectors will be put back into the pool if there is room. /// /// Connection to which the connector is leased. /// The connector to release. public async void ReleaseConnector(NpgsqlConnection connection, NpgsqlConnector connector) { //We can only clean up a connector with a reader if the current thread hasn't been aborted //If it has then we need to just close it (ReleasePooledConnector will do this for an aborted thread) if (connector.CurrentReader != null && (Thread.CurrentThread.ThreadState & (ThreadState.Aborted | ThreadState.AbortRequested)) == 0) { // Consume the open reader asynchronously, returning to the user immediately. // However, synchronously "fake close" the reader, this emits the closed event and causes // NpgsqlDataReader.IsClosed to return true try { await connector.CurrentReader.CloseImmediate(); } catch (Exception e) { Log.Warn("Error while performing async close on connector", e); } } UngetConnector(connection, connector); } /// /// Find an available pooled connector in the pool, or create a new one if none found. /// private NpgsqlConnector GetPooledConnector(NpgsqlConnection Connection) { ConnectorQueue Queue = null; NpgsqlConnector Connector = null; // We only need to lock all pools when trying to get one pool or create one. lock (locker) { // Try to find a queue. if (!PooledConnectors.TryGetValue(Connection.ConnectionString, out Queue)) { Queue = new ConnectorQueue(); Queue.ConnectionLifeTime = Connection.ConnectionLifeTime; Queue.MinPoolSize = Connection.MinPoolSize; PooledConnectors[Connection.ConnectionString] = Queue; } } // Now we can simply lock on the pool itself. lock (Queue) { if (Queue.Available.Count > 0) { // Found a queue with connectors. Grab the top one. // Check if the connector is still valid. Connector = Queue.Available.Dequeue(); Queue.Busy.Add(Connector, null); } } if (Connector != null) return Connector; lock (Queue) { if (Queue.Available.Count + Queue.Busy.Count < Connection.MaxPoolSize) { Connector = new NpgsqlConnector(Connection); Queue.Busy.Add(Connector, null); } } if (Connector != null) { Connector.ProvideClientCertificatesCallback = Connection.ProvideClientCertificatesCallback; Connector.UserCertificateValidationCallback = Connection.UserCertificateValidationCallback; try { Connector.Open(); } catch { Contract.Assert(Connector.IsBroken); lock (Queue) { Queue.Busy.Remove(Connector); } throw; } // Meet the MinPoolSize requirement if needed. if (Connection.MinPoolSize > 1) { lock (Queue) { while (Queue.Available.Count + Queue.Busy.Count < Connection.MinPoolSize) { NpgsqlConnector spare = new NpgsqlConnector(Connection) { ProvideClientCertificatesCallback = Connection.ProvideClientCertificatesCallback, UserCertificateValidationCallback = Connection.UserCertificateValidationCallback }; spare.Open(); spare.ProvideClientCertificatesCallback = null; spare.UserCertificateValidationCallback = null; spare.Connection = null; Queue.Available.Enqueue(spare); } } } } return Connector; } /// /// Put a pooled connector into the pool queue. /// /// Connection is leased to. /// Connector to pool private void UngetConnector(NpgsqlConnection Connection, NpgsqlConnector Connector) { Contract.Requires(Connector.IsReady || Connector.IsClosed || Connector.IsBroken); ConnectorQueue queue; // Find the queue. // As we are handling all possible queues, we have to lock everything... lock (locker) { PooledConnectors.TryGetValue(Connection.ConnectionString, out queue); } if (queue == null) { Connector.Close(); // Release connection to postgres return; // Queue may be emptied by connection problems. See ClearPool below. } /*bool inQueue = false; lock (queue) { inQueue = queue.Busy.ContainsKey(Connector); queue.Busy.Remove(Connector); } */ bool inQueue = queue.Busy.ContainsKey(Connector); if (Connector.IsBroken || Connector.IsClosed) { if (Connector.InTransaction) { Connector.ClearTransaction(); } Connector.Close(); inQueue = false; } else { Contract.Assert(Connector.IsReady); //If thread is good if ((Thread.CurrentThread.ThreadState & (ThreadState.Aborted | ThreadState.AbortRequested)) == 0) { // Release all resources associated with this connector. try { Connector.Reset(); } catch { Connector.Close(); inQueue = false; } } else { //Thread is being aborted, this connection is possibly broken. So kill it rather than returning it to the pool inQueue = false; Connector.Close(); } } // Check if Connector should return to the queue of available connectors. If not, this connector is invalid and should // only be removed from the busy queue which effectvely removes it from the pool. if (inQueue) lock (queue) { queue.Busy.Remove(Connector); queue.Available.Enqueue(Connector); } else lock (queue) { queue.Busy.Remove(Connector); } Connector.ProvideClientCertificatesCallback = null; Connector.UserCertificateValidationCallback = null; } private static void ClearQueue(ConnectorQueue Queue) { if (Queue == null) { return; } lock (Queue) { while (Queue.Available.Count > 0) { NpgsqlConnector connector = Queue.Available.Dequeue(); try { connector.Close(); } catch { // Maybe we should log something here to say we got an exception while closing connector? } } //Clear the busy list so that the current connections don't get re-added to the queue Queue.Busy.Clear(); } } internal void ClearPool(NpgsqlConnection Connection) { // Prevent multithread access to connection pool count. lock (locker) { ConnectorQueue queue; // Try to find a queue. if (PooledConnectors.TryGetValue(Connection.ConnectionString, out queue)) { ClearQueue(queue); PooledConnectors.Remove(Connection.ConnectionString); } } } internal void ClearAllPools() { lock (locker) { foreach (ConnectorQueue Queue in PooledConnectors.Values) { ClearQueue(Queue); } PooledConnectors.Clear(); } } static NpgsqlConnectorPool() { ConnectorPoolMgr = new NpgsqlConnectorPool(); #if !DNXCORE50 AppDomain.CurrentDomain.DomainUnload += (sender, args) => { Thread.Sleep(3); ConnectorPoolMgr.ClearAllPools(); }; AppDomain.CurrentDomain.ProcessExit += (sender, args) => { Thread.Sleep(3); ConnectorPoolMgr.ClearAllPools(); }; #endif } } }
X Tutup