"""Rule functions for checks.
This module contains functions that are used to check values against rules. Each
function takes a value and returns a list of errors if the value does not meet the
rule.
"""
import os
import re
from datetime import datetime
import requests
from urllib.request import urlopen
import numpy as np
import sys
from typing import List, Dict, Optional, Any, Union
from . import processors
from ..config import get_config
conf = get_config()
rule_splitter = conf["settings"].get("rule_splitter", "|")
def _preprocess(value: str, preprocessors: Optional[List[str]]) -> str:
"""Run value through preprocessors.
Preprocess value by running it through preprocessor functions. Functions are
defined in the processors module. Hyphens in the preprocessors (e.g. from specs)
are replaced with underscores.
Args:
value: value to preprocess
preprocessors: list of preprocessor functions to run
Returns:
Preprocessed value as string.
"""
preprocessors = preprocessors or []
for processor in preprocessors:
value = getattr(processors, processor.replace("-", "_"))(value)
return value
[docs]def match_file_name(
value: str,
context: Dict[str, str],
extras: Optional[List[str]] = None,
label: str = "",
) -> List[str]:
"""Check if value matches the file name.
Check if the value matches the file name. The file name is extracted from the context
dictionary, which should contain the file path as a value with the key 'file_path'.
Args:
value: value to check
context: dictionary containing the file path
extras: list of preprocessors to run on the value
label: label to prepend to error message returned
Returns:
List of errors.
"""
file_name = os.path.basename(context["file_path"])
value = _preprocess(value, extras)
errors = []
if value != file_name:
errors.append(f"{label} '{value}' does not match file name: '{file_name}'")
return errors
[docs]def match_one_of(
value: str,
context: Any,
extras: Optional[List[str]] = None,
label: str = "",
) -> List[str]:
"""Check if value matches one of the options.
Check if the value matches one of the options defined in the extras list. The
options are separated by the rule splitter, which is defined in the checksit.ini
file. The default rule splitter is '|'.
Args:
value: value to check
extras: list with string of options to match, options separated by rule
splitter (default '|')
label: label to prepend to error message returned
Returns:
List with error string if no match found.
"""
options = [x.strip() for x in extras[0].split(rule_splitter)]
errors = []
if value not in options:
errors.append(f"{label} '{value}' must be one of: '{options}'")
return errors
[docs]def match_one_or_more_of(
value: str,
context: Any,
extras: Optional[List[str]] = None,
label: str = "",
) -> List[str]:
"""Check one or more values for matches against options.
Check if the value or values given can be found in the options list specified in
`extras`. The options in `extras` are a string separated by the rule splitter, and
the `value` is a string with values separated by commas. Checks if all values are
found within the options.
Args:
value: value to check
extras: list with string of options to match, options separated by rule
splitter (default '|')
label: label to prepend to error message returned
Returns:
List with error string if no match found.
"""
def as_set(x, sep):
return set([i.strip() for i in x.split(sep)])
options = as_set(extras[0], rule_splitter)
values = as_set(value, ",")
errors = []
if not values.issubset(options) or len(values) == 0:
errors.append(f"{label} '{value}' must be one or more of: '{sorted(options)}'")
return errors
[docs]def string_of_length(
value: str,
context: Any,
extras: Optional[List[str]] = None,
label: str = "",
) -> List[str]:
"""Check string is of a certain length.
Check if the string is of a certain length. The length is defined in the extras
list, which should contain the length as a string. If the length is followed by a
'+' sign, the string must be at least that length. If the length is not followed by
a '+', the string must be exactly that length.
Args:
value: value to check
extras: list with length as string
label: label to prepend to error message returned
Returns:
List with error string if length does not match.
"""
spec = extras[0]
min_length = int(re.match(r"^(\d+)\+?", spec).groups()[0])
errors = []
if spec.endswith("+"):
if len(value) < min_length:
errors.append(f"{label} '{value}' must be at least {min_length} characters")
elif len(value) != min_length:
errors.append(f"{label} '{value}' must be exactly {min_length} characters")
return errors
[docs]def validate_image_date_time(
value: str,
context: Any,
extras: Optional[List[str]] = None,
label: str = "",
) -> List[str]:
"""Check value meets date and time format.
Check if the value meets the date and time format that is expected for the
NCAS-Image standard. The expected format is 'YYYY:MM:DD HH:MM:SS' or
'YYYY:MM:DD HH:MM:SS.s'.
Args:
value: value to check
label: label to prepend to error message returned
Returns:
List with error string if date and time format does not match.
"""
errors = []
match = False
for f in ["%Y:%m:%d %H:%M:%S", "%Y:%m:%d %H:%M:%S.%f"]:
if match == False:
try:
match = value == datetime.strptime(value, f).strftime(f)
except ValueError:
pass
if not match:
errors.append(
f"{label} '{value}' needs to be of the format YYYY:MM:DD hh:mm:ss or YYYY:MM:DD hh:mm:ss.s"
)
return errors
[docs]def validate_orcid_ID(
value: str,
context: Any,
extras: Optional[List[str]] = None,
label: str = "",
) -> List[str]:
"""Check value meets ORCID URL format.
Check if the value meets the ORCID URL format (i.e.
https://orcid.org/XXXX-XXXX-XXXX-XXXX).
Args:
value: value to check
label: label to prepend to error message returned
Returns:
List with error string if ORCID ID format does not match.
"""
orcid_string = "https://orcid.org/" # required format of start of the string
errors = []
PI_orcid_digits = value[-19:]
PI_orcid_digits_only = PI_orcid_digits.replace("-", "")
# Check that total the length is correct
if len(value) != 37:
errors.append(
f"{label} '{value}' needs to be of the format https://orcid.org/XXXX-XXXX-XXXX-XXXX"
)
# Check the start of the string (first 18 characters)
elif (
value[0:18] != orcid_string
or
# Check that the "-" are in the correct places
value[22] != "-"
or value[27] != "-"
or value[32] != "-"
or
# Check that the last characters contain only "-" and digits (plus 'X' for last digit)
not (
PI_orcid_digits_only.isdigit()
or (
PI_orcid_digits_only[0:15].isdigit() and PI_orcid_digits_only[15] == "X"
)
)
):
errors.append(
f"{label} '{value}' needs to be of the format https://orcid.org/XXXX-XXXX-XXXX-XXXX"
)
return errors
[docs]def list_of_names(
value: Union[str, List[str]],
context: Any,
extras: Optional[List[str]] = None,
label: str = "",
) -> List[str]:
"""Check list of names matches expected pattern.
Check if a given name or list of names matches the expected pattern. The pattern
is <last name>, <first name> <middle initials(s)> or <last name>, <first name>
<middle name(s)>. Designed for checks with the NCAS-Image standard.
Args:
value: name(s) to check
label: label to prepend to error message returned
Returns:
List with error string if name format does not match.
"""
name_pattern = (
r"(.)+, (.)+ ?((.)+|((.)\.))" # The format names should be written in
)
character_name_pattern = r"[A-Za-z_À-ÿ\-\'\ \.\,]+"
warnings = []
if type(value) == list:
for i in value:
if not re.fullmatch(name_pattern, i):
warnings.append(
f"{label} '{value}' should be of the format <last name>, <first name> <middle initials(s)> or <last name>, <first name> <middle name(s)> where appropriate"
)
if not re.fullmatch(character_name_pattern, i):
warnings.append(
f"{label} '{value}' - please use characters A-Z, a-z, À-ÿ where appropriate"
)
if type(value) == str:
if not re.fullmatch(name_pattern, value):
warnings.append(
f"{label} '{value}' should be of the format <last name>, <first name> <middle initials(s)> or <last name>, <first name> <middle name(s)> where appropriate"
)
if not re.fullmatch(character_name_pattern, value):
warnings.append(
f"{label} '{value}' - please use characters A-Z, a-z, À-ÿ where appropriate"
)
return warnings
[docs]def headline(
value: str,
context: Any,
extras: Optional[List[str]] = None,
label: str = "",
) -> List[str]:
"""Check value is valid for NCAS Image headline tag.
Check if the value is valid for the NCAS Image headline tag. The headline should
be a single sentence, starting with a capital letter, and should not exceed 150
characters.
Args:
value: value to check
label: label to prepend to error message returned
Returns:
List with error string if headline format does not match.
"""
warnings = []
if value == "":
warnings.append(f"{label} '{value}' should not be empty")
else:
if len(value) > 150:
warnings.append(
f"{label} '{value}' should contain no more than one sentence"
)
if value.count(".") >= 2:
warnings.append(
f"{label} '{value}' should contain no more than one sentence"
)
if not value[0].isupper():
warnings.append(f"{label} '{value}' should start with a capital letter")
if len(value) < 10:
warnings.append(f"{label} '{value}' should be at least 10 characters")
return warnings
[docs]def title_check(
value: str,
context: Any,
extras: Optional[List[str]] = None,
label: str = "",
) -> List[str]:
"""Check if title matches the filename.
For NCAS-Image standard, check if the value (from the title tag) matches the name
of the file (given in the context).
Args:
value: value to check
context: file path
label: label to prepend to error message returned
Returns:
List with error string if title does not match file name.
"""
errors = []
if value != os.path.basename(context):
errors.append(f"{label} '{value}' must match the name of the file")
return errors
[docs]def url_checker(
value: str,
context: Any,
extras: Optional[List[str]] = None,
label: str = "",
) -> List[str]:
"""Check URL exists and is reachable.
Args:
value: URL to check
label: label to prepend to error message returned
Returns:
List with error string if URL is not reachable.
"""
warnings = []
try:
url = urlopen(value)
except:
warnings.append(f"{label} '{value}' is not a reachable url")
else:
if url.getcode() != 200: # (200 means it exists and is up and reachable)
warnings.append(f"{label} '{value}' is not a reachable url")
finally:
return warnings
[docs]def relation_url_checker(
value: str,
context: Any,
extras: Optional[List[str]] = None,
label: str = ""
) -> List[str]:
"""Check relation field is in the correct format and that the url exists.
Designed for checking the Relation tag matches the expected format in the
NCAS-Image standard, and the URL is reachable using the `url_checker` function.
Args:
value: value to check
label: label to prepend to error message returned
Returns:
List with error string if Relation tag does not match expected format or URL is
not reachable
"""
errors = []
if " " not in value:
errors.append(f"{label} '{value}' should contain a space before the url")
else:
relation_url = value.partition(" ")[
2
] # extract only the url part of the relation string
if url_checker(relation_url, context, extras, label) != []:
errors.extend(
url_checker(relation_url, context, extras, label)
) # check the url exists using the url_checker() function defined above
return errors
[docs]def latitude(
value: str,
context: Any,
extras: Optional[List[str]] = None,
label: str = "",
) -> List[str]:
"""Check if the value is within -90 and +90
Args:
value: value to check
label: label to prepend to error message returned
Returns:
List with error string if latitude is not within -90 and +90
"""
errors = []
latitude = re.findall(r"[0-9]+", value)
int_latitude = int(latitude[0])
dec_latitude = int(latitude[1])
if int_latitude > 90 or (int_latitude == 90 and dec_latitude > 0):
errors.append(f"{label} '{value}' must be within -90 and +90 ")
return errors
[docs]def longitude(
value: str,
context: Any,
extras: Optional[List[str]] = None,
label: str = "",
) -> List[str]:
"""Check if the value is within -180 and +180
Args:
value: value to check
label: label to prepend to error message returned
Returns:
List with error string if longitude is not within -180 and +180
"""
errors = []
longitude = re.findall(r"[0-9]+", value)
int_longitude = int(longitude[0])
dec_longitude = int(longitude[1])
if int_longitude > 180 or (int_longitude == 180 and dec_longitude > 0):
errors.append(f"{label} '{value}' must be within -180 and +180 ")
return errors
[docs]def check_qc_flags(
value: Any,
context: str,
extras: Optional[List[str]] = None,
label: str = ""
) -> List[str]:
"""Check QC flag values and meanings meet NCAS-General requirements
Checks the QC flag values and meanings. The flag values must be an array or tuple
of byte values, with at least two values, starting with 0 and 1. The flag meanings
must be space separated and the first two must start with 'not_used' and
'good_data'. The number of flag values must equal the number of flag meanings.
Args:
value: flag values, as defined in the netCDF file
context: flag meanings, as defined in the netCDF file
label: label to prepend to error message returned
Returns:
List with error string if QC flag values and meanings do not meet requirements
"""
errors = []
meanings = context.split(" ")
# check flag_values are correctly formatted (should be array of bytes)
if not (isinstance(value, np.ndarray) or isinstance(value, tuple)):
errors.append(
f"{label} QC flag_values must be an array or tuple of byte values, not '{type(value)}'."
)
# check there are at least two values and they start with 0 and 1
if not len(value) >= 2:
errors.append(f"{label} There must be at least two QC flag values.")
elif not (np.all(value[:2] == [0, 1]) or np.all(value[:2] == (0, 1))):
errors.append(f"{label} First two QC flag_values must be '[0, 1]'.")
# check there are at least two meanings and the first two are correct
if not len(meanings) >= 2:
errors.append(
f"{label} There must be at least two QC flag meanings (space separated)."
)
elif not np.all(meanings[:2] == ["not_used", "good_data"]):
errors.append(
f"{label} First two QC flag_meanings must be 'not_used' and 'good_data'."
)
# check number of values is same as number of meanings
if not len(value) == len(meanings):
errors.append(
f"{label} Number of flag_values must equal number of flag_meanings."
)
return errors
[docs]def allow_proposed(value, context, extras=None, label=""):
"""Check for proposed_standard_name if standard_name not given
Used in CFRadial and the NCAS-Radar standard, this function takes the value of both
the `standard_name` attribute and the `proposed_standard_name` attribute (if they
exist) and compares each to the expected value, as given in `extras`.
Args:
value: value of the standard_name attribute
context: value of the proposed_standard_name attribute
extras: list of expected values
label: label to prepend to error message returned
Returns:
List with error string if neither value matches the expected value
"""
errors = []
if extras != None and isinstance(extras, list):
extras = extras[0]
if value != extras and context != extras:
errors.append(f"{label} does not contain standard_name or proposed_standard_name with value '{extras}'")
return errors