-
Notifications
You must be signed in to change notification settings - Fork 874
Expand file tree
/
Copy pathSafeReplicationTestBase.cs
More file actions
175 lines (152 loc) · 7.55 KB
/
SafeReplicationTestBase.cs
File metadata and controls
175 lines (152 loc) · 7.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;
using Npgsql.Replication;
namespace Npgsql.Tests.Replication;
public abstract class SafeReplicationTestBase<TConnection> : TestBase
where TConnection : ReplicationConnection, new()
{
protected abstract string Postfix { get; }
int _maxIdentifierLength;
static Version CurrentServerVersion = null!;
[OneTimeSetUp]
public async Task OneTimeSetUp()
{
await using var conn = await OpenConnectionAsync();
CurrentServerVersion = conn.PostgreSqlVersion;
_maxIdentifierLength = int.Parse((string)(await conn.ExecuteScalarAsync("SHOW max_identifier_length"))!);
}
[SetUp]
public async Task Setup()
{
await using var conn = await OpenConnectionAsync();
var walLevel = (string)(await conn.ExecuteScalarAsync("SHOW wal_level"))!;
if (walLevel != "logical")
TestUtil.IgnoreExceptOnBuildServer("wal_level needs to be set to 'logical' in the PostgreSQL conf");
var maxWalSenders = int.Parse((string)(await conn.ExecuteScalarAsync("SHOW max_wal_senders"))!);
if (maxWalSenders < 50)
{
TestUtil.IgnoreExceptOnBuildServer(
$"max_wal_senders is too low ({maxWalSenders}) and could lead to transient failures. Skipping replication tests");
}
}
private protected Task<TConnection> OpenReplicationConnectionAsync(
NpgsqlConnectionStringBuilder csb,
CancellationToken cancellationToken = default)
=> OpenReplicationConnectionAsync(csb.ToString(), cancellationToken);
private protected async Task<TConnection> OpenReplicationConnectionAsync(
string? connectionString = null,
CancellationToken cancellationToken = default)
{
var c = new TConnection { ConnectionString = connectionString ?? ConnectionString };
await c.Open(cancellationToken);
return c;
}
private protected static async Task AssertReplicationCancellation<T>(IAsyncEnumerator<T> enumerator, bool streamingStarted = true)
{
try
{
var succeeded = await enumerator.MoveNextAsync();
Assert.Fail(succeeded
? $"Expected replication cancellation but got message: {enumerator.Current}"
: "Expected replication cancellation but reached enumeration end instead");
}
catch (Exception e)
{
Assert.That(e, streamingStarted && CurrentServerVersion >= Pg10Version
? Is.AssignableTo<OperationCanceledException>()
.With.InnerException.InstanceOf<PostgresException>()
.And.InnerException.Property(nameof(PostgresException.SqlState))
.EqualTo(PostgresErrorCodes.QueryCanceled)
: Is.AssignableTo<OperationCanceledException>()
.With.InnerException.Null);
}
}
private protected Task SafeReplicationTest(Func<string, string, Task> testAction, [CallerMemberName] string memberName = "")
=> SafeReplicationTestCore((slotName, tableNames, publicationName) => testAction(slotName, tableNames[0]), 1, memberName);
private protected Task SafeReplicationTest(Func<string, string, string, Task> testAction, [CallerMemberName] string memberName = "")
=> SafeReplicationTestCore((slotName, tableNames, publicationName) => testAction(slotName, tableNames[0], publicationName), 1, memberName);
private protected Task SafeReplicationTest(Func<string, string[], string, Task> testAction, int tableCount, [CallerMemberName] string memberName = "")
=> SafeReplicationTestCore(testAction, tableCount, memberName);
static readonly Version Pg10Version = new(10, 0);
async Task SafeReplicationTestCore(Func<string, string[], string, Task> testAction, int tableCount, string memberName)
{
// if the supplied name is too long we create on from a guid.
var baseName = $"{memberName}_{Postfix}";
var name = (baseName.Length > _maxIdentifierLength - 4 ? Guid.NewGuid().ToString("N") : baseName).ToLowerInvariant();
var slotName = $"s_{name}".ToLowerInvariant();
var tableNames = new string[tableCount];
for (var i = tableNames.Length - 1; i >= 0; i--)
{
tableNames[i] = $"t{(tableCount == 1 ? "" : i.ToString())}_{name}".ToLowerInvariant();
}
var publicationName = $"p_{name}".ToLowerInvariant();
await Cleanup();
try
{
await testAction(slotName, tableNames, publicationName);
}
finally
{
await Cleanup();
}
async Task Cleanup()
{
await using var c = await OpenConnectionAsync();
try
{
await DropSlot();
}
catch (PostgresException e) when (e.SqlState == PostgresErrorCodes.ObjectInUse && e.Message.Contains(slotName))
{
// The slot is still in use. Probably because we didn't terminate
// the streaming replication properly.
// The following is ugly, but let's try to clean up after us if we can.
var pid = Regex.Match(e.MessageText, "PID (?<pid>\\d+)", RegexOptions.IgnoreCase).Groups["pid"];
if (pid.Success)
{
await c.ExecuteNonQueryAsync($"SELECT pg_terminate_backend ({pid.Value})");
for (var i = 0; (bool)(await c.ExecuteScalarAsync($"SELECT EXISTS(SELECT * FROM pg_stat_replication where pid = {pid.Value})"))! && i < 20; i++)
await Task.Delay(TimeSpan.FromSeconds(1));
}
else
{
// Old backends don't report the PID
for (var i = 0; (bool)(await c.ExecuteScalarAsync("SELECT EXISTS(SELECT * FROM pg_stat_replication)"))! && i < 20; i++)
await Task.Delay(TimeSpan.FromSeconds(1));
}
try
{
await DropSlot();
}
catch (PostgresException e2) when (e2.SqlState == PostgresErrorCodes.ObjectInUse && e2.Message.Contains(slotName))
{
// We failed to drop the slot, even after 20 seconds. Swallow the exception to avoid failing the test, we'll
// likely drop it the next time the test is executed (Cleanup is executed before starting the test as well).
return;
}
}
if (c.PostgreSqlVersion >= Pg10Version)
await c.ExecuteNonQueryAsync($"DROP PUBLICATION IF EXISTS {publicationName}");
for (var i = tableNames.Length - 1; i >= 0; i--)
await c.ExecuteNonQueryAsync($"DROP TABLE IF EXISTS {tableNames[i]} CASCADE;");
async Task DropSlot()
{
try
{
await c.ExecuteNonQueryAsync($"SELECT pg_drop_replication_slot('{slotName}')");
}
catch (PostgresException ex) when (ex.SqlState == PostgresErrorCodes.UndefinedObject && ex.Message.Contains(slotName))
{
// Temporary slots might already have been deleted
// We don't care as long as it's gone and we don't have to clean it up
}
}
}
}
private protected static CancellationToken GetCancelledCancellationToken() => new(canceled: true);
}