Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge changes from main into ncas-radar branch #47

Merged
merged 12 commits into from
Apr 10, 2024
498 changes: 330 additions & 168 deletions checksit/check.py

Large diffs are not rendered by default.

88 changes: 66 additions & 22 deletions checksit/cli.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
"""Console script for checksit."""

__author__ = """Ag Stephens"""
__contact__ = '[email protected]'
__contact__ = "[email protected]"
__copyright__ = "Copyright 2022 United Kingdom Research and Innovation"
__license__ = "BSD - see LICENSE file in top-level package directory"

import click
import os

from .utils import string_to_dict, string_to_list
from .check import check_file
from .summary import summarise
from . import describer
from . import specs


@click.group()
def main():
"""Console script for checker."""
Expand All @@ -35,12 +37,30 @@ def main():
@click.option("-t", "--template", default="auto")
@click.option("-w", "--ignore-warnings", is_flag=True)
@click.option("-p", "--skip-spellcheck", is_flag=True)
def check(file_path, mappings=None, rules=None, specs=None, ignore_attrs=None, ignore_all_globals=False,
ignore_all_dimensions=False, ignore_all_variables=False, ignore_all_variable_attrs=False,
auto_cache=False, log_mode="standard", verbose=False, template="auto", ignore_warnings=False,
skip_spellcheck=False):

if ignore_all_globals or ignore_all_dimensions or ignore_all_variables or ignore_all_variable_attrs:
def check(
file_path,
mappings=None,
rules=None,
specs=None,
ignore_attrs=None,
ignore_all_globals=False,
ignore_all_dimensions=False,
ignore_all_variables=False,
ignore_all_variable_attrs=False,
auto_cache=False,
log_mode="standard",
verbose=False,
template="auto",
ignore_warnings=False,
skip_spellcheck=False,
):

if (
ignore_all_globals
or ignore_all_dimensions
or ignore_all_variables
or ignore_all_variable_attrs
):
raise Exception("Options not implemented yet!!!!!")

