Skip to content

Commit

Permalink
erdl#119 update api_egauge to use orm_lonoa
Browse files Browse the repository at this point in the history
instead of orm_egauge
  • Loading branch information
matthew-schultz committed Dec 22, 2019
1 parent ac6cff8 commit 2b69595
Showing 1 changed file with 17 additions and 14 deletions.
31 changes: 17 additions & 14 deletions egauge/script/api_egauge.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
import configparser
import logging
import logging.handlers
import orm_egauge
import os
import pandas
import pendulum
import requests
# import sqlalchemy #used for errors like sqlalchemy.exc.InternalError, sqlalchemy.exc.OperationalError
# import sys
import sys

# use grandparent_directory with sys.path.append() to import orm_lonoa from ../../ directory relatieve to this script's location
grandparent_directory = os.path.abspath(os.path.join((os.path.join(os.path.join(__file__, os.pardir), os.pardir)), os.pardir))
sys.path.append(grandparent_directory)
import orm_lonoa

SCRIPT_NAME = os.path.basename(__file__)

Expand Down Expand Up @@ -110,7 +113,7 @@ def get_data_from_api(conn, query_string):
# truncate time to hundredths of a second
current_time = current_time.set(microsecond=current_time.microsecond - (current_time.microsecond % 10000))
# The next lines of code before setting api_start_time used to be in their own function get_most_recent_timestamp_from_db()
purpose_sensors = conn.query(orm_egauge.SensorInfo.purpose_id, orm_egauge.SensorInfo.data_sensor_info_mapping, orm_egauge.SensorInfo.last_updated_datetime, orm_egauge.SensorInfo.unit).\
purpose_sensors = conn.query(orm_lonoa.SensorInfo.purpose_id, orm_lonoa.SensorInfo.data_sensor_info_mapping, orm_lonoa.SensorInfo.last_updated_datetime, orm_lonoa.SensorInfo.unit).\
filter_by(query_string=query_string,is_active=True)
last_updated_datetime = purpose_sensors[0].last_updated_datetime
if last_updated_datetime:
Expand Down Expand Up @@ -139,7 +142,7 @@ def get_data_from_api(conn, query_string):
# readings.to_csv(path_or_buf=output_file, index=False, header=False, mode='a+')
# # readings.to_csv(path_or_buf=output_file, mode='a+')
for purpose_sensor in purpose_sensors:
error_log_row = orm_egauge.ErrorLog(purpose_id=purpose_sensor.purpose_id, datetime=current_time, was_success=True, pipeline_stage=orm_egauge.ErrorLog.PipelineStageEnum.data_acquisition)
error_log_row = orm_lonoa.ErrorLog(purpose_id=purpose_sensor.purpose_id, datetime=current_time, was_success=True, pipeline_stage=orm_lonoa.ErrorLog.PipelineStageEnum.data_acquisition)
conn.add(error_log_row)
conn.commit()
return readings, purpose_sensors
Expand Down Expand Up @@ -184,21 +187,21 @@ def insert_readings_into_database(conn, readings, purpose_sensors):
column_name = columns[i+1]
# only insert column's reading if data_sensor_info_mapping matches column_name
if purpose_sensor.data_sensor_info_mapping == column_name:
reading_row = orm_egauge.Reading(purpose_id=purpose_sensor.purpose_id, datetime=row_datetime, reading=row_reading, units=purpose_sensor.unit, upload_timestamp=current_time)
reading_row = orm_lonoa.Reading(purpose_id=purpose_sensor.purpose_id, datetime=row_datetime, reading=row_reading, units=purpose_sensor.unit, upload_timestamp=current_time)
conn.add(reading_row)
rows_inserted += 1
new_last_updated_datetime = row_datetime
if rows_inserted > 0:
conn.query(orm_egauge.SensorInfo.purpose_id).filter(orm_egauge.SensorInfo.purpose_id == purpose_sensor.purpose_id).update({"last_updated_datetime": new_last_updated_datetime})
error_log_row = orm_egauge.ErrorLog(purpose_id=purpose_sensor.purpose_id, datetime=current_time, pipeline_stage=orm_egauge.ErrorLog.PipelineStageEnum.database_insertion, was_success=True)
conn.query(orm_lonoa.SensorInfo.purpose_id).filter(orm_lonoa.SensorInfo.purpose_id == purpose_sensor.purpose_id).update({"last_updated_datetime": new_last_updated_datetime})
error_log_row = orm_lonoa.ErrorLog(purpose_id=purpose_sensor.purpose_id, datetime=current_time, pipeline_stage=orm_lonoa.ErrorLog.PipelineStageEnum.database_insertion, was_success=True)
conn.add(error_log_row)
# need to flush and refresh to get error_log_row.log_id
conn.flush()
conn.refresh(error_log_row)
# update current set of readings with related log_id
conn.query(orm_egauge.Reading.log_id).\
filter(orm_egauge.Reading.purpose_id == purpose_sensor.purpose_id,
orm_egauge.Reading.upload_timestamp == current_time).\
conn.query(orm_lonoa.Reading.log_id).\
filter(orm_lonoa.Reading.purpose_id == purpose_sensor.purpose_id,
orm_lonoa.Reading.upload_timestamp == current_time).\
update({'log_id':error_log_row.log_id})
logging.info(str(rows_inserted) + ' readings(s) attempted to be inserted by ' + SCRIPT_NAME + ' for purpose id ' + str(purpose_sensor.purpose_id))
conn.commit()
Expand All @@ -209,10 +212,10 @@ def log_failure_to_connect_to_api(conn, exception, query_string):
current_time = pendulum.now('Pacific/Honolulu')
current_time = current_time.set(microsecond=current_time.microsecond - (current_time.microsecond % 10000))
# get all purpose_ids associated with query_string
purpose_ids = [purpose_id[0] for purpose_id in conn.query(orm_egauge.SensorInfo.purpose_id).filter_by(query_string=query_string, is_active=True)]
purpose_ids = [purpose_id[0] for purpose_id in conn.query(orm_lonoa.SensorInfo.purpose_id).filter_by(query_string=query_string, is_active=True)]
logging.exception('Egauge API data request error')
for purpose_id in purpose_ids:
error_log_row = orm_egauge.ErrorLog(datetime=current_time, error_type=exception.__class__.__name__, pipeline_stage=orm_egauge.ErrorLog.PipelineStageEnum.data_acquisition, purpose_id=purpose_id, was_success=False)
error_log_row = orm_lonoa.ErrorLog(datetime=current_time, error_type=exception.__class__.__name__, pipeline_stage=orm_lonoa.ErrorLog.PipelineStageEnum.data_acquisition, purpose_id=purpose_id, was_success=False)
conn.add(error_log_row)
conn.commit()

Expand All @@ -225,7 +228,7 @@ def log_failure_to_connect_to_database(conn, exception, purpose_sensors):
logging.exception('Egauge reading insertion error')
conn.rollback()
for purpose_sensor in purpose_sensors:
error_log_row = orm_egauge.ErrorLog(datetime=current_time, error_type=exception.__class__.__name__, purpose_id=purpose_sensor.purpose_id, pipeline_stage=orm_egauge.ErrorLog.PipelineStageEnum.database_insertion, was_success=False)
error_log_row = orm_lonoa.ErrorLog(datetime=current_time, error_type=exception.__class__.__name__, purpose_id=purpose_sensor.purpose_id, pipeline_stage=orm_lonoa.ErrorLog.PipelineStageEnum.database_insertion, was_success=False)
conn.add(error_log_row)
conn.commit()

Expand All @@ -235,7 +238,7 @@ def log_failure_to_connect_to_database(conn, exception, purpose_sensors):
# start the database connection
conn = get_db_handler()
# get a list of all unique query_string's for active egauges from sensor_info table
query_strings = [query_string[0] for query_string in conn.query(orm_egauge.SensorInfo.query_string).filter_by(script_folder=orm_egauge.SensorInfo.ScriptFolderEnum.egauge, is_active=True).distinct()]
query_strings = [query_string[0] for query_string in conn.query(orm_lonoa.SensorInfo.query_string).filter_by(script_folder=orm_lonoa.SensorInfo.ScriptFolderEnum.egauge, is_active=True).distinct()]
for query_string in query_strings:
try:
# readings is a pandas dataframe
Expand Down

0 comments on commit 2b69595

Please sign in to comment.