Skip to content

Commit

Permalink
erdl#119 update extract_hobo to use orm_lonoa
Browse files Browse the repository at this point in the history
instead of orm_webctrl
  • Loading branch information
matthew-schultz committed Dec 22, 2019
1 parent df275ed commit 62652ac
Showing 1 changed file with 32 additions and 28 deletions.
60 changes: 32 additions & 28 deletions hobo/script/extract_hobo.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@
import glob
import logging
import logging.handlers
import orm_hobo
import os
import pandas
import pendulum
import sys

# use grandparent_directory with sys.path.append() to import orm_lonoa from ../../ directory relatieve to this script's location
grandparent_directory = os.path.abspath(os.path.join((os.path.join(os.path.join(__file__, os.pardir), os.pardir)), os.pardir))
sys.path.append(grandparent_directory)
import orm_lonoa

SCRIPT_NAME = os.path.basename(__file__)

Expand Down Expand Up @@ -112,8 +116,8 @@ def get_csv_from_folder_not_in_db(conn, csv_filename):
# create a list of sensor_info_rows which will be used to iterate through each data_sensor_info_mapping in each csv_reading row in the insert...() function
sensor_info_rows = []
for data_sensor_info_mapping in csv_readings.columns[1:]:
purpose_id, last_updated_datetime, unit = conn.query(orm_hobo.SensorInfo.purpose_id, orm_hobo.SensorInfo.last_updated_datetime, orm_hobo.SensorInfo.unit).filter_by(query_string=query_string, data_sensor_info_mapping=data_sensor_info_mapping, is_active=True).first()[:3]
sensor_info_rows.append(orm_hobo.SensorInfo(data_sensor_info_mapping=data_sensor_info_mapping, purpose_id=purpose_id, last_updated_datetime=last_updated_datetime, unit=unit))
purpose_id, last_updated_datetime, unit = conn.query(orm_lonoa.SensorInfo.purpose_id, orm_lonoa.SensorInfo.last_updated_datetime, orm_lonoa.SensorInfo.unit).filter_by(query_string=query_string, data_sensor_info_mapping=data_sensor_info_mapping, is_active=True).first()[:3]
sensor_info_rows.append(orm_lonoa.SensorInfo(data_sensor_info_mapping=data_sensor_info_mapping, purpose_id=purpose_id, last_updated_datetime=last_updated_datetime, unit=unit))
# # Units
# timezone_units = [x.split(', ')[1] for x in timezone_units]
# #Timezone needs no further pre-processing
Expand All @@ -132,23 +136,23 @@ def get_csv_from_folder_not_in_db(conn, csv_filename):
earliest_csv_timestamp = pendulum.instance(csv_readings.iloc[0]['Date Time, GMT-10:00'], 'Pacific/Honolulu')
latest_csv_timestamp = pendulum.instance(csv_readings.iloc[csv_readings.shape[0]-1]['Date Time, GMT-10:00'], 'Pacific/Honolulu')
for sensor_info_row in sensor_info_rows:
error_log_row = orm_hobo.ErrorLog(was_success=True, purpose_id=sensor_info_row.purpose_id, datetime=current_time, pipeline_stage=orm_hobo.ErrorLog.PipelineStageEnum.data_acquisition)
error_log_row = orm_lonoa.ErrorLog(was_success=True, purpose_id=sensor_info_row.purpose_id, datetime=current_time, pipeline_stage=orm_lonoa.ErrorLog.PipelineStageEnum.data_acquisition)
conn.add(error_log_row)
# need to flush and refresh to get error_log_row.log_id
conn.flush()
conn.refresh(error_log_row)
csv_filename_row = orm_hobo.ErrorLogDetails(log_id=error_log_row.log_id, information_type="csv_filename", information_value=csv_filename)
csv_filename_row = orm_lonoa.ErrorLogDetails(log_id=error_log_row.log_id, information_type="csv_filename", information_value=csv_filename)
conn.add(csv_filename_row)
csv_modified_timestamp_row = orm_hobo.ErrorLogDetails(log_id=error_log_row.log_id, information_type="csv_modified_timestamp", information_value=csv_modified_timestamp)
csv_modified_timestamp_row = orm_lonoa.ErrorLogDetails(log_id=error_log_row.log_id, information_type="csv_modified_timestamp", information_value=csv_modified_timestamp)
conn.add(csv_modified_timestamp_row)
earliest_csv_timestamp_row = orm_hobo.ErrorLogDetails(log_id=error_log_row.log_id, information_type="earliest_csv_timestamp", information_value=earliest_csv_timestamp)
earliest_csv_timestamp_row = orm_lonoa.ErrorLogDetails(log_id=error_log_row.log_id, information_type="earliest_csv_timestamp", information_value=earliest_csv_timestamp)
conn.add(earliest_csv_timestamp_row)
latest_csv_timestamp_row = orm_hobo.ErrorLogDetails(log_id=error_log_row.log_id, information_type="latest_csv_timestamp", information_value=latest_csv_timestamp)
latest_csv_timestamp_row = orm_lonoa.ErrorLogDetails(log_id=error_log_row.log_id, information_type="latest_csv_timestamp", information_value=latest_csv_timestamp)
conn.add(latest_csv_timestamp_row)
# check if earliest and latest file_timestamps are already in db and set new_readings variable
# assume that if first or last timestamps in csv were already inserted for that given timestamp and query_string, then all were already inserted
earliest_csv_timestamp_is_in_db = conn.query(orm_hobo.Reading).filter_by(datetime=earliest_csv_timestamp, purpose_id=sensor_info_rows[0].purpose_id).first()
latest_csv_timestamp_is_in_db = conn.query(orm_hobo.Reading).filter_by(datetime=latest_csv_timestamp, purpose_id=sensor_info_rows[0].purpose_id).first()
earliest_csv_timestamp_is_in_db = conn.query(orm_lonoa.Reading).filter_by(datetime=earliest_csv_timestamp, purpose_id=sensor_info_rows[0].purpose_id).first()
latest_csv_timestamp_is_in_db = conn.query(orm_lonoa.Reading).filter_by(datetime=latest_csv_timestamp, purpose_id=sensor_info_rows[0].purpose_id).first()
if not earliest_csv_timestamp_is_in_db and not latest_csv_timestamp_is_in_db:
new_readings = True
return csv_readings, (new_readings, earliest_csv_timestamp, csv_modified_timestamp, query_string, latest_csv_timestamp, sensor_info_rows)
Expand Down Expand Up @@ -182,32 +186,32 @@ def insert_csv_readings_into_db(conn, csv_readings, csv_metadata, csv_filename):
if rows_returned > 0:
for csv_reading in csv_readings.itertuples():
for i in range(0, len(sensor_info_rows)):
reading_row = orm_hobo.Reading(datetime=csv_reading[1], purpose_id=sensor_info_rows[i].purpose_id, reading=csv_reading[i+2], units=sensor_info_rows[i].unit, upload_timestamp=current_time)
reading_row = orm_lonoa.Reading(datetime=csv_reading[1], purpose_id=sensor_info_rows[i].purpose_id, reading=csv_reading[i+2], units=sensor_info_rows[i].unit, upload_timestamp=current_time)
conn.add(reading_row)
last_reading_row_datetime = csv_reading[1]
#update last_updated_datetime column for relevant rows in sensor_info table
for sensor_info_row in sensor_info_rows:
logging.info('attempting to insert ' + str(rows_returned) + ' reading(s) for purpose_id ' + str(sensor_info_row.purpose_id))
# account for if csv files uploaded out of order by checking if last_reading_row_datetime is later than last_updated_datetime
if not sensor_info_row.last_updated_datetime or sensor_info_row.last_updated_datetime < last_reading_row_datetime:
conn.query(orm_hobo.SensorInfo.purpose_id).filter(orm_hobo.SensorInfo.purpose_id == sensor_info_row.purpose_id).update({"last_updated_datetime": last_reading_row_datetime})
error_log_row = orm_hobo.ErrorLog(was_success=True, purpose_id=sensor_info_row.purpose_id, datetime=current_time, pipeline_stage=orm_hobo.ErrorLog.PipelineStageEnum.database_insertion)
conn.query(orm_lonoa.SensorInfo.purpose_id).filter(orm_lonoa.SensorInfo.purpose_id == sensor_info_row.purpose_id).update({"last_updated_datetime": last_reading_row_datetime})
error_log_row = orm_lonoa.ErrorLog(was_success=True, purpose_id=sensor_info_row.purpose_id, datetime=current_time, pipeline_stage=orm_lonoa.ErrorLog.PipelineStageEnum.database_insertion)
conn.add(error_log_row)
# need to flush and refresh to get error_log_row.log_id
conn.flush()
conn.refresh(error_log_row)
csv_filename_row = orm_hobo.ErrorLogDetails(log_id=error_log_row.log_id, information_type="csv_filename", information_value=csv_filename)
csv_filename_row = orm_lonoa.ErrorLogDetails(log_id=error_log_row.log_id, information_type="csv_filename", information_value=csv_filename)
conn.add(csv_filename_row)
csv_modified_timestamp_row = orm_hobo.ErrorLogDetails(log_id=error_log_row.log_id, information_type="csv_modified_timestamp", information_value=csv_modified_timestamp)
csv_modified_timestamp_row = orm_lonoa.ErrorLogDetails(log_id=error_log_row.log_id, information_type="csv_modified_timestamp", information_value=csv_modified_timestamp)
conn.add(csv_modified_timestamp_row)
earliest_csv_timestamp_row = orm_hobo.ErrorLogDetails(log_id=error_log_row.log_id, information_type="earliest_csv_timestamp", information_value=earliest_csv_timestamp)
earliest_csv_timestamp_row = orm_lonoa.ErrorLogDetails(log_id=error_log_row.log_id, information_type="earliest_csv_timestamp", information_value=earliest_csv_timestamp)
conn.add(earliest_csv_timestamp_row)
latest_csv_timestamp_row = orm_hobo.ErrorLogDetails(log_id=error_log_row.log_id, information_type="latest_csv_timestamp", information_value=latest_csv_timestamp)
latest_csv_timestamp_row = orm_lonoa.ErrorLogDetails(log_id=error_log_row.log_id, information_type="latest_csv_timestamp", information_value=latest_csv_timestamp)
conn.add(latest_csv_timestamp_row)
# update current sensor_info_row's set of readings with related log_id
conn.query(orm_hobo.Reading.log_id).\
filter(orm_hobo.Reading.purpose_id == sensor_info_row.purpose_id,
orm_hobo.Reading.upload_timestamp == current_time).\
conn.query(orm_lonoa.Reading.log_id).\
filter(orm_lonoa.Reading.purpose_id == sensor_info_row.purpose_id,
orm_lonoa.Reading.upload_timestamp == current_time).\
update({'log_id': error_log_row.log_id})
conn.commit()

Expand All @@ -216,12 +220,12 @@ def log_failure_to_get_csv_readings_from_folder_not_in_db(conn, csv_filename, ex
current_time = pendulum.now('Pacific/Honolulu')
current_time = current_time.set(microsecond=current_time.microsecond - (current_time.microsecond % 10000))
logging.exception('log_failure_to_get_csv_readings_from_folder_not_in_db')
error_log_row = orm_hobo.ErrorLog(was_success=False, datetime=current_time, error_type=exception.__class__.__name__, pipeline_stage=orm_hobo.ErrorLog.PipelineStageEnum.data_acquisition)
error_log_row = orm_lonoa.ErrorLog(was_success=False, datetime=current_time, error_type=exception.__class__.__name__, pipeline_stage=orm_lonoa.ErrorLog.PipelineStageEnum.data_acquisition)
conn.add(error_log_row)
# need to flush and refresh to get error_log_row.log_id
conn.flush()
conn.refresh(error_log_row)
csv_filename_row = orm_hobo.ErrorLogDetails(log_id=error_log_row.log_id, information_type="csv_filename", information_value=csv_filename)
csv_filename_row = orm_lonoa.ErrorLogDetails(log_id=error_log_row.log_id, information_type="csv_filename", information_value=csv_filename)
conn.add(csv_filename_row)
conn.commit()

Expand All @@ -236,21 +240,21 @@ def log_failure_to_insert_csv_readings_into_db(conn, csv_filename, csv_metadata,
for sensor_info_row in sensor_info_rows:
# set was_success to "" if readings were already inserted
if not new_readings:
error_log_row = orm_hobo.ErrorLog(purpose_id=sensor_info_row.purpose_id, datetime=current_time, error_type=exception.__class__.__name__,pipeline_stage=orm_hobo.ErrorLog.PipelineStageEnum.database_insertion)
error_log_row = orm_lonoa.ErrorLog(purpose_id=sensor_info_row.purpose_id, datetime=current_time, error_type=exception.__class__.__name__,pipeline_stage=orm_lonoa.ErrorLog.PipelineStageEnum.database_insertion)
# set was_success to False if readings were new but an error was thrown
else:
error_log_row = orm_hobo.ErrorLog(purpose_id=sensor_info_row.purpose_id, was_success=False, datetime=current_time, error_type=exception.__class__.__name__, pipeline_stage=orm_hobo.ErrorLog.PipelineStageEnum.database_insertion)
error_log_row = orm_lonoa.ErrorLog(purpose_id=sensor_info_row.purpose_id, was_success=False, datetime=current_time, error_type=exception.__class__.__name__, pipeline_stage=orm_lonoa.ErrorLog.PipelineStageEnum.database_insertion)
conn.add(error_log_row)
# need to flush and refresh to get error_log_row.log_id
conn.flush()
conn.refresh(error_log_row)
csv_filename_row = orm_hobo.ErrorLogDetails(log_id=error_log_row.log_id, information_type="csv_filename", information_value=csv_filename)
csv_filename_row = orm_lonoa.ErrorLogDetails(log_id=error_log_row.log_id, information_type="csv_filename", information_value=csv_filename)
conn.add(csv_filename_row)
csv_modified_timestamp_row = orm_hobo.ErrorLogDetails(log_id=error_log_row.log_id, information_type="csv_modified_timestamp", information_value=csv_modified_timestamp)
csv_modified_timestamp_row = orm_lonoa.ErrorLogDetails(log_id=error_log_row.log_id, information_type="csv_modified_timestamp", information_value=csv_modified_timestamp)
conn.add(csv_modified_timestamp_row)
earliest_csv_timestamp_row = orm_hobo.ErrorLogDetails(log_id=error_log_row.log_id, information_type="earliest_csv_timestamp", information_value=earliest_csv_timestamp)
earliest_csv_timestamp_row = orm_lonoa.ErrorLogDetails(log_id=error_log_row.log_id, information_type="earliest_csv_timestamp", information_value=earliest_csv_timestamp)
conn.add(earliest_csv_timestamp_row)
latest_csv_timestamp_row = orm_hobo.ErrorLogDetails(log_id=error_log_row.log_id, information_type="latest_csv_timestamp", information_value=latest_csv_timestamp)
latest_csv_timestamp_row = orm_lonoa.ErrorLogDetails(log_id=error_log_row.log_id, information_type="latest_csv_timestamp", information_value=latest_csv_timestamp)
conn.add(latest_csv_timestamp_row)
conn.commit()

Expand Down

0 comments on commit 62652ac

Please sign in to comment.