X Tutup
// Copyright (C) 2002 The Npgsql Development Team // npgsql-general@gborg.postgresql.org // http://gborg.postgresql.org/project/npgsql/projdisplay.php // // Permission to use, copy, modify, and distribute this software and its // documentation for any purpose, without fee, and without a written // agreement is hereby granted, provided that the above copyright notice // and this paragraph and the following two paragraphs appear in all copies. // // IN NO EVENT SHALL THE NPGSQL DEVELOPMENT TEAM BE LIABLE TO ANY PARTY // FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, // INCLUDING LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS // DOCUMENTATION, EVEN IF THE NPGSQL DEVELOPMENT TEAM HAS BEEN ADVISED OF // THE POSSIBILITY OF SUCH DAMAGE. // // THE NPGSQL DEVELOPMENT TEAM SPECIFICALLY DISCLAIMS ANY WARRANTIES, // INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY // AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS // ON AN "AS IS" BASIS, AND THE NPGSQL DEVELOPMENT TEAM HAS NO OBLIGATIONS // TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. // // ConnectorPool.cs // ------------------------------------------------------------------ // Status // 0.00.0000 - 06/17/2002 - ulrich sprick - creation // - 05/??/2004 - Glen Parker rewritten using // System.Queue. using System; using System.Collections.Generic; using System.Data; using System.Threading; using System.Timers; using Timer = System.Timers.Timer; namespace Npgsql { /// /// This class manages all connector objects, pooled AND non-pooled. /// internal class NpgsqlConnectorPool { /// /// A queue with an extra Int32 for keeping track of busy connections. /// private class ConnectorQueue { /// /// Connections available to the end user /// public Queue Available = new Queue(); /// /// Connections currently in use /// public Dictionary Busy = new Dictionary(); public Int32 ConnectionLifeTime; public Int32 InactiveTime = 0; public Int32 MinPoolSize; } /// Unique static instance of the connector pool /// mamager. internal static NpgsqlConnectorPool ConnectorPoolMgr = new NpgsqlConnectorPool(); private object locker = new object(); public NpgsqlConnectorPool() { PooledConnectors = new Dictionary(); Timer = new Timer(1000); Timer.AutoReset = false; Timer.Elapsed += new ElapsedEventHandler(TimerElapsedHandler); } private void StartTimer() { lock (locker) { Timer.Start(); } } private void TimerElapsedHandler(object sender, ElapsedEventArgs e) { NpgsqlConnector Connector; var activeConnectionsExist = false; lock (locker) { try { foreach (ConnectorQueue Queue in PooledConnectors.Values) { lock (Queue) { if (Queue.Available.Count > 0) { if (Queue.Available.Count + Queue.Busy.Count > Queue.MinPoolSize) { if (Queue.InactiveTime >= Queue.ConnectionLifeTime) { Int32 diff = Queue.Available.Count + Queue.Busy.Count - Queue.MinPoolSize; Int32 toBeClosed = (diff + 1) / 2; toBeClosed = Math.Min(toBeClosed, Queue.Available.Count); if (diff < 2) { diff = 2; } Queue.InactiveTime -= Queue.ConnectionLifeTime / (int)(Math.Log(diff) / Math.Log(2)); for (Int32 i = 0; i < toBeClosed; ++i) { Connector = Queue.Available.Dequeue(); Connector.Close(); } } else { Queue.InactiveTime++; } } else { Queue.InactiveTime = 0; } if (Queue.Available.Count > 0 || Queue.Busy.Count > 0) activeConnectionsExist = true; } else { Queue.InactiveTime = 0; } } } } finally { if (activeConnectionsExist) Timer.Start(); else Timer.Stop(); } } } /// Map of index to unused pooled connectors, avaliable to the /// next RequestConnector() call. /// This hashmap will be indexed by connection string. /// This key will hold a list of queues of pooled connectors available to be used. private readonly Dictionary PooledConnectors; /// Timer for tracking unused connections in pools. // I used System.Timers.Timer because of bad experience with System.Threading.Timer // on Windows - it's going mad sometimes and don't respect interval was set. private Timer Timer; /// /// Searches the pooled connector lists for a matching connector object or creates a new one. /// /// The NpgsqlConnection that is requesting /// the connector. Its ConnectionString will be used to search the /// pool for available connectors. /// A connector object. public NpgsqlConnector RequestConnector(NpgsqlConnection Connection) { NpgsqlConnector Connector; Int32 timeoutMilliseconds = Connection.Timeout * 1000; // No need for this lock anymore //lock (this) { Connector = GetPooledConnector(Connection); } while (Connector == null && timeoutMilliseconds > 0) { Int32 ST = timeoutMilliseconds > 1000 ? 1000 : timeoutMilliseconds; Thread.Sleep(ST); timeoutMilliseconds -= ST; //lock (this) { Connector = GetPooledConnector(Connection); } } if (Connector == null) { if (Connection.Timeout > 0) { throw new Exception("Timeout while getting a connection from pool."); } else { throw new Exception("Connection pool exceeds maximum size."); } } StartTimer(); return Connector; } private delegate void CleanUpConnectorDel(NpgsqlConnection Connection, NpgsqlConnector Connector); private void CleanUpConnectorMethod(NpgsqlConnection Connection, NpgsqlConnector Connector) { try { Connector.CurrentReader.Close(); Connector.CurrentReader = null; ReleaseConnector(Connection, Connector); } catch { } } private void CleanUpConnector(NpgsqlConnection Connection, NpgsqlConnector Connector) { new CleanUpConnectorDel(CleanUpConnectorMethod).BeginInvoke(Connection, Connector, null, null); } /// /// Releases a connector, possibly back to the pool for future use. /// /// /// Pooled connectors will be put back into the pool if there is room. /// /// Connection to which the connector is leased. /// The connector to release. public void ReleaseConnector(NpgsqlConnection Connection, NpgsqlConnector Connector) { //We can only clean up a connector with a reader if the current thread hasn't been aborted //If it has then we need to just close it (ReleasePooledConnector will do this for an aborted thread) if (Connector.CurrentReader != null && (Thread.CurrentThread.ThreadState & (ThreadState.Aborted | ThreadState.AbortRequested)) == 0) { CleanUpConnector(Connection, Connector); } else { //lock (this) { UngetConnector(Connection, Connector); } } } /// /// Find an available pooled connector in the pool, or create a new one if none found. /// private NpgsqlConnector GetPooledConnector(NpgsqlConnection Connection) { ConnectorQueue Queue = null; NpgsqlConnector Connector = null; do { if (Connector != null) { //This means Connector was found to be invalid at the end of the loop lock (Queue) { Queue.Busy.Remove(Connector); } Connector.Close(); Connector = null; } // We only need to lock all pools when trying to get one pool or create one. lock (locker) { // Try to find a queue. if (!PooledConnectors.TryGetValue(Connection.ConnectionString, out Queue)) { Queue = new ConnectorQueue(); Queue.ConnectionLifeTime = Connection.ConnectionLifeTime; Queue.MinPoolSize = Connection.MinPoolSize; PooledConnectors[Connection.ConnectionString] = Queue; } } // Now we can simply lock on the pool itself. lock (Queue) { if (Queue.Available.Count > 0) { // Found a queue with connectors. Grab the top one. // Check if the connector is still valid. Connector = Queue.Available.Dequeue(); Queue.Busy.Add(Connector, null); } } } while (Connector != null && !Connector.IsValid()); if (Connector != null) return Connector; lock (Queue) { if (Queue.Available.Count + Queue.Busy.Count < Connection.MaxPoolSize) { Connector = new NpgsqlConnector(Connection); Queue.Busy.Add(Connector, null); } } if (Connector != null) { Connector.ProvideClientCertificatesCallback += Connection.ProvideClientCertificatesCallbackDelegate; Connector.CertificateSelectionCallback += Connection.CertificateSelectionCallbackDelegate; Connector.CertificateValidationCallback += Connection.CertificateValidationCallbackDelegate; Connector.PrivateKeySelectionCallback += Connection.PrivateKeySelectionCallbackDelegate; Connector.ValidateRemoteCertificateCallback += Connection.ValidateRemoteCertificateCallbackDelegate; try { Connector.Open(); } catch { lock (Queue) { Queue.Busy.Remove(Connector); } Connector.Close(); throw; } // Meet the MinPoolSize requirement if needed. if (Connection.MinPoolSize > 1) { lock (Queue) { while (Queue.Available.Count + Queue.Busy.Count < Connection.MinPoolSize) { NpgsqlConnector Spare = new NpgsqlConnector(Connection); Spare.ProvideClientCertificatesCallback += Connection.ProvideClientCertificatesCallbackDelegate; Spare.CertificateSelectionCallback += Connection.CertificateSelectionCallbackDelegate; Spare.CertificateValidationCallback += Connection.CertificateValidationCallbackDelegate; Spare.PrivateKeySelectionCallback += Connection.PrivateKeySelectionCallbackDelegate; Spare.ValidateRemoteCertificateCallback += Connection.ValidateRemoteCertificateCallbackDelegate; Spare.Open(); Spare.ProvideClientCertificatesCallback -= Connection.ProvideClientCertificatesCallbackDelegate; Spare.CertificateSelectionCallback -= Connection.CertificateSelectionCallbackDelegate; Spare.CertificateValidationCallback -= Connection.CertificateValidationCallbackDelegate; Spare.PrivateKeySelectionCallback -= Connection.PrivateKeySelectionCallbackDelegate; Spare.ValidateRemoteCertificateCallback -= Connection.ValidateRemoteCertificateCallbackDelegate; Queue.Available.Enqueue(Spare); } } } } return Connector; } /// /// Put a pooled connector into the pool queue. /// /// Connection is leased to. /// Connector to pool private void UngetConnector(NpgsqlConnection Connection, NpgsqlConnector Connector) { ConnectorQueue queue; // Find the queue. // As we are handling all possible queues, we have to lock everything... lock (locker) { PooledConnectors.TryGetValue(Connection.ConnectionString, out queue); } if (queue == null) { Connector.Close(); // Release connection to postgres return; // Queue may be emptied by connection problems. See ClearPool below. } Connector.ProvideClientCertificatesCallback -= Connection.ProvideClientCertificatesCallbackDelegate; Connector.CertificateSelectionCallback -= Connection.CertificateSelectionCallbackDelegate; Connector.CertificateValidationCallback -= Connection.CertificateValidationCallbackDelegate; Connector.PrivateKeySelectionCallback -= Connection.PrivateKeySelectionCallbackDelegate; Connector.ValidateRemoteCertificateCallback -= Connection.ValidateRemoteCertificateCallbackDelegate; /*bool inQueue = false; lock (queue) { inQueue = queue.Busy.ContainsKey(Connector); queue.Busy.Remove(Connector); } */ if (!Connector.IsConnected) { if (Connector.Transaction != null) { Connector.Transaction.Cancel(); } Connector.Close(); } else { if (Connector.Transaction != null) { try { Connector.Transaction.Rollback(); } catch { Connector.Close(); } } } bool inQueue = queue.Busy.ContainsKey(Connector); if (Connector.State == ConnectorState.Ready) { //If thread is good if ((Thread.CurrentThread.ThreadState & (ThreadState.Aborted | ThreadState.AbortRequested)) == 0) { // Release all resources associated with this connector. try { Connector.ReleaseResources(); } catch (Exception) { //If the connector fails to release its resources then it is probably broken, so make sure we don't add it to the queue. // Usually it already won't be in the queue as it would of broken earlier inQueue = false; Connector.Close(); } } else { //Thread is being aborted, this connection is possibly broken. So kill it rather than returning it to the pool inQueue = false; Connector.Close(); } } // Check if Connector should return to the queue of available connectors. If not, this connector is invalid and should // only be removed from the busy queue which effectvely removes it from the pool. if (inQueue) lock (queue) { queue.Busy.Remove(Connector); queue.Available.Enqueue(Connector); } else lock (queue) { queue.Busy.Remove(Connector); } } private static void ClearQueue(ConnectorQueue Queue) { if (Queue == null) { return; } lock (Queue) { while (Queue.Available.Count > 0) { NpgsqlConnector connector = Queue.Available.Dequeue(); try { connector.Close(); } catch { // Maybe we should log something here to say we got an exception while closing connector? } } //Clear the busy list so that the current connections don't get re-added to the queue Queue.Busy.Clear(); } } internal void ClearPool(NpgsqlConnection Connection) { // Prevent multithread access to connection pool count. lock (locker) { ConnectorQueue queue; // Try to find a queue. if (PooledConnectors.TryGetValue(Connection.ConnectionString, out queue)) { ClearQueue(queue); PooledConnectors.Remove(Connection.ConnectionString); } } } internal void ClearAllPools() { lock (locker) { foreach (ConnectorQueue Queue in PooledConnectors.Values) { ClearQueue(Queue); } PooledConnectors.Clear(); } } } }
X Tutup