#region License
// The PostgreSQL License
//
// Copyright (C) 2015 The Npgsql Development Team
//
// Permission to use, copy, modify, and distribute this software and its
// documentation for any purpose, without fee, and without a written
// agreement is hereby granted, provided that the above copyright notice
// and this paragraph and the following two paragraphs appear in all copies.
//
// IN NO EVENT SHALL THE NPGSQL DEVELOPMENT TEAM BE LIABLE TO ANY PARTY
// FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES,
// INCLUDING LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
// DOCUMENTATION, EVEN IF THE NPGSQL DEVELOPMENT TEAM HAS BEEN ADVISED OF
// THE POSSIBILITY OF SUCH DAMAGE.
//
// THE NPGSQL DEVELOPMENT TEAM SPECIFICALLY DISCLAIMS ANY WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
// ON AN "AS IS" BASIS, AND THE NPGSQL DEVELOPMENT TEAM HAS NO OBLIGATIONS
// TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
#endregion
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Security;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using System.Threading.Tasks;
using AsyncRewriter;
using Npgsql.BackendMessages;
using Npgsql.FrontendMessages;
using Npgsql.TypeHandlers;
using NpgsqlTypes;
using Npgsql.Logging;
namespace Npgsql
{
///
/// Represents a connection to a PostgreSQL backend. Unlike NpgsqlConnection objects, which are
/// exposed to users, connectors are internal to Npgsql and are recycled by the connection pool.
///
internal partial class NpgsqlConnector
{
#region Fields and Properties
///
/// The physical connection socket to the backend.
///
Socket _socket;
///
/// The physical connection stream to the backend, without anything on top.
///
NetworkStream _baseStream;
///
/// The physical connection stream to the backend, layered with an SSL/TLS stream if in secure mode.
///
Stream _stream;
readonly NpgsqlConnectionStringBuilder _settings;
///
/// Contains the clear text password which was extracted from the user-provided connection string.
/// If non-cleartext authentication is requested from the server, this is set to null.
///
readonly string _password;
///
/// Buffer used for reading data.
///
internal NpgsqlBuffer Buffer { get; private set; }
///
/// Version of backend server this connector is connected to.
///
internal Version ServerVersion { get; set; }
///
/// The secret key of the backend for this connector, used for query cancellation.
///
int _backendSecretKey;
///
/// The process ID of the backend for this connector.
///
internal int BackendProcessId { get; private set; }
///
/// A unique ID identifying this connector, used for logging. Currently mapped to BackendProcessId
///
internal int Id { get { return BackendProcessId; } }
internal TypeHandlerRegistry TypeHandlerRegistry { get; set; }
///
/// The current transaction status for this connector.
///
internal TransactionStatus TransactionStatus { get; set; }
///
/// The transaction currently in progress, if any.
///
///
///
/// Note that this doesn't mean a transaction request has actually been sent to the backend - for
/// efficiency we defer sending the request to the first query after BeginTransaction is called.
/// See for the actual transaction status.
///
///
/// Also, the user can initiate a transaction in SQL (i.e. BEGIN), in which case there will be no
/// NpgsqlTransaction instance. As a result, never check to know whether
/// a transaction is in progress, check instead.
///
///
internal NpgsqlTransaction Transaction { get; set; }
///
/// The NpgsqlConnection that (currently) owns this connector. Null if the connector isn't
/// owned (i.e. idle in the pool)
///
internal NpgsqlConnection Connection { get; set; }
///
/// The number of messages that were prepended to the current message chain, but not yet sent.
/// Note that this only tracks messages which produce a ReadyForQuery message
///
byte _pendingRfqPrependedMessages;
///
/// The number of messages that were prepended and sent to the last message chain.
/// Note that this only tracks messages which produce a ReadyForQuery message
///
byte _sentRfqPrependedMessages;
///
/// A chain of messages to be sent to the backend.
///
readonly List _messagesToSend;
internal NpgsqlDataReader CurrentReader;
///
/// If the connector is currently in COPY mode, holds a reference to the importer/exporter object.
/// Otherwise null.
///
internal ICancelable CurrentCopyOperation;
///
/// Holds all run-time parameters received from the backend (via ParameterStatus messages)
///
internal Dictionary BackendParams;
#if !DNXCORE50
internal SSPIHandler SSPI { get; set; }
#endif
///
/// The frontend timeout for reading messages that are part of the user's command
/// (i.e. which aren't internal prepended commands).
///
internal int UserCommandFrontendTimeout { get; set; }
///
/// Contains the current value of the statement_timeout parameter at the backend,
/// used to determine whether we need to change it when commands are sent.
///
///
/// 0 means means no timeout.
/// -1 means that the current value is unknown.
///
int _backendTimeout;
///
/// Contains the current value of the socket's ReceiveTimeout, used to determine whether
/// we need to change it when commands are received.
///
int _frontendTimeout;
///
/// A lock that's taken while a user action is in progress, e.g. a command being executed.
///
SemaphoreSlim _userLock;
///
/// A lock that's taken while a non-user-triggered async action is in progress, e.g. handling of an
/// asynchronous notification or a connection keepalive. Does not get taken for a user async
/// action such as .
///
SemaphoreSlim _asyncLock;
///
/// A lock that's taken while a cancellation is being delivered; new queries are blocked until the
/// cancellation is delivered. This reduces the chance that a cancellation meant for a previous
/// command will accidentally cancel a later one, see #615.
///
readonly object _cancelLock;
readonly UserAction _userAction;
readonly Timer _keepAliveTimer;
static readonly byte[] EmptyBuffer = new byte[0];
static readonly NpgsqlLogger Log = NpgsqlLogManager.GetCurrentClassLogger();
#endregion
#region Constants
///
/// Number of seconds added as a margin to the backend timeout to yield the frontend timeout.
/// We prefer the backend to timeout - it's a clean error which doesn't break the connector.
///
const int FrontendTimeoutMargin = 3;
///
/// The minimum timeout that can be set on internal commands such as COMMIT, ROLLBACK.
///
internal const int MinimumInternalCommandTimeout = 3;
#endregion
#region Reusable Message Objects
// Frontend. Note that these are only used for single-query commands.
internal readonly ParseMessage ParseMessage = new ParseMessage();
internal readonly BindMessage BindMessage = new BindMessage();
internal readonly DescribeMessage DescribeMessage = new DescribeMessage();
internal readonly ExecuteMessage ExecuteMessage = new ExecuteMessage();
// Backend
readonly CommandCompleteMessage _commandCompleteMessage = new CommandCompleteMessage();
readonly ReadyForQueryMessage _readyForQueryMessage = new ReadyForQueryMessage();
readonly ParameterDescriptionMessage _parameterDescriptionMessage = new ParameterDescriptionMessage();
readonly DataRowSequentialMessage _dataRowSequentialMessage = new DataRowSequentialMessage();
readonly DataRowNonSequentialMessage _dataRowNonSequentialMessage = new DataRowNonSequentialMessage();
// Since COPY is rarely used, allocate these lazily
CopyInResponseMessage _copyInResponseMessage;
CopyOutResponseMessage _copyOutResponseMessage;
CopyDataMessage _copyDataMessage;
#endregion
#region Constructors
internal NpgsqlConnector(NpgsqlConnection connection)
: this(connection.Settings, connection.Password)
{
Connection = connection;
Connection.Connector = this;
}
///
/// Creates a new connector with the given connection string.
///
/// The connection string.
/// The clear-text password or null if not using a password.
NpgsqlConnector(NpgsqlConnectionStringBuilder connectionString, string password)
{
State = ConnectorState.Closed;
TransactionStatus = TransactionStatus.Idle;
_settings = connectionString;
_password = password;
BackendParams = new Dictionary();
_messagesToSend = new List();
_preparedStatementIndex = 0;
_portalIndex = 0;
_userLock = new SemaphoreSlim(1, 1);
_asyncLock = new SemaphoreSlim(1, 1);
_userAction = new UserAction(this);
_cancelLock = new object();
if (KeepAlive > 0) {
_keepAliveTimer = new Timer(PerformKeepAlive, null, Timeout.Infinite, Timeout.Infinite);
}
}
#endregion
#region Configuration settings
internal string ConnectionString { get { return _settings.ConnectionString; } }
internal string Host { get { return _settings.Host; } }
internal int Port { get { return _settings.Port; } }
internal string Database { get { return _settings.Database; } }
internal string UserName { get { return _settings.Username; } }
internal string KerberosServiceName { get { return _settings.KerberosServiceName; } }
internal SslMode SslMode { get { return _settings.SslMode; } }
internal bool UseSslStream { get { return _settings.UseSslStream; } }
internal int BufferSize { get { return _settings.BufferSize; } }
internal int ConnectionTimeout { get { return _settings.Timeout; } }
internal bool BackendTimeouts { get { return _settings.BackendTimeouts; } }
internal int KeepAlive { get { return _settings.KeepAlive; } }
internal bool Enlist { get { return _settings.Enlist; } }
internal bool IntegratedSecurity { get { return _settings.IntegratedSecurity; } }
internal bool ConvertInfinityDateTime { get { return _settings.ConvertInfinityDateTime; } }
internal bool ContinuousProcessing { get { return _settings.ContinuousProcessing; } }
internal int ActualInternalCommandTimeout
{
get
{
Contract.Ensures(Contract.Result() == 0 || Contract.Result() >= MinimumInternalCommandTimeout);
var internalTimeout = _settings.InternalCommandTimeout;
if (internalTimeout == -1) {
return Math.Max(_settings.CommandTimeout, MinimumInternalCommandTimeout);
}
Contract.Assert(internalTimeout == 0 || internalTimeout >= MinimumInternalCommandTimeout);
return internalTimeout;
}
}
#endregion Configuration settings
#region State management
int _state;
///
/// Gets the current state of the connector
///
internal ConnectorState State
{
get { return (ConnectorState)_state; }
set
{
var newState = (int)value;
if (newState == _state)
return;
Interlocked.Exchange(ref _state, newState);
}
}
///
/// Returns whether the connector is open, regardless of any task it is currently performing
///
internal bool IsConnected
{
get
{
switch (State)
{
case ConnectorState.Ready:
case ConnectorState.Executing:
case ConnectorState.Fetching:
case ConnectorState.Copy:
return true;
case ConnectorState.Closed:
case ConnectorState.Connecting:
case ConnectorState.Broken:
return false;
default:
throw new ArgumentOutOfRangeException("State", "Unknown state: " + State);
}
}
}
///
/// Returns whether the connector is open and performing a task, i.e. not ready for a query
///
internal bool IsBusy
{
get
{
switch (State)
{
case ConnectorState.Executing:
case ConnectorState.Fetching:
case ConnectorState.Copy:
return true;
case ConnectorState.Ready:
case ConnectorState.Closed:
case ConnectorState.Connecting:
case ConnectorState.Broken:
return false;
default:
throw new ArgumentOutOfRangeException("State", "Unknown state: " + State);
}
}
}
internal bool IsReady { get { return State == ConnectorState.Ready; } }
internal bool IsClosed { get { return State == ConnectorState.Closed; } }
internal bool IsBroken { get { return State == ConnectorState.Broken; } }
#endregion
#region Open
///
/// Totally temporary until the connection pool is rewritten with timeout support
///
internal void Open()
{
Open(new NpgsqlTimeout(TimeSpan.Zero));
}
///
/// Opens the physical connection to the server.
///
/// Usually called by the RequestConnector
/// Method of the connection pool manager.
[RewriteAsync]
internal void Open(NpgsqlTimeout timeout)
{
Contract.Requires(Connection != null && Connection.Connector == this);
Contract.Requires(State == ConnectorState.Closed);
State = ConnectorState.Connecting;
try {
RawOpen(timeout);
WriteStartupMessage();
Buffer.Flush();
timeout.Check();
HandleAuthentication(timeout);
TypeHandlerRegistry.Setup(this, timeout);
State = ConnectorState.Ready;
if (ContinuousProcessing) {
HandleAsyncMessages();
}
}
catch
{
try { Break(); }
catch {
// ignored
}
throw;
}
}
void WriteStartupMessage()
{
var startupMessage = new StartupMessage();
startupMessage["client_encoding"] = "UTF8";
startupMessage["user"] = UserName;
if (!string.IsNullOrEmpty(Database))
{
startupMessage["database"] = Database;
}
if (!string.IsNullOrEmpty(_settings.ApplicationName))
{
startupMessage["application_name"] = _settings.ApplicationName;
}
if (!string.IsNullOrEmpty(_settings.SearchPath))
{
startupMessage["search_path"] = _settings.SearchPath;
}
if (_settings.BackendTimeouts && _settings.CommandTimeout != 0)
{
startupMessage["statement_timeout"] = (_settings.CommandTimeout * 1000).ToString();
_backendTimeout = _settings.CommandTimeout;
}
if (IsSecure && !IsRedshift)
{
startupMessage["ssl_renegotiation_limit"] = "0";
}
if (startupMessage.Length > Buffer.Size)
{ // Should really never happen, just in case
throw new Exception("Startup message bigger than buffer");
}
startupMessage.Write(Buffer);
}
[RewriteAsync]
public void RawOpen(NpgsqlTimeout timeout)
{
try
{
Connect(timeout);
Contract.Assert(_socket != null);
_baseStream = new NetworkStream(_socket, true);
_stream = _baseStream;
Buffer = new NpgsqlBuffer(_stream, BufferSize, PGUtil.UTF8Encoding);
if (SslMode == SslMode.Require || SslMode == SslMode.Prefer)
{
SSLRequestMessage.Instance.Write(Buffer);
Buffer.Flush();
Buffer.Ensure(1);
var response = (char)Buffer.ReadByte();
timeout.Check();
switch (response)
{
default:
throw new Exception(string.Format("Received unknown response {0} for SSLRequest (expecting S or N)", response));
case 'N':
if (SslMode == SslMode.Require)
{
throw new InvalidOperationException("SSL connection requested. No SSL enabled connection from this host is configured.");
}
break;
case 'S':
var clientCertificates = new X509CertificateCollection();
if (Connection.ProvideClientCertificatesCallback != null)
{
Connection.ProvideClientCertificatesCallback(clientCertificates);
}
RemoteCertificateValidationCallback certificateValidationCallback;
if (_settings.TrustServerCertificate)
{
certificateValidationCallback = (sender, certificate, chain, errors) => true;
}
else if (Connection.UserCertificateValidationCallback != null)
{
certificateValidationCallback = Connection.UserCertificateValidationCallback;
}
else
{
certificateValidationCallback = DefaultUserCertificateValidationCallback;
}
if (!UseSslStream)
{
#if DNXCORE50
throw new NotSupportedException("TLS implementation not yet supported with .NET Core, specify UseSslStream=true for now");
#else
var sslStream = new TlsClientStream.TlsClientStream(_stream);
sslStream.PerformInitialHandshake(Host, clientCertificates, certificateValidationCallback, false);
_stream = sslStream;
#endif
}
else
{
var sslStream = new SslStream(_stream, false, certificateValidationCallback);
sslStream.AuthenticateAsClient(Host, clientCertificates, SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12, false);
_stream = sslStream;
}
timeout.Check();
Buffer.Underlying = _stream;
IsSecure = true;
break;
}
}
Log.Debug(String.Format("Connected to {0}:{1}", Host, Port));
}
catch
{
if (_stream != null)
{
try { _stream.Dispose(); } catch {
// ignored
}
_stream = null;
}
if (_baseStream != null)
{
try { _baseStream.Dispose(); } catch {
// ignored
}
_baseStream = null;
}
if (_socket != null)
{
try { _socket.Dispose(); } catch {
// ignored
}
_socket = null;
}
throw;
}
}
void Connect(NpgsqlTimeout timeout)
{
#if DNXCORE50
// .NET Core doesn't appear to have sync DNS methods (yet?)
var ips = Dns.GetHostAddressesAsync(Host).Result;
#else
// Note that there aren't any timeoutable DNS methods, and we want to use sync-only
// methods (not to rely on any TP threads etc.)
var ips = Dns.GetHostAddresses(Host);
#endif
timeout.Check();
// Give each IP an equal share of the remaining time
var perIpTimeout = timeout.IsSet ? (int)((timeout.TimeLeft.Ticks / ips.Length) / 10) : -1;
for (var i = 0; i < ips.Length; i++)
{
Log.Trace("Attempting to connect to " + ips[i], Id);
var ep = new IPEndPoint(ips[i], Port);
var socket = new Socket(ep.AddressFamily, SocketType.Stream, ProtocolType.Tcp)
{
Blocking = false
};
try
{
try
{
socket.Connect(ep);
}
catch (SocketException e)
{
if (e.SocketErrorCode != SocketError.WouldBlock)
{
throw;
}
}
var write = new List { socket };
var error = new List { socket };
Socket.Select(null, write, error, perIpTimeout);
var errorCode = (int) socket.GetSocketOption(SocketOptionLevel.Socket, SocketOptionName.Error);
if (errorCode != 0) {
throw new SocketException((int)socket.GetSocketOption(SocketOptionLevel.Socket, SocketOptionName.Error));
}
if (!write.Any())
{
Log.Warn(string.Format("Timeout after {0} seconds when connecting to {1}",
new TimeSpan(perIpTimeout * 10).TotalSeconds, ips[i]));
try { socket.Dispose(); }
catch
{
// ignored
}
if (i == ips.Length - 1)
{
throw new TimeoutException();
}
continue;
}
socket.Blocking = true;
_socket = socket;
return;
}
catch (TimeoutException) { throw; }
catch
{
try { socket.Dispose(); }
catch
{
// ignored
}
Log.Warn("Failed to connect to " + ips[i]);
if (i == ips.Length - 1)
{
throw;
}
}
}
}
async Task ConnectAsync(NpgsqlTimeout timeout, CancellationToken cancellationToken)
{
// Note that there aren't any timeoutable or cancellable DNS methods
var ips = await Dns.GetHostAddressesAsync(Host).WithCancellation(cancellationToken);
// Give each IP an equal share of the remaining time
var perIpTimespan = timeout.IsSet ? new TimeSpan(timeout.TimeLeft.Ticks / ips.Length) : TimeSpan.Zero;
var perIpTimeout = timeout.IsSet ? new NpgsqlTimeout(perIpTimespan) : timeout;
for (var i = 0; i < ips.Length; i++)
{
Log.Trace("Attempting to connect to " + ips[i], Id);
var ep = new IPEndPoint(ips[i], Port);
var socket = new Socket(ep.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
var connectTask = Task.Factory.FromAsync(socket.BeginConnect, socket.EndConnect, ep, null);
try
{
try
{
await connectTask.WithCancellationAndTimeout(perIpTimeout, cancellationToken);
}
catch (OperationCanceledException)
{
#pragma warning disable 4014
// ReSharper disable once MethodSupportsCancellation
connectTask.ContinueWith(t => socket.Dispose());
#pragma warning restore 4014
if (timeout.HasExpired)
{
Log.Warn(string.Format("Timeout after {0} seconds when connecting to {1}",
perIpTimespan.TotalSeconds, ips[i]));
if (i == ips.Length - 1)
{
throw new TimeoutException();
}
continue;
}
// We're here if an actual cancellation was requested (not a timeout)
throw;
}
_socket = socket;
return;
}
catch (TimeoutException) { throw; }
catch (OperationCanceledException) { throw; }
catch
{
try { socket.Dispose(); }
catch
{
// ignored
}
Log.Warn("Failed to connect to " + ips[i]);
if (i == ips.Length - 1)
{
throw;
}
}
}
}
[RewriteAsync]
void HandleAuthentication(NpgsqlTimeout timeout)
{
Log.Debug("Authenticating...", Id);
while (true)
{
var msg = ReadSingleMessage();
timeout.Check();
switch (msg.Code)
{
case BackendMessageCode.AuthenticationRequest:
var passwordMessage = ProcessAuthenticationMessage((AuthenticationRequestMessage)msg);
if (passwordMessage != null)
{
passwordMessage.Write(Buffer);
Buffer.Flush();
timeout.Check();
}
continue;
case BackendMessageCode.BackendKeyData:
var backendKeyDataMsg = (BackendKeyDataMessage) msg;
BackendProcessId = backendKeyDataMsg.BackendProcessId;
_backendSecretKey = backendKeyDataMsg.BackendSecretKey;
continue;
case BackendMessageCode.ReadyForQuery:
State = ConnectorState.Ready;
return;
default:
throw new Exception("Unexpected message received while authenticating: " + msg.Code);
}
}
}
///
/// Performs a step in the PostgreSQL authentication protocol
///
/// A message read from the server, instructing us on the required response
/// a PasswordMessage to be sent, or null if authentication has completed successfully
PasswordMessage ProcessAuthenticationMessage(AuthenticationRequestMessage msg)
{
switch (msg.AuthRequestType)
{
case AuthenticationRequestType.AuthenticationOk:
return null;
case AuthenticationRequestType.AuthenticationCleartextPassword:
if (_password == null) {
throw new Exception("No password has been provided but the backend requires one (in cleartext)");
}
return PasswordMessage.CreateClearText(_password);
case AuthenticationRequestType.AuthenticationMD5Password:
if (_password == null) {
throw new Exception("No password has been provided but the backend requires one (in MD5)");
}
return PasswordMessage.CreateMD5(_password, UserName, ((AuthenticationMD5PasswordMessage)msg).Salt);
case AuthenticationRequestType.AuthenticationGSS:
if (!IntegratedSecurity) {
throw new Exception("GSS authentication but IntegratedSecurity not enabled");
}
#if DNXCORE50
throw new NotSupportedException("SSPI not yet supported in .NET Core");
#else
// For GSSAPI we have to use the supplied hostname
SSPI = new SSPIHandler(Host, KerberosServiceName, true);
return new PasswordMessage(SSPI.Continue(null));
#endif
case AuthenticationRequestType.AuthenticationSSPI:
if (!IntegratedSecurity) {
throw new Exception("SSPI authentication but IntegratedSecurity not enabled");
}
#if DNXCORE50
throw new NotSupportedException("SSPI not yet supported in .NET Core");
#else
SSPI = new SSPIHandler(Host, KerberosServiceName, false);
return new PasswordMessage(SSPI.Continue(null));
#endif
case AuthenticationRequestType.AuthenticationGSSContinue:
#if DNXCORE50
throw new NotSupportedException("SSPI not yet supported in .NET Core");
#else
var passwdRead = SSPI.Continue(((AuthenticationGSSContinueMessage)msg).AuthenticationData);
if (passwdRead.Length != 0)
{
return new PasswordMessage(passwdRead);
}
return null;
#endif
default:
throw new NotSupportedException(String.Format("Authentication method not supported (Received: {0})", msg.AuthRequestType));
}
}
#endregion
#region Frontend message processing
internal void AddMessage(FrontendMessage msg)
{
_messagesToSend.Add(msg);
}
///
/// Prepends a message to be sent at the beginning of the next message chain.
///
internal void PrependInternalMessage(FrontendMessage msg, bool withTimeout=true)
{
// Set backend timeout if needed.
if (withTimeout) {
PrependBackendTimeoutMessage(ActualInternalCommandTimeout);
}
if (msg is QueryMessage || msg is PregeneratedMessage || msg is SyncMessage)
{
// These messages produce a ReadyForQuery response, which we will be looking for when
// processing the message chain results
checked { _pendingRfqPrependedMessages++; }
}
_messagesToSend.Add(msg);
}
internal void PrependBackendTimeoutMessage(int timeout)
{
if (_backendTimeout == timeout || !BackendTimeouts) {
return;
}
_backendTimeout = timeout;
checked { _pendingRfqPrependedMessages++; }
switch (timeout) {
case 10:
_messagesToSend.Add(PregeneratedMessage.SetStmtTimeout10Sec);
return;
case 20:
_messagesToSend.Add(PregeneratedMessage.SetStmtTimeout20Sec);
return;
case 30:
_messagesToSend.Add(PregeneratedMessage.SetStmtTimeout30Sec);
return;
case 60:
_messagesToSend.Add(PregeneratedMessage.SetStmtTimeout60Sec);
return;
case 90:
_messagesToSend.Add(PregeneratedMessage.SetStmtTimeout90Sec);
return;
case 120:
_messagesToSend.Add(PregeneratedMessage.SetStmtTimeout120Sec);
return;
default:
_messagesToSend.Add(new QueryMessage(string.Format("SET statement_timeout = {0}", timeout * 1000)));
return;
}
}
[RewriteAsync]
internal void SendAllMessages()
{
if (!_messagesToSend.Any()) {
return;
}
// If a cancellation is in progress, wait for it to "complete" before proceeding (#615)
lock (_cancelLock) { }
_sentRfqPrependedMessages = _pendingRfqPrependedMessages;
_pendingRfqPrependedMessages = 0;
try
{
foreach (var msg in _messagesToSend)
{
SendMessage(msg);
}
Buffer.Flush();
}
catch
{
Break();
throw;
}
finally
{
_messagesToSend.Clear();
}
}
///
/// Sends a single frontend message, used for simple messages such as rollback, etc.
/// Note that additional prepend messages may be previously enqueued, and will be sent along
/// with this message.
///
///
internal void SendSingleMessage(FrontendMessage msg)
{
AddMessage(msg);
SendAllMessages();
}
[RewriteAsync]
void SendMessage(FrontendMessage msg)
{
Log.Trace(String.Format("Sending: {0}", msg), Id);
var asSimple = msg as SimpleFrontendMessage;
if (asSimple != null)
{
if (asSimple.Length > Buffer.WriteSpaceLeft)
{
Buffer.Flush();
}
Contract.Assume(Buffer.WriteSpaceLeft >= asSimple.Length);
asSimple.Write(Buffer);
return;
}
var asComplex = msg as ChunkingFrontendMessage;
if (asComplex != null)
{
var directBuf = new DirectBuffer();
while (!asComplex.Write(Buffer, ref directBuf))
{
Buffer.Flush();
// The following is an optimization hack for writing large byte arrays without passing
// through our buffer
if (directBuf.Buffer != null)
{
Buffer.Underlying.Write(directBuf.Buffer, directBuf.Offset, directBuf.Size == 0 ? directBuf.Buffer.Length : directBuf.Size);
directBuf.Buffer = null;
directBuf.Size = 0;
}
}
return;
}
throw PGUtil.ThrowIfReached();
}
#endregion
#region Backend message processing
[RewriteAsync]
internal IBackendMessage ReadSingleMessage(DataRowLoadingMode dataRowLoadingMode = DataRowLoadingMode.NonSequential, bool returnNullForAsyncMessage = false)
{
// First read the responses of any prepended messages.
// Exceptions shouldn't happen here, we break the connector if they do
if (_sentRfqPrependedMessages > 0)
{
try
{
SetFrontendTimeout(ActualInternalCommandTimeout);
while (_sentRfqPrependedMessages > 0)
{
var msg = DoReadSingleMessage(DataRowLoadingMode.Skip, isPrependedMessage: true);
if (msg is ReadyForQueryMessage)
{
_sentRfqPrependedMessages--;
}
}
}
catch
{
Break();
throw;
}
}
// Now read a non-prepended message
try
{
SetFrontendTimeout(UserCommandFrontendTimeout);
return DoReadSingleMessage(dataRowLoadingMode, returnNullForAsyncMessage);
}
catch (NpgsqlException)
{
if (CurrentReader != null)
{
// The reader cleanup will call EndUserAction
CurrentReader.Cleanup();
}
else
{
EndUserAction();
}
throw;
}
catch
{
Break();
throw;
}
}
[RewriteAsync]
IBackendMessage DoReadSingleMessage(DataRowLoadingMode dataRowLoadingMode = DataRowLoadingMode.NonSequential,
bool returnNullForAsyncMessage = false,
bool isPrependedMessage = false)
{
NpgsqlException error = null;
while (true)
{
var buf = Buffer;
Buffer.Ensure(5);
var messageCode = (BackendMessageCode) Buffer.ReadByte();
Contract.Assume(Enum.IsDefined(typeof(BackendMessageCode), messageCode), "Unknown message code: " + messageCode);
var len = Buffer.ReadInt32() - 4; // Transmitted length includes itself
if ((messageCode == BackendMessageCode.DataRow && dataRowLoadingMode != DataRowLoadingMode.NonSequential) ||
messageCode == BackendMessageCode.CopyData)
{
if (dataRowLoadingMode == DataRowLoadingMode.Skip)
{
Buffer.Skip(len);
continue;
}
}
else if (len > Buffer.ReadBytesLeft)
{
buf = buf.EnsureOrAllocateTemp(len);
}
var msg = ParseServerMessage(buf, messageCode, len, dataRowLoadingMode, isPrependedMessage);
switch (messageCode) {
case BackendMessageCode.ErrorResponse:
Contract.Assert(msg == null);
// An ErrorResponse is (almost) always followed by a ReadyForQuery. Save the error
// and throw it as an exception when the ReadyForQuery is received (next).
error = new NpgsqlException(buf);
if (State == ConnectorState.Connecting) {
// During the startup/authentication phase, an ErrorResponse isn't followed by
// an RFQ. Instead, the server closes the connection immediately
throw error;
}
continue;
case BackendMessageCode.ReadyForQuery:
if (error != null) {
throw error;
}
break;
// Asynchronous messages
case BackendMessageCode.NoticeResponse:
case BackendMessageCode.NotificationResponse:
case BackendMessageCode.ParameterStatus:
Contract.Assert(msg == null);
if (!returnNullForAsyncMessage) {
continue;
}
return null;
}
Contract.Assert(msg != null, "Message is null for code: " + messageCode);
return msg;
}
}
IBackendMessage ParseServerMessage(NpgsqlBuffer buf, BackendMessageCode code, int len, DataRowLoadingMode dataRowLoadingMode, bool isPrependedMessage)
{
switch (code)
{
case BackendMessageCode.RowDescription:
// TODO: Recycle
var rowDescriptionMessage = new RowDescriptionMessage();
return rowDescriptionMessage.Load(buf, TypeHandlerRegistry);
case BackendMessageCode.DataRow:
Contract.Assert(dataRowLoadingMode == DataRowLoadingMode.NonSequential || dataRowLoadingMode == DataRowLoadingMode.Sequential);
return dataRowLoadingMode == DataRowLoadingMode.Sequential
? _dataRowSequentialMessage.Load(buf)
: _dataRowNonSequentialMessage.Load(buf);
case BackendMessageCode.CompletedResponse:
return _commandCompleteMessage.Load(buf, len);
case BackendMessageCode.ReadyForQuery:
var rfq = _readyForQueryMessage.Load(buf);
if (!isPrependedMessage) {
// Transaction status on prepended messages should never be processed - it could be a timeout
// on a begin new transaction, or even a rollback enqueued from a previous connection (pooled).
ProcessNewTransactionStatus(rfq.TransactionStatusIndicator);
}
return rfq;
case BackendMessageCode.EmptyQueryResponse:
return EmptyQueryMessage.Instance;
case BackendMessageCode.ParseComplete:
return ParseCompleteMessage.Instance;
case BackendMessageCode.ParameterDescription:
return _parameterDescriptionMessage.Load(buf);
case BackendMessageCode.BindComplete:
return BindCompleteMessage.Instance;
case BackendMessageCode.NoData:
return NoDataMessage.Instance;
case BackendMessageCode.CloseComplete:
return CloseCompletedMessage.Instance;
case BackendMessageCode.ParameterStatus:
HandleParameterStatus(buf.ReadNullTerminatedString(), buf.ReadNullTerminatedString());
return null;
case BackendMessageCode.NoticeResponse:
FireNotice(new NpgsqlNotice(buf));
return null;
case BackendMessageCode.NotificationResponse:
FireNotification(new NpgsqlNotificationEventArgs(buf));
return null;
case BackendMessageCode.AuthenticationRequest:
var authType = (AuthenticationRequestType)buf.ReadInt32();
Log.Trace("Received AuthenticationRequest of type " + authType, Id);
switch (authType)
{
case AuthenticationRequestType.AuthenticationOk:
return AuthenticationOkMessage.Instance;
case AuthenticationRequestType.AuthenticationCleartextPassword:
return AuthenticationCleartextPasswordMessage.Instance;
case AuthenticationRequestType.AuthenticationMD5Password:
return AuthenticationMD5PasswordMessage.Load(buf);
case AuthenticationRequestType.AuthenticationGSS:
return AuthenticationGSSMessage.Instance;
case AuthenticationRequestType.AuthenticationSSPI:
return AuthenticationSSPIMessage.Instance;
case AuthenticationRequestType.AuthenticationGSSContinue:
return AuthenticationGSSContinueMessage.Load(buf, len);
default:
throw new NotSupportedException(String.Format("Authentication method not supported (Received: {0})", authType));
}
case BackendMessageCode.BackendKeyData:
return new BackendKeyDataMessage(buf);
case BackendMessageCode.CopyInResponse:
if (_copyInResponseMessage == null) {
_copyInResponseMessage = new CopyInResponseMessage();
}
return _copyInResponseMessage.Load(Buffer);
case BackendMessageCode.CopyOutResponse:
if (_copyOutResponseMessage == null) {
_copyOutResponseMessage = new CopyOutResponseMessage();
}
return _copyOutResponseMessage.Load(Buffer);
case BackendMessageCode.CopyData:
if (_copyDataMessage == null) {
_copyDataMessage = new CopyDataMessage();
}
return _copyDataMessage.Load(len);
case BackendMessageCode.CopyDone:
return CopyDoneMessage.Instance;
case BackendMessageCode.PortalSuspended:
throw new NotImplementedException("Unimplemented message: " + code);
case BackendMessageCode.ErrorResponse:
return null;
case BackendMessageCode.FunctionCallResponse:
// We don't use the obsolete function call protocol
throw new Exception("Unexpected backend message: " + code);
default:
throw PGUtil.ThrowIfReached("Unknown backend message code: " + code);
}
}
///
/// Given a user timeout in seconds, sets the socket's ReceiveTimeout (if needed).
/// Note that if backend timeouts are enabled, we add a few seconds of margin to allow
/// the backend timeout to happen first.
///
void SetFrontendTimeout(int userTimeout)
{
// TODO: Socket.ReceiveTimeout doesn't work for async.
int timeout;
if (userTimeout == 0)
timeout = 0;
else if (BackendTimeouts)
timeout = (userTimeout + FrontendTimeoutMargin) * 1000;
else
timeout = userTimeout * 1000;
if (timeout != _frontendTimeout) {
_socket.ReceiveTimeout = _frontendTimeout = timeout;
}
}
bool HasDataInBuffers
{
get
{
return Buffer.ReadBytesLeft > 0 ||
(_stream is NetworkStream && ((NetworkStream) _stream).DataAvailable)
#if !DNXCORE50
|| (_stream is TlsClientStream.TlsClientStream && ((TlsClientStream.TlsClientStream) _stream).HasBufferedReadData(false))
#endif
;
}
}
///
/// Reads and processes any messages that are already in our buffers (either Npgsql or TCP).
/// Handles asynchronous messages (Notification, Notice, ParameterStatus) that may after a
/// ReadyForQuery, as well as async notification mode.
///
void DrainBufferedMessages()
{
while (HasDataInBuffers)
{
var msg = ReadSingleMessage(DataRowLoadingMode.NonSequential, true);
if (msg != null)
{
Break();
throw new Exception(string.Format("Got unexpected non-async message with code {0} while draining: {1}", msg.Code, msg));
}
}
}
///
/// Reads backend messages and discards them, stopping only after a message of the given type has
/// been seen.
///
[RewriteAsync]
internal IBackendMessage SkipUntil(BackendMessageCode stopAt)
{
Contract.Requires(stopAt != BackendMessageCode.DataRow, "Shouldn't be used for rows, doesn't know about sequential");
while (true)
{
var msg = ReadSingleMessage(DataRowLoadingMode.Skip);
Contract.Assert(!(msg is DataRowMessage));
if (msg.Code == stopAt) {
return msg;
}
}
}
///
/// Reads backend messages and discards them, stopping only after a message of the given types has
/// been seen.
///
[RewriteAsync]
internal IBackendMessage SkipUntil(BackendMessageCode stopAt1, BackendMessageCode stopAt2)
{
Contract.Requires(stopAt1 != BackendMessageCode.DataRow, "Shouldn't be used for rows, doesn't know about sequential");
Contract.Requires(stopAt2 != BackendMessageCode.DataRow, "Shouldn't be used for rows, doesn't know about sequential");
while (true) {
var msg = ReadSingleMessage(DataRowLoadingMode.Skip);
Contract.Assert(!(msg is DataRowMessage));
if (msg.Code == stopAt1 || msg.Code == stopAt2) {
return msg;
}
}
}
///
/// Reads a single message, expecting it to be of type .
/// Any other message causes an exception to be raised and the connector to be broken.
/// Asynchronous messages (e.g. Notice) are treated and ignored. ErrorResponses raise an
/// exception but do not cause the connector to break.
///
internal T ReadExpecting() where T : class, IBackendMessage
{
var msg = ReadSingleMessage();
var asExpected = msg as T;
if (asExpected == null)
{
Break();
throw new Exception(string.Format("Received backend message {0} while expecting {1}. Please file a bug.",
msg.Code, typeof(T).Name));
}
return asExpected;
}
#endregion Backend message processing
#region Transactions
internal bool InTransaction
{
get
{
switch (TransactionStatus)
{
case TransactionStatus.Idle:
return false;
case TransactionStatus.Pending:
case TransactionStatus.InTransactionBlock:
case TransactionStatus.InFailedTransactionBlock:
return true;
default:
throw PGUtil.ThrowIfReached();
}
}
}
///
/// Handles a new transaction indicator received on a ReadyForQuery message
///
void ProcessNewTransactionStatus(TransactionStatus newStatus)
{
if (newStatus == TransactionStatus) { return; }
switch (newStatus) {
case TransactionStatus.Idle:
ClearTransaction();
break;
case TransactionStatus.InTransactionBlock:
case TransactionStatus.InFailedTransactionBlock:
break;
case TransactionStatus.Pending:
throw new Exception("Invalid TransactionStatus (should be frontend-only)");
default:
throw PGUtil.ThrowIfReached();
}
TransactionStatus = newStatus;
}
internal void ClearTransaction()
{
if (TransactionStatus == TransactionStatus.Idle) { return; }
// We may not have an NpgsqlTransaction for the transaction (i.e. user executed BEGIN)
if (Transaction != null)
{
Transaction.Connection = null;
Transaction = null;
}
TransactionStatus = TransactionStatus.Idle;
}
#endregion
#region Notifications
///
/// Occurs on NoticeResponses from the PostgreSQL backend.
///
internal event NoticeEventHandler Notice;
///
/// Occurs on NotificationResponses from the PostgreSQL backend.
///
internal event NotificationEventHandler Notification;
internal void FireNotice(NpgsqlNotice e)
{
var notice = Notice;
if (notice != null)
{
try
{
notice(this, new NpgsqlNoticeEventArgs(e));
}
catch
{
// Ignore all exceptions bubbling up from the user's event handler
}
}
}
internal void FireNotification(NpgsqlNotificationEventArgs e)
{
var notification = Notification;
if (notification != null)
{
try
{
notification(this, e);
}
catch
{
// Ignore all exceptions bubbling up from the user's event handler
}
}
}
#endregion Notifications
#region SSL
///
/// Returns whether SSL is being used for the connection
///
internal bool IsSecure { get; private set; }
static bool DefaultUserCertificateValidationCallback(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
{
return sslPolicyErrors == SslPolicyErrors.None;
}
#endregion SSL
#region Cancel
///
/// Creates another connector and sends a cancel request through it for this connector.
///
internal void CancelRequest()
{
lock (_cancelLock)
{
var cancelConnector = new NpgsqlConnector(_settings, _password);
cancelConnector.DoCancelRequest(BackendProcessId, _backendSecretKey, cancelConnector.ConnectionTimeout);
}
}
void DoCancelRequest(int backendProcessId, int backendSecretKey, int connectionTimeout)
{
Contract.Requires(State == ConnectorState.Closed);
Log.Debug("Performing cancel", Id);
try
{
RawOpen(new NpgsqlTimeout(TimeSpan.FromSeconds(connectionTimeout)));
SendSingleMessage(new CancelRequestMessage(backendProcessId, backendSecretKey));
Contract.Assert(Buffer.ReadPosition == 0);
// Now wait for the server to close the connection, better chance of the cancellation
// actually being delivered.
var count = _stream.Read(Buffer._buf, 0, 1);
if (count != -1)
{
Log.Error("Received response after sending cancel request, shouldn't happen! First byte: " + Buffer._buf[0]);
}
}
finally
{
Cleanup();
}
}
#endregion Cancel
#region Close / Reset
///
/// Closes the physical connection to the server.
///
internal void Close()
{
Log.Debug("Close connector", Id);
if (IsReady)
{
try { SendSingleMessage(TerminateMessage.Instance); }
catch (Exception e)
{
Log.Error("Exception while closing connector", e, Id);
Contract.Assert(IsBroken);
}
}
switch (State)
{
case ConnectorState.Broken:
case ConnectorState.Closed:
return;
}
State = ConnectorState.Closed;
Cleanup();
}
///
/// Called when an unexpected message has been received during an action. Breaks the
/// connector and returns the appropriate message.
///
internal Exception UnexpectedMessageReceived(BackendMessageCode received)
{
Break();
return new Exception(string.Format("Received unexpected backend message {0}. Please file a bug.", received));
}
internal void Break()
{
Contract.Requires(!IsClosed);
if (State == ConnectorState.Broken)
return;
Log.Trace("Break connector", Id);
var prevState = State;
State = ConnectorState.Broken;
var conn = Connection;
Cleanup();
// We have no connection if we're broken by a keepalive occuring while the connector is in the pool
if (conn != null)
{
if (prevState != ConnectorState.Connecting)
{
// A break during a connection attempt puts the connection in state closed, not broken
conn.WasBroken = true;
}
conn.ReallyClose();
}
}
///
/// Closes the socket and cleans up client-side resources associated with this connector.
///
void Cleanup()
{
Log.Trace("Cleanup connector", Id);
try { if (_stream != null) _stream.Dispose(); } catch {
// ignored
}
if (CurrentReader != null) {
CurrentReader.Command.State = CommandState.Idle;
try { CurrentReader.Close(); } catch {
// ignored
}
CurrentReader = null;
}
ClearTransaction();
_stream = null;
_baseStream = null;
Buffer = null;
Connection = null;
BackendParams.Clear();
ServerVersion = null;
_userLock.Dispose();
_userLock = null;
_asyncLock.Dispose();
_asyncLock = null;
}
///
/// Called when a pooled connection is closed, and its connector is returned to the pool.
/// Resets the connector back to its initial state, releasing server-side sources
/// (e.g. prepared statements), resetting parameters to their defaults, and resetting client-side
/// state
///
internal void Reset()
{
Contract.Requires(State == ConnectorState.Ready);
Connection = null;
switch (State)
{
case ConnectorState.Ready:
break;
case ConnectorState.Closed:
case ConnectorState.Broken:
Log.Warn(String.Format("Reset() called on connector with state {0}, ignoring", State), Id);
return;
case ConnectorState.Connecting:
case ConnectorState.Executing:
case ConnectorState.Fetching:
case ConnectorState.Copy:
throw new InvalidOperationException("Reset() called on connector with state " + State);
default:
throw PGUtil.ThrowIfReached();
}
if (IsInUserAction)
{
EndUserAction();
}
// If a begin transaction is pending (i.e. not yet sent to the server), remove it
if (PregeneratedMessage.BeginTransactionMessages.Contains(_messagesToSend.LastOrDefault()))
{
_messagesToSend.RemoveAt(_messagesToSend.Count - 1);
checked { _pendingRfqPrependedMessages--; }
ClearTransaction();
}
// If a DISCARD ALL is already pending (#736), don't reenqueue rollback/discard
var lastEnqueued = _messagesToSend.LastOrDefault();
if (lastEnqueued == PregeneratedMessage.DiscardAll || lastEnqueued == PregeneratedMessage.UnlistenAll)
{
return;
}
// Must rollback transaction before sending DISCARD ALL
if (InTransaction)
{
// If we're in a failed transaction we can't set the timeout
var withTimeout = TransactionStatus != TransactionStatus.InFailedTransactionBlock;
PrependInternalMessage(PregeneratedMessage.RollbackTransaction, withTimeout);
ClearTransaction();
}
if (SupportsDiscard)
{
PrependInternalMessage(PregeneratedMessage.DiscardAll);
}
else
{
PrependInternalMessage(PregeneratedMessage.UnlistenAll);
/*
TODO: Problem: we can't just deallocate for all the below since some may have already been deallocated
Not sure if we should keep supporting this for < 8.3. If so fix along with #483
if (_preparedStatementIndex > 0) {
for (var i = 1; i <= _preparedStatementIndex; i++) {
PrependMessage(new QueryMessage(String.Format("DEALLOCATE \"{0}{1}\";", PreparedStatementNamePrefix, i)));
}
}*/
_portalIndex = 0;
_preparedStatementIndex = 0;
}
// DISCARD ALL has reset statement_timeout to the default specified on the connection string.
if (_settings.BackendTimeouts)
{
_backendTimeout = _settings.CommandTimeout;
}
}
#endregion Close
#region Locking
internal IDisposable StartUserAction(ConnectorState newState=ConnectorState.Executing)
{
Contract.Requires(newState != ConnectorState.Ready);
Contract.Requires(newState != ConnectorState.Closed);
Contract.Requires(newState != ConnectorState.Broken);
Contract.Requires(newState != ConnectorState.Connecting);
Contract.Ensures(State == newState);
Contract.Ensures(IsInUserAction);
if (!_userLock.Wait(0))
{
switch (State) {
case ConnectorState.Closed:
case ConnectorState.Broken:
case ConnectorState.Connecting:
throw new InvalidOperationException("The connection is still connecting.");
case ConnectorState.Ready:
// Can happen in a race condition as the connector is transitioning to Ready
case ConnectorState.Executing:
case ConnectorState.Fetching:
throw new InvalidOperationException("An operation is already in progress.");
case ConnectorState.Copy:
throw new InvalidOperationException("A COPY operation is in progress and must complete first.");
default:
throw PGUtil.ThrowIfReached("Unknown connector state: " + State);
}
}
// If an async operation happens to be in progress (async notification, keepalive),
// wait until it's done
_asyncLock.Wait();
// The connection might not have been open, or the async operation which just finished
// may have led to the connection being broken
if (!IsConnected)
{
_asyncLock.Release();
_userLock.Release();
throw new InvalidOperationException();
}
// Disable keepalive
if (KeepAlive > 0) {
_keepAliveTimer.Change(Timeout.Infinite, Timeout.Infinite);
}
Contract.Assume(IsReady);
Contract.Assume(Buffer.ReadBytesLeft == 0, "The read buffer should be read completely before sending Parse message");
Contract.Assume(Buffer.WritePosition == 0, "WritePosition should be 0");
State = newState;
return _userAction;
}
internal void EndUserAction()
{
Contract.Requires(CurrentReader == null);
Contract.Ensures(!IsInUserAction);
Contract.EnsuresOnThrow(!IsInUserAction);
Contract.EnsuresOnThrow(!IsInUserAction);
if (!IsInUserAction)
{
// Allows us to call EndUserAction twice. This makes it easier to write code that
// always ends the user action with using(), whether an exception was thrown or not.
return;
}
if (!IsConnected)
{
// A breaking exception was thrown or the connector was closed
return;
}
try
{
// Asynchronous messages (Notification, Notice, ParameterStatus) may have arrived
// during the user action, and may be in our buffer. Since we have one buffer for
// both reading and writing, and since we want to process these messages early,
// we drain any remaining buffered messages.
DrainBufferedMessages();
if (KeepAlive > 0)
{
var keepAlive = KeepAlive*1000;
_keepAliveTimer.Change(keepAlive, keepAlive);
}
State = ConnectorState.Ready;
}
finally
{
if (_asyncLock != null) { _asyncLock.Release(); }
if (_userLock != null) { _userLock.Release(); }
}
}
internal bool IsInUserAction
{
get { return _userLock != null && _userLock.CurrentCount == 0; }
}
///
/// An IDisposable wrapper around and
/// .
///
class UserAction : IDisposable
{
readonly NpgsqlConnector _connector;
internal UserAction(NpgsqlConnector connector)
{
_connector = connector;
}
public void Dispose()
{
_connector.EndUserAction();
}
}
#endregion
#region Async message handling
internal async void HandleAsyncMessages()
{
try
{
while (true)
{
await _baseStream.ReadAsync(EmptyBuffer, 0, 0);
if (_asyncLock == null) {
return;
}
// If the semaphore is disposed while we're (async) waiting on it, the continuation
// never gets called and this method doesn't continue
await _asyncLock.WaitAsync();
try
{
DrainBufferedMessages();
}
finally
{
if (_asyncLock != null) {
_asyncLock.Release();
}
}
}
}
catch (Exception e)
{
Log.Error("Exception while handling async messages", e, Id);
}
}
#endregion
#region Keepalive
void PerformKeepAlive(object state)
{
if (_asyncLock == null) { return; }
if (!_asyncLock.Wait(0)) {
// The async semaphore has already been acquired, either by a user action,
// or an async notification being handled, or, improbably, by a previous keepalive.
// Whatever the case, exit immediately, no need to perform a keepalive.
return;
}
if (!IsConnected) { return; }
try
{
// Note: we can't use a regular command to execute the SQL query, because that would
// acquire the user lock (and prevent real user queries from running).
if (TransactionStatus != TransactionStatus.InFailedTransactionBlock)
{
PrependBackendTimeoutMessage(ActualInternalCommandTimeout);
}
SendSingleMessage(PregeneratedMessage.KeepAlive);
SkipUntil(BackendMessageCode.ReadyForQuery);
_asyncLock.Release();
}
catch (Exception e)
{
Log.Fatal("Keepalive failure", e, Id);
Break();
}
}
#endregion
#region Supported features
internal bool SupportsApplicationName { get; private set; }
internal bool SupportsExtraFloatDigits3 { get; private set; }
internal bool SupportsExtraFloatDigits { get; private set; }
internal bool SupportsSslRenegotiationLimit { get; private set; }
internal bool SupportsSavepoint { get; private set; }
internal bool SupportsDiscard { get; private set; }
internal bool SupportsEStringPrefix { get; private set; }
internal bool SupportsHexByteFormat { get; private set; }
internal bool SupportsRangeTypes { get; private set; }
internal bool UseConformantStrings { get; private set; }
///
/// This method is required to set all the version dependent features flags.
/// SupportsPrepare means the server can use prepared query plans (7.3+)
///
void ProcessServerVersion(string value)
{
var versionString = value.Trim();
for (var idx = 0; idx != versionString.Length; ++idx)
{
var c = value[idx];
if (!char.IsDigit(c) && c != '.')
{
versionString = versionString.Substring(0, idx);
break;
}
}
ServerVersion = new Version(versionString);
SupportsSavepoint = (ServerVersion >= new Version(8, 0, 0));
SupportsDiscard = (ServerVersion >= new Version(8, 3, 0));
SupportsApplicationName = (ServerVersion >= new Version(9, 0, 0));
SupportsExtraFloatDigits3 = (ServerVersion >= new Version(9, 0, 0));
SupportsExtraFloatDigits = (ServerVersion >= new Version(7, 4, 0));
SupportsSslRenegotiationLimit = ((ServerVersion >= new Version(8, 4, 3)) ||
(ServerVersion >= new Version(8, 3, 10) && ServerVersion < new Version(8, 4, 0)) ||
(ServerVersion >= new Version(8, 2, 16) && ServerVersion < new Version(8, 3, 0)) ||
(ServerVersion >= new Version(8, 1, 20) && ServerVersion < new Version(8, 2, 0)) ||
(ServerVersion >= new Version(8, 0, 24) && ServerVersion < new Version(8, 1, 0)) ||
(ServerVersion >= new Version(7, 4, 28) && ServerVersion < new Version(8, 0, 0)));
// Per the PG documentation, E string literal prefix support appeared in PG version 8.1.
// Note that it is possible that support for this prefix will vanish in some future version
// of Postgres, in which case this test will need to be revised.
// At that time it may also be necessary to set UseConformantStrings = true here.
SupportsEStringPrefix = (ServerVersion >= new Version(8, 1, 0));
// Per the PG documentation, hex string encoding format support appeared in PG version 9.0.
SupportsHexByteFormat = (ServerVersion >= new Version(9, 0, 0));
// Range data types
SupportsRangeTypes = (ServerVersion >= new Version(9, 2, 0));
}
///
/// Whether the backend is an AWS Redshift instance
///
internal bool IsRedshift
{
get { return _settings.ServerCompatibilityMode == ServerCompatibilityMode.Redshift; }
}
#endregion Supported features
#region Execute internal command
internal void ExecuteInternalCommand(string query, bool withTimeout=true)
{
ExecuteInternalCommand(new QueryMessage(query), withTimeout);
}
internal void ExecuteInternalCommand(SimpleFrontendMessage message, bool withTimeout=true)
{
using (StartUserAction())
{
if (withTimeout) {
PrependBackendTimeoutMessage(ActualInternalCommandTimeout);
}
AddMessage(message);
SendAllMessages();
ReadExpecting();
ReadExpecting();
}
}
#endregion
#region Misc
///
/// Called in various cases to indicate that the backend's statement_timeout parameter
/// may have changed and is currently unknown. It will be set the next time a command
/// is sent.
///
internal void SetBackendTimeoutToUnknown() { _backendTimeout = -1; }
void HandleParameterStatus(string name, string value)
{
BackendParams[name] = value;
switch (name)
{
case "server_version":
ProcessServerVersion(value);
return;
case "standard_conforming_strings":
UseConformantStrings = (value == "on");
return;
}
}
///
/// Returns next portal index.
///
internal String NextPortalName()
{
return _portalNamePrefix + (++_portalIndex);
}
int _portalIndex;
const String _portalNamePrefix = "p";
///
/// Returns next plan index.
///
internal string NextPreparedStatementName()
{
return PreparedStatementNamePrefix + (++_preparedStatementIndex);
}
int _preparedStatementIndex;
const string PreparedStatementNamePrefix = "s";
#endregion Misc
#region Invariants
[ContractInvariantMethod]
void ObjectInvariants()
{
Contract.Invariant(!IsReady || !IsInUserAction);
Contract.Invariant(!IsReady || !HasDataInBuffers, "Connector in Ready state but has data in buffer");
Contract.Invariant((KeepAlive == 0 && _keepAliveTimer == null) || (KeepAlive > 0 && _keepAliveTimer != null));
}
#endregion
}
#region Enums
///
/// Expresses the exact state of a connector.
///
internal enum ConnectorState
{
///
/// The connector has either not yet been opened or has been closed.
///
Closed,
///
/// The connector is currently connecting to a Postgresql server.
///
Connecting,
///
/// The connector is connected and may be used to send a new query.
///
Ready,
///
/// The connector is waiting for a response to a query which has been sent to the server.
///
Executing,
///
/// The connector is currently fetching and processing query results.
///
Fetching,
///
/// The connection was broken because an unexpected error occurred which left it in an unknown state.
/// This state isn't implemented yet.
///
Broken,
///
/// The connector is engaged in a COPY operation.
///
Copy,
}
internal enum TransactionStatus : byte
{
///
/// Currently not in a transaction block
///
Idle = (byte)'I',
///
/// Currently in a transaction block
///
InTransactionBlock = (byte)'T',
///
/// Currently in a failed transaction block (queries will be rejected until block is ended)
///
InFailedTransactionBlock = (byte)'E',
///
/// A new transaction has been requested but not yet transmitted to the backend. It will be transmitted
/// prepended to the next query.
/// This is a client-side state option only, and is never transmitted from the backend.
///
Pending = byte.MaxValue,
}
///
/// Specifies how to load/parse DataRow messages as they're received from the backend.
///
internal enum DataRowLoadingMode
{
///
/// Load DataRows in non-sequential mode
///
NonSequential,
///
/// Load DataRows in sequential mode
///
Sequential,
///
/// Skip DataRow messages altogether
///
Skip
}
#endregion
}