forked from madelson/DistributedLock
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathOracleDbmsLock.cs
More file actions
140 lines (124 loc) · 6.2 KB
/
OracleDbmsLock.cs
File metadata and controls
140 lines (124 loc) · 6.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
using Medallion.Threading.Internal;
using Medallion.Threading.Internal.Data;
using System;
using System.Collections.Generic;
using System.Data;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Medallion.Threading.Oracle
{
/// <summary>
/// Implements <see cref="IDbSynchronizationStrategy{TLockCookie}"/> using Oracle's DBMS_LOCK package
/// </summary>
internal class OracleDbmsLock : IDbSynchronizationStrategy<object>
{
// https://docs.oracle.com/cd/B19306_01/appdev.102/b14258/d_lock.htm#i1002309
private const int MaxWaitSeconds = 32767;
private const int MaxTimeoutSeconds = MaxWaitSeconds - 1;
public static readonly OracleDbmsLock SharedLock = new OracleDbmsLock(Mode.Shared),
UpdateLock = new OracleDbmsLock(Mode.Update),
ExclusiveLock = new OracleDbmsLock(Mode.Exclusive),
UpgradeLock = new OracleDbmsLock(Mode.Exclusive, isUpgrade: true);
private static readonly object Cookie = new();
private readonly Mode _mode;
private readonly bool _isUpgrade;
private OracleDbmsLock(Mode mode, bool isUpgrade = false)
{
Invariant.Require(!isUpgrade || mode == Mode.Exclusive);
this._mode = mode;
this._isUpgrade = isUpgrade;
}
public bool IsUpgradeable => this._mode == Mode.Update;
private string ModeSqlConstant
{
get
{
var modeCode = this._mode switch
{
Mode.Shared => "SS",
Mode.Update => "SSX",
Mode.Exclusive => "X",
_ => throw new InvalidOperationException(),
};
return $"SYS.DBMS_LOCK.{modeCode}_MODE";
}
}
public async ValueTask ReleaseAsync(DatabaseConnection connection, string resourceName, object lockCookie)
{
// Since we we don't allow downgrading and therefore "releasing" an upgrade only happens on disposal of the
// original handle, this can safely be a noop.
if (this._isUpgrade) { return; }
using var command = connection.CreateCommand();
command.SetCommandText(@"
DECLARE
lockHandle VARCHAR2(128);
BEGIN
SYS.DBMS_LOCK.ALLOCATE_UNIQUE(:lockName, lockHandle);
:returnValue := SYS.DBMS_LOCK.RELEASE(lockHandle);
END;"
);
// note: parameters bind by position by default!
command.AddParameter("lockName", resourceName);
var returnValueParameter = command.AddParameter("returnValue", type: DbType.Int32, direction: ParameterDirection.Output);
await command.ExecuteNonQueryAsync(CancellationToken.None).ConfigureAwait(false);
var returnValue = (int)returnValueParameter.Value;
if (returnValue != 0)
{
// we don't enumerate the error codes here because release shouldn't ever fail unless the user really messes things up
throw new InvalidOperationException($"SYS.DBMS_LOCK.RELEASE returned error code {returnValue}");
}
}
public async ValueTask<object?> TryAcquireAsync(DatabaseConnection connection, string resourceName, TimeoutValue timeout, CancellationToken cancellationToken)
{
var acquireFunction = this._isUpgrade ? "CONVERT" : "REQUEST";
using var command = connection.CreateCommand();
command.SetCommandText($@"
DECLARE
lockHandle VARCHAR2(128);
BEGIN
SYS.DBMS_LOCK.ALLOCATE_UNIQUE(:lockName, lockHandle);
:returnValue := SYS.DBMS_LOCK.{acquireFunction}(lockhandle => lockHandle, lockmode => {this.ModeSqlConstant}, timeout => :timeout);
END;"
);
// note: parameters bind by position by default!
command.AddParameter("lockName", resourceName);
var returnValueParameter = command.AddParameter("returnValue", type: DbType.Int32, direction: ParameterDirection.Output);
command.AddParameter(
"timeout",
timeout.IsInfinite ? MaxWaitSeconds
// we could support longer timeouts via looping lock requests, but this doesn't feel particularly valuable and isn't a true longer wait
// since by looping you fall out of the wait queue
: timeout.TimeSpan.TotalSeconds > MaxTimeoutSeconds ? throw new ArgumentOutOfRangeException($"Requested non-infinite timeout value '{timeout}' is longer than Oracle's allowed max of '{TimeSpan.FromSeconds(MaxTimeoutSeconds)}'")
: timeout.TimeSpan.TotalSeconds
);
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
var returnValue = (int)returnValueParameter.Value;
return returnValue switch
{
0 => Cookie, // success
1 => null, // timeout
2 => throw new DeadlockException(GetErrorMessage("deadlock")),
3 => throw new InvalidOperationException(GetErrorMessage("parameter error")),
4 => timeout.IsZero ? null
: timeout.IsInfinite ? throw new DeadlockException("Attempted to acquire a lock that is already held on the same connection")
: await WaitThenReturnNullAsync().ConfigureAwait(false),
5 => throw new InvalidOperationException(GetErrorMessage("illegal lock handle")),
_ => throw new InvalidOperationException(GetErrorMessage("unknown error code")),
};
string GetErrorMessage(string description) =>
$"SYS.DBMS_LOCK.{acquireFunction} returned error code {returnValue} ({description})";
async ValueTask<object?> WaitThenReturnNullAsync()
{
await SyncViaAsync.Delay(timeout, cancellationToken).ConfigureAwait(false);
return null;
}
}
private enum Mode
{
Shared,
Update,
Exclusive,
}
}
}