Skip to content

Commit

Permalink
Merge pull request #14 from EESSI/allow_s1s_on_s3
Browse files Browse the repository at this point in the history
Initial commit of S3 support for Stratum1.
  • Loading branch information
terjekv authored Jun 14, 2024
2 parents 39c79db + 2faa891 commit beb283d
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 43 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ for repo in servers[0].repositories:
print("Last snapshot: " + str(repo.last_snapshot))
````

Note that if you are using a Stratum1 server with S3 as its backend, you need to set repos explicitly.
This is because the S3 backend does not have a `cvmfs/info/v1/repositories.json` file. Also, the GeoAPI
status will be `NOT_FOUND` for these servers.

````python

# Data structure

## Server
Expand Down
7 changes: 7 additions & 0 deletions cvmfsscraper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@
structlog.processors.add_log_level,
structlog.processors.StackInfoRenderer(),
structlog.dev.set_exc_info,
structlog.processors.CallsiteParameterAdder(
[
structlog.processors.CallsiteParameter.FILENAME,
structlog.processors.CallsiteParameter.FUNC_NAME,
structlog.processors.CallsiteParameter.LINENO,
],
),
structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S", utc=False),
structlog.processors.JSONRenderer(),
],
Expand Down
6 changes: 4 additions & 2 deletions cvmfsscraper/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Exceptions for cvmfsscraper."""

from typing import Any
from typing import Any, Union

import structlog

Expand All @@ -10,7 +10,9 @@
class CVMFSScraperBaseException(Exception):
"""Base exception for cvmfsscraper."""

def __init__(self, message: str, original_excption: Exception = None, *args: Any) -> None:
def __init__(
self, message: str, original_excption: Union[Exception, None] = None, *args: Any
) -> None:
"""Initialize the exception."""
self.message = message
self.original_exception = original_excption
Expand Down
5 changes: 4 additions & 1 deletion cvmfsscraper/http_get_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,5 +309,8 @@ def __init__(self, path: str, model_class: Type[BaseModel]) -> None:
# Dynamically creating this list based on the Endpoints enum values
# is not supported by mypy et al, so we have to do it manually.
EndpointClassesType = Union[
GetCVMFSPublished, GetCVMFSRepositoriesJSON, GetCVMFSStatusJSON, GetGeoAPI
GetCVMFSPublished,
GetCVMFSRepositoriesJSON,
GetCVMFSStatusJSON,
GetGeoAPI,
]
30 changes: 21 additions & 9 deletions cvmfsscraper/repository.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
"""A CVMFS repository."""
from typing import Dict
from typing import TYPE_CHECKING, Dict, cast

import structlog

from cvmfsscraper.http_get_models import (
Endpoints,
GetCVMFSPublished,
GetCVMFSStatusJSON,
)
from cvmfsscraper.exceptions import CVMFSFetchError
from cvmfsscraper.http_get_models import Endpoints, GetCVMFSPublished, GetCVMFSStatusJSON

log = structlog.getLogger(__name__)

if TYPE_CHECKING: # pragma: no cover
from cvmfsscraper.server import CVMFSServer


class Repository:
"""A CVMFS repository.
Expand All @@ -37,7 +37,7 @@ class Repository:
"L": "micro_catalogues",
}

def __init__(self, server: object, name: str, url: str):
def __init__(self, server: "CVMFSServer", name: str, url: str):
"""Initialize the repository.
:param server: The server object this repository belongs to.
Expand All @@ -51,6 +51,10 @@ def __init__(self, server: object, name: str, url: str):
self.last_gc = None
self.last_snapshot = None

self.root_size = 0
self.revision = 0
self.revision_timestamp = 0

self._repo_status_loaded = 0
self._cvmfspublished_loaded = 0

Expand Down Expand Up @@ -152,7 +156,11 @@ def fetch_cvmfspublished(self) -> GetCVMFSPublished:
:returns: A GetCVMFSPublished object.
"""
return self.server.fetch_endpoint(Endpoints.CVMFS_PUBLISHED, self.name)
cvmfspublished = self.server.fetch_endpoint(Endpoints.CVMFS_PUBLISHED, self.name)
if not cvmfspublished:
raise CVMFSFetchError("Failed to fetch .cvmfspublished")

return cast(GetCVMFSPublished, cvmfspublished)

