Main render scriptΒΆ

  • Set up the geometry

  • Load the data

  • Find the streamline start points

  • Propagate the streamlines

  • Draw the text along the streamlines

  • Add a land-sea mask

#!/usr/bin/env python

# Plot a wind field using text strings from the shipping forecast

import os
import sys
import iris
import iris.coords
import iris.coord_systems
import iris.fileformats
import iris.util

iris.FUTURE.datum_support = True  # I don't care about datums
import numpy as np
from scipy.stats.qmc import PoissonDisk
from Met_palettes import MET_PALETTES
import matplotlib
from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas
from matplotlib.figure import Figure
from matplotlib.patches import PathPatch
from copy import deepcopy
from datetime import datetime

import random

from cube import plot_cube
from streamlines import wind_vectors
from textPath import map_path, string_to_path, curve_length, path_length, trim_curve
from collisions import collision_check, collision_cut

# Parameters
plot_width = 3200
plot_height = 2100
plot_x = [-18.3 / 0.6, 3 / 0.6]  # plot range lat & lon
plot_y = [48, 62]
data_x = [-22.5 / 0.6, 8 / 0.6]
data_y = [43, 67]
data_resolution = 0.05  # degrees (units of u wind)
collision_resolution = 0.05  # same
# Cube for the wind field
data_cube = plot_cube(
    data_resolution, xmin=data_x[0], xmax=data_x[1], ymin=data_y[0], ymax=data_y[1]
)
# Cube for the collision detection
collision_cube = plot_cube(
    collision_resolution, xmin=data_x[0], xmax=data_x[1], ymin=data_y[0], ymax=data_y[1]
)
collision_cube.data = np.zeros(collision_cube.data.shape)

iterations = 250 * 4  # How far the streamlines propagate (max)
epsilon = 0.005 / 4

poisson_radius = 0.005  # Figure fraction - separation of streamline start points

scheme = MET_PALETTES["Hokusai2"]
bgcol = (1, 1, 0.95, 1)  # "#ffe6b7"  # (255 / 255, 255 / 255, 255 / 255, 1.0)

font_size = 0.35  # Scaling for the TextPath - 4 is about right to match the degrees
fprop = "Serif"  # font choice

# Load the land mask
land_mask = iris.load_cube(
    "%s/fixed_fields/land_mask/opfc_global_2019.nc" % os.getenv("DATADIR")
)
land_mask = land_mask.regrid(data_cube, iris.analysis.Linear())

# Load the wind field
fctime = datetime(2024, 12, 7, 6)
vwnd = iris.load_cube(
    "%s/opfc/glm/2024/12/07.pp" % os.getenv("SCRATCH"),
    iris.AttributeConstraint(STASH=iris.fileformats.pp.STASH(1, 3, 226))
    & iris.Constraint(forecast_period=0)
    & iris.Constraint(forecast_reference_time=fctime),
)
uwnd = iris.load_cube(
    "%s/opfc/glm/2024/12/07.pp" % os.getenv("SCRATCH"),
    iris.AttributeConstraint(STASH=iris.fileformats.pp.STASH(1, 3, 225))
    & iris.Constraint(forecast_period=0)
    & iris.Constraint(forecast_reference_time=fctime),
)
u10m = uwnd.regrid(data_cube, iris.analysis.Linear())
# iris.save(u10m, "u10m.nc")
v10m = vwnd.regrid(data_cube, iris.analysis.Linear())
# v10m.data *= -1  # Want +ve going north
# iris.save(v10m, "v10m.nc")

# Load the shipping forecast
with open("./forecast_20241207.txt", "r") as file:
    # Read all lines into a list of strings
    lines = file.readlines()

# Remove blank lines from the list
lines = [line.strip() for line in lines if line.strip()]

# Strip out the lines with assigned start points
alines = [line for line in lines if line.startswith("[")]
lines = [line for line in lines if not line.startswith("[")]


# Find longest line shorter than some given curve
def line_shorter_than(linePaths, x, y):
    max_length = curve_length(x, y)
    short_line = None
    for line in linePaths:
        pl = path_length(linePaths[line])
        if pl < max_length:
            if short_line is None or pl > path_length(linePaths[short_line]):
                short_line = line
    return short_line


# Set up the plot
fig = Figure(
    figsize=(int(plot_width / 100), int(plot_height / 100)),
    dpi=100,
    facecolor=bgcol,
    edgecolor=None,
    linewidth=0.0,
    frameon=True,
    subplotpars=None,
    tight_layout=None,
)
canvas = FigureCanvas(fig)
ax = fig.add_axes([0, 0, 1, 1])
ax.set_axis_off()  # Don't want surrounding x and y axis
ax.set_xlim(plot_x[0], plot_x[1])
ax.set_ylim(plot_y[0], plot_y[1])
ax.set_aspect("auto")

