Converting training data for use in TensorFlow

To train models on the downloaded data, we need to get it out of netCDF and into a file format that TensorFlow supports. This should probably be a TFRecord file but I was quite unable to work out how to use them, so I’m using individual files of serialised tensors.

The basic plan is:

  1. Each input file will contain the data for one variable, at one point in time, appropriately arranged and normalised for use by Tensorflow . (Making the file can be slow, using it should be fast, so do all conversions at this point).
  2. We want the input files to be independent, so only make one every five days or so, but arrange them to sample the annual and diurnal cycles uniformly. I make one set of files every 126 hours, through the test and training periods.
  3. The structure of these files should be matched to the model that is using them. Here, this means regridding to a resolution that works well with strided convolutions, and rotation to put the important parts of the world (the UK) in the middle.
  4. This makes a large batch of serialised tensor files, which can be combined into a TensorFlow Dataset for model training.

Script to make a single tensor file:

#!/usr/bin/env python

# Read in a field from 20CR as an Iris cube.
# Rescale it and move UK to the centre of the field.
# Convert it into a TensorFlow tensor.
# Serialise it and store it on $SCRATCH.

import tensorflow as tf
tf.enable_eager_execution()
import numpy

import IRData.twcr as twcr
import iris
import datetime
import argparse
import os
import sys

sys.path.append('%s/../../lib/' % os.path.dirname(__file__))
from normalise import normalise_t2m
from normalise import normalise_wind
from normalise import normalise_prmsl
from geometry import to_analysis_grid

import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--year", help="Year",
                    type=int,required=True)
parser.add_argument("--month", help="Integer month",
                    type=int,required=True)
parser.add_argument("--day", help="Day of month",
                    type=int,required=True)
parser.add_argument("--hour", help="Hour of day (0 to 23)",
                    type=int,required=True)
parser.add_argument("--member", help="Ensemble member",
                    default=1,type=int,required=False)
parser.add_argument("--source", help="Data source",
                    default='20CR2c',type=str,required=False)
parser.add_argument("--variable", help="variable name",
                    default='prmsl',type=str,required=False)
parser.add_argument("--test", help="test data, not training",
                    action="store_true")
parser.add_argument("--opfile", help="tf data file name",
                    default=None,
                    type=str,required=False)
args = parser.parse_args()
if args.opfile is None:
    purpose='training'
    if args.test: purpose='test'
    args.opfile=(("%s/ML_GCM/datasets/"+
                  "%s/%s/%s/%04d-%02d-%02d:%02d.tfd") %
                       (os.getenv('SCRATCH'),args.source,
                        args.variable,purpose,
                        args.year,args.month,args.day,args.hour))

if not os.path.isdir(os.path.dirname(args.opfile)):
    os.makedirs(os.path.dirname(args.opfile))

# Load and standardise data
if args.source=='20CR2c':
    ic=twcr.load(args.variable,datetime.datetime(args.year,args.month,
                                                args.day,args.hour),
                 version='2c')
    ic=ic.extract(iris.Constraint(member=args.member))
    ic=to_analysis_grid(ic)
    if args.variable=='uwnd.10m' or args.variable=='vwnd.10m':
        ic.data=normalise_wind(ic.data)
    elif args.variable=='air.2m':
        ic.data=normalise_t2m(ic.data)
    elif args.variable=='prmsl':
        ic.data=normalise_prmsl(ic.data)
    else:
        raise ValueError('Variable %s is not supported' % args.variable)
    
else:
    raise ValueError('Source %s is not supported' % args.source)

# Convert to Tensor
ict=tf.convert_to_tensor(ic.data, numpy.float32)

# Write to file
sict=tf.serialize_tensor(ict)
tf.write_file(args.opfile,sict)

To run this script for every required variable and timepoint:

#!/usr/bin/env python

# Make a few hundred tf data files
#  for training the GCM models.

# Get one data file every 5 days+6 hours over the selected years 
#  They should be far enough apart to be mostly independent.

# Partition off 1/10 of them to be test data

# This script does not run the commands - it makes a list of commands
#  (in the file 'run.txt') which can be run in parallel.

import os
import datetime

# Function to check if the job is already done for this timepoint
def is_done(variable,year,month,day,hour,group):
    op_file_name=("%s/ML_GCM/datasets/20CR2c/%s/" +
                  "%s/%04d-%02d-%02d:%02d.tfd") % (
                            os.getenv('SCRATCH'),
                            variable,group,
                            year,month,day,hour)
    if os.path.isfile(op_file_name):
        return True
    return False

f=open("run.txt","w+")

start_day=datetime.datetime(1969,  1,  1,  0)
end_day  =datetime.datetime(2009, 12, 31, 23)

