-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathtest_binary_copy.py
More file actions
117 lines (100 loc) · 3.06 KB
/
test_binary_copy.py
File metadata and controls
117 lines (100 loc) · 3.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import os
import typing
from io import BytesIO
import pytest
from pgpq import ArrowToPostgresBinaryEncoder
from psqlpy import ConnectionPool
from pyarrow import parquet
pytestmark = pytest.mark.anyio
async def test_binary_copy_to_table_in_connection(
psql_pool: ConnectionPool,
) -> None:
"""Test binary copy in connection."""
table_name: typing.Final = "cars"
connection = await psql_pool.connection()
await connection.execute(f"DROP TABLE IF EXISTS {table_name}")
await connection.execute(
"""
CREATE TABLE IF NOT EXISTS cars (
model VARCHAR,
mpg FLOAT8,
cyl INTEGER,
disp FLOAT8,
hp INTEGER,
drat FLOAT8,
wt FLOAT8,
qsec FLOAT8,
vs INTEGER,
am INTEGER,
gear INTEGER,
carb INTEGER
);
""",
)
arrow_table = parquet.read_table(
f"{os.path.dirname(os.path.abspath(__file__))}/test_data/MTcars.parquet", # noqa: PTH120, PTH100
)
encoder = ArrowToPostgresBinaryEncoder(arrow_table.schema)
buf = BytesIO()
buf.write(encoder.write_header())
for batch in arrow_table.to_batches():
buf.write(encoder.write_batch(batch))
buf.write(encoder.finish())
buf.seek(0)
inserted_rows = await connection.binary_copy_to_table(
source=buf,
table_name=table_name,
)
expected_inserted_row: typing.Final = 32
assert inserted_rows == expected_inserted_row
real_table_rows: typing.Final = await connection.execute(
f"SELECT COUNT(*) AS rows_count FROM {table_name}",
)
assert real_table_rows.result()[0]["rows_count"] == expected_inserted_row
async def test_binary_copy_to_table_in_transaction(
psql_pool: ConnectionPool,
) -> None:
"""Test binary copy in transaction."""
table_name: typing.Final = "cars"
connection = await psql_pool.connection()
await connection.execute(f"DROP TABLE IF EXISTS {table_name}")
await connection.execute(
"""
CREATE TABLE IF NOT EXISTS cars (
model VARCHAR,
mpg FLOAT8,
cyl INTEGER,
disp FLOAT8,
hp INTEGER,
drat FLOAT8,
wt FLOAT8,
qsec FLOAT8,
vs INTEGER,
am INTEGER,
gear INTEGER,
carb INTEGER
);
""",
)
arrow_table = parquet.read_table(
f"{os.path.dirname(os.path.abspath(__file__))}/test_data/MTcars.parquet", # noqa: PTH120, PTH100
)
encoder = ArrowToPostgresBinaryEncoder(arrow_table.schema)
buf = BytesIO()
buf.write(encoder.write_header())
for batch in arrow_table.to_batches():
buf.write(encoder.write_batch(batch))
buf.write(encoder.finish())
buf.seek(0)
async with psql_pool.acquire() as connection:
inserted_rows = await connection.binary_copy_to_table(
source=buf,
table_name=table_name,
)
expected_inserted_row: typing.Final = 32
assert inserted_rows == expected_inserted_row
connection = await psql_pool.connection()
real_table_rows: typing.Final = await connection.execute(
f"SELECT COUNT(*) AS rows_count FROM {table_name}",
)
assert real_table_rows.result()[0]["rows_count"] == expected_inserted_row