# Add the assigned lines
for line in alines:
    udx = line.find(",")
    x = float(line[1:udx])
    udx2 = line.find(",", udx + 1)
    y = float(line[udx + 1 : udx2])
    udx3 = line.find("]", udx2 + 1)
    fonts = float(line[udx2 + 1 : udx3])
    line = line[udx3 + 1 :]
    wv = wind_vectors(
        u10m,
        v10m,
        np.array([x]),
        np.array([y]),
        epsilon=epsilon,
        iterations=iterations * 2,
    )
    lp = string_to_path(line, fsize=fonts, fprop=fprop, sy=1)
    npath = map_path(lp, wv[0, 0, :], wv[0, 1, :])
    npatch = PathPatch(
        npath,
        facecolor="red",
        edgecolor="black",
        linewidth=0.25,
        zorder=30,
    )
    npatch2 = PathPatch(
        npath,
        facecolor="red",
        edgecolor="black",
        linewidth=0.25,
        alpha=0.25,
        zorder=80,
    )
    ax.add_artist(npatch)
    ax.add_artist(npatch2)
    collision_check(collision_cube, npath.vertices[:, 0], npath.vertices[:, 1])

if True:
    # Generate a set of origin points for the text strings
    engine = PoissonDisk(d=2, radius=poisson_radius)
    sample = engine.fill_space()
    sample = sample * (data_x[1] - data_x[0])
    sample[:, 0] += data_x[0]
    sample[:, 1] += data_y[0]
    sample = sample[(sample[:, 1] > data_y[0]) & (sample[:, 1] < data_y[1])]
    wpx = sample[:, 0]
    wpy = sample[:, 1]

    text_points = wind_vectors(
        u10m,
        v10m,
        wpx,
        wpy,
        epsilon=epsilon,
        iterations=iterations,
    )

    count = 0
    for fontSize in [0.5, 0.45, 0.4, 0.35, 0.3, 0.25, 0.2]:
        # # Add the streamlines in random order
        line_sequence = random.sample(
            list(range(text_points.shape[0])), text_points.shape[0]
        )
        # Make a TextPath for each line (to get the size)
        linePaths = {}
        for line in lines:
            linePaths[line] = string_to_path(line, fsize=fontSize, fprop=fprop, sy=1)
        olinePaths = deepcopy(linePaths)

        for streamline in line_sequence:
            try:
                x = text_points[streamline, 0, :].copy()
                y = text_points[streamline, 1, :].copy()
                x, y = collision_cut(collision_cube, x, y)
                short_line = line_shorter_than(linePaths, x, y)
                if short_line is None:
                    continue
                # random_line = random.choice(short_lines)
                x, y = trim_curve(x, y, path_length(linePaths[short_line]))
                if len(x) < 4:  # Too short - try again
                    print("short x")
                    continue
                npath = map_path(linePaths[short_line], x, y)
                fcol = facecolor = scheme["colors"][count % len(scheme["colors"])]
                npatch = PathPatch(
                    npath,
                    facecolor=fcol,
                    edgecolor="black",
                    linewidth=0.25,
                    zorder=30,
                )
                npatch2 = PathPatch(
                    npath,
                    facecolor=fcol,
                    edgecolor="black",
                    linewidth=0.25,
                    alpha=0.25,
                    zorder=80,
                )
                if not collision_check(
                    collision_cube, npath.vertices[:, 0], npath.vertices[:, 1]
                ):
                    ax.add_artist(npatch)
                    ax.add_artist(npatch2)
                    count += 1
                    # del linePaths[random_line]
                    line_sequence.remove(streamline)
                    if len(linePaths) == 0:
                        linePaths = deepcopy(olinePaths)
                    continue
            except Exception as e:
                print(e)
                continue

# Add the land mask - temporary addition
lats = land_mask.coord("latitude").points
lons = land_mask.coord("longitude").points
mask_img = ax.pcolorfast(
    lons,
    lats,
    land_mask.data,
    cmap=matplotlib.colors.ListedColormap(
        ((1.0, 1.0, 0.9, 0), (0.975, 0.975, 0.95 * 0.975, 1.0))
    ),
    vmin=0,
    vmax=1,
    alpha=1.0,
    zorder=50,
)


fig.savefig("wind_words.webp")