forked from madelson/DistributedLock
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathProgram.cs
More file actions
162 lines (153 loc) · 8.46 KB
/
Program.cs
File metadata and controls
162 lines (153 loc) · 8.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
using System;
using Medallion.Threading.SqlServer;
using Medallion.Threading.WaitHandles;
using Medallion.Threading.Postgres;
using Medallion.Threading.Tests;
using Medallion.Threading.Azure;
using Azure.Storage.Blobs;
using Medallion.Threading.FileSystem;
using System.IO;
using Medallion.Threading.Redis;
using StackExchange.Redis;
using System.Linq;
using Medallion.Threading;
using System.Drawing.Text;
using System.Collections.Generic;
using Medallion.Threading.ZooKeeper;
#if NET471
using System.Data.SqlClient;
#elif NETCOREAPP3_1
using Microsoft.Data.SqlClient;
#endif
namespace DistributedLockTaker
{
internal static class Program
{
public static int Main(string[] args)
{
var type = args[0];
var name = args[1];
IDisposable? handle;
switch (type)
{
case nameof(SqlDistributedLock):
handle = new SqlDistributedLock(name, SqlServerCredentials.ConnectionString).Acquire();
break;
case "Write" + nameof(SqlDistributedReaderWriterLock):
handle = new SqlDistributedReaderWriterLock(name, SqlServerCredentials.ConnectionString).AcquireWriteLock();
break;
case nameof(SqlDistributedSemaphore) + "1AsMutex":
handle = new SqlDistributedSemaphore(name, maxCount: 1, connectionString: SqlServerCredentials.ConnectionString).Acquire();
break;
case nameof(SqlDistributedSemaphore) + "5AsMutex":
handle = new SqlDistributedSemaphore(name, maxCount: 5, connectionString: SqlServerCredentials.ConnectionString).Acquire();
break;
case nameof(PostgresDistributedLock):
handle = new PostgresDistributedLock(new PostgresAdvisoryLockKey(name), PostgresCredentials.GetConnectionString(Environment.CurrentDirectory)).Acquire();
break;
case "Write" + nameof(PostgresDistributedReaderWriterLock):
handle = new PostgresDistributedReaderWriterLock(new PostgresAdvisoryLockKey(name), PostgresCredentials.GetConnectionString(Environment.CurrentDirectory)).AcquireWriteLock();
break;
case nameof(EventWaitHandleDistributedLock):
handle = new EventWaitHandleDistributedLock(name).Acquire();
break;
case nameof(WaitHandleDistributedSemaphore) + "1AsMutex":
handle = new WaitHandleDistributedSemaphore(name, maxCount: 1).Acquire();
break;
case nameof(WaitHandleDistributedSemaphore) + "5AsMutex":
handle = new WaitHandleDistributedSemaphore(name, maxCount: 5).Acquire();
break;
case nameof(AzureBlobLeaseDistributedLock):
handle = new AzureBlobLeaseDistributedLock(
new BlobClient(AzureCredentials.ConnectionString, AzureCredentials.DefaultBlobContainerName, name),
o => o.Duration(TimeSpan.FromSeconds(15))
)
.Acquire();
break;
case nameof(FileDistributedLock):
handle = new FileDistributedLock(new FileInfo(name)).Acquire();
break;
case nameof(RedisDistributedLock) + "1":
handle = AcquireRedisLock(name, serverCount: 1);
break;
case nameof(RedisDistributedLock) + "3":
handle = AcquireRedisLock(name, serverCount: 3);
break;
case nameof(RedisDistributedLock) + "2x1":
handle = AcquireRedisLock(name, serverCount: 2); // we know the last will fail; don't bother (we also don't know its port)
break;
case nameof(RedisDistributedLock) + "1WithPrefix":
handle = AcquireRedisLock("distributed_locks:" + name, serverCount: 1);
break;
case "Write" + nameof(RedisDistributedReaderWriterLock) + "1":
handle = AcquireRedisWriteLock(name, serverCount: 1);
break;
case "Write" + nameof(RedisDistributedReaderWriterLock) + "3":
handle = AcquireRedisWriteLock(name, serverCount: 3);
break;
case "Write" + nameof(RedisDistributedReaderWriterLock) + "2x1":
handle = AcquireRedisWriteLock(name, serverCount: 2); // we know the last will fail; don't bother (we also don't know its port)
break;
case "Write" + nameof(RedisDistributedReaderWriterLock) + "1WithPrefix":
handle = AcquireRedisWriteLock("distributed_locks:" + name, serverCount: 1);
break;
case string _ when type.StartsWith(nameof(RedisDistributedSemaphore)):
{
var maxCount = type.EndsWith("1AsMutex") ? 1
: type.EndsWith("5AsMutex") ? 5
: throw new ArgumentException(type);
handle = new RedisDistributedSemaphore(
name,
maxCount,
GetRedisDatabases(serverCount: 1).Single(),
// in order to see abandonment work in a reasonable timeframe, use very short expiry
options => options.Expiry(TimeSpan.FromSeconds(1))
.BusyWaitSleepTime(TimeSpan.FromSeconds(.1), TimeSpan.FromSeconds(.3))
).Acquire();
break;
}
case nameof(ZooKeeperDistributedLock):
handle = new ZooKeeperDistributedLock(new ZooKeeperPath(name), ZooKeeperPorts.DefaultConnectionString, options: ZooKeeperOptions).AcquireAsync().Result;
break;
case "Write" + nameof(ZooKeeperDistributedReaderWriterLock):
handle = new ZooKeeperDistributedReaderWriterLock(new ZooKeeperPath(name), ZooKeeperPorts.DefaultConnectionString, options: ZooKeeperOptions).AcquireWriteLockAsync().Result;
break;
case string _ when type.StartsWith(nameof(ZooKeeperDistributedSemaphore)):
{
var maxCount = type.EndsWith("1AsMutex") ? 1
: type.EndsWith("5AsMutex") ? 5
: throw new ArgumentException(type);
handle = new ZooKeeperDistributedSemaphore(
new ZooKeeperPath(name),
maxCount,
ZooKeeperPorts.DefaultConnectionString,
options: ZooKeeperOptions
).AcquireAsync().Result;
break;
}
default:
Console.Error.WriteLine($"type: {type}");
return 123;
}
Console.WriteLine("Acquired");
Console.Out.Flush();
if (Console.ReadLine() != "abandon")
{
handle.Dispose();
}
return 0;
}
private static IDistributedSynchronizationHandle AcquireRedisLock(string name, int serverCount) =>
new RedisDistributedLock(name, GetRedisDatabases(serverCount), RedisOptions).Acquire();
private static IDistributedSynchronizationHandle AcquireRedisWriteLock(string name, int serverCount) =>
new RedisDistributedReaderWriterLock(name, GetRedisDatabases(serverCount), RedisOptions).AcquireWriteLock();
private static IEnumerable<IDatabase> GetRedisDatabases(int serverCount) => RedisPorts.DefaultPorts.Take(serverCount)
.Select(port => ConnectionMultiplexer.Connect($"localhost:{port}").GetDatabase());
private static void RedisOptions(RedisDistributedSynchronizationOptionsBuilder options) =>
options.Expiry(TimeSpan.FromSeconds(.5)) // short expiry for abandonment testing
.BusyWaitSleepTime(TimeSpan.FromSeconds(.1), TimeSpan.FromSeconds(.3));
private static void ZooKeeperOptions(ZooKeeperDistributedSynchronizationOptionsBuilder options) =>
// use a very short session timeout to support abandonment
options.SessionTimeout(TimeSpan.FromSeconds(.1));
}
}