for variable in ('air.2m','prmsl','uwnd.10m','vwnd.10m'):
    current_day=start_day
    count=1
    while current_day<=end_day:
        if count%10==0:
            if not is_done(variable,current_day.year,current_day.month,
                           current_day.day,current_day.hour,'test'):
                cmd=("./make_training_tensor.py --year=%d --month=%d" +
                     " --day=%d --hour=%d --variable=%s --test \n") % (
                       current_day.year,current_day.month,
                        current_day.day,current_day.hour,variable)
                f.write(cmd)
        else:
            if not is_done(variable,current_day.year,current_day.month,
                           current_day.day,current_day.hour,'training'):
                cmd=("./make_training_tensor.py --year=%d --month=%d" +
                     " --day=%d --hour=%d --variable=%s \n") % (
                       current_day.year,current_day.month,
                        current_day.day,current_day.hour,variable)
                f.write(cmd)
        current_day=current_day+datetime.timedelta(hours=126)
        count += 1
    
f.close()

This script produces a list of commands, which can be run in serial or parallel.

The insolation data requires slightly different treatment (every 6 hours for a year, rather than every 126 hours for 40 years), but the fundamental process is the same:

#!/usr/bin/env python

# Read in a field from 20CR as an Iris cube.
# Rescale it and move UK to the centre of the field.
# Convert it into a TensorFlow tensor.
# Serialise it and store it on $SCRATCH.

import tensorflow as tf
tf.enable_eager_execution()
import numpy

import IRData.twcr as twcr
import iris
import datetime
import argparse
import os
import sys

sys.path.append('%s/../../lib/' % os.path.dirname(__file__))
from normalise import normalise_insolation
from geometry import to_analysis_grid

import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--year", help="Year",
                    type=int,required=True)
parser.add_argument("--month", help="Integer month",
                    type=int,required=True)
parser.add_argument("--day", help="Day of month",
                    type=int,required=True)
parser.add_argument("--hour", help="Hour of day (0 to 23)",
                    type=int,required=True)
parser.add_argument("--opfile", help="tf data file name",
                    default=None,
                    type=str,required=False)
args = parser.parse_args()
if args.opfile is None:
    args.opfile=(("%s/ML_GCM/datasets/"+
                  "%s/%s/%s/%04d-%02d-%02d:%02d.tfd") %
                       (os.getenv('SCRATCH'),'20CR2c',
                        'insolation','training',
                        args.year,args.month,args.day,args.hour))

if not os.path.isdir(os.path.dirname(args.opfile)):
    os.makedirs(os.path.dirname(args.opfile))

# Don't distinguish between training and test for insolation.
#  Make a 'test' directory that's a copy of the 'training' directory'
tstdir = os.path.dirname(args.opfile).replace('training','test')
if not os.path.exists(tstdir):
   os.symlink(os.path.dirname(args.opfile),tstdir)

# Load the 20CR2c data as an iris cube
time_constraint=iris.Constraint(time=iris.time.PartialDateTime(
                                year=args.year,
                                month=args.month,
                                day=args.day,
                                hour=args.hour))
ic=iris.load_cube("%s/20CR/version_2c/ensmean/cduvb.1969.nc" % os.getenv('DATADIR'),
                  iris.Constraint(name='3-hourly Clear Sky UV-B Downward Solar Flux') &
                  time_constraint)
coord_s=iris.coord_systems.GeogCS(iris.fileformats.pp.EARTH_RADIUS)
ic.coord('latitude').coord_system=coord_s
ic.coord('longitude').coord_system=coord_s

# Standardise
ic=to_analysis_grid(ic)
ic.data=normalise_insolation(ic.data)    

# Convert to Tensor
ict=tf.convert_to_tensor(ic.data, numpy.float32)

# Write to file
sict=tf.serialize_tensor(ict)
tf.write_file(args.opfile,sict)
#!/usr/bin/env python

# Make a list of the commands needed to make a few hundred tf data files
#  for training the autoencoder.

# Special case for insolation - all time points, one year.
import os
import datetime

# Function to check if the job is already done for this timepoint
def is_done(year,month,day,hour,group):
    op_file_name=("%s/ML_GCM/datasets/"+
                  "20CR2c/insolation/" +
                  "%s/%04d-%02d-%02d:%02d.tfd") % (
                            os.getenv('SCRATCH'),group,
                            year,month,day,hour)
    if os.path.isfile(op_file_name):
        return True
    return False

f=open("run.txt","w+")

count=1

start_day=datetime.datetime(1969,  1,  1,  0)
end_day  =datetime.datetime(1969, 12, 31, 23)

current_day=start_day
while current_day<=end_day:
    if not is_done(current_day.year,current_day.month,
                   current_day.day,current_day.hour,'training'):
        cmd=("./make_insolation_tensor.py --year=%d --month=%d" +
            " --day=%d --hour=%d \n") % (
               current_day.year,current_day.month,
               current_day.day,current_day.hour)
        f.write(cmd)
    current_day=current_day+datetime.timedelta(hours=6)
    count += 1
    
f.close()

This process provides data for the autoencoder (where input is the same as output). Other models will require additional (but similar) data - the +6hour predictor, for example, requires the same data but with a 6hour offset applied.

Library functions used: