forked from dabeaz-course/practical-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfileparse.py
More file actions
86 lines (63 loc) · 2.27 KB
/
fileparse.py
File metadata and controls
86 lines (63 loc) · 2.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# fileparse.py
#
# Exercise 3.3
import csv
import gzip
def parse_csv(source, select=None, types=None, has_headers=True, delimiter=',', silence_errors=True):
'''
Parse an iterable source into a list of records.
Returns a list of dictionaries if has_headers is True, otherwise returns a list of tuples.
'''
if type(source) == str:
raise RuntimeError("source must be iterable")
if select and not has_headers:
raise RuntimeError("select argument requires column headers")
rows = csv.reader(source, delimiter=delimiter)
# Read the file headers
if has_headers:
headers = next(rows)
start = 2
else:
headers = []
start = 1
# If a column selector was given, find those names indices
# Also narrow the set of headers as given
if select:
indices = [headers.index(colname) for colname in select] # [0, 1 ]
headers = select
else:
indices = []
records = []
for rownum, row in enumerate(rows, start=start):
if not row: # Skip rows with no data
# print(f'Row {rownum}: Empty row')
continue
try:
# Filter the row if specific columns were selected
if indices:
row = [ row[index] for index in indices ]
# Convert types if a list of types is provided
if types:
row = [func(val) for func, val in zip(types, row) ]
# print(f'Row {rownum}: Converted row: {row}')
except ValueError as e:
if not silence_errors:
print(f"Row {rownum}: Could not convert: {row}")
print(f"Row {rownum}: Reason: {e}")
continue
# print(list(zip(headers, row)))
if has_headers:
record = dict(zip(headers, row))
else:
record = tuple(row)
records.append(record)
return records
# with open('.\\Data\\missing.csv') as f:
# d = parse_csv(f, types=[str, int, float])
# print(d)
# with gzip.open('.\\Data\\portfolio.csv.gz', 'rt') as f:
# d = parse_csv(f, types=[str, int, float])
# print(d)
# lines = ['name,shares,price', 'AA,100,34.23', 'IBM,50,91.1', 'HPE,75,45.1']
# pf = parse_csv(lines, types=[str,int,float])
# print(pf)