Assemble ERA5 normalized data into a set of tf.tensors

The data download scripts assemble selected ERA5 data in netCDF files. To use that data efficiently in analysis and modelling it is necessary both to normalize it, and to reformat it as a set of tf.tensors. These have consistent format and resolution and can be reassembled into a tf.data.Dataset` for ML model training.

Script to make the set of tensors. Takes argument –variable, and uses precalculated normalization parameters:

#!/usr/bin/env python

# Make normalized tensors

import os
import sys
import argparse
import tensorflow as tf
from normalize.ERA5.makeDataset import getDataset
from normalize.ERA5.normalize import match_normal, load_fitted

sDir = os.path.dirname(os.path.realpath(__file__))

parser = argparse.ArgumentParser()
parser.add_argument(
    "--variable",
    help="Variable name",
    type=str,
    required=True,
)
args = parser.parse_args()

opdir = "%s/DCVAE-Climate/normalized_datasets/ERA5/%s/" % (
    os.getenv("SCRATCH"),
    args.variable,
)
if not os.path.isdir(opdir):
    os.makedirs(opdir)


def is_done(year, month):
    fn = "%s/%04d-%02d.tfd" % (
        opdir,
        year,
        month,
    )
    if os.path.exists(fn):
        return True
    return False


# Load the pre-calculated normalisation parameters
fitted = []
for month in range(1, 13):
    cubes = load_fitted(month, variable=args.variable)
    fitted.append([cubes[0].data, cubes[1].data, cubes[2].data])


# Go through raw dataset  and make normalized tensors
trainingData = getDataset(
    args.variable,
    cache=False,
    blur=1.0e-9,
).batch(1)


for batch in trainingData:
    year = int(batch[1].numpy()[0][0:4])
    month = int(batch[1].numpy()[0][5:7])
    if is_done(year, month):
        continue

    # normalize
    raw = batch[0].numpy().squeeze()
    normalized = match_normal(raw, fitted[month - 1])
    ict = tf.convert_to_tensor(normalized, tf.float32)
    tf.debugging.check_numerics(ict, "Bad data %04d-%02d" % (year, month))

    # Write to file
    opfile = ("%s/%04d-%02d.tfd") % (
        opdir,
        year,
        month,
    )
    sict = tf.io.serialize_tensor(ict)
    tf.io.write_file(opfile, sict)

Calls another script to make a single tensor:

#!/usr/bin/env python

# Read in monthly data from ERA5
# Normalise: convert to normally distributed values on the range ~0-1
# Convert into a TensorFlow tensor.
# Serialise and store on $SCRATCH.

import os
import tensorflow as tf
import dask

# Going to do external parallelism - run this on one core
tf.config.threading.set_inter_op_parallelism_threads(1)
dask.config.set(scheduler="single-threaded")

from tensor_utils import load_raw, raw_to_tensor

import argparse

parser = argparse.ArgumentParser()
parser.add_argument("--year", help="Year", type=int, required=True)
parser.add_argument("--month", help="Integer month", type=int, required=True)
parser.add_argument("--variable", help="Variable name", type=str, required=True)
parser.add_argument(
    "--opfile", help="tf data file name", default=None, type=str, required=False
)
args = parser.parse_args()
if args.opfile is None:
    args.opfile = ("%s/DCVAE-Climate/normalized_datasets/ERA5/%s/%04d-%02d.tfd") % (
        os.getenv("SCRATCH"),
        args.variable,
        args.year,
        args.month,
    )

if not os.path.isdir(os.path.dirname(args.opfile)):
    os.makedirs(os.path.dirname(args.opfile))

# Load and standardise data
qd = load_raw(args.year, args.month, args.variable)
ict = raw_to_tensor(qd, args.variable, args.month)
tf.debugging.check_numerics(ict, "Bad data %04d-%02d" % (args.year, args.month))

# Write to file
sict = tf.io.serialize_tensor(ict)
tf.io.write_file(args.opfile, sict)

Library functions to do the normalization:

# Functions to normalize a data distribution based on SPI
# The aim is to make a normalized distribution that is normally distributed
#  with mean=0.5 and sd=0.2 (so almost all the data is in 0-1)
from scipy.stats import gamma, norm
import numpy as np

import os
import sys
import iris


# Load the pre-calculated fitted values
def load_fitted(month, variable="total_precipitation"):
    shape = iris.load_cube(
        "%s/DCVAE-Climate/normalization/ERA5/%s/shape_m%02d.nc"
        % (os.getenv("SCRATCH"), variable, month),
    )
    location = iris.load_cube(
        "%s/DCVAE-Climate/normalization/ERA5/%s/location_m%02d.nc"
        % (os.getenv("SCRATCH"), variable, month),
    )
    scale = iris.load_cube(
        "%s/DCVAE-Climate/normalization/ERA5/%s/scale_m%02d.nc"
        % (os.getenv("SCRATCH"), variable, month),
    )
    return (shape, location, scale)


# Find the normal variate that matches the gamma cdf
def match_normal(raw, gamma_p, norm_mean=0.5, norm_sd=0.2):
    cdf = gamma.cdf(raw, gamma_p[0], gamma_p[1], gamma_p[2])
    cdf[cdf > 0.99999] = 0.99999  # cdf=0 or 1 causes numerical failure
    cdf[cdf < 0.00001] = 0.00001  # Should fix the gamma fit so this never happens
    spi = norm.ppf(cdf, loc=norm_mean, scale=norm_sd)
    return spi


# Find the original value from the normalized one
def match_original(normalized, gamma_p, norm_mean=0.5, norm_sd=0.2):
    cdf = norm.cdf(normalized, loc=norm_mean, scale=norm_sd)
    original = gamma.ppf(cdf, gamma_p[0], gamma_p[1], gamma_p[2])
    return original


# Normalise a cube (same as match_normal but for cubes)
def normalize_cube(raw, shape, location, scale, norm_mean=0.5, norm_sd=0.2):
    cdf = gamma.cdf(raw.data, shape.data, loc=location.data, scale=scale.data)
    cdf[cdf > 0.99999] = 0.99999  # cdf=0 or 1 causes numerical failure
    cdf[cdf < 0.00001] = 0.00001  # Most of these will be missing data
    spi = norm.ppf(cdf, loc=norm_mean, scale=norm_sd)
    result = raw.copy()
    result.data = np.ma.MaskedArray(spi, np.logical_and(raw.data.mask, shape.data.mask))
    result.data.data[result.data.mask] = 0.0
    return result


# Convert a cube from normalized value to raw
#  (same as match_original but for cubes)
def unnormalize_cube(normalized, shape, location, scale, norm_mean=0.5, norm_sd=0.2):
    cdf = norm.cdf(normalized.data, loc=norm_mean, scale=norm_sd)
    raw = gamma.ppf(cdf, shape.data, location.data, scale.data)
    result = normalized.copy()
    result.data.data = raw
    return result

Library functions to convert between tf.tensor` and iris.cube.cube:

