-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathstore.py
More file actions
97 lines (83 loc) · 2.71 KB
/
store.py
File metadata and controls
97 lines (83 loc) · 2.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import os
import requests
import json
from .common import API_URL
class _Store():
def __init__(self, interceptor):
self.path = '/store'
self.interceptor = interceptor
def insert(self, collection, document):
r = requests.post('{}{}/{}'.format(API_URL, self.path, collection), auth=self.interceptor, json=document)
data = r.json()
if not 200 <= r.status_code < 300:
raise StoreError(data['code'], data['message'])
return data
def get(self, collection, id):
r = requests.get('{}{}/{}/{}'.format(API_URL, self.path, collection, id), auth=self.interceptor)
data = r.json()
if not 200 <= r.status_code < 300:
raise StoreError(data['code'], data['message'])
return data
def search(self,
collection,
query=None,
query_string='*',
sort=None,
size=None,
from_=None):
params = {
'query': query,
'query_string': query_string,
'sort': sort,
'size': size,
'from': from_
}
r = requests.post('{}{}/{}/search'.format(API_URL, self.path, collection), auth=self.interceptor, json=params)
data = r.json()
if not 200 <= r.status_code < 300:
raise StoreError(data['code'], data['message'])
return data
def update(self, collection, id, document):
r = requests.patch('{}{}/{}/{}'.format(API_URL, self.path, collection, id), auth=self.interceptor, json=document)
if not 200 <= r.status_code < 300:
data = r.json()
raise StoreError(data['code'], data['message'])
def delete(self, collection, id):
r = requests.delete('{}{}/{}/{}'.format(API_URL, self.path, collection, id), auth=self.interceptor)
if not 200 <= r.status_code < 300:
data = r.json()
raise StoreError(data['code'], data['message'])
class StoreEntry():
def __init__(self, key=None, value=None):
self.key = key
self.value = value
self.created_at = None
self.updated_at = None
def to_json(self):
return json.dumps({
'key': self.key,
'value': self.value
})
@classmethod
def from_json(self, entry):
se = StoreEntry(entry['key'], entry['value'])
se.created_at = entry['created_at']
se.updated_at = entry['updated_at']
return se
# class StoreSearchResponse():
# def __init__(self):
# self.total = 0
# self.documents = []
# @classmethod
# def from_json(self, response):
# qr = StoreSearchResponse()
# qr.total = response['total']
# for entry in response['entries']:
# qr.documents.append(StoreEntry.from_json(entry))
# return qr
class StoreError(Exception):
def __init__(self, code, message):
self.code = code
self.message = message
def __str__(self):
return self.message