forked from dabeaz-course/practical-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfileparse.py
More file actions
83 lines (67 loc) · 2.85 KB
/
fileparse.py
File metadata and controls
83 lines (67 loc) · 2.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# fileparse.py (Original file: '03_fileparse_10.py')
#
# Exercise 3.3
import os
import csv
import datetime
def convert_str_to_date_object(data):
dateObject = datetime.date(1970, 1, 1)
datetimeObject = datetime.datetime.strptime(data, '%m/%d/%Y')
dateObject = datetime.date(datetimeObject.year, datetimeObject.month, datetimeObject.day)
return dateObject
def convert_str_to_time_object(data):
dateObject = datetime.date(1970, 1, 1)
datetimeObject = datetime.datetime.strptime(data, '%I:%M%p')
dateObject = datetime.time(datetimeObject.hour, datetimeObject.minute, datetimeObject.second)
return dateObject
def parse_csv(
filename: str,
file_data_delimiter: str = ",",
file_has_header: bool = True,
select: list = None,
types: list = None,
silence_errors: bool = False) -> list:
'''
Parse a CSV file into the list of records.
'''
# print('Parameter: filename:', filename)
# print('Parameter: select:', select)
# print('Parameter: types:', types)
if select and file_has_header == False:
raise RuntimeError("'select' argument requires column headers!")
filenamepath = os.path.join(os.getcwd(), 'Data', filename)
with open(filenamepath) as f:
rows = csv.reader(f, delimiter=file_data_delimiter)
start_rowno = 1
selected_indices = []
if file_has_header == True:
# Read file header
selected_headers = next(rows)
start_rowno += 1
# If a column selector was given, find indices of the specified columns.
# Also narrow the set of headers used for resulting dictionaries
if select:
selected_indices = [ selected_headers.index(colname) for colname in select ]
selected_headers = select
records = []
for rowno, row in enumerate(rows, start=start_rowno):
if not row: # Skip row with no data
continue
try:
if selected_indices:
row = [ row[index] for index in selected_indices ]
if types:
row = [ func(data) for func, data in zip(types, row)]
if file_has_header == True:
# print('row (converted): ', row)
record = dict(zip(selected_headers, row))
else:
record = tuple(row)
records.append(record)
except ValueError as ve:
if silence_errors == False:
print(f" >>> ValueError:: Bad Data >> Row #: {rowno}, Data: '{row}', '{ve}'")
except Exception as ex:
if silence_errors == False:
print(f" >>> Exception:: Catch ALL Exceptions >> Type: '{type(ex).__name__}', '{ex}', Row #: {rowno}, Data: '{row}'")
return records