forked from slackapi/bolt-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathasync_request.py
More file actions
63 lines (58 loc) · 2.39 KB
/
async_request.py
File metadata and controls
63 lines (58 loc) · 2.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from typing import Dict, Optional, Union, Any, Sequence
from slack_bolt.context.async_context import AsyncBoltContext
from slack_bolt.error import BoltError
from slack_bolt.request.async_internals import build_async_context
from slack_bolt.request.internals import (
parse_query,
parse_body,
build_normalized_headers,
extract_content_type,
error_message_raw_body_required_in_http_mode,
error_message_unknown_request_body_type,
)
class AsyncBoltRequest:
raw_body: str
body: Dict[str, Any]
query: Dict[str, Sequence[str]]
headers: Dict[str, Sequence[str]]
content_type: Optional[str]
context: AsyncBoltContext
lazy_only: bool
lazy_function_name: Optional[str]
mode: str # either "http" or "socket_mode"
def __init__(
self,
*,
body: Union[str, dict],
query: Optional[Union[str, Dict[str, str], Dict[str, Sequence[str]]]] = None,
headers: Optional[Dict[str, Union[str, Sequence[str]]]] = None,
context: Optional[Dict[str, str]] = None,
mode: str = "http", # either "http" or "socket_mode"
):
"""Request to a Bolt app.
:param body: The raw request body (only plain text is supported for "http" mode)
:param query: The query string data in any data format.
:param headers: The request headers.
:param context: The context in this request.
:param mode: The mode used for this request. (either "http" or "socket_mode")
"""
if mode == "http" and not isinstance(body, str):
raise BoltError(error_message_raw_body_required_in_http_mode())
self.raw_body = body if mode == "http" else ""
self.query = parse_query(query)
self.headers = build_normalized_headers(headers)
self.content_type = extract_content_type(self.headers)
if isinstance(body, str):
self.body = parse_body(self.raw_body, self.content_type)
elif isinstance(body, dict):
self.body = body
else:
raise BoltError(error_message_unknown_request_body_type())
self.context = build_async_context(
AsyncBoltContext(context if context else {}), self.body
)
self.lazy_only = self.headers.get("x-slack-bolt-lazy-only", [False])[0]
self.lazy_function_name = self.headers.get(
"x-slack-bolt-lazy-function-name", [None]
)[0]
self.mode = mode