// Copyright (C) 2002 The Npgsql Development Team
// npgsql-general@gborg.postgresql.org
// http://gborg.postgresql.org/project/npgsql/projdisplay.php
//
// Permission to use, copy, modify, and distribute this software and its
// documentation for any purpose, without fee, and without a written
// agreement is hereby granted, provided that the above copyright notice
// and this paragraph and the following two paragraphs appear in all copies.
//
// IN NO EVENT SHALL THE NPGSQL DEVELOPMENT TEAM BE LIABLE TO ANY PARTY
// FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES,
// INCLUDING LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
// DOCUMENTATION, EVEN IF THE NPGSQL DEVELOPMENT TEAM HAS BEEN ADVISED OF
// THE POSSIBILITY OF SUCH DAMAGE.
//
// THE NPGSQL DEVELOPMENT TEAM SPECIFICALLY DISCLAIMS ANY WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
// ON AN "AS IS" BASIS, AND THE NPGSQL DEVELOPMENT TEAM HAS NO OBLIGATIONS
// TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
using System;
using System.Collections.Generic;
using System.Data;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Security;
using System.Net.Sockets;
using System.Reflection;
using System.Resources;
using System.Runtime.CompilerServices;
using System.Security.Cryptography;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using Common.Logging;
using Mono.Security.Protocol.Tls;
using Npgsql.Localization;
using Npgsql.BackendMessages;
using Npgsql.TypeHandlers;
using NpgsqlTypes;
using System.Text;
using System.Text.RegularExpressions;
using Npgsql.FrontendMessages;
using SecurityProtocolType = Mono.Security.Protocol.Tls.SecurityProtocolType;
namespace Npgsql
{
///
/// Represents a connection to a PostgreSQL backend. Unlike NpgsqlConnection objects, which are
/// exposed to users, connectors are internal to Npgsql and are recycled by the connection pool.
///
internal partial class NpgsqlConnector
{
readonly NpgsqlConnectionStringBuilder _settings;
///
/// The physical connection socket to the backend.
///
internal Socket Socket { get; set; }
///
/// The physical connection stream to the backend.
///
internal NpgsqlNetworkStream BaseStream { get; set; }
// The top level stream to the backend.
// This is a BufferedStream.
// With SSL, this stream sits on top of the SSL stream, which sits on top of _baseStream.
// Otherwise, this stream sits directly on top of _baseStream.
internal Stream Stream { get; set; }
///
/// Buffer used for reading data.
///
internal NpgsqlBuffer Buffer { get; private set; }
///
/// The connection mediator.
///
internal NpgsqlMediator Mediator { get; private set; }
///
/// Version of backend server this connector is connected to.
///
internal Version ServerVersion { get; set; }
///
/// The secret key of the backend for this connector, used for query cancellation.
///
internal int BackendSecretKey { get; set; }
///
/// The process ID of the backend for this connector.
///
internal int BackendProcessId { get; set; }
internal bool Pooled { get; private set; }
internal TypeHandlerRegistry TypeHandlerRegistry { get; set; }
TransactionStatus _txStatus;
NpgsqlTransaction _tx;
///
/// The number of messages that were prepended to the current message chain.
///
byte _prependedMessages;
///
/// A chain of messages to be sent to the backend.
///
List _messagesToSend;
internal NpgsqlDataReader CurrentReader;
///
/// Holds all run-time parameters received from the backend (via ParameterStatus messages)
///
internal Dictionary BackendParams;
///
/// Commands to be executed when the reader is done.
/// Current usage is for when a prepared command is disposed while its reader is still open; the
/// actual DEALLOCATE message must be deferred.
///
List _deferredCommands;
///
/// Whether the backend is an AWS Redshift instance
///
bool? _isRedshift;
// For IsValid test
readonly RNGCryptoServiceProvider _rng = new RNGCryptoServiceProvider();
string _initQueries;
internal SSPIHandler SSPI { get; set; }
static readonly ILog _log = LogManager.GetCurrentClassLogger();
SemaphoreSlim _notificationSemaphore;
byte[] _emptyBuffer = new byte[0];
int _notificationBlockRecursionDepth;
#region Reusable Message Objects
// Frontend. Note that these are only used for single-query commands.
internal readonly ParseMessage ParseMessage = new ParseMessage();
internal readonly BindMessage BindMessage = new BindMessage();
internal readonly DescribeMessage DescribeMessage = new DescribeMessage();
internal readonly ExecuteMessage ExecuteMessage = new ExecuteMessage();
// Backend
readonly CommandCompleteMessage _commandCompleteMessage = new CommandCompleteMessage();
readonly ReadyForQueryMessage _readyForQueryMessage = new ReadyForQueryMessage();
readonly ParameterDescriptionMessage _parameterDescriptionMessage = new ParameterDescriptionMessage();
readonly DataRowSequentialMessage _dataRowSequentialMessage = new DataRowSequentialMessage();
readonly DataRowNonSequentialMessage _dataRowNonSequentialMessage = new DataRowNonSequentialMessage();
#endregion
public NpgsqlConnector(NpgsqlConnection connection)
: this(connection.CopyConnectionStringBuilder(), connection.Pooling)
{}
///
/// Constructor.
///
/// Connection string.
/// Pooled
public NpgsqlConnector(NpgsqlConnectionStringBuilder connectionString, bool pooled)
{
State = ConnectorState.Closed;
_txStatus = TransactionStatus.Idle;
_settings = connectionString;
Pooled = pooled;
BackendParams = new Dictionary();
Mediator = new NpgsqlMediator();
_messagesToSend = new List();
_preparedStatementIndex = 0;
_portalIndex = 0;
_deferredCommands = new List();
}
#region Configuration settings
///
/// Return Connection String.
///
internal static bool UseSslStream = true;
internal string ConnectionString { get { return _settings.ConnectionString; } }
internal string Host { get { return _settings.Host; } }
internal int Port { get { return _settings.Port; } }
internal string Database { get { return _settings.ContainsKey(Keywords.Database) ? _settings.Database : _settings.UserName; } }
internal string UserName { get { return _settings.UserName; } }
internal string Password { get { return _settings.Password; } }
internal bool SSL { get { return _settings.SSL; } }
internal SslMode SslMode { get { return _settings.SslMode; } }
internal int BufferSize { get { return _settings.BufferSize; } }
internal bool UseMonoSsl { get { return ValidateRemoteCertificateCallback == null; } }
internal int ConnectionTimeout { get { return _settings.Timeout; } }
internal int DefaultCommandTimeout { get { return _settings.CommandTimeout; } }
internal bool Enlist { get { return _settings.Enlist; } }
internal bool IntegratedSecurity { get { return _settings.IntegratedSecurity; } }
internal Version CompatVersion { get { return _settings.Compatible; } }
#endregion Configuration settings
#region State management
volatile int _state;
///
/// Gets the current state of the connector
///
internal ConnectorState State
{
get { return (ConnectorState)_state; }
set
{
var newState = (int) value;
if (newState == _state)
return;
Interlocked.Exchange(ref _state, newState);
switch (value)
{
case ConnectorState.Ready:
ExecuteDeferredCommands();
if (CurrentReader != null) {
CurrentReader.Command.State = CommandState.Idle;
CurrentReader = null;
}
break;
case ConnectorState.Closed:
case ConnectorState.Broken:
if (CurrentReader != null) {
CurrentReader.Command.State = CommandState.Idle;
CurrentReader = null;
}
ClearTransaction();
break;
case ConnectorState.Connecting:
case ConnectorState.Executing:
case ConnectorState.Fetching:
case ConnectorState.CopyIn:
case ConnectorState.CopyOut:
break;
default:
throw new ArgumentOutOfRangeException("value");
}
}
}
///
/// Returns whether the connector is open, regardless of any task it is currently performing
///
internal bool IsConnected
{
get
{
switch (State)
{
case ConnectorState.Ready:
case ConnectorState.Executing:
case ConnectorState.Fetching:
case ConnectorState.CopyIn:
case ConnectorState.CopyOut:
return true;
case ConnectorState.Closed:
case ConnectorState.Connecting:
case ConnectorState.Broken:
return false;
default:
throw new ArgumentOutOfRangeException("State", "Unknown state: " + State);
}
}
}
///
/// Returns whether the connector is open and performing a task, i.e. not ready for a query
///
internal bool IsBusy
{
get
{
switch (State)
{
case ConnectorState.Executing:
case ConnectorState.Fetching:
case ConnectorState.CopyIn:
case ConnectorState.CopyOut:
return true;
case ConnectorState.Ready:
case ConnectorState.Closed:
case ConnectorState.Connecting:
case ConnectorState.Broken:
return false;
default:
throw new ArgumentOutOfRangeException("State", "Unknown state: " + State);
}
}
}
internal void CheckReadyState()
{
switch (State)
{
case ConnectorState.Ready:
return;
case ConnectorState.Closed:
case ConnectorState.Broken:
case ConnectorState.Connecting:
throw new InvalidOperationException(L10N.ConnectionNotOpen);
case ConnectorState.Executing:
case ConnectorState.Fetching:
throw new InvalidOperationException("There is already an open DataReader associated with this Connection which must be closed first.");
case ConnectorState.CopyIn:
case ConnectorState.CopyOut:
throw new InvalidOperationException("A COPY operation is in progress and must complete first.");
default:
throw new ArgumentOutOfRangeException("Connector.State", "Unknown state: " + State);
}
}
#endregion
#region Open
///
/// Opens the physical connection to the server.
///
/// Usually called by the RequestConnector
/// Method of the connection pool manager.
internal void Open()
{
if (State != ConnectorState.Closed) {
throw new InvalidOperationException("Can't open, state is " + State);
}
State = ConnectorState.Connecting;
ServerVersion = null;
// Keep track of time remaining; Even though there may be multiple timeout-able calls,
// this allows us to still respect the caller's timeout expectation.
var connectTimeRemaining = ConnectionTimeout * 1000;
// Get a raw connection, possibly SSL...
RawOpen(connectTimeRemaining);
try
{
var startupMessage = new StartupMessage(Database, UserName);
if (!string.IsNullOrEmpty(_settings.ApplicationName)) {
startupMessage.ApplicationName = _settings.ApplicationName;
}
if (!string.IsNullOrEmpty(_settings.SearchPath)) {
startupMessage.SearchPath = _settings.SearchPath;
}
var len = startupMessage.Length;
if (len >= Buffer.Size) { // Should really never happen, just in case
throw new Exception(String.Format("Buffer ({0} bytes) not big enough to contain Startup message ({1} bytes)", Buffer.Size, len));
}
startupMessage.Prepare();
if (startupMessage.Length > Buffer.Size) {
throw new Exception("Startup message bigger than buffer");
}
startupMessage.Write(Buffer);
// TODO: Possible optimization: send settings like ssl_renegotiation in the same packet,
// reduce one roundtrip
Buffer.Flush();
HandleAuthentication();
}
catch
{
if (Stream != null)
{
try {
Stream.Dispose();
}
catch {}
}
throw;
}
// After attachment, the stream will close the connector (this) when the stream gets disposed.
BaseStream.AttachConnector(this);
// Fall back to the old way, SELECT VERSION().
// This should not happen for protocol version 3+.
if (ServerVersion == null)
{
//NpgsqlCommand command = new NpgsqlCommand("set DATESTYLE TO ISO;select version();", this);
//ServerVersion = new Version(PGUtil.ExtractServerVersion((string) command.ExecuteScalar()));
using (var command = new NpgsqlCommand("set DATESTYLE TO ISO;select version();", this))
{
ServerVersion = new Version(PGUtil.ExtractServerVersion((string)command.ExecuteScalar()));
}
}
ProcessServerVersion();
var sbInitQueries = new StringWriter();
// Some connection parameters for protocol 3 had been sent in the startup packet.
// The rest will be setted here.
if (SupportsSslRenegotiationLimit) {
sbInitQueries.WriteLine("SET ssl_renegotiation_limit=0;");
}
_initQueries = sbInitQueries.ToString();
ExecuteBlind(_initQueries);
TypeHandlerRegistry.Setup(this);
// Make a shallow copy of the type mapping that the connector will own.
// It is possible that the connector may add types to its private
// mapping that will not be valid to another connector, even
// if connected to the same backend version.
//NativeToBackendTypeConverterOptions.OidToNameMapping = NpgsqlTypesHelper.CreateAndLoadInitialTypesMapping(this).Clone();
State = ConnectorState.Ready;
if (_settings.SyncNotification)
{
AddNotificationListener();
}
}
public void RawOpen(int timeout)
{
// Keep track of time remaining; Even though there may be multiple timeout-able calls,
// this allows us to still respect the caller's timeout expectation.
var attemptStart = DateTime.Now;
var result = Dns.BeginGetHostAddresses(Host, null, null);
if (!result.AsyncWaitHandle.WaitOne(timeout, true))
{
// Timeout was used up attempting the Dns lookup
throw new TimeoutException(L10N.DnsLookupTimeout);
}
timeout -= Convert.ToInt32((DateTime.Now - attemptStart).TotalMilliseconds);
var ips = Dns.EndGetHostAddresses(result);
Socket socket = null;
Exception lastSocketException = null;
// try every ip address of the given hostname, use the first reachable one
// make sure not to exceed the caller's timeout expectation by splitting the
// time we have left between all the remaining ip's in the list.
for (var i = 0; i < ips.Length; i++)
{
_log.Trace("Attempting to connect to " + ips[i]);
var ep = new IPEndPoint(ips[i], Port);
socket = new Socket(ep.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
attemptStart = DateTime.Now;
try
{
result = socket.BeginConnect(ep, null, null);
if (!result.AsyncWaitHandle.WaitOne(timeout / (ips.Length - i), true))
{
throw new TimeoutException(L10N.ConnectionTimeout);
}
socket.EndConnect(result);
// connect was successful, leave the loop
break;
}
catch (Exception e)
{
_log.Warn("Failed to connect to " + ips[i]);
timeout -= Convert.ToInt32((DateTime.Now - attemptStart).TotalMilliseconds);
lastSocketException = e;
socket.Close();
socket = null;
}
}
if (socket == null)
{
throw lastSocketException;
}
var baseStream = new NpgsqlNetworkStream(socket, true);
Stream sslStream = null;
// If the PostgreSQL server has SSL connectors enabled Open SslClientStream if (response == 'S') {
if (SSL || (SslMode == SslMode.Require) || (SslMode == SslMode.Prefer))
{
baseStream
.WriteInt32(8)
.WriteInt32(80877103);
// Receive response
var response = (Char)baseStream.ReadByte();
if (response != 'S')
{
if (SslMode == SslMode.Require) {
throw new InvalidOperationException(L10N.SslRequestError);
}
}
else
{
//create empty collection
var clientCertificates = new X509CertificateCollection();
//trigger the callback to fetch some certificates
DefaultProvideClientCertificatesCallback(clientCertificates);
//if (context.UseMonoSsl)
if (!UseSslStream)
{
var sslStreamPriv = new SslClientStream(baseStream, Host, true, SecurityProtocolType.Default, clientCertificates)
{
ClientCertSelectionDelegate = DefaultCertificateSelectionCallback,
ServerCertValidationDelegate = DefaultCertificateValidationCallback,
PrivateKeyCertSelectionDelegate = DefaultPrivateKeySelectionCallback
};
sslStream = sslStreamPriv;
IsSecure = true;
}
else
{
var sslStreamPriv = new SslStream(baseStream, true, DefaultValidateRemoteCertificateCallback);
sslStreamPriv.AuthenticateAsClient(Host, clientCertificates, System.Security.Authentication.SslProtocols.Default, false);
sslStream = sslStreamPriv;
IsSecure = true;
}
}
}
Socket = socket;
BaseStream = baseStream;
//Stream = new BufferedStream(sslStream ?? baseStream, 8192);
Stream = BaseStream;
Buffer = new NpgsqlBuffer(Stream, BufferSize, PGUtil.UTF8Encoding);
_log.DebugFormat("Connected to {0}:{1 }", Host, Port);
}
void HandleAuthentication()
{
_log.Trace("Authenticating...");
while (true)
{
var msg = ReadSingleMessage();
switch (msg.Code)
{
case BackendMessageCode.ReadyForQuery:
State = ConnectorState.Ready;
return;
case BackendMessageCode.AuthenticationRequest:
ProcessAuthenticationMessage((AuthenticationRequestMessage)msg);
continue;
default:
throw new Exception("Unexpected message received while authenticating: " + msg.Code);
}
}
}
void ProcessAuthenticationMessage(AuthenticationRequestMessage msg)
{
PasswordMessage passwordMessage;
switch (msg.AuthRequestType)
{
case AuthenticationRequestType.AuthenticationOk:
return;
case AuthenticationRequestType.AuthenticationCleartextPassword:
passwordMessage = PasswordMessage.CreateClearText(Password);
break;
case AuthenticationRequestType.AuthenticationMD5Password:
passwordMessage = PasswordMessage.CreateMD5(Password, UserName, ((AuthenticationMD5PasswordMessage)msg).Salt);
break;
case AuthenticationRequestType.AuthenticationGSS:
if (!IntegratedSecurity) {
throw new Exception("GSS authentication but IntegratedSecurity not enabled");
}
// For GSSAPI we have to use the supplied hostname
SSPI = new SSPIHandler(Host, "POSTGRES", true);
passwordMessage = new PasswordMessage(SSPI.Continue(null));
break;
case AuthenticationRequestType.AuthenticationSSPI:
if (!IntegratedSecurity) {
throw new Exception("SSPI authentication but IntegratedSecurity not enabled");
}
// For SSPI we have to get the IP-Address (hostname doesn't work)
var ipAddressString = ((IPEndPoint)Socket.RemoteEndPoint).Address.ToString();
SSPI = new SSPIHandler(ipAddressString, "POSTGRES", false);
passwordMessage = new PasswordMessage(SSPI.Continue(null));
break;
case AuthenticationRequestType.AuthenticationGSSContinue:
var passwdRead = SSPI.Continue(((AuthenticationGSSContinueMessage)msg).AuthenticationData);
if (passwdRead.Length != 0)
{
passwordMessage = new PasswordMessage(passwdRead);
break;
}
return;
default:
throw new NotSupportedException(String.Format(L10N.AuthenticationMethodNotSupported, msg.AuthRequestType));
}
passwordMessage.Prepare();
passwordMessage.Write(Buffer);
Buffer.Flush();
}
#endregion
#region Frontend message processing
internal void AddMessage(FrontendMessage msg)
{
_messagesToSend.Add(msg);
}
///
/// Prepends a message to be sent at the beginning of the next message chain.
///
internal void PrependMessage(FrontendMessage msg)
{
_prependedMessages++;
_messagesToSend.Add(msg);
}
[GenerateAsync]
internal void SendAllMessages()
{
try
{
foreach (var msg in _messagesToSend)
{
SendMessage(msg);
}
Buffer.Flush();
}
finally
{
_messagesToSend.Clear();
}
}
///
/// Sends a single frontend message, used for simple messages such as rollback, etc.
/// Note that additional prepend messages may be previously enqueued, and will be sent along
/// with this message.
///
///
void SendSingleMessage(FrontendMessage msg)
{
AddMessage(msg);
SendAllMessages();
}
[GenerateAsync]
void SendMessage(FrontendMessage msg)
{
try
{
_log.DebugFormat("Sending: {0}", msg);
var asSimple = msg as SimpleFrontendMessage;
if (asSimple != null)
{
if (asSimple.Length > Buffer.WriteSpaceLeft)
{
Buffer.Flush();
}
Contract.Assume(Buffer.WriteSpaceLeft >= asSimple.Length);
asSimple.Write(Buffer);
return;
}
var asComplex = msg as ChunkingFrontendMessage;
if (asComplex != null)
{
var directBuf = new DirectBuffer();
while (!asComplex.Write(Buffer, ref directBuf))
{
Buffer.Flush();
// The following is an optimization hack for writing large byte arrays without passing
// through our buffer
if (directBuf.Buffer != null)
{
Buffer.Underlying.Write(directBuf.Buffer, 0, directBuf.Size == 0 ? directBuf.Buffer.Length : directBuf.Size);
directBuf.Buffer = null;
directBuf.Size = 0;
}
}
return;
}
throw PGUtil.ThrowIfReached();
}
catch
{
State = ConnectorState.Broken;
throw;
}
}
#endregion
#region Backend message processing
[GenerateAsync]
internal BackendMessage ReadSingleMessage(DataRowLoadingMode dataRowLoadingMode = DataRowLoadingMode.NonSequential, bool ignoreNotifications = true)
{
try
{
return DoReadSingleMessage(dataRowLoadingMode, ignoreNotifications);
}
catch (NpgsqlException)
{
throw;
}
catch
{
State = ConnectorState.Broken;
throw;
}
}
[GenerateAsync]
BackendMessage DoReadSingleMessage(DataRowLoadingMode dataRowLoadingMode = DataRowLoadingMode.NonSequential, bool ignoreNotifications = true)
{
NpgsqlError error = null;
while (true)
{
var buf = Buffer;
Buffer.Ensure(5);
var messageCode = (BackendMessageCode) Buffer.ReadByte();
Contract.Assume(Enum.IsDefined(typeof(BackendMessageCode), messageCode), "Unknown message code: " + messageCode);
var len = Buffer.ReadInt32() - 4; // Transmitted length includes itself
if (messageCode == BackendMessageCode.DataRow && dataRowLoadingMode != DataRowLoadingMode.NonSequential)
{
if (dataRowLoadingMode == DataRowLoadingMode.Skip)
{
Buffer.Skip(len);
continue;
}
}
else if (len > Buffer.ReadBytesLeft)
{
buf = buf.EnsureOrAllocateTemp(len);
}
var msg = ParseServerMessage(buf, messageCode, len, dataRowLoadingMode);
if (msg != null || !ignoreNotifications && (messageCode == BackendMessageCode.NoticeResponse || messageCode == BackendMessageCode.NotificationResponse))
{
if (error != null)
{
Contract.Assert(messageCode == BackendMessageCode.ReadyForQuery, "Expected ReadyForQuery after ErrorResponse");
throw new NpgsqlException(error);
}
return msg;
}
else if (messageCode == BackendMessageCode.ErrorResponse)
{
// An ErrorResponse is (almost) always followed by a ReadyForQuery. Save the error
// and throw it as an exception when the ReadyForQuery is received (next)
// The exception is during the startup/authentication phase, where the server closes
// the connection after an ErrorResponse
error = new NpgsqlError(buf);
if (State == ConnectorState.Connecting) {
throw new NpgsqlException(error);
}
}
}
}
BackendMessage ParseServerMessage(NpgsqlBuffer buf, BackendMessageCode code, int len, DataRowLoadingMode dataRowLoadingMode)
{
switch (code)
{
case BackendMessageCode.RowDescription:
// TODO: Recycle
var rowDescriptionMessage = new RowDescriptionMessage();
return rowDescriptionMessage.Load(buf, TypeHandlerRegistry);
case BackendMessageCode.DataRow:
Contract.Assert(dataRowLoadingMode == DataRowLoadingMode.NonSequential || dataRowLoadingMode == DataRowLoadingMode.Sequential);
return dataRowLoadingMode == DataRowLoadingMode.Sequential
? _dataRowSequentialMessage.Load(buf)
: _dataRowNonSequentialMessage.Load(buf);
case BackendMessageCode.CompletedResponse:
return _commandCompleteMessage.Load(buf, len);
case BackendMessageCode.ReadyForQuery:
var rfq = _readyForQueryMessage.Load(buf);
TransactionStatus = rfq.TransactionStatusIndicator;
return rfq;
case BackendMessageCode.EmptyQueryResponse:
return EmptyQueryMessage.Instance;
case BackendMessageCode.ParseComplete:
return ParseCompleteMessage.Instance;
case BackendMessageCode.ParameterDescription:
return _parameterDescriptionMessage.Load(buf);
case BackendMessageCode.BindComplete:
return BindCompleteMessage.Instance;
case BackendMessageCode.NoData:
return NoDataMessage.Instance;
case BackendMessageCode.CloseComplete:
return CloseCompletedMessage.Instance;
case BackendMessageCode.ParameterStatus:
HandleParameterStatus(buf.ReadNullTerminatedString(), buf.ReadNullTerminatedString());
return null;
case BackendMessageCode.NoticeResponse:
// TODO: Recycle
FireNotice(new NpgsqlError(buf));
return null;
case BackendMessageCode.NotificationResponse:
FireNotification(new NpgsqlNotificationEventArgs(buf));
return null;
case BackendMessageCode.AuthenticationRequest:
var authType = (AuthenticationRequestType)buf.ReadInt32();
_log.Trace("Received AuthenticationRequest of type " + authType);
switch (authType)
{
case AuthenticationRequestType.AuthenticationOk:
return AuthenticationOkMessage.Instance;
case AuthenticationRequestType.AuthenticationCleartextPassword:
return AuthenticationCleartextPasswordMessage.Instance;
case AuthenticationRequestType.AuthenticationMD5Password:
return AuthenticationMD5PasswordMessage.Load(buf);
case AuthenticationRequestType.AuthenticationGSS:
return AuthenticationGSSMessage.Instance;
case AuthenticationRequestType.AuthenticationSSPI:
return AuthenticationSSPIMessage.Instance;
case AuthenticationRequestType.AuthenticationGSSContinue:
return AuthenticationGSSContinueMessage.Load(buf, len);
default:
throw new NotSupportedException(String.Format(L10N.AuthenticationMethodNotSupported, authType));
}
case BackendMessageCode.BackendKeyData:
BackendProcessId = buf.ReadInt32();
BackendSecretKey = buf.ReadInt32();
return null;
case BackendMessageCode.CopyData:
case BackendMessageCode.CopyDone:
case BackendMessageCode.CancelRequest:
case BackendMessageCode.CopyDataRows:
case BackendMessageCode.CopyInResponse:
case BackendMessageCode.CopyOutResponse:
throw new NotImplementedException();
case BackendMessageCode.PortalSuspended:
case BackendMessageCode.IO_ERROR:
Debug.Fail("Unimplemented message: " + code);
throw new NotImplementedException("Unimplemented message: " + code);
case BackendMessageCode.ErrorResponse:
return null;
case BackendMessageCode.FunctionCallResponse:
// We don't use the obsolete function call protocol
throw new Exception("Unexpected backend message: " + code);
default:
throw PGUtil.ThrowIfReached("Unknown backend message code: " + code);
}
}
///
/// Reads backend messages and discards them, stopping only after a message of the given types has
/// been seen. Note that when this method is called, the buffer position must be properly set at
/// the start of the next message.
///
internal BackendMessage SkipUntil(params BackendMessageCode[] stopAt)
{
Contract.Requires(!stopAt.Any(c => c == BackendMessageCode.DataRow), "Shouldn't be used for rows, doesn't know about sequential");
while (true)
{
var msg = ReadSingleMessage(DataRowLoadingMode.Skip);
Contract.Assert(!(msg is DataRowMessage));
if (stopAt.Contains(msg.Code)) {
return msg;
}
}
}
///
/// Processes the response from any messages that were prepended to the message chain.
/// These messages are currently assumed to be simple queries with no result sets.
///
[GenerateAsync]
internal void ReadPrependedMessageResponses()
{
for (; _prependedMessages > 0; _prependedMessages--)
{
SkipUntil(BackendMessageCode.ReadyForQuery);
}
}
#endregion Backend message processing
#region Transactions
///
/// The current transaction status for this connector.
///
internal TransactionStatus TransactionStatus
{
get { return _txStatus; }
private set
{
if (value == _txStatus) { return; }
switch (value) {
case TransactionStatus.Idle:
ClearTransaction();
break;
case TransactionStatus.InTransactionBlock:
case TransactionStatus.InFailedTransactionBlock:
break;
case TransactionStatus.Pending:
throw new Exception("Invalid TransactionStatus (should be frontend-only)");
default:
throw PGUtil.ThrowIfReached();
}
_txStatus = value;
}
}
///
/// The transaction currently in progress, if any.
/// Note that this doesn't mean a transaction request has actually been sent to the backend - for
/// efficiency we defer sending the request to the first query after BeginTransaction is called.
/// See for the actual backend transaction status.
///
internal NpgsqlTransaction Transaction
{
get { return _tx; }
set
{
Contract.Requires(TransactionStatus == TransactionStatus.Idle);
_tx = value;
_txStatus = TransactionStatus.Pending;
}
}
internal void ClearTransaction()
{
if (_txStatus == TransactionStatus.Idle) { return; }
_tx.Connection = null;
_tx = null;
_txStatus = TransactionStatus.Idle;
}
#endregion
#region Copy In / Out
internal NpgsqlCopyFormat CopyFormat { get; private set; }
// TODO: Currently unused
NpgsqlCopyFormat ReadCopyHeader()
{
var copyFormat = (byte)Buffer.ReadByte();
var numCopyFields = Buffer.ReadInt16();
var copyFieldFormats = new short[numCopyFields];
for (short i = 0; i < numCopyFields; i++)
{
copyFieldFormats[i] = Buffer.ReadInt16();
}
return new NpgsqlCopyFormat(copyFormat, copyFieldFormats);
}
///
/// Called from NpgsqlState.ProcessBackendResponses upon CopyInResponse.
/// If CopyStream is already set, it is used to read data to push to server, after which the copy is completed.
/// Otherwise CopyStream is set to a writable NpgsqlCopyInStream that calls SendCopyData each time it is written to.
///
internal void StartCopyIn(NpgsqlCopyFormat copyFormat)
{
CopyFormat = copyFormat;
var userFeed = Mediator.CopyStream;
if (userFeed == null)
{
Mediator.CopyStream = new NpgsqlCopyInStream(this);
}
else
{
// copy all of user feed to server at once
var bufsiz = Mediator.CopyBufferSize;
var buf = new byte[bufsiz];
int len;
while ((len = userFeed.Read(buf, 0, bufsiz)) > 0)
{
SendCopyInData(buf, 0, len);
}
SendCopyInDone();
}
}
///
/// Sends given packet to server as a CopyData message.
/// Does not check for notifications! Use another thread for that.
///
internal void SendCopyInData(byte[] buf, int off, int len)
{
throw new NotImplementedException();
/*
Buffer.EnsureWrite(5);
Buffer.WriteByte((byte)FrontendMessageCode.CopyData);
Buffer.WriteInt32(len + 4);
Buffer.Write(buf, off, len);
*/
}
///
/// Sends CopyDone message to server. Handles responses, ie. may throw an exception.
///
internal void SendCopyInDone()
{
throw new NotImplementedException();
/*
Buffer.WriteByte((byte)FrontEndMessageCode.CopyDone);
Buffer.WriteInt32(4); // message without data
ConsumeAll();
*/
}
///
/// Sends CopyFail message to server. Handles responses, ie. should always throw an exception:
/// in CopyIn state the server responds to CopyFail with an error response;
/// outside of a CopyIn state the server responds to CopyFail with an error response;
/// without network connection or whatever, there's going to eventually be a failure, timeout or user intervention.
///
internal void SendCopyInFail(String message)
{
throw new NotImplementedException();
/*
Buffer.WriteByte((byte)FrontEndMessageCode.CopyFail);
var buf = BackendEncoding.UTF8Encoding.GetBytes((message ?? string.Empty) + '\x00');
Buffer.WriteInt32(4 + buf.Length);
Buffer.Write(buf, 0, buf.Length);
ConsumeAll();
*/
}
///
/// Called from NpgsqlState.ProcessBackendResponses upon CopyOutResponse.
/// If CopyStream is already set, it is used to write data received from server, after which the copy ends.
/// Otherwise CopyStream is set to a readable NpgsqlCopyOutStream that receives data from server.
///
internal void StartCopyOut(NpgsqlCopyFormat copyFormat)
{
throw new NotImplementedException();
/*
CopyFormat = copyFormat;
var userFeed = Mediator.CopyStream;
if (userFeed == null)
{
Mediator.CopyStream = new NpgsqlCopyOutStream(this);
}
else
{
byte[] buf;
while ((buf = GetCopyOutData()) != null)
{
userFeed.Write(buf, 0, buf.Length);
}
userFeed.Close();
}
*/
}
///
/// Called from NpgsqlOutStream.Read to read copy data from server.
///
internal byte[] GetCopyOutData()
{
throw new NotImplementedException();
/*
// polling in COPY would take seconds on Windows
ConsumeAll();
return Mediator.ReceivedCopyData;
*/
}
#endregion Copy In / Out
#region Notifications
///
/// Occurs on NoticeResponses from the PostgreSQL backend.
///
internal event NoticeEventHandler Notice;
///
/// Occurs on NotificationResponses from the PostgreSQL backend.
///
internal event NotificationEventHandler Notification;
internal void FireNotice(NpgsqlError e)
{
if (Notice != null)
{
try
{
Notice(this, new NpgsqlNoticeEventArgs(e));
}
catch
{
} //Eat exceptions from user code.
}
}
internal void FireNotification(NpgsqlNotificationEventArgs e)
{
if (Notification != null)
{
try
{
Notification(this, e);
}
catch
{
} //Eat exceptions from user code.
}
}
#endregion Notifications
#region SSL
///
/// Default SSL CertificateSelectionCallback implementation.
///
internal X509Certificate DefaultCertificateSelectionCallback(X509CertificateCollection clientCertificates,
X509Certificate serverCertificate, string targetHost,
X509CertificateCollection serverRequestedCertificates)
{
return CertificateSelectionCallback != null
? CertificateSelectionCallback(clientCertificates, serverCertificate, targetHost, serverRequestedCertificates)
: null;
}
///
/// Default SSL CertificateValidationCallback implementation.
///
internal bool DefaultCertificateValidationCallback(X509Certificate certificate, int[] certificateErrors)
{
return CertificateValidationCallback == null || CertificateValidationCallback(certificate, certificateErrors);
}
///
/// Default SSL PrivateKeySelectionCallback implementation.
///
internal AsymmetricAlgorithm DefaultPrivateKeySelectionCallback(X509Certificate certificate, string targetHost)
{
return PrivateKeySelectionCallback != null
? PrivateKeySelectionCallback(certificate, targetHost)
: null;
}
///
/// Default SSL ProvideClientCertificatesCallback implementation.
///
internal void DefaultProvideClientCertificatesCallback(X509CertificateCollection certificates)
{
if (ProvideClientCertificatesCallback != null) {
ProvideClientCertificatesCallback(certificates);
}
}
///
/// Default SSL ValidateRemoteCertificateCallback implementation.
///
internal bool DefaultValidateRemoteCertificateCallback(object sender, X509Certificate cert, X509Chain chain, SslPolicyErrors errors)
{
return ValidateRemoteCertificateCallback != null && ValidateRemoteCertificateCallback(cert, chain, errors);
}
///
/// Returns whether SSL is being used for the connection
///
internal bool IsSecure { get; private set; }
///
/// Called to provide client certificates for SSL handshake.
///
internal event ProvideClientCertificatesCallback ProvideClientCertificatesCallback;
///
/// Mono.Security.Protocol.Tls.CertificateSelectionCallback delegate.
///
internal event CertificateSelectionCallback CertificateSelectionCallback;
///
/// Mono.Security.Protocol.Tls.CertificateValidationCallback delegate.
///
internal event CertificateValidationCallback CertificateValidationCallback;
///
/// Mono.Security.Protocol.Tls.PrivateKeySelectionCallback delegate.
///
internal event PrivateKeySelectionCallback PrivateKeySelectionCallback;
///
/// Called to validate server's certificate during SSL handshake
///
internal event ValidateRemoteCertificateCallback ValidateRemoteCertificateCallback;
#endregion SSL
#region Cancel
///
/// Creates another connector and sends a cancel request through it for this connector.
///
internal void CancelRequest()
{
var cancelConnector = new NpgsqlConnector(_settings, false);
try
{
cancelConnector.RawOpen(cancelConnector.ConnectionTimeout*1000);
cancelConnector.SendSingleMessage(new CancelRequestMessage(BackendProcessId, BackendSecretKey));
}
finally
{
cancelConnector.Close();
}
}
#endregion Cancel
#region Close
///
/// Closes the physical connection to the server.
///
internal void Close()
{
_log.Debug("Close connector");
switch (State)
{
case ConnectorState.Closed:
return;
case ConnectorState.Ready:
try
{
SendSingleMessage(TerminateMessage.Instance);
}
catch { }
break;
}
try
{
Stream.Close();
}
catch { }
try
{
RemoveNotificationListener();
}
catch { }
Stream = null;
BaseStream = null;
Buffer = null;
BackendParams.Clear();
ServerVersion = null;
State = ConnectorState.Closed;
}
///
/// This method is responsible for releasing all resources associated with this Connector.
///
internal void ReleaseResources()
{
if (State != ConnectorState.Closed)
{
if (SupportsDiscard)
{
ReleaseWithDiscard();
}
else
{
ReleasePlansPortals();
ReleaseRegisteredListen();
}
}
}
internal void ReleaseWithDiscard()
{
ExecuteBlind(PregeneratedMessage.DiscardAll);
// The initial connection parameters will be restored via IsValid() when get connector from pool later
}
internal void ReleaseRegisteredListen()
{
ExecuteBlind(PregeneratedMessage.UnlistenAll);
}
///
/// This method is responsible to release all portals used by this Connector.
///
internal void ReleasePlansPortals()
{
if (_preparedStatementIndex > 0)
{
for (var i = 1; i <= _preparedStatementIndex; i++)
{
try
{
ExecuteBlind(String.Format("DEALLOCATE \"{0}{1}\";", PreparedStatementNamePrefix, i));
}
// Ignore any error which may occur when releasing portals as this portal name may not be valid anymore. i.e.: the portal name was used on a prepared query which had errors.
catch { }
}
}
_portalIndex = 0;
_preparedStatementIndex = 0;
}
#endregion Close
#region Sync notification
internal class NotificationBlock : IDisposable
{
NpgsqlConnector _connector;
public NotificationBlock(NpgsqlConnector connector)
{
_connector = connector;
}
public void Dispose()
{
if (_connector != null)
{
if (--_connector._notificationBlockRecursionDepth == 0)
{
while (_connector.Buffer.ReadBytesLeft > 0)
{
var msg = _connector.ReadSingleMessage(DataRowLoadingMode.NonSequential, false);
if (msg != null)
{
Contract.Assert(msg == null, "Expected null after processing a notification");
}
}
if (_connector._notificationSemaphore != null)
{
_connector._notificationSemaphore.Release();
}
}
}
_connector = null;
}
}
[GenerateAsync]
internal NotificationBlock BlockNotifications()
{
var n = new NotificationBlock(this);
if (++_notificationBlockRecursionDepth == 1 && _notificationSemaphore != null)
_notificationSemaphore.Wait();
return n;
}
internal void AddNotificationListener()
{
_notificationSemaphore = new SemaphoreSlim(1);
var task = BaseStream.ReadAsync(_emptyBuffer, 0, 0);
task.ContinueWith(NotificationHandler);
}
internal void RemoveNotificationListener()
{
_notificationSemaphore = null;
}
internal void NotificationHandler(System.Threading.Tasks.Task task)
{
if (task.Exception != null || task.Result != 0)
{
// The stream is dead
return;
}
var semaphore = _notificationSemaphore; // To avoid problems when closing the connection
if (semaphore != null)
{
semaphore.WaitAsync().ContinueWith(t => {
try
{
while (BaseStream.DataAvailable || Buffer.ReadBytesLeft > 0)
{
var msg = ReadSingleMessage(DataRowLoadingMode.NonSequential, false);
if (msg != null)
{
Contract.Assert(msg == null, "Expected null after processing a notification");
}
}
}
catch
{
}
finally
{
semaphore.Release();
try
{
BaseStream.ReadAsync(_emptyBuffer, 0, 0).ContinueWith(NotificationHandler);
}
catch { }
}
});
}
}
#endregion Sync notification
#region Supported features
internal bool SupportsApplicationName { get; private set; }
internal bool SupportsExtraFloatDigits3 { get; private set; }
internal bool SupportsExtraFloatDigits { get; private set; }
internal bool SupportsSslRenegotiationLimit { get; private set; }
internal bool SupportsSavepoint { get; private set; }
internal bool SupportsDiscard { get; private set; }
internal bool SupportsEStringPrefix { get; private set; }
internal bool SupportsHexByteFormat { get; private set; }
internal bool SupportsRangeTypes { get; private set; }
internal bool UseConformantStrings { get; private set; }
///
/// This method is required to set all the version dependent features flags.
/// SupportsPrepare means the server can use prepared query plans (7.3+)
///
void ProcessServerVersion()
{
SupportsSavepoint = (ServerVersion >= new Version(8, 0, 0));
SupportsDiscard = (ServerVersion >= new Version(8, 3, 0));
SupportsApplicationName = (ServerVersion >= new Version(9, 0, 0));
SupportsExtraFloatDigits3 = (ServerVersion >= new Version(9, 0, 0));
SupportsExtraFloatDigits = (ServerVersion >= new Version(7, 4, 0));
SupportsSslRenegotiationLimit = ((ServerVersion >= new Version(8, 4, 3)) ||
(ServerVersion >= new Version(8, 3, 10) && ServerVersion < new Version(8, 4, 0)) ||
(ServerVersion >= new Version(8, 2, 16) && ServerVersion < new Version(8, 3, 0)) ||
(ServerVersion >= new Version(8, 1, 20) && ServerVersion < new Version(8, 2, 0)) ||
(ServerVersion >= new Version(8, 0, 24) && ServerVersion < new Version(8, 1, 0)) ||
(ServerVersion >= new Version(7, 4, 28) && ServerVersion < new Version(8, 0, 0)));
// Per the PG documentation, E string literal prefix support appeared in PG version 8.1.
// Note that it is possible that support for this prefix will vanish in some future version
// of Postgres, in which case this test will need to be revised.
// At that time it may also be necessary to set UseConformantStrings = true here.
SupportsEStringPrefix = (ServerVersion >= new Version(8, 1, 0));
// Per the PG documentation, hex string encoding format support appeared in PG version 9.0.
SupportsHexByteFormat = (ServerVersion >= new Version(9, 0, 0));
// Range data types
SupportsRangeTypes = (ServerVersion >= new Version(9, 2, 0));
}
///
/// Whether the backend is an AWS Redshift instance
///
internal bool IsRedshift
{
get
{
if (!_isRedshift.HasValue)
{
using (var cmd = new NpgsqlCommand("SELECT version()", this))
{
var versionStr = (string)cmd.ExecuteScalar();
_isRedshift = versionStr.ToLower().Contains("redshift");
}
}
return _isRedshift.Value;
}
}
#endregion Supported features
#region Execute blind
///
/// Internal query shortcut for use in cases where the number
/// of affected rows is of no interest.
///
[GenerateAsync]
internal void ExecuteBlind(string query)
{
using (BlockNotifications())
{
SetBackendCommandTimeout(20);
SendSingleMessage(new QueryMessage(query));
ReadPrependedMessageResponses();
SkipUntil(BackendMessageCode.ReadyForQuery);
State = ConnectorState.Ready;
}
}
[GenerateAsync]
internal void ExecuteBlind(SimpleFrontendMessage message)
{
using (BlockNotifications())
{
SetBackendCommandTimeout(20);
SendSingleMessage(message);
ReadPrependedMessageResponses();
SkipUntil(BackendMessageCode.ReadyForQuery);
State = ConnectorState.Ready;
}
}
[GenerateAsync]
internal void ExecuteBlindSuppressTimeout(string query)
{
using (BlockNotifications())
{
SendSingleMessage(new QueryMessage(query));
ReadPrependedMessageResponses();
SkipUntil(BackendMessageCode.ReadyForQuery);
State = ConnectorState.Ready;
}
}
[GenerateAsync]
internal void ExecuteBlindSuppressTimeout(PregeneratedMessage message)
{
// Block the notification thread before writing anything to the wire.
using (BlockNotifications())
{
SendSingleMessage(message);
ReadPrependedMessageResponses();
SkipUntil(BackendMessageCode.ReadyForQuery);
State = ConnectorState.Ready;
}
}
///
/// Special adaptation of ExecuteBlind() that sets statement_timeout.
/// This exists to prevent Connector.SetBackendCommandTimeout() from calling Command.ExecuteBlind(),
/// which will cause an endless recursive loop.
///
/// Timeout in seconds.
[GenerateAsync]
internal void ExecuteSetStatementTimeoutBlind(int timeout)
{
// Optimize for a few common timeout values.
switch (timeout)
{
case 10:
SendSingleMessage(PregeneratedMessage.SetStmtTimeout10Sec);
break;
case 20:
SendSingleMessage(PregeneratedMessage.SetStmtTimeout20Sec);
break;
case 30:
SendSingleMessage(PregeneratedMessage.SetStmtTimeout30Sec);
break;
case 60:
SendSingleMessage(PregeneratedMessage.SetStmtTimeout60Sec);
break;
case 90:
SendSingleMessage(PregeneratedMessage.SetStmtTimeout90Sec);
break;
case 120:
SendSingleMessage(PregeneratedMessage.SetStmtTimeout120Sec);
break;
default:
SendSingleMessage(new QueryMessage(string.Format("SET statement_timeout = {0}", timeout * 1000)));
break;
}
ReadPrependedMessageResponses();
SkipUntil(BackendMessageCode.ReadyForQuery);
State = ConnectorState.Ready;
}
#endregion Execute blind
#region Misc
void HandleParameterStatus(string name, string value)
{
BackendParams[name] = value;
if (name == "server_version")
{
// Deal with this here so that if there are
// changes in a future backend version, we can handle it here in the
// protocol handler and leave everybody else put of it.
var versionString = value.Trim();
for (var idx = 0; idx != versionString.Length; ++idx)
{
var c = value[idx];
if (!char.IsDigit(c) && c != '.')
{
versionString = versionString.Substring(0, idx);
break;
}
}
ServerVersion = new Version(versionString);
return;
}
if (name == "standard_conforming_strings") {
UseConformantStrings = (value == "on");
}
}
///
/// Modify the backend statement_timeout value if needed.
///
/// New timeout
[GenerateAsync]
internal void SetBackendCommandTimeout(int timeout)
{
if (Mediator.BackendCommandTimeout == -1 || Mediator.BackendCommandTimeout != timeout)
{
ExecuteSetStatementTimeoutBlind(timeout);
Mediator.BackendCommandTimeout = timeout;
}
}
///
/// Returns next portal index.
///
internal String NextPortalName()
{
return _portalNamePrefix + (++_portalIndex);
}
int _portalIndex;
const String _portalNamePrefix = "p";
///
/// Returns next plan index.
///
internal string NextPreparedStatementName()
{
return PreparedStatementNamePrefix + (++_preparedStatementIndex);
}
int _preparedStatementIndex;
const string PreparedStatementNamePrefix = "s";
///
/// This method checks if the connector is still ok.
/// We try to send a simple query text, select 1 as ConnectionTest;
///
internal Boolean IsValid()
{
try
{
// Here we use a fake NpgsqlCommand, just to send the test query string.
// Get random test value.
var testBytes = new Byte[2];
_rng.GetNonZeroBytes(testBytes);
var testValue = String.Format("Npgsql{0}{1}", testBytes[0], testBytes[1]);
//Query(new NpgsqlCommand("select 1 as ConnectionTest", this));
var compareValue = string.Empty;
var sql = "select '" + testValue + "'";
// restore initial connection parameters resetted by "Discard ALL"
sql = _initQueries + sql;
using (var cmd = new NpgsqlCommand(sql, this))
{
compareValue = (string)cmd.ExecuteScalar();
}
if (compareValue != testValue)
{
return false;
}
}
catch
{
return false;
}
return true;
}
///
/// Executes the command immediately if the connector is ready, otherwise schedules the command for
/// execution at the earliest possible convenience.
///
///
internal void ExecuteOrDefer(string cmd)
{
if (State == ConnectorState.Ready) {
ExecuteBlind(cmd);
} else {
_deferredCommands.Add(cmd);
}
}
internal void ExecuteDeferredCommands()
{
if (!_deferredCommands.Any()) { return; }
// TODO: Not optimal, but with the current state implementation ExecuteBlind() below sets the state
// back to Ready, which recursively attempts to... execute deferred commands
var deferredCommands = _deferredCommands.ToArray();
_deferredCommands.Clear();
foreach (var cmd in deferredCommands)
{
try {
ExecuteBlind(cmd);
} catch (Exception e) {
_log.Error(String.Format("Error executing deferred command {0}", cmd), e);
}
}
}
// Unused, can be deleted?
internal void TestConnector()
{
SyncMessage.Instance.Write(Buffer);
Buffer.Flush();
var buffer = new Queue();
//byte[] compareBuffer = new byte[6];
int[] messageSought = { 'Z', 0, 0, 0, 5 };
for (; ; )
{
var newByte = (int)Buffer.ReadByte();
switch (newByte)
{
case -1:
throw new EndOfStreamException();
case 'E':
case 'I':
case 'T':
if (buffer.Count > 4)
{
bool match = true;
int i = 0;
foreach (byte cmp in buffer)
{
if (cmp != messageSought[i++])
{
match = false;
break;
}
}
if (match)
{
return;
}
}
break;
default:
buffer.Enqueue(newByte);
if (buffer.Count > 5)
{
buffer.Dequeue();
}
break;
}
}
}
#endregion Misc
#region Invariants
[ContractInvariantMethod]
void ObjectInvariants()
{
Contract.Invariant(TransactionStatus == TransactionStatus.Idle || Transaction != null);
Contract.Invariant(TransactionStatus != TransactionStatus.Idle || Transaction == null);
Contract.Invariant(Transaction == null || Transaction.Connection.Connector == this);
}
#endregion
}
///
/// Expresses the exact state of a connector.
///
internal enum ConnectorState
{
///
/// The connector has either not yet been opened or has been closed.
///
Closed,
///
/// The connector is currently connecting to a Postgresql server.
///
Connecting,
///
/// The connector is connected and may be used to send a new query.
///
Ready,
///
/// The connector is waiting for a response to a query which has been sent to the server.
///
Executing,
///
/// The connector is currently fetching and processing query results.
///
Fetching,
///
/// The connection was broken because an unexpected error occurred which left it in an unknown state.
/// This state isn't implemented yet.
///
Broken,
///
/// The connector is engaged in a COPY IN operation.
///
CopyIn,
///
/// The connector is engaged in a COPY OUT operation.
///
CopyOut,
}
internal enum TransactionStatus : byte
{
///
/// Currently not in a transaction block
///
Idle = (byte)'I',
///
/// Currently in a transaction block
///
InTransactionBlock = (byte)'T',
///
/// Currently in a failed transaction block (queries will be rejected until block is ended)
///
InFailedTransactionBlock = (byte)'E',
///
/// A new transaction has been requested but not yet transmitted to the backend. It will be transmitted
/// prepended to the next query.
/// This is a client-side state option only, and is never transmitted from the backend.
///
Pending = Byte.MaxValue,
}
///
/// Specifies how to load/parse DataRow messages as they're received from the backend.
///
internal enum DataRowLoadingMode
{
///
/// Load DataRows in non-sequential mode
///
NonSequential,
///
/// Load DataRows in sequential mode
///
Sequential,
///
/// Skip DataRow messages altogether
///
Skip
}
///
/// Represents the method that allows the application to provide a certificate collection to be used for SSL clien authentication
///
/// A X509CertificateCollection to be filled with one or more client certificates.
public delegate void ProvideClientCertificatesCallback(X509CertificateCollection certificates);
///
/// Represents the method that is called to validate the certificate provided by the server during an SSL handshake
///
/// The server's certificate
/// The certificate chain containing the certificate's CA and any intermediate authorities
/// Any errors that were detected
public delegate bool ValidateRemoteCertificateCallback(X509Certificate cert, X509Chain chain, SslPolicyErrors errors);
}