if mappings:
Expand All @@ -55,10 +75,19 @@ def check(file_path, mappings=None, rules=None, specs=None, ignore_attrs=None, i
if ignore_attrs:
ignore_attrs = string_to_list(ignore_attrs)

return check_file(file_path, template=template, mappings=mappings, extra_rules=rules,
specs=specs, ignore_attrs=ignore_attrs,
auto_cache=auto_cache, verbose=verbose,
log_mode=log_mode, ignore_warnings=ignore_warnings, skip_spellcheck=skip_spellcheck)
return check_file(
file_path,
template=template,
mappings=mappings,
extra_rules=rules,
specs=specs,
ignore_attrs=ignore_attrs,
auto_cache=auto_cache,
verbose=verbose,
log_mode=log_mode,
ignore_warnings=ignore_warnings,
skip_spellcheck=skip_spellcheck,
)


@main.command()
Expand All @@ -68,28 +97,44 @@ def check(file_path, mappings=None, rules=None, specs=None, ignore_attrs=None, i
@click.option("-x", "--exclude", default=None)
@click.option("-e", "--exclude-file", default=None)
@click.option("--verbose/--no-verbose", default=False)
def summary(log_files=None, log_directory=None, show_files=False,
exclude=None, exclude_file=None,
verbose=False):
def summary(
log_files=None,
log_directory=None,
show_files=False,
exclude=None,
exclude_file=None,
verbose=False,
):

if exclude:
exclude = string_to_list(exclude)
exclude = string_to_list(exclude)
else:
exclude = []

if exclude_file:
if not os.path.isfile(exclude_file):
raise Exception(f"'--exclude-file' does not point to a valid file")

with open(exclude_file) as exfile:
exclude.extend([exclude_pattern for exclude_pattern in exfile if exclude_pattern.strip()])

return summarise(log_files, log_directory=log_directory, show_files=show_files,
exclude=exclude, verbose=verbose)
exclude.extend(
[
exclude_pattern
for exclude_pattern in exfile
if exclude_pattern.strip()
]
)

return summarise(
log_files,
log_directory=log_directory,
show_files=show_files,
exclude=exclude,
verbose=verbose,
)


@main.command()
@click.argument("check_ids", nargs=-1, default=None)
@click.argument("check_ids", nargs=-1, default=None)
@click.option("--verbose/--no-verbose", default=False)
def describe(check_ids=None, verbose=False):
return describer.describe(check_ids, verbose=verbose)
Expand All @@ -104,4 +149,3 @@ def show_specs(spec_ids=None, verbose=False):

if __name__ == "__main__":
main()

74 changes: 37 additions & 37 deletions checksit/cvs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@


from .config import get_config

conf = get_config()

vocabs_dir = conf["settings"]["vocabs_dir"]
Expand All @@ -26,13 +27,20 @@ def _load(self, vocab_id):

def _load_from_url(self, vocab_id):
# Loads a specific vocabulary from a URL
vocab_id_url = vocab_id.replace("__URL__","https://")
if vocab_id_url.startswith("https://raw.githubusercontent.com") and "/__latest__/" in vocab_id_url:
vocab_id_url = vocab_id.replace("__URL__", "https://")
if (
vocab_id_url.startswith("https://raw.githubusercontent.com")
and "/__latest__/" in vocab_id_url
):
vocab_id_url_base = vocab_id_url.split("/__latest__")[0]
vocab_id_url_base = vocab_id_url_base.replace("raw.githubusercontent.com","github.com")
latest_version = requests.get(f"{vocab_id_url_base}/releases/latest").url.split("/")[-1]
vocab_id_url_base = vocab_id_url_base.replace(
"raw.githubusercontent.com", "github.com"
)
latest_version = requests.get(
f"{vocab_id_url_base}/releases/latest"
).url.split("/")[-1]
vocab_id_url = vocab_id_url.replace("__latest__", latest_version)
res = requests.get(vocab_id_url.replace("__URL__","https://"))
res = requests.get(vocab_id_url.replace("__URL__", "https://"))
if res.status_code == 200:
self._vocabs[vocab_id] = res.json()
else:
Expand All @@ -46,74 +54,66 @@ def __getitem__(self, vocab_id):
else:
self._load(vocab_id)

return self._vocabs[vocab_id]
return self._vocabs[vocab_id]

def lookup(self, vocab_lookup):
# A nested dictionary-style look-up using a string: vocab_lookup
obj = self
vocab_lookup = re.sub(f"^{vocabs_prefix}:", "", vocab_lookup)

for i,key in enumerate(vocab_lookup.split(":")):
for i, key in enumerate(vocab_lookup.split(":")):
if isinstance(obj, dict) or i == 0:
if key in WILDCARD:
if i+1 != len(vocab_lookup.split(":")):
obj = [ obj[key] for key in obj.keys() ]
if i + 1 != len(vocab_lookup.split(":")):
obj = [obj[key] for key in obj.keys()]
else:
# WILDCARD used as last option, just get keys
obj = list(obj.keys())
else:
obj = obj[key]
else:
if not isinstance(obj,list):
if not isinstance(obj, list):
# sanity check
raise ValueError(f"Confused how we got here, obj = {obj}")
elif key in WILDCARD:
raise ValueError(f"Second WILDCARD ({WILDCARD}) in query {vocab_lookup} not allowed")
raise ValueError(
f"Second WILDCARD ({WILDCARD}) in query {vocab_lookup} not allowed"
)
else:
# obj should be list of dicts, creating list of values or dicts
obj = [ d[key] for d in obj ]
obj = [d[key] for d in obj]

return obj

# def OLD_lookup(self, lookup):
# Used to have a special "__key__" lookup. not needed now.
# # Parses a lookup string (from a template) and then looks up the vocabulary
# # to return an item or a list of items
# lookup = re.sub("^__vocabs__:", "", lookup)
# self._load_vocab(lookup)
# comps = deque(lookup.split(":"))
# item = self.vocabs

# while comps:
# comp = comps.popleft()
# if comp == "__key__":
# item = item.keys()
# elif isinstance(item, list):
# item = [i[comp] for i in item if i.get(comp)]
# else:
# item = item[comp]

# return item

def check(self, vocab_lookup, value, label="", lookup=True):
# Return a list of errors - empty list if no errors
errors = []
options = [ self.lookup(vocab_lookup) if lookup else vocab_lookup ][0]
options = [self.lookup(vocab_lookup) if lookup else vocab_lookup][0]

if isinstance(options, list):
if value not in options:
errors.append(f"{label} '{value}' not in vocab options: {options} (using: '{vocab_lookup}')")
errors.append(
f"{label} '{value}' not in vocab options: {options} (using: '{vocab_lookup}')"
)
elif isinstance(options, dict):
for key in options.keys():
if key in value.keys():
errors.extend(self.check(options[key], value[key], label = f"{label}:{key}", lookup=False))
errors.extend(
self.check(
options[key],
value[key],
label=f"{label}:{key}",
lookup=False,
)
)
else:
errors.append(f"{label} does not have attribute '{key}'")
elif value != options:
errors.append(f"{label} '{value}' does not equal required vocab value: '{options}' (using: '{vocab_lookup}')")
errors.append(
f"{label} '{value}' does not equal required vocab value: '{options}' (using: '{vocab_lookup}')"
)

return errors



vocabs = Vocabs()
3 changes: 1 addition & 2 deletions checksit/describer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ def describe(check_ids=None, verbose=False):

print("Functional check descriptions:")
for check_id, check_func in check_funcs:

print(f"\n{check_id}:\n\tFunction: {check_func.__name__}\n\tDescription:")
print("\n\t".join([line for line in check_func.__doc__.split("\n")]))

Loading
Loading