#region License
// The PostgreSQL License
//
// Copyright (C) 2015 The Npgsql Development Team
//
// Permission to use, copy, modify, and distribute this software and its
// documentation for any purpose, without fee, and without a written
// agreement is hereby granted, provided that the above copyright notice
// and this paragraph and the following two paragraphs appear in all copies.
//
// IN NO EVENT SHALL THE NPGSQL DEVELOPMENT TEAM BE LIABLE TO ANY PARTY
// FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES,
// INCLUDING LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
// DOCUMENTATION, EVEN IF THE NPGSQL DEVELOPMENT TEAM HAS BEEN ADVISED OF
// THE POSSIBILITY OF SUCH DAMAGE.
//
// THE NPGSQL DEVELOPMENT TEAM SPECIFICALLY DISCLAIMS ANY WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
// ON AN "AS IS" BASIS, AND THE NPGSQL DEVELOPMENT TEAM HAS NO OBLIGATIONS
// TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
#endregion
using System;
using System.Collections.Generic;
using System.Data;
using System.Diagnostics.Contracts;
using System.Threading;
using Npgsql.Logging;
namespace Npgsql
{
///
/// This class manages all connector objects, pooled AND non-pooled.
///
internal class NpgsqlConnectorPool
{
///
/// A queue with an extra Int32 for keeping track of busy connections.
///
private class ConnectorQueue
{
///
/// Connections available to the end user
///
public Queue Available = new Queue();
///
/// Connections currently in use
///
public Dictionary Busy = new Dictionary();
public Int32 ConnectionLifeTime;
public Int32 InactiveTime = 0;
public Int32 MinPoolSize;
}
/// Unique static instance of the connector pool
/// mamager.
internal static NpgsqlConnectorPool ConnectorPoolMgr;
///
/// Maximum number of possible connections in the pool.
///
internal const int PoolSizeLimit = 1024;
private object locker = new object();
internal NpgsqlConnectorPool()
{
PooledConnectors = new Dictionary();
_timer = new Timer(TimerElapsedHandler, null, Timeout.Infinite, Timeout.Infinite);
}
private void StartTimer()
{
lock (locker)
{
_timer.Change(TimerInterval, Timeout.Infinite);
}
}
private void TimerElapsedHandler(object sender)
{
NpgsqlConnector Connector;
var activeConnectionsExist = false;
lock (locker)
{
try
{
foreach (ConnectorQueue Queue in PooledConnectors.Values)
{
lock (Queue)
{
if (Queue.Available.Count > 0)
{
if (Queue.Available.Count + Queue.Busy.Count > Queue.MinPoolSize)
{
if (Queue.InactiveTime >= Queue.ConnectionLifeTime)
{
Int32 diff = Queue.Available.Count + Queue.Busy.Count - Queue.MinPoolSize;
Int32 toBeClosed = (diff + 1) / 2;
toBeClosed = Math.Min(toBeClosed, Queue.Available.Count);
if (diff < 2)
{
diff = 2;
}
Queue.InactiveTime -= Queue.ConnectionLifeTime / (int)(Math.Log(diff) / Math.Log(2));
for (Int32 i = 0; i < toBeClosed; ++i)
{
Connector = Queue.Available.Dequeue();
Connector.Close();
}
}
else
{
Queue.InactiveTime++;
}
}
else
{
Queue.InactiveTime = 0;
}
if (Queue.Available.Count > 0 || Queue.Busy.Count > 0)
activeConnectionsExist = true;
}
else
{
Queue.InactiveTime = 0;
}
}
}
}
finally
{
if (activeConnectionsExist)
_timer.Change(TimerInterval, Timeout.Infinite);
}
}
}
/// Map of index to unused pooled connectors, avaliable to the
/// next RequestConnector() call.
/// This hashmap will be indexed by connection string.
/// This key will hold a list of queues of pooled connectors available to be used.
private readonly Dictionary PooledConnectors;
readonly Timer _timer;
const int TimerInterval = 1000;
///
/// Searches the pooled connector lists for a matching connector object or creates a new one.
///
/// The NpgsqlConnection that is requesting
/// the connector. Its ConnectionString will be used to search the
/// pool for available connectors.
/// A connector object.
internal NpgsqlConnector RequestConnector(NpgsqlConnection connection)
{
if (connection.MaxPoolSize < connection.MinPoolSize)
throw new ArgumentException(
$"Connection can't have MaxPoolSize {connection.MaxPoolSize} under MinPoolSize {connection.MinPoolSize}");
Contract.Ensures(Contract.Result().State == ConnectorState.Ready, "Pool returned a connector with state ");
NpgsqlConnector connector;
Int32 timeoutMilliseconds = connection.Timeout * 1000;
// No need for this lock anymore
//lock (this)
{
connector = GetPooledConnector(connection);
}
while (connector == null && timeoutMilliseconds > 0)
{
Int32 ST = timeoutMilliseconds > 1000 ? 1000 : timeoutMilliseconds;
Thread.Sleep(ST);
timeoutMilliseconds -= ST;
//lock (this)
{
connector = GetPooledConnector(connection);
}
}
if (connector == null)
{
if (connection.Timeout > 0)
{
throw new Exception("Timeout while getting a connection from pool.");
}
else
{
throw new Exception("Connection pool exceeds maximum size.");
}
}
connector.Connection = connection;
StartTimer();
return connector;
}
///
/// Find an available pooled connector in the pool, or create a new one if none found.
///
private NpgsqlConnector GetPooledConnector(NpgsqlConnection Connection)
{
ConnectorQueue Queue = null;
NpgsqlConnector Connector = null;
// We only need to lock all pools when trying to get one pool or create one.
lock (locker)
{
// Try to find a queue.
if (!PooledConnectors.TryGetValue(Connection.ConnectionString, out Queue))
{
Queue = new ConnectorQueue();
Queue.ConnectionLifeTime = Connection.ConnectionLifeTime;
Queue.MinPoolSize = Connection.MinPoolSize;
PooledConnectors[Connection.ConnectionString] = Queue;
}
}
// Now we can simply lock on the pool itself.
lock (Queue)
{
if (Queue.Available.Count > 0)
{
// Found a queue with connectors. Grab the top one.
// Check if the connector is still valid.
Connector = Queue.Available.Dequeue();
Queue.Busy.Add(Connector, null);
}
}
if (Connector != null) return Connector;
lock (Queue)
{
if (Queue.Available.Count + Queue.Busy.Count < Connection.MaxPoolSize)
{
Connector = new NpgsqlConnector(Connection);
Queue.Busy.Add(Connector, null);
}
}
if (Connector != null)
{
try
{
Connector.Open();
}
catch
{
Contract.Assert(Connector.IsBroken);
lock (Queue)
{
Queue.Busy.Remove(Connector);
}
throw;
}
// Meet the MinPoolSize requirement if needed.
if (Connection.MinPoolSize > 1)
{
lock (Queue)
{
while (Queue.Available.Count + Queue.Busy.Count < Connection.MinPoolSize)
{
NpgsqlConnector spare = new NpgsqlConnector(Connection);
spare.Open();
spare.Connection = null;
Queue.Available.Enqueue(spare);
}
}
}
}
return Connector;
}
///
/// Releases a connector, possibly back to the pool for future use.
///
///
/// Pooled connectors will be put back into the pool if there is room.
///
/// Connection to which the connector is leased.
/// The connector to release.
internal void ReleaseConnector(NpgsqlConnection connection, NpgsqlConnector connector)
{
Contract.Requires(connector.IsReady || connector.IsClosed || connector.IsBroken);
ConnectorQueue queue;
// Find the queue.
// As we are handling all possible queues, we have to lock everything...
lock (locker)
{
PooledConnectors.TryGetValue(connection.ConnectionString, out queue);
}
if (queue == null)
{
connector.Close(); // Release connection to postgres
return; // Queue may be emptied by connection problems. See ClearPool below.
}
/*bool inQueue = false;
lock (queue)
{
inQueue = queue.Busy.ContainsKey(Connector);
queue.Busy.Remove(Connector);
}
*/
bool inQueue = queue.Busy.ContainsKey(connector);
if (connector.IsBroken || connector.IsClosed)
{
if (connector.InTransaction)
{
connector.ClearTransaction();
}
connector.Close();
inQueue = false;
}
else
{
Contract.Assert(connector.IsReady);
//If thread is good
if ((Thread.CurrentThread.ThreadState & (ThreadState.Aborted | ThreadState.AbortRequested)) == 0)
{
// Release all resources associated with this connector.
try {
connector.Reset();
} catch {
connector.Close();
inQueue = false;
}
} else {
//Thread is being aborted, this connection is possibly broken. So kill it rather than returning it to the pool
inQueue = false;
connector.Close();
}
}
// Check if Connector should return to the queue of available connectors. If not, this connector is invalid and should
// only be removed from the busy queue which effectvely removes it from the pool.
if (inQueue)
lock (queue)
{
queue.Busy.Remove(connector);
queue.Available.Enqueue(connector);
}
else
lock (queue)
{
queue.Busy.Remove(connector);
}
}
private static void ClearQueue(ConnectorQueue Queue)
{
if (Queue == null)
{
return;
}
lock (Queue)
{
while (Queue.Available.Count > 0)
{
NpgsqlConnector connector = Queue.Available.Dequeue();
try
{
connector.Close();
}
catch
{
// Maybe we should log something here to say we got an exception while closing connector?
}
}
//Clear the busy list so that the current connections don't get re-added to the queue
Queue.Busy.Clear();
}
}
internal void ClearPool(NpgsqlConnection Connection)
{
// Prevent multithread access to connection pool count.
lock (locker)
{
ConnectorQueue queue;
// Try to find a queue.
if (PooledConnectors.TryGetValue(Connection.ConnectionString, out queue))
{
ClearQueue(queue);
PooledConnectors.Remove(Connection.ConnectionString);
}
}
}
internal void ClearAllPools()
{
lock (locker)
{
foreach (ConnectorQueue Queue in PooledConnectors.Values)
{
ClearQueue(Queue);
}
PooledConnectors.Clear();
}
}
static NpgsqlConnectorPool()
{
ConnectorPoolMgr = new NpgsqlConnectorPool();
#if NET45 || NET452 || DNX452
AppDomain.CurrentDomain.DomainUnload += (sender, args) => { Thread.Sleep(3); ConnectorPoolMgr.ClearAllPools(); };
AppDomain.CurrentDomain.ProcessExit += (sender, args) => { Thread.Sleep(3); ConnectorPoolMgr.ClearAllPools(); };
#endif
}
}
}