def fetch_repository(self) -> GetCVMFSStatusJSON:
"""Fetch a repository by name.
Expand All @@ -162,4 +170,8 @@ def fetch_repository(self) -> GetCVMFSStatusJSON:
:returns: GetCVMFSStatusJSON object.
"""
return self.server.fetch_endpoint(Endpoints.CVMFS_STATUS_JSON, self.name)
cvmfsstatus = self.server.fetch_endpoint(Endpoints.CVMFS_STATUS_JSON, self.name)
if not cvmfsstatus:
raise CVMFSFetchError("Failed to fetch .cvmfs_status.json")

return cast(GetCVMFSStatusJSON, cvmfsstatus)
84 changes: 69 additions & 15 deletions cvmfsscraper/server.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
"""Server class for cvmfs-server-metadata."""

import json
from typing import Dict, List
from typing import TYPE_CHECKING, Dict, List, Union, cast
from urllib import error, request

import structlog

from cvmfsscraper.constants import GeoAPIStatus
from cvmfsscraper.http_get_models import (
EndpointClassesType,
Endpoints,
GetCVMFSPublished,
GetCVMFSRepositoriesJSON,
Expand All @@ -20,6 +19,9 @@

log = structlog.getLogger(__name__)

if TYPE_CHECKING: # pragma: no cover
from cvmfsscraper.http_get_models import BaseModel


class CVMFSServer:
"""Base class for CVMFS servers."""
Expand Down Expand Up @@ -125,11 +127,12 @@ def populate_repositories(self) -> None:
repodata = self.fetch_repositories_json()

if repodata:
# This should populate self.repositories.
self.process_repositories_json(repodata)

if self.fetch_errors: # pragma: no cover
self._is_down = True
return []
return None

self._is_down = False
except Exception as e: # pragma: no cover
Expand All @@ -140,7 +143,12 @@ def populate_repositories(self) -> None:
)
self.fetch_errors.append({"path": self.name, "error": e})

def process_repositories_json(self, repodata: GetCVMFSRepositoriesJSON) -> List[Repository]:
if self.is_stratum1():
for repo in self.forced_repositories:
if repo not in [r.name for r in self.repositories]:
self.repositories.append(Repository(self, repo, "/cvmfs/" + repo))