# Utility functions for creating and manipulating normalized tensors

import tensorflow as tf
import numpy as np

from get_data.ERA5 import ERA5_monthly
from utilities import grids
from normalize.ERA5.normalize import (
    normalize_cube,
    unnormalize_cube,
    load_fitted,
)


# Load the data for 1 month
def load_raw(year, month, variable="total_precipitation"):
    raw = ERA5_monthly.load(
        variable=variable,
        year=year,
        month=month,
        grid=grids.E5sCube,
    )
    raw.data.data[raw.data.mask == True] = 0.0
    return raw


# Convert raw cube to normalized tensor
def raw_to_tensor(raw, variable, month):
    (shape, location, scale) = load_fitted(month, variable=variable)
    norm = normalize_cube(raw, shape, location, scale)
    norm.data.data[raw.data.mask == True] = 0.0
    ict = tf.convert_to_tensor(norm.data, tf.float32)
    return ict


# Convert normalized tensor to cube
def tensor_to_cube(tensor):
    cube = grids.E5sCube.copy()
    cube.data = tensor.numpy()
    cube.data = np.ma.MaskedArray(cube.data, cube.data == 0.0)
    return cube


# Convert normalized tensor to raw values
def tensor_to_raw(tensor, variable, month):
    (shape, location, scale) = load_fitted(month, variable=variable)
    cube = tensor_to_cube(tensor)
    raw = unnormalize_cube(cube, shape, location, scale)
    raw.data.data[raw.data.mask == True] = 0.0
    return raw