forked from quay/quay
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfields.py
More file actions
336 lines (237 loc) · 9.38 KB
/
fields.py
File metadata and controls
336 lines (237 loc) · 9.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
import base64
import string
import json
from random import SystemRandom
import bcrypt
import resumablehashlib
from peewee import TextField, CharField, SmallIntegerField
from data.text import prefix_search
def random_string(length=16):
random = SystemRandom()
return "".join([random.choice(string.ascii_uppercase + string.digits) for _ in range(length)])
class _ResumableSHAField(TextField):
def _create_sha(self):
raise NotImplementedError
def db_value(self, value):
if value is None:
return None
sha_state = value.state()
# One of the fields is a byte string, let's base64 encode it to make sure
# we can store and fetch it regardless of default collocation.
sha_state[3] = base64.b64encode(sha_state[3])
return json.dumps(sha_state)
def python_value(self, value):
if value is None:
return None
sha_state = json.loads(value)
# We need to base64 decode the data bytestring.
sha_state[3] = base64.b64decode(sha_state[3])
to_resume = self._create_sha()
to_resume.set_state(sha_state)
return to_resume
class ResumableSHA256Field(_ResumableSHAField):
def _create_sha(self):
return resumablehashlib.sha256()
class ResumableSHA1Field(_ResumableSHAField):
def _create_sha(self):
return resumablehashlib.sha1()
class JSONField(TextField):
def db_value(self, value):
return json.dumps(value)
def python_value(self, value):
if value is None or value == "":
return {}
return json.loads(value)
class Base64BinaryField(TextField):
def db_value(self, value):
if value is None:
return None
return base64.b64encode(value)
def python_value(self, value):
if value is None:
return None
return base64.b64decode(value)
class DecryptedValue(object):
"""
Wrapper around an already decrypted value to be placed into an encrypted field.
"""
def __init__(self, decrypted_value):
assert decrypted_value is not None
assert isinstance(decrypted_value, basestring)
self.value = decrypted_value
def decrypt(self):
return self.value
def matches(self, unencrypted_value):
"""
Returns whether the value of this field matches the unencrypted_value.
"""
return self.decrypt() == unencrypted_value
class LazyEncryptedValue(object):
"""
Wrapper around an encrypted value in an encrypted field.
Will decrypt lazily.
"""
def __init__(self, encrypted_value, field):
self.encrypted_value = encrypted_value
self._field = field
def decrypt(self, encrypter=None):
"""
Decrypts the value.
"""
encrypter = encrypter or self._field.model._meta.encrypter
return encrypter.decrypt_value(self.encrypted_value)
def matches(self, unencrypted_value):
"""
Returns whether the value of this field matches the unencrypted_value.
"""
return self.decrypt() == unencrypted_value
def __eq__(self, _):
raise Exception("Disallowed operation; use `matches`")
def __mod__(self, _):
raise Exception("Disallowed operation; use `matches`")
def __pow__(self, _):
raise Exception("Disallowed operation; use `matches`")
def __contains__(self, _):
raise Exception("Disallowed operation; use `matches`")
def contains(self, _):
raise Exception("Disallowed operation; use `matches`")
def startswith(self, _):
raise Exception("Disallowed operation; use `matches`")
def endswith(self, _):
raise Exception("Disallowed operation; use `matches`")
def _add_encryption(field_class, requires_length_check=True):
"""
Adds support for encryption and decryption to the given field class.
"""
class indexed_class(field_class):
def __init__(self, default_token_length=None, *args, **kwargs):
def _generate_default():
return DecryptedValue(random_string(default_token_length))
if default_token_length is not None:
kwargs["default"] = _generate_default
field_class.__init__(self, *args, **kwargs)
assert not self.index
def db_value(self, value):
if value is None:
return None
if isinstance(value, LazyEncryptedValue):
return value.encrypted_value
if isinstance(value, DecryptedValue):
value = value.value
meta = self.model._meta
return meta.encrypter.encrypt_value(
value, self.max_length if requires_length_check else None
)
def python_value(self, value):
if value is None:
return None
return LazyEncryptedValue(value, self)
def __eq__(self, _):
raise Exception("Disallowed operation; use `matches`")
def __mod__(self, _):
raise Exception("Disallowed operation; use `matches`")
def __pow__(self, _):
raise Exception("Disallowed operation; use `matches`")
def __contains__(self, _):
raise Exception("Disallowed operation; use `matches`")
def contains(self, _):
raise Exception("Disallowed operation; use `matches`")
def startswith(self, _):
raise Exception("Disallowed operation; use `matches`")
def endswith(self, _):
raise Exception("Disallowed operation; use `matches`")
return indexed_class
EncryptedCharField = _add_encryption(CharField)
EncryptedTextField = _add_encryption(TextField, requires_length_check=False)
class EnumField(SmallIntegerField):
def __init__(self, enum_type, *args, **kwargs):
kwargs.pop("index", None)
super(EnumField, self).__init__(index=True, *args, **kwargs)
self.enum_type = enum_type
def db_value(self, value):
"""
Convert the python value for storage in the database.
"""
return int(value.value)
def python_value(self, value):
"""
Convert the database value to a pythonic value.
"""
return self.enum_type(value) if value is not None else None
def clone_base(self, **kwargs):
return super(EnumField, self).clone_base(enum_type=self.enum_type, **kwargs)
def _add_fulltext(field_class):
"""
Adds support for full text indexing and lookup to the given field class.
"""
class indexed_class(field_class):
# Marker used by SQLAlchemy translation layer to add the proper index for full text searching.
__fulltext__ = True
def __init__(self, match_function, *args, **kwargs):
field_class.__init__(self, *args, **kwargs)
self.match_function = match_function
def match(self, query):
return self.match_function(self, query)
def match_prefix(self, query):
return prefix_search(self, query)
def __mod__(self, _):
raise Exception("Unsafe operation: Use `match` or `match_prefix`")
def __pow__(self, _):
raise Exception("Unsafe operation: Use `match` or `match_prefix`")
def __contains__(self, _):
raise Exception("Unsafe operation: Use `match` or `match_prefix`")
def contains(self, _):
raise Exception("Unsafe operation: Use `match` or `match_prefix`")
def startswith(self, _):
raise Exception("Unsafe operation: Use `match` or `match_prefix`")
def endswith(self, _):
raise Exception("Unsafe operation: Use `match` or `match_prefix`")
return indexed_class
FullIndexedCharField = _add_fulltext(CharField)
FullIndexedTextField = _add_fulltext(TextField)
class Credential(object):
"""
Credential represents a hashed credential.
"""
def __init__(self, hashed):
self.hashed = hashed
def matches(self, value):
"""
Returns true if this credential matches the unhashed value given.
"""
return bcrypt.hashpw(value.encode("utf-8"), self.hashed) == self.hashed
@classmethod
def from_string(cls, string_value):
"""
Returns a Credential object from an unhashed string value.
"""
return Credential(bcrypt.hashpw(string_value.encode("utf-8"), bcrypt.gensalt()))
@classmethod
def generate(cls, length=20):
"""
Generates a new credential and returns it, along with its unhashed form.
"""
token = random_string(length)
return Credential.from_string(token), token
class CredentialField(CharField):
"""
A character field that stores crytographically hashed credentials that should never be available
to the user in plaintext after initial creation.
This field automatically provides verification.
"""
def __init__(self, *args, **kwargs):
CharField.__init__(self, *args, **kwargs)
assert "default" not in kwargs
assert not self.index
def db_value(self, value):
if value is None:
return None
if isinstance(value, basestring):
raise Exception(
"A string cannot be given to a CredentialField; please wrap in a Credential"
)
return value.hashed
def python_value(self, value):
if value is None:
return None
return Credential(value)