Commit ddd1b769 authored by hazrmard's avatar hazrmard
Browse files

Converted demo process to thread, added halt_on_error option, store file name is YYYY-WW-store.csv

parent 16e744c8
......@@ -41,7 +41,7 @@ from keras.models import load_model
from agent import get_agent, train_agent
from HVAC_Environment import Env
from modelretrain import retrain
from ah_api import *
from ah_api import weeklyrelearndata
from solar_irradiance import get_current_DHI
from util_datetime import convert_to_ISO8601, convert_to_local_time, abs_diff_in_sec
......@@ -77,7 +77,8 @@ parser.add_argument('read_from', type=str, nargs='?',
help='File to read Niagra data from.')
parser.add_argument('store_at', type=str, nargs='?',
default=PROD_WRITE_DIR,
help='Directory to store measurements to.')
help=('Directory to store measurements to. Files are named: '
'YEAR-WEEK-store.csv'))
parser.add_argument('save_to', type=str, nargs='?',
default=os.path.join(PROD_WRITE_DIR, 'SAT_Setpoint.csv'),
help='File to write control setpoint to.')
......@@ -85,15 +86,17 @@ parser.add_argument('is_valid', type=str, nargs='?',
default=os.path.join(PROD_WRITE_DIR, 'is_valid.csv'),
help='File to write the setpoint validity indicator')
parser.add_argument('--use_control', '-c', default='agent_weights.h5f', type=str,
help='Use existing control policy from file (.h5f)')
help='Prefix of *_[actor|critic].h5f parameters file to use.')
parser.add_argument('--control_mean_window', '-m', default=CTRL_MEAN_WINDOW, type=int,
help='Window of past actions over which to average control.')
parser.add_argument('--control_output_threshold', default=OUTPUT_THRES, type=float,
help='Maximum amount output can change between intervals.')
parser.add_argument('--control_input_threshold', default=DIFF_THRESH, type=float, nargs=4,
parser.add_argument('--control_input_threshold', default=DIFF_THRESH, type=float,
nargs=4, metavar='T',
help=('Maximum amount each input can change between intervals '
'to be considered normal.'))
parser.add_argument('--control_input_fallback', default=FALLBACK_INPUTS, type=float, nargs=4,
parser.add_argument('--control_input_fallback', default=FALLBACK_INPUTS, type=float,
nargs=4, metavar='F',
help=('Default input values to infer control from if access '
'to valid source fails.'))
parser.add_argument('--period', '-p', type=float, default=PERIOD,
......@@ -111,13 +114,15 @@ parser.add_argument('--dev_server', '-x', action='store_true', default=False,
parser.add_argument('--logging', '-l', choices=(10, 20, 30, 40, 50), type=int,
help=('Logging level (50-critical, 40-error, 30-warning, '
'20-info, 10-debug).'), default=logging.INFO)
parser.add_argument('--halt_on_error', action='store_true', default=False,
help='Stop controller loop when error occurs.')
def control_loop(source: str, store: str, save: str, valid: str, agent: DDPGAgent, policy: str,
lock_policy: Lock, signal_stop: Event, signal_reload: Event,
period: float, control_mean_window: int, control_thresh: float,
input_thresh: np.ndarray, dev_server: bool):
input_thresh: np.ndarray, dev_server: bool, halt_on_error: bool):
"""
Main control loop for the script. Runs in a separate thread until a stop
signal is set by the main thread.
......@@ -143,6 +148,7 @@ def control_loop(source: str, store: str, save: str, valid: str, agent: DDPGAgen
intervals to be considered normal.
dev_server {bool} -- Whether environment is a development or a production
server.
halt_on_error {bool} -- Stop control loop if exception occurs.
"""
dhi = 300 # caches last DHI value
is_alumni_vars_updated = True # flag indicating that variables are updated properly
......@@ -167,7 +173,7 @@ def control_loop(source: str, store: str, save: str, valid: str, agent: DDPGAgen
# to updates every 30mins (PT30M).
cur_dhi_pull_time = convert_to_ISO8601(cur_time_utc)
temp = get_current_DHI(cur_dhi_pull_time, dev_server)
dhi = temp if temp!=None else dhi
dhi = temp if temp is not None else dhi
# Get other measurements and put them all together into the `raw`
# input array.
......@@ -233,18 +239,19 @@ def control_loop(source: str, store: str, save: str, valid: str, agent: DDPGAgen
# the confidence in the control action. It can be 0 because
# measurements are outdated or some variables could not be read.
with open(valid, 'w') as vfile:
if dhi == None or is_alumni_vars_updated == False:
if dhi is None or not is_alumni_vars_updated:
vfile.write(str(0))
else:
vfile.write(str(1))
vfile.close()
log.info('New control setpoint %s', final_ctrl)
# saving historical data
# Saving historical data. Data storage is aggregated on a calendar-
# week basis.
year = datetime.datetime.today().isocalendar()[0]
week = datetime.datetime.today().isocalendar()[1]
dfile_path = os.path.join(store, # a new file every week
'{}-{}-{}'.format(week, year, os.path.basename(store)))
'{}-{}-{}'.format(year, week, 'store.csv'))
with open(dfile_path, 'a') as dfile:
dfile.seek(0, 2) # go to end of destination file...
# np.savetxt(dfile, raw) # ...and write all read measurements
......@@ -267,6 +274,8 @@ def control_loop(source: str, store: str, save: str, valid: str, agent: DDPGAgen
vfile.close()
log.error('Control loop error: %s', str(e))
log.debug(e, exc_info=True)
if halt_on_error:
signal_stop.set()
# Do this whether or not an exception has occurred:
finally:
......@@ -296,7 +305,8 @@ def learn_control(source: str, save_to: str, duration: int, signal_stop: Event,
policy.
Args:
source {str} -- Directory containing history of measurements (.csv) to learn new model,
source {str} -- Directory containing history of measurements (.csv) to learn
new model,
save_to {str} -- Filename (.h5f) where to save new policy,
duration {int} -- Number of steps/actions to train over,
signal_stop {Event} -- Event to indicate that script is ending,
......@@ -307,8 +317,9 @@ def learn_control(source: str, save_to: str, duration: int, signal_stop: Event,
# TODO: Currently this function is disabled. In production, it will use
# history of measurements to learn new models of the system and use them
# to develop newer control policies.
# TODO: Have to handle edge case when there simulataneous writing and reading
# from last week file.
try:
# TODO: Have to handle edge case when there simulataneous writing and reading from last week file.
# get the new traindata
relearndf = weeklyrelearndata(solardatapath=source, weeks=4)
......@@ -317,6 +328,9 @@ def learn_control(source: str, save_to: str, duration: int, signal_stop: Event,
# Train it with the newly initialized environment
log.info('Adapting control policy')
with lock_policy:
if signal_stop.is_set():
log.info('Stopping re-learning.')
return
# retrain on the same model
lstm = load_model('weights.best.hdf5')
......@@ -333,7 +347,7 @@ def learn_control(source: str, save_to: str, duration: int, signal_stop: Event,
signal_reload.set()
except Exception as e:
log.error('Could not generate new control policy: %s', str(e))
log.error('Control re-learning error: %s', str(e))
log.debug(e, exc_info=True)
......@@ -432,7 +446,7 @@ if __name__ == '__main__':
# start demo process to create dummy data
if args.demo:
pr_demo = Process(target=demo_source,
pr_demo = Thread(target=demo_source,
args=('_demo_source',
signal_stop,
args.period,
......@@ -455,7 +469,8 @@ if __name__ == '__main__':
'control_mean_window': args.control_mean_window,
'dev_server': args.dev_server,
'control_thresh': args.control_output_threshold,
'input_thresh': np.asarray(args.control_input_threshold)
'input_thresh': np.asarray(args.control_input_threshold),
'halt_on_error': args.halt_on_error
})
th_ctrl.start()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment