forked from ej2/python-quickbooks
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbatch.py
More file actions
90 lines (62 loc) · 2.94 KB
/
batch.py
File metadata and controls
90 lines (62 loc) · 2.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import uuid
from .client import QuickBooks
from .exceptions import QuickbooksException
from .objects.batchrequest import IntuitBatchRequest, BatchItemRequest, BatchOperation, BatchResponse, BatchItemResponse
class BatchManager(object):
def __init__(self, operation, max_request_items=30):
self._max_request_items = max_request_items
if operation in ["create", "update", "delete"]:
self._operation = operation
else:
raise QuickbooksException("Operation not supported.")
def save(self, obj_list):
batch_response = BatchResponse()
while len(obj_list) > 0:
temp_list = obj_list[:self._max_request_items]
obj_list = [item for item in obj_list if item not in temp_list]
result = self.process_batch(temp_list)
batch_response.batch_responses += result.batch_responses
batch_response.original_list += result.original_list
batch_response.successes += result.successes
batch_response.faults += result.faults
return batch_response
def process_batch(self, obj_list):
qb = QuickBooks()
batch = self.list_to_batch_request(obj_list)
json_data = qb.batch_operation(batch.to_json())
batch_response = self.batch_results_to_list(json_data, batch, obj_list)
return batch_response
def list_to_batch_request(self, obj_list):
batch = IntuitBatchRequest()
for obj in obj_list:
batch_item = BatchItemRequest()
batch_item.bId = str(uuid.uuid4())
batch_item.operation = self._operation
batch_item.set_object(obj)
batch.BatchItemRequest.append(batch_item)
return batch
def batch_results_to_list(self, json_data, batch, original_list):
response = BatchResponse()
response.original_list = original_list
for data in json_data['BatchItemResponse']:
response_item = BatchItemResponse.from_json(data)
batch_item = [obj for obj in batch.BatchItemRequest if obj.bId == response_item.bId][0]
response_item.set_object(batch_item.get_object())
response.batch_responses.append(response_item)
if response_item.Fault:
response_item.Fault.original_object = response_item.get_object()
response.faults.append(response_item.Fault)
else:
class_obj = type(response_item.get_object())
new_object = class_obj.from_json(data[class_obj.qbo_object_name])
response.successes.append(new_object)
return response
def batch_create(obj_list):
batch_mgr = BatchManager(BatchOperation.CREATE)
return batch_mgr.save(obj_list)
def batch_update(obj_list):
batch_mgr = BatchManager(BatchOperation.UPDATE)
return batch_mgr.save(obj_list)
def batch_delete(obj_list):
batch_mgr = BatchManager(BatchOperation.DELETE)
return batch_mgr.save(obj_list)