"""Module defining EDGES-specific reading functions for weather and auxiliary data."""
from __future__ import annotations
import re
import warnings
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
import numpy as np
_NEW_WEATHER_PATTERN = re.compile(
r"(?P<year>\d{4}):(?P<day>\d{3}):(?P<hour>\d{2}):(?P<minute>\d{2}):(?P<second>\d{2}) "
r"rack_temp (?P<rack_temp>\d{3}.\d{2}) Kelvin, "
r"ambient_temp (?P<ambient_temp>\d{3}.\d{2}) Kelvin, "
r"ambient_hum (?P<ambient_hum>[\d\- ]{3}.\d{2}) percent, "
r"frontend (?P<frontend_temp>\d{3}.\d{2}) Kelvin, "
r"rcv3_lna (?P<lna_temp>\d{3}.\d{2}) Kelvin"
)
_OLD_WEATHER_PATTERN = re.compile(
r"(?P<year>\d{4}):(?P<day>\d{3}):(?P<hour>\d{2}):(?P<minute>\d{2}):(?P<second>\d{2}) "
r"rack_temp (?P<rack_temp>\d{3}.\d{2}) Kelvin, "
r"ambient_temp (?P<ambient_temp>\d{3}.\d{2}) Kelvin, "
r"ambient_hum (?P<ambient_hum>[\d\- ]{3}.\d{2}) percent, "
)
_THERMLOG_PATTERN = re.compile(
r"(?P<year>\d{4}):(?P<day>\d{3}):(?P<hour>\d{2}):(?P<minute>\d{2}):(?P<second>\d{2}) "
r"temp_set (?P<temp_set>[\d\- ]+.\d{2}) deg_C "
r"tmp (?P<receiver_temp>[\d\- ]+.\d{2}) deg_C "
r"pwr (?P<power_percent>[\d\- ]+.\d{2}) percent"
)
def _parse_lines(text, pattern):
for match in pattern.finditer(text):
dct = {}
for k, v in match.groupdict().items():
try:
dct[k] = int(v)
except ValueError:
dct[k] = float(v)
yield dct
def _get_chunk_pos_and_size(
fname: str | Path,
start_time: tuple[int, int, int, int],
end_time: tuple[int, int, int, int] | None = None,
n_hours: int | None = None,
):
"""Get the chunk and position size for a given time range in a file.
Parameters
----------
fname : path
File to read.
start_time : tuple
Tuple of (year, day, hour, minute) at which to start reading data.
end_time : tuple
Tuple of (year, day, hour, minute) at which to end reading data. This is exclusive,
so that if `start_time` is (2020, 1, 0, 0) and `end_time` is (2020, 2, 0, 0),
you get a whole day. The default is to get the *rest of* the day.
Returns
-------
int :
Starting position in file.
nlines :
Number of lines required to read for this chunk.
"""
if end_time is None:
if n_hours is None:
end_time = f"{start_time[0]:04}:{start_time[1] + 1:03}:00:00"
else:
first_day = datetime(
start_time[0],
1,
1,
hour=start_time[2],
minute=start_time[3],
tzinfo=timezone.utc,
)
dt = first_day + timedelta(days=start_time[1])
end = dt + timedelta(hours=n_hours)
jd = (end - first_day).days
end_time = f"{end.year:04}:{jd:03}:{end.hour:02}:{end.minute:02}"
else:
end_time = (
f"{end_time[0]:04}:{end_time[1]:03}:{end_time[2]:02}:{end_time[3]:02}"
)
start_time = (
f"{start_time[0]:04}:{start_time[1]:03}:{start_time[2]:02}:{start_time[3]:02}"
)
fname = Path(fname)
line = "0000:000:00:00"
with fname.open("r") as fl:
# Get our starting position in the file.
while line and line[:14] < start_time:
line = fl.readline()
# Got to the end of the file without finding our year/day
if not line:
raise ValueError(
f"The file provided [{fname}]does not contain the year/day desired "
f"[{start_time[0]}/{start_time[1]}]."
)
# First line is current position, minus one line (which is the line length
# plus a newline character).
start_pos = fl.tell() - len(line)
# Get the number of lines in this day.
n_lines = 1
while line and line[:14] < end_time:
line = fl.readline()
n_lines += 1
end_pos = fl.tell() - len(line)
return start_pos, n_lines - 1, end_pos - start_pos
[docs]
def read_weather_file(
weather_file: str | Path,
year: int,
day: int,
hour: int = 0,
minute: int = 0,
n_hours: int | None = None,
end_time: tuple[int, int, int, int] | None = None,
):
"""Read (a chunk of) the weather file maintained by the on-site (MRO) monitoring.
The primary location of this file is on the enterprise cluster at
``/data5/edges/data/2014_February_Boolardy/weather2.txt``, but the function
requires you to pass in the filename manually, as you may have copied the file
to your own system or elsewhere.
Parameters
----------
weather_file : path or str
The path to the file on the system.
year : int
The year defining the start of the chunk of times to return.
day : int
The day defining the start of the chunk of times to return.
hour : int
The hour defining the start of the chunk of times to return.
minute : int
The minute defining the start of the chunk of times to return.
n_hours : int
Number of hours of data to return. Default is to return the rest of the day.
end_time : tuple of int
The (year, day, hour, minute) defining the end of the returned data (exclusive).
Default is to return the rest of the starting day.
Returns
-------
structured array :
A numpy structured array with the field names:
* ``seconds``: seconds since the start of the chosen day.
* ``rack_temp``: temperature of the rack (K)
* ``ambient_temp``: ambient temperature on site (K)
* ``ambient_hum``: ambient humidity on site (%)
* ``frontend_temp``: temperature of the frontend (K)
* ``lna_temp``: temperature of the LNA (K).
"""
weather_file = Path(weather_file)
with weather_file.open("r") as fl:
if _NEW_WEATHER_PATTERN.match(fl.readline()) is not None:
pattern = _NEW_WEATHER_PATTERN
else:
pattern = _OLD_WEATHER_PATTERN
start_line, n_lines, nchar = _get_chunk_pos_and_size(
weather_file, (year, day, hour, minute), end_time=end_time, n_hours=n_hours
)
dtype = [
("year", int),
("day", int),
("hour", int),
("minute", int),
("second", int),
("rack_temp", float),
("ambient_temp", float),
("ambient_hum", float),
("frontend_temp", float),
("lna_temp", float),
]
weather = np.zeros(n_lines, dtype)
with weather_file.open("r") as fl:
# Go back to the starting position of the day, and read in each line of the day.
fl.seek(start_line)
matches = _parse_lines(fl.read(nchar), pattern)
i = -1
for i, match in enumerate(matches):
w = (
match["year"],
match["day"],
match["hour"],
match["minute"],
match["second"],
match["rack_temp"],
match["ambient_temp"],
match["ambient_hum"],
)
if pattern == _NEW_WEATHER_PATTERN:
w = (*w, match["frontend_temp"], match["lna_temp"])
else:
w = (*w, np.nan, np.nan)
weather[i] = w
if i < len(weather) - 1:
warnings.warn(
f"Only {i + 1}/{n_lines} lines of {weather_file} were able to be parsed.",
stacklevel=2,
)
weather = weather[: i + 1]
return weather
[docs]
def read_thermlog_file(
filename: str | Path,
year: int,
day: int,
hour: int = 0,
minute: int = 0,
n_hours: int | None = None,
end_time: tuple[int, int, int, int] | None = None,
):
"""Read (a chunk of) the thermlog file maintained by the on-site (MRO) monitoring.
The primary location of this file is on the enterprise cluster at
``/data5/edges/data/2014_February_Boolardy/thermlog_{band}.txt``, but the function
requires you to pass in the filename manually, as you may have copied the file
to your own system or elsewhere.
Parameters
----------
filename : path or str
The path to the file on the system.
year : int
The year defining the chunk of times to return.
day : int
The day defining the chunk of times to return.
hour : int
The hour defining the start of the chunk of times to return.
minute : int
The minute defining the start of the chunk of times to return.
n_hours : int
Number of hours of data to return. Default is to return the rest of the day.
end_time : tuple of int
The (year, day, hour, minute) defining the end of the returned data (exclusive).
Default is to return the rest of the starting day.
Returns
-------
structured array :
A numpy structured array with the field names:
* ``seconds``: seconds since the start of the chosen day.
* ``temp_set``: temperature that it was set to (?) (C)
* ``receiver_temp``: temperature of the receiver (C)
* ``power_percent``: power of something (%)
"""
start_line, n_lines, nchar = _get_chunk_pos_and_size(
filename, (year, day, hour, minute), end_time=end_time, n_hours=n_hours
)
therm = np.zeros(
n_lines,
dtype=[
("year", int),
("day", int),
("hour", int),
("minute", int),
("second", int),
("temp_set", float),
("receiver_temp", float),
("power_percent", float),
],
)
with Path(filename).open("r") as fl:
fl.seek(start_line)
matches = _parse_lines(fl.read(nchar), _THERMLOG_PATTERN)
i = -1
for i, match in enumerate(matches):
therm[i] = (
match["year"],
match["day"],
match["hour"],
match["minute"],
match["second"],
match["temp_set"],
match["receiver_temp"],
match["power_percent"],
)
if i < len(therm) - 1:
warnings.warn(
f"Only {i + 1}/{n_lines} lines of {filename} were able to be parsed.",
stacklevel=2,
)
therm = therm[: i + 1]
return therm
[docs]
def auxiliary_data(
weather_file: str | Path,
thermlog_file: str | Path,
year: int,
day: int,
hour: int = 0,
minute: int = 0,
n_hours: int | None = None,
end_time: tuple[int, int, int, int] | None = None,
):
"""Read both weather and thermlog files for a given time range.
Parameters
----------
weather_file : path or str
The file containing the weather information.
thermlog_file : path or str
The file containing the thermlog information.
year : int
The year defining the chunk of times to return.
day : int
The day defining the chunk of times to return.
hour : int
The hour defining the start of the chunk of times to return.
minute : int
The minute defining the start of the chunk of times to return.
n_hours : int
Number of hours of data to return. Default is to return the rest of the day.
end_time : tuple of int
The (year, day, hour, minute) defining the end of the returned data (exclusive).
Default is to return the rest of the starting day.
Returns
-------
structured array :
The weather data (see :func:`read_weather_file`).
structured array :
The thermlog data (see :func:`read_thermlog_file`)
"""
weather = read_weather_file(
weather_file,
year,
day,
hour=hour,
minute=minute,
n_hours=n_hours,
end_time=end_time,
)
thermlog = read_thermlog_file(
thermlog_file,
year,
day,
hour=hour,
minute=minute,
n_hours=n_hours,
end_time=end_time,
)
return weather, thermlog