def process_repositories_json(self, repodata: GetCVMFSRepositoriesJSON) -> None:
"""Process the repositories.json file.
Sets self.repos and self.metadata.
Expand All @@ -153,7 +161,7 @@ def process_repositories_json(self, repodata: GetCVMFSRepositoriesJSON) -> List[
if repodata.replicas:
self.server_type = 1
repos_on_server = repodata.replicas
else:
elif repodata.repositories:
self.server_type = 0
repos_on_server = repodata.repositories

Expand Down Expand Up @@ -197,6 +205,9 @@ def check_geoapi_status(self) -> GeoAPIStatus:

try:
geoapi_obj = self.fetch_geoapi(self.repositories[0])
if not geoapi_obj:
return GeoAPIStatus.NO_RESPONSE

if geoapi_obj.has_order(self.geoapi_order):
return GeoAPIStatus.OK
else:
Expand All @@ -209,32 +220,46 @@ def check_geoapi_status(self) -> GeoAPIStatus:
)
return GeoAPIStatus.NO_RESPONSE

def fetch_repositories_json(self) -> GetCVMFSRepositoriesJSON:
def fetch_repositories_json(self) -> Union[GetCVMFSRepositoriesJSON, None]:
"""Fetch the repositories JSON file.
Note: This function will return None if the server is a stratum1 and uses S3 as
its backend. In this case, the endpoint is not available.
raises: urlllib.error.URLError (or a subclass thereof) for URL errors.
pydantic.ValidationError if the object creation fails.
returns: A GetCVMFSRepositoriesJSON object.
returns: A GetCVMFSRepositoriesJSON object or None
"""
return self.fetch_endpoint(Endpoints.REPOSITORIES_JSON)
repos = self.fetch_endpoint(Endpoints.REPOSITORIES_JSON)
if not repos:
return None

return cast(GetCVMFSRepositoriesJSON, repos)

def fetch_geoapi(self, repo: Repository) -> GetGeoAPI:
def fetch_geoapi(self, repo: Repository) -> Union[GetGeoAPI, None]:
"""Fetch the GeoAPI host ordering.
Note: This function will return None if the server is a stratum1 and uses S3 as
its backend. In this case, the endpoint is not available.
raises: urlllib.error.URLError (or a subclass thereof) for URL errors.
pydantic.ValidationError if the object creation fails.
:returns: A GetGeoAPI object.
:returns: A GetGeoAPI object or None
"""
return self.fetch_endpoint(Endpoints.GEOAPI, repo=repo.name)
geoapi = self.fetch_endpoint(Endpoints.GEOAPI, repo=repo.name)
if not geoapi:
return None

return cast(GetGeoAPI, geoapi)

def fetch_endpoint(
self,
endpoint: Endpoints,
repo: str = "data",
geoapi_servers: str = GEOAPI_SERVERS,
) -> EndpointClassesType:
geoapi_servers: List[str] = GEOAPI_SERVERS,
) -> Union["BaseModel", None]:
"""Fetch and process a specified URL endpoint.
This function reads the content of a specified URL and ether returns a validated
Expand Down Expand Up @@ -270,11 +295,12 @@ def fetch_endpoint(
geoapi_str = ",".join(geoapi_servers)
formatted_path = endpoint.path.format(repo=repo, geoapi_str=geoapi_str)
url = f"{self.url()}/cvmfs/{formatted_path}"

timeout_seconds = 5
try:
log.info("Fetching url", url=url)
content = request.urlopen(url, timeout=timeout_seconds)
req = request.Request(url)
req.add_header("User-Agent", "Mozilla/5.0")
content = request.urlopen(req, timeout=timeout_seconds)

if endpoint in [Endpoints.REPOSITORIES_JSON, Endpoints.CVMFS_STATUS_JSON]:
log.debug(
Expand Down Expand Up @@ -307,6 +333,34 @@ def fetch_endpoint(

return endpoint.model_class(**content)

except error.HTTPError as e:
# If we get a 403 from a stratum1 on the repositories.json endpoint, we are
# probably dealing with a server that uses S3 as its backend. In this case
# this endpoint is not available, and we should just ignore it.
if (
e
and (endpoint == Endpoints.REPOSITORIES_JSON or endpoint == Endpoints.GEOAPI)
and self.server_type == 1
and e.code == 404
):
log.debug(
"Assuming S3 backend for stratum1",
server=self.name,
endpoint=endpoint.name,
repo=repo,
url=url,
)
return None
log.error(
"Fetch endpoint failure",
exc=e,
name=self.name,
endpoint=endpoint.name,
repo=repo,
url=url,
)
raise e from e

except error.URLError as e:
log.error(
"Fetch endpoint failure",
Expand Down
6 changes: 5 additions & 1 deletion cvmfsscraper/tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ def validate_and_load(data_dir: str) -> Dict[str, Union[str, bytes]]:
ENDPOINTS["http://example.com/timeout"] = urllib.error.URLError("timeout")


def mock_urlopen(url: str, timeout: Union[int, float, None] = None) -> Union[Mock, Exception]:
def mock_urlopen(
url: Union[str, urllib.request.Request], timeout: Union[int, float, None] = None
) -> Union[Mock, Exception]:
"""Mock urllib.request.urlopen based on a predefined URL mapping.
:param url: The URL to fetch.
Expand All @@ -56,6 +58,8 @@ def mock_urlopen(url: str, timeout: Union[int, float, None] = None) -> Union[Moc
:returns: Mocked HTTPResponse object with read() method.
"""
url = url.full_url if isinstance(url, urllib.request.Request) else url

if url not in ENDPOINTS:
raise HTTPError(url, 404, "Not Found", {}, None)

Expand Down
12 changes: 7 additions & 5 deletions cvmfsscraper/tests/test_002_models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Test the pydantic models in cvmfsscraper/models.py."""

from __future__ import annotations

import json
import os
from copy import deepcopy
Expand Down Expand Up @@ -76,7 +78,7 @@ class BaseCVMFSModelTestCase(TestCase):
"""Base model for testing CVMFS models."""

def verify_date_field(
self, cls: CVMFSBaseModel, input_data: Dict[str, Any], field: str
self, cls: type[CVMFSBaseModel], input_data: Dict[str, Any], field: str
) -> None:
"""Verify that a given field in the dataset is validated as a CVMFS date."""
data = deepcopy(input_data)
Expand All @@ -102,11 +104,11 @@ def verify_date_field(

def verify_str_field(
self,
cls: CVMFSBaseModel,
cls: type[CVMFSBaseModel],
input_data: Dict[str, Any],
field: str,
min_length: int = None,
max_length: int = None,
min_length: int = 0,
max_length: int = 0,
is_hex: bool = False,
) -> None:
"""Verify that a given field in the dataset is validated as a string."""
Expand Down Expand Up @@ -142,7 +144,7 @@ def verify_str_field(

def verify_int_field(
self,
cls: CVMFSBaseModel,
cls: type[CVMFSBaseModel],
input_data: Dict[str, Any],
field: str,
require_positive: bool = False,
Expand Down
Loading

0 comments on commit beb283d

Please sign in to comment.