Skip to content

Commit

Permalink
erdl#119 extract_hobo rename orm_lonoa to orm
Browse files Browse the repository at this point in the history
  • Loading branch information
matthew-schultz committed Dec 23, 2019
1 parent 6247239 commit 09530a8
Showing 1 changed file with 29 additions and 29 deletions.
58 changes: 29 additions & 29 deletions hobo/script/extract_hobo.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
import pendulum
import sys

# use grandparent_directory with sys.path.append() to import orm_lonoa from ../../ directory relatieve to this script's location
# use grandparent_directory with sys.path.append() to import orm from ../../ directory relatieve to this script's location
grandparent_directory = os.path.abspath(os.path.join((os.path.join(os.path.join(__file__, os.pardir), os.pardir)), os.pardir))
sys.path.append(grandparent_directory)
import orm_lonoa
import orm

SCRIPT_NAME = os.path.basename(__file__)

Expand Down Expand Up @@ -116,8 +116,8 @@ def get_csv_from_folder_not_in_db(conn, csv_filename):
# create a list of sensor_info_rows which will be used to iterate through each data_sensor_info_mapping in each csv_reading row in the insert...() function
sensor_info_rows = []
for data_sensor_info_mapping in csv_readings.columns[1:]:
purpose_id, last_updated_datetime, unit = conn.query(orm_lonoa.SensorInfo.purpose_id, orm_lonoa.SensorInfo.last_updated_datetime, orm_lonoa.SensorInfo.unit).filter_by(query_string=query_string, data_sensor_info_mapping=data_sensor_info_mapping, is_active=True).first()[:3]
sensor_info_rows.append(orm_lonoa.SensorInfo(data_sensor_info_mapping=data_sensor_info_mapping, purpose_id=purpose_id, last_updated_datetime=last_updated_datetime, unit=unit))
purpose_id, last_updated_datetime, unit = conn.query(orm.SensorInfo.purpose_id, orm.SensorInfo.last_updated_datetime, orm.SensorInfo.unit).filter_by(query_string=query_string, data_sensor_info_mapping=data_sensor_info_mapping, is_active=True).first()[:3]
sensor_info_rows.append(orm.SensorInfo(data_sensor_info_mapping=data_sensor_info_mapping, purpose_id=purpose_id, last_updated_datetime=last_updated_datetime, unit=unit))
# # Units
# timezone_units = [x.split(', ')[1] for x in timezone_units]
# #Timezone needs no further pre-processing
Expand All @@ -136,23 +136,23 @@ def get_csv_from_folder_not_in_db(conn, csv_filename):
earliest_csv_timestamp = pendulum.instance(csv_readings.iloc[0]['Date Time, GMT-10:00'], 'Pacific/Honolulu')
latest_csv_timestamp = pendulum.instance(csv_readings.iloc[csv_readings.shape[0]-1]['Date Time, GMT-10:00'], 'Pacific/Honolulu')
for sensor_info_row in sensor_info_rows:
error_log_row = orm_lonoa.ErrorLog(was_success=True, purpose_id=sensor_info_row.purpose_id, datetime=current_time, pipeline_stage=orm_lonoa.ErrorLog.PipelineStageEnum.data_acquisition)
error_log_row = orm.ErrorLog(was_success=True, purpose_id=sensor_info_row.purpose_id, datetime=current_time, pipeline_stage=orm.ErrorLog.PipelineStageEnum.data_acquisition)
conn.add(error_log_row)
# need to flush and refresh to get error_log_row.log_id
conn.flush()
conn.refresh(error_log_row)
csv_filename_row = orm_lonoa.ErrorLogDetails(log_id=error_log_row.log_id, information_type="csv_filename", information_value=csv_filename)
csv_filename_row = orm.ErrorLogDetails(log_id=error_log_row.log_id, information_type="csv_filename", information_value=csv_filename)
conn.add(csv_filename_row)
csv_modified_timestamp_row = orm_lonoa.ErrorLogDetails(log_id=error_log_row.log_id, information_type="csv_modified_timestamp", information_value=csv_modified_timestamp)
csv_modified_timestamp_row = orm.ErrorLogDetails(log_id=error_log_row.log_id, information_type="csv_modified_timestamp", information_value=csv_modified_timestamp)
conn.add(csv_modified_timestamp_row)
earliest_csv_timestamp_row = orm_lonoa.ErrorLogDetails(log_id=error_log_row.log_id, information_type="earliest_csv_timestamp", information_value=earliest_csv_timestamp)
earliest_csv_timestamp_row = orm.ErrorLogDetails(log_id=error_log_row.log_id, information_type="earliest_csv_timestamp", information_value=earliest_csv_timestamp)
conn.add(earliest_csv_timestamp_row)
latest_csv_timestamp_row = orm_lonoa.ErrorLogDetails(log_id=error_log_row.log_id, information_type="latest_csv_timestamp", information_value=latest_csv_timestamp)
latest_csv_timestamp_row = orm.ErrorLogDetails(log_id=error_log_row.log_id, information_type="latest_csv_timestamp", information_value=latest_csv_timestamp)
conn.add(latest_csv_timestamp_row)
# check if earliest and latest file_timestamps are already in db and set new_readings variable
# assume that if first or last timestamps in csv were already inserted for that given timestamp and query_string, then all were already inserted
earliest_csv_timestamp_is_in_db = conn.query(orm_lonoa.Reading).filter_by(datetime=earliest_csv_timestamp, purpose_id=sensor_info_rows[0].purpose_id).first()
latest_csv_timestamp_is_in_db = conn.query(orm_lonoa.Reading).filter_by(datetime=latest_csv_timestamp, purpose_id=sensor_info_rows[0].purpose_id).first()
earliest_csv_timestamp_is_in_db = conn.query(orm.Reading).filter_by(datetime=earliest_csv_timestamp, purpose_id=sensor_info_rows[0].purpose_id).first()
latest_csv_timestamp_is_in_db = conn.query(orm.Reading).filter_by(datetime=latest_csv_timestamp, purpose_id=sensor_info_rows[0].purpose_id).first()
if not earliest_csv_timestamp_is_in_db and not latest_csv_timestamp_is_in_db:
new_readings = True
return csv_readings, (new_readings, earliest_csv_timestamp, csv_modified_timestamp, query_string, latest_csv_timestamp, sensor_info_rows)
Expand Down Expand Up @@ -186,32 +186,32 @@ def insert_csv_readings_into_db(conn, csv_readings, csv_metadata, csv_filename):
if rows_returned > 0:
for csv_reading in csv_readings.itertuples():
for i in range(0, len(sensor_info_rows)):
reading_row = orm_lonoa.Reading(datetime=csv_reading[1], purpose_id=sensor_info_rows[i].purpose_id, reading=csv_reading[i+2], units=sensor_info_rows[i].unit, upload_timestamp=current_time)
reading_row = orm.Reading(datetime=csv_reading[1], purpose_id=sensor_info_rows[i].purpose_id, reading=csv_reading[i+2], units=sensor_info_rows[i].unit, upload_timestamp=current_time)
conn.add(reading_row)
last_reading_row_datetime = csv_reading[1]
#update last_updated_datetime column for relevant rows in sensor_info table
for sensor_info_row in sensor_info_rows:
logging.info('attempting to insert ' + str(rows_returned) + ' reading(s) for purpose_id ' + str(sensor_info_row.purpose_id))
# account for if csv files uploaded out of order by checking if last_reading_row_datetime is later than last_updated_datetime
if not sensor_info_row.last_updated_datetime or sensor_info_row.last_updated_datetime < last_reading_row_datetime:
conn.query(orm_lonoa.SensorInfo.purpose_id).filter(orm_lonoa.SensorInfo.purpose_id == sensor_info_row.purpose_id).update({"last_updated_datetime": last_reading_row_datetime})
error_log_row = orm_lonoa.ErrorLog(was_success=True, purpose_id=sensor_info_row.purpose_id, datetime=current_time, pipeline_stage=orm_lonoa.ErrorLog.PipelineStageEnum.database_insertion)
conn.query(orm.SensorInfo.purpose_id).filter(orm.SensorInfo.purpose_id == sensor_info_row.purpose_id).update({"last_updated_datetime": last_reading_row_datetime})
error_log_row = orm.ErrorLog(was_success=True, purpose_id=sensor_info_row.purpose_id, datetime=current_time, pipeline_stage=orm.ErrorLog.PipelineStageEnum.database_insertion)
conn.add(error_log_row)
# need to flush and refresh to get error_log_row.log_id
conn.flush()
conn.refresh(error_log_row)
csv_filename_row = orm_lonoa.ErrorLogDetails(log_id=error_log_row.log_id, information_type="csv_filename", information_value=csv_filename)
csv_filename_row = orm.ErrorLogDetails(log_id=error_log_row.log_id, information_type="csv_filename", information_value=csv_filename)
conn.add(csv_filename_row)
csv_modified_timestamp_row = orm_lonoa.ErrorLogDetails(log_id=error_log_row.log_id, information_type="csv_modified_timestamp", information_value=csv_modified_timestamp)
csv_modified_timestamp_row = orm.ErrorLogDetails(log_id=error_log_row.log_id, information_type="csv_modified_timestamp", information_value=csv_modified_timestamp)
conn.add(csv_modified_timestamp_row)
earliest_csv_timestamp_row = orm_lonoa.ErrorLogDetails(log_id=error_log_row.log_id, information_type="earliest_csv_timestamp", information_value=earliest_csv_timestamp)
earliest_csv_timestamp_row = orm.ErrorLogDetails(log_id=error_log_row.log_id, information_type="earliest_csv_timestamp", information_value=earliest_csv_timestamp)
conn.add(earliest_csv_timestamp_row)
latest_csv_timestamp_row = orm_lonoa.ErrorLogDetails(log_id=error_log_row.log_id, information_type="latest_csv_timestamp", information_value=latest_csv_timestamp)
latest_csv_timestamp_row = orm.ErrorLogDetails(log_id=error_log_row.log_id, information_type="latest_csv_timestamp", information_value=latest_csv_timestamp)
conn.add(latest_csv_timestamp_row)
# update current sensor_info_row's set of readings with related log_id
conn.query(orm_lonoa.Reading.log_id).\
filter(orm_lonoa.Reading.purpose_id == sensor_info_row.purpose_id,
orm_lonoa.Reading.upload_timestamp == current_time).\
conn.query(orm.Reading.log_id).\
filter(orm.Reading.purpose_id == sensor_info_row.purpose_id,
orm.Reading.upload_timestamp == current_time).\
update({'log_id': error_log_row.log_id})
conn.commit()

Expand All @@ -220,12 +220,12 @@ def log_failure_to_get_csv_readings_from_folder_not_in_db(conn, csv_filename, ex
current_time = pendulum.now('Pacific/Honolulu')
current_time = current_time.set(microsecond=current_time.microsecond - (current_time.microsecond % 10000))
logging.exception('log_failure_to_get_csv_readings_from_folder_not_in_db')
error_log_row = orm_lonoa.ErrorLog(was_success=False, datetime=current_time, error_type=exception.__class__.__name__, pipeline_stage=orm_lonoa.ErrorLog.PipelineStageEnum.data_acquisition)
error_log_row = orm.ErrorLog(was_success=False, datetime=current_time, error_type=exception.__class__.__name__, pipeline_stage=orm.ErrorLog.PipelineStageEnum.data_acquisition)
conn.add(error_log_row)
# need to flush and refresh to get error_log_row.log_id
conn.flush()
conn.refresh(error_log_row)
csv_filename_row = orm_lonoa.ErrorLogDetails(log_id=error_log_row.log_id, information_type="csv_filename", information_value=csv_filename)
csv_filename_row = orm.ErrorLogDetails(log_id=error_log_row.log_id, information_type="csv_filename", information_value=csv_filename)
conn.add(csv_filename_row)
conn.commit()

Expand All @@ -240,21 +240,21 @@ def log_failure_to_insert_csv_readings_into_db(conn, csv_filename, csv_metadata,
for sensor_info_row in sensor_info_rows:
# set was_success to "" if readings were already inserted
if not new_readings:
error_log_row = orm_lonoa.ErrorLog(purpose_id=sensor_info_row.purpose_id, datetime=current_time, error_type=exception.__class__.__name__,pipeline_stage=orm_lonoa.ErrorLog.PipelineStageEnum.database_insertion)
error_log_row = orm.ErrorLog(purpose_id=sensor_info_row.purpose_id, datetime=current_time, error_type=exception.__class__.__name__,pipeline_stage=orm.ErrorLog.PipelineStageEnum.database_insertion)
# set was_success to False if readings were new but an error was thrown
else:
error_log_row = orm_lonoa.ErrorLog(purpose_id=sensor_info_row.purpose_id, was_success=False, datetime=current_time, error_type=exception.__class__.__name__, pipeline_stage=orm_lonoa.ErrorLog.PipelineStageEnum.database_insertion)
error_log_row = orm.ErrorLog(purpose_id=sensor_info_row.purpose_id, was_success=False, datetime=current_time, error_type=exception.__class__.__name__, pipeline_stage=orm.ErrorLog.PipelineStageEnum.database_insertion)
conn.add(error_log_row)
# need to flush and refresh to get error_log_row.log_id
conn.flush()
conn.refresh(error_log_row)
csv_filename_row = orm_lonoa.ErrorLogDetails(log_id=error_log_row.log_id, information_type="csv_filename", information_value=csv_filename)
csv_filename_row = orm.ErrorLogDetails(log_id=error_log_row.log_id, information_type="csv_filename", information_value=csv_filename)
conn.add(csv_filename_row)
csv_modified_timestamp_row = orm_lonoa.ErrorLogDetails(log_id=error_log_row.log_id, information_type="csv_modified_timestamp", information_value=csv_modified_timestamp)
csv_modified_timestamp_row = orm.ErrorLogDetails(log_id=error_log_row.log_id, information_type="csv_modified_timestamp", information_value=csv_modified_timestamp)
conn.add(csv_modified_timestamp_row)
earliest_csv_timestamp_row = orm_lonoa.ErrorLogDetails(log_id=error_log_row.log_id, information_type="earliest_csv_timestamp", information_value=earliest_csv_timestamp)
earliest_csv_timestamp_row = orm.ErrorLogDetails(log_id=error_log_row.log_id, information_type="earliest_csv_timestamp", information_value=earliest_csv_timestamp)
conn.add(earliest_csv_timestamp_row)
latest_csv_timestamp_row = orm_lonoa.ErrorLogDetails(log_id=error_log_row.log_id, information_type="latest_csv_timestamp", information_value=latest_csv_timestamp)
latest_csv_timestamp_row = orm.ErrorLogDetails(log_id=error_log_row.log_id, information_type="latest_csv_timestamp", information_value=latest_csv_timestamp)
conn.add(latest_csv_timestamp_row)
conn.commit()

Expand Down

0 comments on commit 09530a8

Please sign in to comment.