Commit a127af5c authored by hazrmard's avatar hazrmard
Browse files

input buffer contains raw measurements, instead of smoothed inputs. Smoothing...

input buffer contains raw measurements, instead of smoothed inputs. Smoothing happens outside of buffer. Added try/catch for reading solcast API for HTTPErrors when rate limit is reached.
parent 7675e0cb
...@@ -164,22 +164,21 @@ def control_loop(source: str, store: str, save: str, valid: str, agent: DDPGAgen ...@@ -164,22 +164,21 @@ def control_loop(source: str, store: str, save: str, valid: str, agent: DDPGAgen
dhi = 300 # caches last DHI value dhi = 300 # caches last DHI value
is_alumni_vars_updated = True # flag indicating that variables are updated properly is_alumni_vars_updated = True # flag indicating that variables are updated properly
log.info('Control loop thread started.') log.info('Control loop thread started.')
# Buffers for input/output variables:
# raw measurements -> [buffer_in] -> (optional smoothing) -> state \
# |-> NETWORK -> ctrl -> [ctrl_window] -> (optional smoothing) \
# |-> final_ctrl -> [buffer_out]
buffer_in = deque(maxlen=control_mean_window) # input measurements buffer_in = deque(maxlen=control_mean_window) # input measurements
ctrl_window = deque(maxlen=control_mean_window) # raw network outputs ctrl_window = deque(maxlen=control_mean_window) # raw network control outputs
buffer_out = deque(maxlen=1) # store previous control buffer_out = deque(maxlen=1) # store previous control supplied to system
while not signal_stop.is_set(): while not signal_stop.is_set():
start = time.perf_counter() start = time.perf_counter()
try: try:
cur_time = datetime.now() cur_time = datetime.now()
cur_time_utc = datetime.utcnow() cur_time_utc = datetime.utcnow()
# Get measurements from source: # Get measurements from source. Assume initial state of variables:
t = None t = None
# if source == '_demo_source':
# raw = np.genfromtxt(source) # read measurements as numpy arrays
# # This is the time format from the production data
# t = cur_time.strftime('%d-%b-%y %I:%M %p CDT')
# else:
raw = np.zeros(4) raw = np.zeros(4)
# Get DHI. DHI is pulled from a separate API and is restricted # Get DHI. DHI is pulled from a separate API and is restricted
# to updates every 30mins (PT30M). # to updates every 30mins (PT30M).
...@@ -201,6 +200,7 @@ def control_loop(source: str, store: str, save: str, valid: str, agent: DDPGAgen ...@@ -201,6 +200,7 @@ def control_loop(source: str, store: str, save: str, valid: str, agent: DDPGAgen
# Ensure input shape is consistent. From now on, raw is a 2D array # Ensure input shape is consistent. From now on, raw is a 2D array
# of N rows of measurements of F columns of fields # of N rows of measurements of F columns of fields
# pylint: disable=unsubscriptable-object
if len(raw.shape) == 1: # convert 1d to 2d array if len(raw.shape) == 1: # convert 1d to 2d array
raw = np.reshape(raw, (-1, raw.shape[0])) raw = np.reshape(raw, (-1, raw.shape[0]))
log.info('Read %d measurements', len(raw)) log.info('Read %d measurements', len(raw))
...@@ -225,21 +225,24 @@ def control_loop(source: str, store: str, save: str, valid: str, agent: DDPGAgen ...@@ -225,21 +225,24 @@ def control_loop(source: str, store: str, save: str, valid: str, agent: DDPGAgen
.format(buffer_in[-1], raw[-1])) .format(buffer_in[-1], raw[-1]))
if len(buffer_in) >= control_mean_window: if len(buffer_in) >= control_mean_window:
arr = np.concatenate((np.asarray(buffer_in), raw)) arr = np.concatenate((np.asarray(buffer_in), raw))
raw = np.mean(arr, axis=0, keepdims=True) state = np.mean(arr, axis=0, keepdims=True)
log.debug(('Using averaged input of buffer and latest input:' log.debug(('Using averaged input of buffer and latest input:'
'\n{}\n=={}').format(arr, raw[-1])) '\n{}\n=={}').format(arr, state[-1]))
else: else:
raw = np.asarray([control_input_fallback]) state = np.asarray([control_input_fallback])
log.debug('Using fallback input: {}'.format(raw[-1])) log.debug('Using fallback input: {}'.format(state[-1]))
else:
state = raw
else:
state = raw
# Once input has been validated and transformed, add most recent # Once input has been validated and transformed, add most recent
# measurement to buffer. # raw measurement to buffer.
buffer_in.append(raw[-1]) buffer_in.append(raw[-1])
# Execute new control logic when a new measurement becomes available: # Execute new control logic when a new measurement becomes available:
# Applying control # Applying control
new_input = raw[-1, np.newaxis] # take only latest measurement for control ctrl = agent.select_action(state[-1, np.newaxis])[0]
ctrl = agent.select_action(new_input)[0]
ctrl_window.append(ctrl) # record action in history ctrl_window.append(ctrl) # record action in history
with open(save, 'w') as cfile: with open(save, 'w') as cfile:
final_ctrl = np.mean(ctrl_window) # final control is mean of window final_ctrl = np.mean(ctrl_window) # final control is mean of window
......
...@@ -3,6 +3,7 @@ This script extracts solar irradiation data from Solcast. ...@@ -3,6 +3,7 @@ This script extracts solar irradiation data from Solcast.
""" """
import os import os
from urllib.error import HTTPError
import pandas as pd import pandas as pd
...@@ -13,12 +14,14 @@ lat, lon = '36.147934', '-86.803386' ...@@ -13,12 +14,14 @@ lat, lon = '36.147934', '-86.803386'
type = ['forecasts', 'estimated_actuals'] type = ['forecasts', 'estimated_actuals']
def update_dhi_data(is_test_env, write_dir): def update_dhi_data(is_test_env, write_dir) -> bool:
""" """
Makes a call to Solcast API and stores the data in a local csv file. Makes a call to Solcast API and stores the data in a local csv file. Returns
False if update failed due to HTTPError. Otherwise returns True.
""" """
# Solcast api key to access the API # Solcast api key to access the API
# TODO: remove api keys from code and git history
if is_test_env: if is_test_env:
api_key = 'Mj1OX_LqWspibNMsPSy2T34Ti6WlpCyA' # for test environment api_key = 'Mj1OX_LqWspibNMsPSy2T34Ti6WlpCyA' # for test environment
else: else:
...@@ -30,9 +33,14 @@ def update_dhi_data(is_test_env, write_dir): ...@@ -30,9 +33,14 @@ def update_dhi_data(is_test_env, write_dir):
'&api_key='+api_key+ '&format=csv' '&api_key='+api_key+ '&format=csv'
# get the data from solcast # get the data from solcast
df = pd.read_csv(url) try:
df = pd.read_csv(url)
except HTTPError as e:
return False
# TODO: Data-frame needs to be verified for formatting.
df = df[['Ghi', 'period_end']].copy() df = df[['Ghi', 'period_end']].copy()
df.to_csv(os.path.join(write_dir, 'solcast_dhi.csv'), encoding='utf-8', index=False) df.to_csv(os.path.join(write_dir, 'solcast_dhi.csv'), encoding='utf-8', index=False)
return True
def get_current_DHI(t, is_test_env, read_dir): def get_current_DHI(t, is_test_env, read_dir):
...@@ -66,7 +74,7 @@ def get_current_DHI(t, is_test_env, read_dir): ...@@ -66,7 +74,7 @@ def get_current_DHI(t, is_test_env, read_dir):
# if data not found locally, update local data using solcast API call # if data not found locally, update local data using solcast API call
if df.empty: if df.empty:
update_dhi_data(is_test_env, read_dir) updated = update_dhi_data(is_test_env, read_dir)
df = pd.read_csv(fpath) df = pd.read_csv(fpath)
df = df.loc[df['period_end'] == t] df = df.loc[df['period_end'] == t]
...@@ -76,4 +84,4 @@ def get_current_DHI(t, is_test_env, read_dir): ...@@ -76,4 +84,4 @@ def get_current_DHI(t, is_test_env, read_dir):
else: else:
dhi = df.iloc[0]['Ghi'] dhi = df.iloc[0]['Ghi']
return dhi return dhi
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment