X Tutup
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions src/Npgsql/Internal/NpgsqlConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -485,9 +485,18 @@ internal async Task Open(NpgsqlTimeout timeout, bool async, CancellationToken ca
LogMessages.OpeningPhysicalConnection(ConnectionLogger, Host, Port, Database, UserFacingConnectionString);
var startOpenTimestamp = Stopwatch.GetTimestamp();

Activity? activity = null;

try
{
await OpenCore(this, Settings.SslMode, timeout, async, cancellationToken).ConfigureAwait(false);
var username = await GetUsernameAsync(async, cancellationToken).ConfigureAwait(false);

activity = NpgsqlActivitySource.ConnectionOpen(this);

await OpenCore(this, username, Settings.SslMode, timeout, async, cancellationToken).ConfigureAwait(false);

if (activity is not null)
NpgsqlActivitySource.Enrich(activity, this);

await DataSource.Bootstrap(this, timeout, forceReload: false, async, cancellationToken).ConfigureAwait(false);

Expand All @@ -510,6 +519,8 @@ internal async Task Open(NpgsqlTimeout timeout, bool async, CancellationToken ca
// It is intentionally not awaited and will run as long as the connector is alive.
// The CommandsInFlightWriter channel is completed in Cleanup, which should cause this task
// to complete.
// Make sure we do not flow AsyncLocals like Activity.Current
using var __ = ExecutionContext.SuppressFlow();
_ = Task.Run(MultiplexingReadLoop, CancellationToken.None)
.ContinueWith(t =>
{
Expand Down Expand Up @@ -540,7 +551,7 @@ internal async Task Open(NpgsqlTimeout timeout, bool async, CancellationToken ca
{
if (async)
await DataSource.ConnectionInitializerAsync(tempConnection).ConfigureAwait(false);
else if (!async)
else
DataSource.ConnectionInitializer(tempConnection);
}
finally
Expand All @@ -553,26 +564,31 @@ internal async Task Open(NpgsqlTimeout timeout, bool async, CancellationToken ca
}
}

if (activity is not null)
NpgsqlActivitySource.CommandStop(activity);

LogMessages.OpenedPhysicalConnection(
ConnectionLogger, Host, Port, Database, UserFacingConnectionString, (long)Stopwatch.GetElapsedTime(startOpenTimestamp).TotalMilliseconds, Id);
ConnectionLogger, Host, Port, Database, UserFacingConnectionString,
(long)Stopwatch.GetElapsedTime(startOpenTimestamp).TotalMilliseconds, Id);
}
catch (Exception e)
{
if (activity is not null)
NpgsqlActivitySource.SetException(activity, e);
Break(e);
throw;
}

static async Task OpenCore(
NpgsqlConnector conn,
string username,
SslMode sslMode,
NpgsqlTimeout timeout,
bool async,
CancellationToken cancellationToken)
{
await conn.RawOpen(sslMode, timeout, async, cancellationToken).ConfigureAwait(false);

var username = await conn.GetUsernameAsync(async, cancellationToken).ConfigureAwait(false);

timeout.CheckAndApply(conn);
conn.WriteStartupMessage(username);
await conn.Flush(async, cancellationToken).ConfigureAwait(false);
Expand All @@ -595,6 +611,7 @@ static async Task OpenCore(
// If Allow was specified and we failed (without SSL), retry with SSL
await OpenCore(
conn,
username,
sslMode == SslMode.Prefer ? SslMode.Disable : SslMode.Require,
timeout,
async,
Expand Down Expand Up @@ -754,6 +771,8 @@ async Task RawOpen(SslMode sslMode, NpgsqlTimeout timeout, bool async, Cancellat
else
Connect(timeout);

ConnectionLogger.LogTrace("Socket connected to {Host}:{Port}", Host, Port);

_baseStream = new NetworkStream(_socket, true);
_stream = _baseStream;

Expand Down Expand Up @@ -810,8 +829,6 @@ async Task RawOpen(SslMode sslMode, NpgsqlTimeout timeout, bool async, Cancellat
if (ReadBuffer.ReadBytesLeft > 0)
throw new NpgsqlException("Additional unencrypted data received after SSL negotiation - this should never happen, and may be an indication of a man-in-the-middle attack.");
}

ConnectionLogger.LogTrace("Socket connected to {Host}:{Port}", Host, Port);
}
catch
{
Expand Down
27 changes: 21 additions & 6 deletions src/Npgsql/MultiplexingDataSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ internal MultiplexingDataSource(
_connectionLogger = dataSourceConfig.LoggingConfiguration.ConnectionLogger;
_commandLogger = dataSourceConfig.LoggingConfiguration.CommandLogger;

// Make sure we do not flow AsyncLocals like Activity.Current
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Though a part of me thinks we should maybe start the activity only after the data source has been set up - that really isn't part of physical connection open. What do you think?

using var _ = ExecutionContext.SuppressFlow();
_multiplexWriteLoop = Task.Run(MultiplexingWriteLoop, CancellationToken.None)
.ContinueWith(t =>
{
Expand Down Expand Up @@ -106,15 +108,28 @@ async Task MultiplexingWriteLoop()
break;
}

connector = await OpenNewConnector(
command.InternalConnection!,
new NpgsqlTimeout(TimeSpan.FromSeconds(Settings.Timeout)),
async: true,
CancellationToken.None).ConfigureAwait(false);
// At no point should we ever have an activity here
Debug.Assert(Activity.Current is null);
// Set current activity as the one from the command
// So child activities from physical open are bound to it
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good thinking here.

Though at some point we should have the conversation about whether we want to keep multiplexing, given that it's known to be non-scalable (e.g. and is no longer used in the TE benchmarks).

Activity.Current = command.CurrentActivity;

try
{
connector = await OpenNewConnector(
command.InternalConnection!,
new NpgsqlTimeout(TimeSpan.FromSeconds(Settings.Timeout)),
async: true,
CancellationToken.None).ConfigureAwait(false);
}
finally
{
Activity.Current = null;
}

if (connector != null)
{
// Managed to created a new connector
// Managed to create a new connector
connector.Connection = null;

// See increment under over-capacity mode below
Expand Down
18 changes: 17 additions & 1 deletion src/Npgsql/NpgsqlActivitySource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Npgsql;

static class NpgsqlActivitySource
{
static readonly ActivitySource Source = new("Npgsql", "0.1.0");
static readonly ActivitySource Source = new("Npgsql", "0.2.0");

internal static bool IsEnabled => Source.HasListeners();

Expand Down Expand Up @@ -61,6 +61,22 @@ static class NpgsqlActivitySource
return activity;
}

internal static Activity? ConnectionOpen(NpgsqlConnector connector)
{
if (!connector.DataSource.Configuration.TracingOptions.EnablePhysicalOpenTracing)
return null;

var dbName = connector.Settings.Database ?? connector.InferredUserName;
var activity = Source.StartActivity(dbName, ActivityKind.Client);
if (activity is not { IsAllDataRequested: true })
return activity;

activity.SetTag("db.system", "postgresql");
activity.SetTag("db.connection_string", connector.UserFacingConnectionString);

return activity;
}

internal static void Enrich(Activity activity, NpgsqlConnector connector)
{
if (!activity.IsAllDataRequested)
Expand Down
2 changes: 1 addition & 1 deletion src/Npgsql/NpgsqlCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class NpgsqlCommand : DbCommand, ICloneable, IComponent

internal List<NpgsqlBatchCommand> InternalBatchCommands { get; }

Activity? CurrentActivity;
internal Activity? CurrentActivity { get; private set; }

/// <summary>
/// Returns details about each statement that this command has executed.
Expand Down
15 changes: 14 additions & 1 deletion src/Npgsql/NpgsqlTracingOptionsBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public sealed class NpgsqlTracingOptionsBuilder
Func<NpgsqlCommand, string?>? _commandSpanNameProvider;
Func<NpgsqlBatch, string?>? _batchSpanNameProvider;
bool _enableFirstResponseEvent = true;
bool _enablePhysicalOpenTracing = true;

internal NpgsqlTracingOptionsBuilder()
{
Expand Down Expand Up @@ -88,6 +89,16 @@ public NpgsqlTracingOptionsBuilder EnableFirstResponseEvent(bool enable = true)
return this;
}

/// <summary>
/// Gets or sets a value indicating whether to trace physical connection open.
/// Default is true to preserve existing behavior.
/// </summary>
public NpgsqlTracingOptionsBuilder EnablePhysicalOpenTracing(bool enable = true)
{
_enablePhysicalOpenTracing = enable;
return this;
}

internal NpgsqlTracingOptions Build() => new()
{
CommandFilter = _commandFilter,
Expand All @@ -96,7 +107,8 @@ public NpgsqlTracingOptionsBuilder EnableFirstResponseEvent(bool enable = true)
BatchEnrichmentCallback = _batchEnrichmentCallback,
CommandSpanNameProvider = _commandSpanNameProvider,
BatchSpanNameProvider = _batchSpanNameProvider,
EnableFirstResponseEvent = _enableFirstResponseEvent
EnableFirstResponseEvent = _enableFirstResponseEvent,
EnablePhysicalOpenTracing = _enablePhysicalOpenTracing
};
}

Expand All @@ -109,4 +121,5 @@ sealed class NpgsqlTracingOptions
internal Func<NpgsqlCommand, string?>? CommandSpanNameProvider { get; init; }
internal Func<NpgsqlBatch, string?>? BatchSpanNameProvider { get; init; }
internal bool EnableFirstResponseEvent { get; init; }
internal bool EnablePhysicalOpenTracing { get; init; }
}
1 change: 1 addition & 0 deletions src/Npgsql/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Npgsql.NpgsqlTracingOptionsBuilder.ConfigureCommandEnrichmentCallback(System.Act
Npgsql.NpgsqlTracingOptionsBuilder.ConfigureCommandFilter(System.Func<Npgsql.NpgsqlCommand!, bool>? commandFilter) -> Npgsql.NpgsqlTracingOptionsBuilder!
Npgsql.NpgsqlTracingOptionsBuilder.ConfigureCommandSpanNameProvider(System.Func<Npgsql.NpgsqlCommand!, string?>? commandSpanNameProvider) -> Npgsql.NpgsqlTracingOptionsBuilder!
Npgsql.NpgsqlTracingOptionsBuilder.EnableFirstResponseEvent(bool enable = true) -> Npgsql.NpgsqlTracingOptionsBuilder!
Npgsql.NpgsqlTracingOptionsBuilder.EnablePhysicalOpenTracing(bool enable = true) -> Npgsql.NpgsqlTracingOptionsBuilder!
Npgsql.NpgsqlTypeLoadingOptionsBuilder
Npgsql.NpgsqlTypeLoadingOptionsBuilder.EnableTableCompositesLoading(bool enable = true) -> Npgsql.NpgsqlTypeLoadingOptionsBuilder!
Npgsql.NpgsqlTypeLoadingOptionsBuilder.EnableTypeLoading(bool enable = true) -> Npgsql.NpgsqlTypeLoadingOptionsBuilder!
Expand Down
Loading
X Tutup