forked from gcatabr1/sas_parser
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsas_parser2_mp.py
128 lines (103 loc) · 5.83 KB
/
sas_parser2_mp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
"""
SAS PARSER
file: sas_parser2_mp.py
author: g.cattabriga
date: 2023.05.27
2023.06.06
update to use dataframes and 3 tables
2023.06.07
update by swapping out the xfer dataframe / csv for a dimension table of function descriptions. Keep summary and detail dataframes / csv's
2023.06.18
add multiprocessing - significantly faster, by about 5 times
version: 2.0
purpose: python script (main function) to execute one or more parse functions on one or more sas (text) files in a specified directory
this creates 2 files, a summary_yymmddhhmmss.csv file and a summary_yymmddhhmmss.csv file
the summary file contains the list of file names, directories and date attributes (create, modified) evaluated
the detail file contains the results of each parse function performed on each file in the summary
example use: python sas_parser.py -i 'test_data' -t 'sas' -o 'results'
where 'test_data' is the directory of text data to be parsed, 'sas' is the file type (.sas.) and
'results' is the directory the summary and details will be saved.
notes: the parsing / evaluation functions are in the parse_functions.py file
todo: function that returns key elements of a SQL statement (e.g. table names, column names)
todo: pass the contents of the evaluated file to the parse functions instead of having it open the file each time
todo: better error handling
"""
import argparse
import os
import inspect
import pandas as pd
from tqdm import tqdm
import datetime
from parse_functions import * # import all the parse functions
from multiprocessing import Pool, cpu_count
import time
# This function is designed to recursively scan through the specified directory, finding all files of the specified type
def get_file_list(directory, file_type):
return [os.path.join(root, file) for root, dirs, files in os.walk(directory) for file in files if file.endswith(file_type)]
# List of functions to apply
functions_to_apply = [
get_file_info,
count_lines,
count_sql,
get_sql_code,
get_libname_lines,
get_password_lines,
count_exports,
count_null_ds,
find_date_lines
# ,find_file_references
]
# populate the dimension dataframe with the parse functions to be used and assign a primary key
dim_func_df = pd.DataFrame({
'func_idx': range(1, len(functions_to_apply) + 1),
'func_name': [func.__name__ for func in functions_to_apply]
})
# New function that encapsulates the logic of processing a single file.
# This function will be applied in parallel to all file paths.
def process_file(args):
file_path, functions_to_apply, file_list, summary_index = args
print(f"processing file: {file_path}")
summary_df = pd.DataFrame(columns=['summ_idx', 'f_name', 'dir_path', 'create_dt', 'modified_dt'])
detail_df = pd.DataFrame(columns=['summ_idx', 'func_idx', 'func_value'])
dir_name, file_name = os.path.split(file_path) # get directory and filename
file_info_name, file_info_values = get_file_info(file_path)
for file_info in file_info_values:
file_info.update({'summ_idx': summary_index, 'f_name': file_name, 'dir_path': dir_name})
summary_df = pd.concat([summary_df, pd.DataFrame(file_info, index=[0])], ignore_index=True)
for func in functions_to_apply:
func_idx = dim_func_df[dim_func_df['func_name'] == func.__name__]['func_idx'].values[0]
num_args = len(inspect.signature(func).parameters)
if num_args == 1:
_, vals = func(file_path)
elif num_args == 2:
_, vals = func(file_path, file_list)
detail_df = pd.concat([detail_df, pd.DataFrame({'summ_idx': summary_index, 'func_idx': func_idx,
'func_value': vals})], ignore_index=True)
return summary_df, detail_df
def main(input_dir, output_dir, file_type):
file_list = get_file_list(input_dir, file_type)
start_time = time.time() # Start the timer
summary_df = pd.DataFrame(columns=['summ_idx', 'f_name', 'dir_path', 'create_dt', 'modified_dt'])
detail_df = pd.DataFrame(columns=['summ_idx', 'func_idx', 'func_value'])
# Create a multiprocessing Pool with number of processes equal to the number of CPUs
with Pool(processes=cpu_count()) as pool:
# Use the Pool's map method to apply the process_file function to each file
# The pool.map function will automatically handle the creation of processes and assignment of work
results = pool.map(process_file, [(file_path, functions_to_apply, file_list, i + 1) for i, file_path in enumerate(file_list)])
# After all processes have completed, concatenate all the resulting dataframes
for result in results:
summary_df = pd.concat([summary_df, result[0]])
detail_df = pd.concat([detail_df, result[1]])
elapsed_time = time.time() - start_time # Calculate the elapsed time
dt_string = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
summary_df.to_csv(os.path.join(output_dir, 'summary_' + dt_string + '.csv'), index=False, lineterminator='\r\n')
dim_func_df.to_csv(os.path.join(output_dir, 'dim_func_' + dt_string + '.csv'), index=False, lineterminator='\r\n')
detail_df.to_csv(os.path.join(output_dir, 'detail_' + dt_string + '.csv'), index=False, lineterminator='\r\n')
print(f"Total time elapsed: {elapsed_time} seconds")
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Parse text files and output the results as normalized csv files.')
parser.add_argument('-i', '--input_dir', required=True, help='The input directory for files to be processed')
parser.add_argument('-o', '--output_dir', required=True, help='The output directory for processed files')
parser.add_argument('-t', '--file_type', required=True, help='The file type to be processed')
args = parser.parse_args()
main(args.input_dir, args.output_dir, args.file_type)