Commit b9da82be authored by hazrmard's avatar hazrmard
Browse files

Made demo data source the same format as production source. Now control_loop...

Made demo data source the same format as production source. Now control_loop function is agnostic to whether data source is demo or not.
parent bf7d5ec3
......@@ -19,6 +19,7 @@ relearn.pkl
_demo_source
results*/
.data/
rl_results_local/
rl_perf_plot.py
modeltrainnb/
......
......@@ -3,7 +3,6 @@ This script runs the main control algorithm loop.
"""
import time
import signal
import sys
import os
import logging
......@@ -49,7 +48,7 @@ from util_datetime import convert_to_ISO8601, convert_to_local_time, abs_diff_in
# Setting the read and write directories
DEV_READ_DIR = './'
DEV_WRITE_DIR = './'
DEV_WRITE_DIR = './.data/'
PROD_READ_DIR = '/app001/niagara/Niagara4.2/vykon/stations/VUWS/shared'
PROD_WRITE_DIR = '/app001/niagara/Niagara4.2/vykon/shared'
......@@ -118,7 +117,8 @@ parser.add_argument('--demo', '-d', action='store_true', default=False,
help='Run a demo with fake input and output streams.')
parser.add_argument('--dev_server', '-x', action='store_true', default=False,
help=('Flag to indicate if it is a development server. This '
'changes read/write paths'))
'overrides read_from/store_at and changes read/write paths:\n'
'READ: {}\nWRITE: {}').format(DEV_READ_DIR, DEV_WRITE_DIR))
parser.add_argument('--logging', '-l', choices=(10, 20, 30, 40, 50), type=int,
help=('Logging level (50-critical, 40-error, 30-warning, '
'20-info, 10-debug).'), default=logging.INFO)
......@@ -175,29 +175,29 @@ def control_loop(source: str, store: str, save: str, valid: str, agent: DDPGAgen
cur_time_utc = datetime.utcnow()
# Get measurements from source:
t = None
if source == '_demo_source':
raw = np.genfromtxt(source) # read measurements as numpy arrays
# This is the time format from the production data
t = cur_time.strftime('%d-%b-%y %I:%M %p CDT')
else:
raw = np.zeros(4)
# Get DHI. DHI is pulled from a separate API and is restricted
# to updates every 30mins (PT30M).
cur_dhi_pull_time = convert_to_ISO8601(cur_time_utc)
temp = get_current_DHI(cur_dhi_pull_time, dev_server)
dhi = temp if temp is not None else dhi
# Get other measurements and put them all together into the `raw`
# input array.
# TODO: Document the format of the input file.
with open(source) as f:
lines = f.read().splitlines()
t = lines[2] # timestamp
raw[0] = lines[5].split(',')[1].replace('"', '') # Alumni_OAT
raw[2] = dhi # Irradiance (DHI)
raw[1] = lines[6].split(',')[1].replace('"', '') # Alumni_RH (OAH)
raw[3] = lines[7].split(',')[1].replace('"', '') # Alumni_SAT
f.close()
# if source == '_demo_source':
# raw = np.genfromtxt(source) # read measurements as numpy arrays
# # This is the time format from the production data
# t = cur_time.strftime('%d-%b-%y %I:%M %p CDT')
# else:
raw = np.zeros(4)
# Get DHI. DHI is pulled from a separate API and is restricted
# to updates every 30mins (PT30M).
cur_dhi_pull_time = convert_to_ISO8601(cur_time_utc)
temp = get_current_DHI(cur_dhi_pull_time, dev_server, store)
dhi = temp if temp is not None else dhi
# Get other measurements and put them all together into the `raw`
# input array.
# TODO: Document the format of the input file.
with open(source) as f:
lines = f.read().splitlines()
t = lines[2] # timestamp
raw[0] = lines[5].split(',')[1].replace('"', '') # Alumni_OAT
raw[2] = dhi # Irradiance (DHI)
raw[1] = lines[6].split(',')[1].replace('"', '') # Alumni_RH (OAH)
raw[3] = lines[7].split(',')[1].replace('"', '') # Alumni_SAT
f.close()
# Ensure input shape is consistent. From now on, raw is a 2D array
# of N rows of measurements of F columns of fields
......@@ -406,7 +406,13 @@ def demo_source(dest: str, signal_stop: Event, period: float, mean: np.ndarray,
while not signal_stop.is_set():
readings = sigma * randn(NUM_INPUT_VARS) + mean
with open(dest, 'w') as file:
file.write('{:.2f}\t{:.2f}\t{:.2f}\t{:.2f}\n'.format(*readings))
file.write('AlumniVariables\n')
file.write('slot:/MumboJumbo\n')
file.write(datetime.now().strftime('%d-%b-%y %I:%M %p CDT\n\n'))
file.write('Name,Variable\n')
file.write('Alumni_OAT, {:.2f}\n'.format(readings[0]))
file.write('Alumni_RH, {:.2f}\n'.format(readings[1]))
file.write('Alumni_SAT, {:.2f}\n'.format(readings[2]))
log.info('Demo values written to source.')
signal_stop.wait(timeout=abs(period + 0.2 * randn()))
except Exception as e:
......@@ -423,16 +429,20 @@ if __name__ == '__main__':
try:
args = parser.parse_args()
# Change arguments if script is ins a development server:
# Change arguments if script is ins a development server. A --dev_server
# flag is a shortcut for specifying read/write directories. It ignores the
# positional arguments read_from/store_at/save_to/is_valid and instead
# uses these:
if args.dev_server:
os.makedirs(DEV_WRITE_DIR, exist_ok=True)
args.read_from = os.path.join(DEV_READ_DIR, 'AlumniVariables.csv')
args.store_at = DEV_WRITE_DIR
args.save_to = os.path.join(DEV_WRITE_DIR, 'SAT_Setpoint.csv')
args.is_valid = os.path.join(DEV_WRITE_DIR, 'is_valid.csv')
args.is_valid = os.path.join(DEV_WRITE_DIR, 'is_valid.csv')
# Specifing the log file name
write_dir = DEV_WRITE_DIR if args.dev_server else PROD_WRITE_DIR
write_dir = DEV_WRITE_DIR if args.dev_server else args.store_at
_logfile_handler = logging.FileHandler(filename=os.path.join(
write_dir, 'log.txt'))
_logfile_handler.setLevel(logging.DEBUG) # DEBUG is the lowest severity. It means print all messages.
......@@ -466,7 +476,7 @@ if __name__ == '__main__':
# start demo process to create dummy data
if args.demo:
pr_demo = Thread(target=demo_source,
args=('_demo_source',
args=(args.read_from,
signal_stop,
args.period,
np.asarray(args.control_input_fallback),
......@@ -475,7 +485,7 @@ if __name__ == '__main__':
# start control loop thread
th_ctrl = Thread(target=control_loop, daemon=False, kwargs={
'source': '_demo_source' if args.demo else args.read_from,
'source': args.read_from,
'store': args.store_at,
'save': args.save_to,
'valid': args.is_valid,
......
......@@ -2,6 +2,8 @@
This script extracts solar irradiation data from Solcast.
"""
import os
import pandas as pd
# Define the lat, long of the location
......@@ -11,7 +13,7 @@ lat, lon = '36.147934', '-86.803386'
type = ['forecasts', 'estimated_actuals']
def update_dhi_data(is_test_env):
def update_dhi_data(is_test_env, write_dir):
"""
Makes a call to Solcast API and stores the data in a local csv file.
"""
......@@ -30,29 +32,45 @@ def update_dhi_data(is_test_env):
# get the data from solcast
df = pd.read_csv(url)
df = df[['Ghi', 'period_end']].copy()
df.to_csv('solcast_dhi.csv', encoding='utf-8', index=False)
df.to_csv(os.path.join(write_dir, 'solcast_dhi.csv'), encoding='utf-8', index=False)
def get_current_DHI(t, is_test_env):
def get_current_DHI(t, is_test_env, read_dir):
"""
Looks for DHI data locally. If not found locally, downloads the data
using solcast API. Returns the DHI value for the requested time stamp.
Returns `None` when no data found.
Parameters
----------
t : str
Time at which to get solar irradiance. In format %Y-%m-%dT%H:%M:%SZ
is_test_env : bool
Whether called from a test environment. Changes API key used.
read_dir : str
Directory in which the solar data is stored/read from
Returns
-------
float
The solar irradiance at that time. If time stamp does not exist in stored
data, None is returned.
"""
# TODO: Remove solcast_dhi.csv from repo. Currently that will give an error
# since this function doesn't check if the file doesn't exist and create a new
# one. Correct behavior: check if csv file exists, if not, create an empty one.
fpath = os.path.join(read_dir, 'solcast_dhi.csv')
if not os.path.isfile(fpath):
update_dhi_data(is_test_env, read_dir)
# retrieve dhi data locally
df = pd.read_csv('solcast_dhi.csv')
# retrieve dhi data locally for the latest time stamp
df = pd.read_csv(fpath)
df = df.loc[df['period_end'] == t]
# if data not found locally, update local data using solcast API call
if df.empty:
update_dhi_data(is_test_env)
df = pd.read_csv('solcast_dhi.csv')
update_dhi_data(is_test_env, read_dir)
df = pd.read_csv(fpath)
df = df.loc[df['period_end'] == t]
# If data is empty even after updating for latest time, return none
if df.empty:
dhi = None
else:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment