Commit fe0941a9 authored by hazrmard's avatar hazrmard
Browse files

removed ah_api spaghetti code, no hardcoded handling of certain csv files...

removed ah_api spaghetti code, no hardcoded handling of certain csv files needed. Made relearn window (weeks) a commandline parameter.
parent dee55040
......@@ -15,7 +15,7 @@ import numpy as np
import pytz
import requests
from bs4 import BeautifulSoup
from pandas import *
import pandas as pd
from helperfunctions import droprows, merge_df_columns, merge_df_rows
......@@ -35,7 +35,7 @@ def weeklybdxdata(weeks = 4):
trendvaluesURL = baseURL + 'trendview/api/data-values'
sess = requests.Session()
xml = BeautifulSoup(sess.get(loginURL).content)
xml = BeautifulSoup(sess.get(loginURL).content, 'html.parser')
view_state = None
for i in xml.find_all('input'):
if i.get('name') == 'javax.faces.ViewState':
......@@ -65,7 +65,7 @@ def weeklybdxdata(weeks = 4):
r =, json=payload)
vars_json = json.loads(r.content)
dates = [to_datetime(i['time']) for i in vars_json[0]['dataValues']]
dates = [pd.to_datetime(i['time']) for i in vars_json[0]['dataValues']]
column_dict = {'Kissam_Global_OAH value': 'OAH',
'AHU_1 outdoorAirTemp': 'OAT',
'AHU_1 supplyAirTemp': 'SAT',
......@@ -77,7 +77,7 @@ def weeklybdxdata(weeks = 4):
column_values.append([j['realValue'] for j in vars_json[k]['dataValues']])
column_values = np.array(column_values).T
localdata = DataFrame(column_values, columns=column_names, index=dates)
localdata = pd.DataFrame(column_values, columns=column_names, index=dates)
# Dropping duplicated time points that may exist in the data
localdata = localdata[~localdata.index.duplicated()]
......@@ -92,6 +92,8 @@ def weeklybdxdata(weeks = 4):
return localdata
def weeklysolardata(datafolder: str = './energyfreqanalysis/', weeks: int = 4):
Extract solar irradiance data from database created by the control loop. This
......@@ -103,25 +105,19 @@ def weeklysolardata(datafolder: str = './energyfreqanalysis/', weeks: int = 4):
weeks {int} -- Number of weeks to read (default: {4})
pandas.DataFrame -- Solar irradiance data in a dataframe
pandas.pd.DataFrame -- Solar irradiance data in a dataframe
# TODO: Instead of loading by weeks, check number of records loaded per file,
# then load additional csv files as needed. This is because main control loop
# aggregates by calendar week. So if the script was started on a sunday
# the controller would create a new file to store measurements for the week
# beginning on monday. The sunday data will belong to the earlier calandar week.
data_files = glob(os.path.join(datafolder, '*store.csv'))
#flist = sorted(data_files, key=os.path.getctime, reverse=True)[:weeks+1]
t1 = [re.findall(r'\d+', t) for t in data_files]
cod = {}
for i in range(len(t1)):
cod[int(t1[i][0]+t1[i][1])]= data_files[i]
for i in [392019,402019,412019, 422019, 432019,442019]:
del cod[i]
t2 = list(cod.keys())
t3 = sorted(t2,reverse=True)
flist = [cod[i] for i in t3[:weeks+1]]
# Get the most recently created data files for the past `weeks+1` weeks
# including current week.
# This assumes *store.csv files are named in a YYYY-UU format or any scheme
# where the names when sorted alphabetically are put in chronological order.
flist = sorted(glob(os.path.join(datafolder, '*store.csv')), reverse=True)[:weeks+1]
if len(flist) == 0:
warnings.warn('No *store.csv files found containing historical measurements.')
......@@ -132,17 +128,19 @@ def weeklysolardata(datafolder: str = './energyfreqanalysis/', weeks: int = 4):
dflist = []
for filename in flist:
dfr = read_csv(filename,
names=['Dates', 'OAT', 'ORH', 'Ghi', 'Alumni_SAT', 'RL_SAT'],
na_values=['nan', ' nan'],
dfr = pd.read_csv(filename,
names=['Dates', 'OAT', 'ORH', 'Ghi', 'Alumni_SAT', 'RL_SAT'],
na_values=['nan', ' nan'],
timelist = []
# TODO: Instead of manually iterating over rows to parse timezones,
# ignore timezone info altogether.
for i in dfr['Dates']:
temp = to_datetime(i,format='%d-%b-%y %I:%M %p CDT')
temp = pd.to_datetime(i,format='%d-%b-%y %I:%M %p CDT')
temp = to_datetime(i,format='%d-%b-%y %I:%M %p CST')
temp = pd.to_datetime(i,format='%d-%b-%y %I:%M %p CST')
dfr['Dates'] = np.array(timelist)
......@@ -158,9 +156,9 @@ def weeklysolardata(datafolder: str = './energyfreqanalysis/', weeks: int = 4):
def weeklyrelearndata(solardatapath: str, weeks: int = 4, log: Logger=None):
df1 = weeklybdxdata(weeks = weeks)
df2 = weeklysolardata(datafolder=solardatapath, weeks = weeks)
def weeklyrelearndata(solardatapath: str, weeks: int=4):
df1 = weeklybdxdata(weeks=weeks)
df2 = weeklysolardata(datafolder=solardatapath, weeks=weeks)
if df2 is None:
raise FileNotFoundError('Solar data could not be extracted.')
dfrelearn = merge_df_columns([df1, df2])
......@@ -6,19 +6,22 @@ import time
import signal
import sys
import os
from argparse import ArgumentParser
import logging
from argparse import ArgumentParser
from datetime import datetime
from importlib import import_module
from collections import deque
from threading import Thread, Lock
from multiprocessing import Process, Event
# Set up logging
# Set up logging with a global variable "log"
log = logging.getLogger(__name__)
_formatter = logging.Formatter('%(asctime)s - %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
_handler = logging.StreamHandler(sys.stderr)
# Initially, before the code in __main__ guard executes, logging is set to DEBUG
# to print out all messages to console.
......@@ -34,7 +37,6 @@ for dep in dependencies:
import numpy as np
from numpy.random import randn
import datetime
from rl.agents import DDPGAgent
from keras.models import load_model
......@@ -42,7 +44,6 @@ from agent import get_agent, train_agent
from HVAC_Environment import Env
from modelretrain import retrain
from ah_api import weeklyrelearndata
from solar_irradiance import get_current_DHI
from util_datetime import convert_to_ISO8601, convert_to_local_time, abs_diff_in_sec
......@@ -68,6 +69,8 @@ PERIOD = 300.0
# Time between learning new policy from history of measurements. Interval is in
# units of PERIODs (i.e. for PERIOD=10s, INTERVAL=5 means 5 PERIODs=50s)
# Number of weeks over which to re-learn control
# Controller argument definitions
......@@ -85,6 +88,7 @@ parser.add_argument('save_to', type=str, nargs='?',
parser.add_argument('is_valid', type=str, nargs='?',
default=os.path.join(PROD_WRITE_DIR, 'is_valid.csv'),
help='File to write the setpoint validity indicator')
parser.add_argument('--use_control', '-c', default='agent_weights.h5f', type=str,
help='Prefix of *_[actor|critic].h5f parameters file to use.')
parser.add_argument('--control_mean_window', '-m', default=CTRL_MEAN_WINDOW, type=int,
......@@ -99,6 +103,7 @@ parser.add_argument('--control_input_fallback', default=FALLBACK_INPUTS, type=fl
nargs=4, metavar='F',
help=('Default input values to infer control from if access '
'to valid source fails.'))
parser.add_argument('--period', '-p', type=float, default=PERIOD,
help='Time interval (s) for control loop application.')
parser.add_argument('--interval', '-i', type=int, default=INTERVAL,
......@@ -106,6 +111,9 @@ parser.add_argument('--interval', '-i', type=int, default=INTERVAL,
parser.add_argument('--training_duration', '-t', type=int, default=1e5,
help='Number of steps to use when re-learning control.')
parser.add_argument('--relearn_window', '-w', type=int, default=RELEARN_WINDOW,
help='Number of past weeks over which to re-learn control.')
parser.add_argument('--demo', '-d', action='store_true', default=False,
help='Run a demo with fake input and output streams.')
parser.add_argument('--dev_server', '-x', action='store_true', default=False,
......@@ -163,13 +171,14 @@ def control_loop(source: str, store: str, save: str, valid: str, agent: DDPGAgen
while not signal_stop.is_set():
start = time.perf_counter()
cur_time =
cur_time_utc = datetime.datetime.utcnow()
cur_time =
cur_time_utc = datetime.utcnow()
# Get measurements from source:
t = None
if source == '_demo_source':
raw = np.genfromtxt(source) # read measurements as numpy arrays
t = cur_time.isoformat() # demo source is always on time
# This is the time format from the production data
t = cur_time.strftime('%d-%b-%y %I:%M %p CDT')
raw = np.zeros(4)
# Get DHI. DHI is pulled from a separate API and is restricted
......@@ -180,6 +189,7 @@ def control_loop(source: str, store: str, save: str, valid: str, agent: DDPGAgen
# Get other measurements and put them all together into the `raw`
# input array.
# TODO: Document the format of the input file.
with open(source) as f:
lines =
t = lines[2] # timestamp
......@@ -231,7 +241,6 @@ def control_loop(source: str, store: str, save: str, valid: str, agent: DDPGAgen
new_input = raw[-1] # take only latest measurement for control
ctrl = agent.select_action(new_input[np.newaxis, :])[0]
ctrl_window.append(ctrl) # record action in history
#ctrl = unnormalize_temp(ctrl)
with open(save, 'w') as cfile:
final_ctrl = np.mean(ctrl_window) # final control is mean of window
if len(buffer_out) > 0: # final control cannot change above threshold
......@@ -258,10 +267,7 @@ def control_loop(source: str, store: str, save: str, valid: str, agent: DDPGAgen
# Saving historical data. Data storage is aggregated on a calendar-
# week basis.
year =[0]
week =[1]
dfile_path = os.path.join(store, # a new file every week
'{}-{}-{}'.format(year, week, 'store.csv'))
dfile_path = os.path.join(store,'%Y-%U-store.csv'))
with open(dfile_path, 'a') as dfile:, 2) # go to end of destination file...
# np.savetxt(dfile, raw) # ...and write all read measurements
......@@ -294,45 +300,29 @@ def control_loop(source: str, store: str, save: str, valid: str, agent: DDPGAgen
def unnormalize_temp(tanhoutput):
Transforms normalized temperature to unnormalized.
highth = 91.66
lowth = 48.98
deltath = highth - lowth
tanhdelta = 1 - (-1)
actualaction = lowth + (tanhoutput+1)* deltath/tanhdelta
return actualaction[0]
def learn_control(source: str, save_to: str, duration: int, signal_stop: Event,
signal_reload: Event, lock_policy: Lock):
def learn_control(source: str, save_to: str, relearn_window: int, duration: int,
signal_stop: Event, signal_reload: Event, lock_policy: Lock):
Run reinforcement learning on accumulated experience and save new control
source {str} -- Directory containing history of measurements (.csv) to learn
new model,
save_to {str} -- Filename (.h5f) where to save new policy,
new model. Directory should contain csv files with names YYYY-UU-store.csv.
save_to {str} -- Filename (.h5f) where to save new policy.
relearn_window {str} -- Number of weeks of data over which to learn control.
duration {int} -- Number of steps/actions to train over,
signal_stop {Event} -- Event to indicate that script is ending,
signal_reload {Event} -- Event to indicate control loop should reload new control,
lock_policy {Lock} -- Lock to ensure control loop and learning functions do not
simultaneously access policy file.
# TODO: Currently this function is disabled. In production, it will use
# history of measurements to learn new models of the system and use them
# to develop newer control policies.
# TODO: Have to handle edge case when there simulataneous writing and reading
# from last week file.
# get the new traindata
relearndf = weeklyrelearndata(solardatapath=source, weeks=4, log=log)
relearndf = weeklyrelearndata(solardatapath=source, weeks=relearn_window)
# Train it with the newly initialized environment
......@@ -429,8 +419,9 @@ if __name__ == '__main__':
# Specifing the log file name
write_dir = DEV_WRITE_DIR if args.dev_server else PROD_WRITE_DIR
_logfile_handler = logging.FileHandler(filename=os.path.join(
DEV_WRITE_DIR if args.dev_server else PROD_WRITE_DIR, 'log.txt'))
write_dir, 'log.txt'))
_logfile_handler.setLevel(logging.DEBUG) # DEBUG is the lowest severity. It means print all messages.
_logfile_handler.setFormatter(_formatter) # Set up the format of log messages
log.addHandler(_logfile_handler) # add this handler to the logger
......@@ -499,8 +490,9 @@ if __name__ == '__main__':
time.sleep(max(0, args.period * args.interval - (end - start)))
start = time.perf_counter()
learn_control(source=args.store_at, save_to=args.use_control,
duration=args.training_duration, signal_stop=signal_stop,
signal_reload=signal_reload, lock_policy=lock_policy)
duration=args.training_duration, signal_stop=signal_stop,
signal_reload=signal_reload, lock_policy=lock_policy)
end = time.perf_counter()
except KeyboardInterrupt:
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment