Data functions for stripes plottingΒΆ

# Utility functions for the Stripes plots

import re
import numpy as np
from astropy.convolution import convolve

rng = np.random.default_rng()

import warnings

warnings.filterwarnings("ignore", message=".*NaN values detected post convolution.*")


# Longitude reduction
def longitude_reduce(choice, ndata, mask=None):
    nd2d = np.squeeze(ndata)
    result = np.ma.masked_array(np.full([nd2d.shape[0], 1], np.nan))
    result[:] = np.ma.masked
    if mask is None:
        mask = np.full(nd2d.shape, False)
    if choice == "sample":
        for i in range(nd2d.shape[0]):  # Iterate over latitudes
            alat = nd2d[i, :]
            if len(alat) > 0:
                alat = alat[~mask[i, :]]
                if len(alat) > 0:
                    result[i, 0] = rng.choice(alat, size=1)[0]
                    if ~np.isnan(result[i, 0]):
                        result.mask[i, 0] = False
        return result
    if choice == "mean":
        nd2d.mask = np.ma.mask_or(nd2d.mask, mask)
        for i in range(nd2d.shape[0]):  # Iterate over latitudes
            alat = nd2d[i, :].compressed()
            if len(alat) > 0:
                result[i, 0] = np.mean(alat)
                result.mask[i, 0] = False
        return result
    raise Exception("Unsupported reduction choice %s" % choice)


# Paired Longitude reduction - both datasets sample from the same grid point
def longitude_reduce_paired(choice, ndata1, ndata2, mask=None):
    nd2d1 = np.squeeze(ndata1)
    result1 = np.ma.masked_array(np.full([nd2d1.shape[0], 1], np.nan))
    result1[:] = np.ma.masked
    nd2d2 = np.squeeze(ndata2)
    result2 = result1.copy()
    if mask is None:
        mask = np.full(nd2d1.shape, False)
    if choice == "sample":
        for i in range(nd2d1.shape[0]):  # Iterate over latitudes
            alat1 = nd2d1[i, :]
            alat2 = nd2d2[i, :]
            if len(alat1) > 0:
                alat1 = alat1[~mask[i, :]]
                alat2 = alat2[~mask[i, :]]
                if len(alat1) > 0:
                    random_idx = rng.integers(0, len(alat1))
                    result1[i, 0] = alat1.data[random_idx]
                    result2[i, 0] = alat2.data[random_idx]
                    if ~np.isnan(result1[i, 0]):
                        result1.mask[i, 0] = False
                    if ~np.isnan(result2[i, 0]):
                        result2.mask[i, 0] = False
        return (result1, result2)
    raise Exception("Unsupported reduction choice %s" % choice)


# Convolution smoothing
def csmooth(choice, ndata):
    ndata[ndata.mask] = np.nan
    if choice[:3] == "sub":  # Want residual from smoothing
        n2 = csmooth(choice[4:], ndata)
        return ndata - n2 + 0.5
    if choice == "none":
        return ndata
    if choice == "annual":
        filter = np.full((1, 13), 1 / 13)
        op = convolve(ndata, filter, boundary="extend")
        op = np.ma.masked_array(op, np.isnan(op))
        return op
    result = re.search(r"(\d+)x(\d+)", choice)
    if result is not None:
        hv = int(result.groups()[0])
        vv = int(result.groups()[1])
        filter = np.full((vv, hv), 1 / (vv * hv))
        op = convolve(ndata, filter, boundary="extend")
        op = np.ma.masked_array(op, np.isnan(op))
        return op
    raise Exception("Unsupported convolution choice %s" % choice)