Assemble data for the SyCLoPS video¶
I got the data from Malcolm Roberts. I’m not including it here, but it comes as a CSV file like this:
,TID,LON,LAT,ISOTIME,MSLP,WS,Full_Name,Short_Label,Tropical_Flag,Transition_Zone,Track_Info,LPSAREA,IKE
7259781,353897,30.25,8.0,2020-02-01 00:00:00,100961.4,4.639237,Thermal Low,THL,1.0,0.0,Track,0,0.0
7259782,353897,30.0,7.0,2020-02-01 03:00:00,101083.2,4.169771,Thermal Low,THL,1.0,0.0,Track,0,0.0
...
So it’s straightforward to write a function to load into a python dictionary. Because we’re making a video, each frame will need to plot a point in time, and we’ll need a higher time-frequency for frames than the SyCLoPS data provides. So we add an `interpolate_track`
function to get an event at an arbitrary time.
# Load the cyclone tracks from Malcolm's dataset
tracks_file = (
"/data/users/malcolm.roberts/SyCLoPS_REV/ERA5_SyCLoPS_classified_2020-21_subset.csv"
)
# Load the contents of the tracks file using the csv module
import csv
import datetime
import iris.analysis.cartography
import numpy as np
# Convert the list of lists into a list of dictionaries
# (assuming the first row contains the column names)
def load_tracks():
with open(tracks_file, "r") as f:
reader = csv.DictReader(f)
# Read the rest of the rows into a list
data = list(reader)
# Convert the dates to datetime objects
for row in data:
row["datetime"] = datetime.datetime.strptime(
row["ISOTIME"], "%Y-%m-%d %H:%M:%S"
)
# Convert numeric values to floats
for row in data:
for key in ("LON", "LAT", "MSLP", "WS", "Tropical_Flag", "Transition_Zone"):
try:
row[key] = float(row[key])
except ValueError:
pass
# Mke sure the longitude is in the range -180 to 180
for row in data:
if row["LON"] > 180:
row["LON"] -= 360
elif row["LON"] < -180:
row["LON"] += 360
# Subset points by cyclone ID
tracks_by_tid = {}
for point in data:
tid = point["TID"] # Assuming 'TID' is the column name
if tid not in tracks_by_tid:
tracks_by_tid[tid] = []
tracks_by_tid[tid].append(point)
return tracks_by_tid
# Rotate the pole
def rotate_pole(tracks, pole_longitude, pole_latitude, npg_longitude):
for tid, track in tracks.items():
for point in track:
rp = iris.analysis.cartography.rotate_pole(
np.array(point["LON"]),
np.array(point["LAT"]),
pole_longitude,
pole_latitude,
)
point["LON"] = rp[0][0] + npg_longitude
if point["LON"] > 180:
point["LON"] -= 360
elif point["LON"] < -180:
point["LON"] += 360
point["LAT"] = rp[1][0]
return tracks
# Find the start and end times of a track
def last_date(track):
end_time = None
for point in track:
if end_time is None or point["datetime"] > end_time:
end_time = point["datetime"]
return end_time
def first_date(track):
start_time = None
for point in track:
if start_time is None or point["datetime"] < start_time:
start_time = point["datetime"]
return start_time
# Interpolate a track to a given time
def interpolate_track(track, target_time):
# Assuming track is a list of dictionaries with 'datetime' and other fields
# Find the two points surrounding the target time
before = None
after = None
for point in track:
if point["datetime"] <= target_time:
before = point
elif point["datetime"] >= target_time and after is None:
after = point
break
if before is None or after is None:
return None # Cannot interpolate if we don't have both points
if before["datetime"] == after["datetime"]:
return before
# Interpolate the numeric values
interpolated_point = after.copy()
for key in ("LON", "LAT", "MSLP", "WS", "Tropical_Flag", "Transition_Zone"):
if key == "LON" and (after[key] - before[key]) > 180:
after[key] -= 360
if key == "LON" and (before[key] - after[key]) > 180:
before[key] -= 360
interpolated_point[key] = before[key] + (after[key] - before[key]) * (
(target_time - before["datetime"]).total_seconds()
/ (after["datetime"] - before["datetime"]).total_seconds()
)
if key == "LON":
if interpolated_point[key] > 180:
interpolated_point[key] -= 360
elif interpolated_point[key] < -180:
interpolated_point[key] += 360
interpolated_point["datetime"] = target_time
return interpolated_point
# Make the trail of a cyclone
def make_trail(tracks, tid, target_time):
track = tracks[tid]
start_time = target_time
for point in track:
if point["datetime"] < start_time:
start_time = point["datetime"]
interpolated_track = []
current_time = target_time
while current_time >= start_time:
interpolated_point = interpolate_track(track, current_time)
if interpolated_point is not None:
interpolated_track.append(interpolated_point)
current_time -= datetime.timedelta(hours=1)
return interpolated_track