Source code for firebird.uuid._spec

# SPDX-FileCopyrightText: 2022-present The Firebird Projects <www.firebirdsql.org>
#
# SPDX-License-Identifier: MIT
#
# PROGRAM/MODULE: firebird-uuid
# FILE:           firebird/uuid/spec.py
# DESCRIPTION:    Firebird OID registry specification
# CREATED:        14.11.2022
#
# The contents of this file are subject to the MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Copyright (c) 2022 Firebird Project (www.firebirdsql.org)
# All Rights Reserved.
#
# Contributor(s): Pavel Císař (original code)
#                 ______________________________________

"""Firebird OID registry specification.

The OID hierarchy is controlled by a set of YAML files, each file describing one level in
the tree hierarchy (that is, the root node of the child tree and all assigned nodes for
children). The `root.oid` file describes the OID of highest level (assigned by IANA).

Each file has the following format::

  # Description of root node for this sub-tree
  node:
      oid:           # Full OID, for example 1.3.6.1.4.1.53446
      name:          # Node name
      description:   # Node description
      contact:       # Name of the contact person
      email:         # E-mail of the contact person
      site:          # URL to website of node owner
      parent-spec:   # URL of parent YAML file, empty for top level (root) node
      type:          # enumeration: "root", "node", "leaf"

  # List of children nodes in order of numbers assigned to them
  # could be omitted for leaf node (see node.type)
  children:
      - number:      # Number assigned to this child node, oid = node.oid + '.' + number
        name:        # Node name
        description: # Node description, could be empty
        contact:     # Name of the contact person
        email:       # E-mail of the contact person
        site:        # URL to website of node owner
        node-spec:   # one of: keywords "leaf" or "private" or URL to YAML file describing this child node
"""

from __future__ import annotations
from typing import Tuple, Dict, Set
import os
import re
from urllib.request import url2pathname
import requests
import yaml

KeySet = Set[str]

class LocalFileAdapter(requests.adapters.BaseAdapter):
    """Protocol Adapter to allow Requests to GET file:// URLs
    """
    @staticmethod
    def _chkpath(method, path):
        """Return an HTTP status for the given filesystem path."""
        if method.lower() in ('put', 'delete'):
            return 501, "Not Implemented"  # TODO
        elif method.lower() not in ('get', 'head'):
            return 405, "Method Not Allowed"
        elif os.path.isdir(path):
            return 400, "Path Not A File"
        elif not os.path.isfile(path):
            return 404, "File Not Found"
        elif not os.access(path, os.R_OK):
            return 403, "Access Denied"
        else:
            return 200, "OK"

    def send(self, req, **kwargs):  # pylint: disable=unused-argument
        """Return the file specified by the given request
        """
        path = os.path.normcase(os.path.normpath(url2pathname(req.path_url)))
        response = requests.Response()

        response.status_code, response.reason = self._chkpath(req.method, path)
        if response.status_code == 200 and req.method.lower() != 'head':
            try:
                response.raw = open(path, 'rb')
            except (OSError, IOError) as err:
                response.status_code = 500
                response.reason = str(err)

        if isinstance(req.url, bytes):
            response.url = req.url.decode('utf-8')
        else:
            response.url = req.url

        response.request = req
        response.connection = self

        return response

    def close(self):
        pass

#: URL for ROOT specification
ROOT_SPEC = 'https://raw.githubusercontent.com/FirebirdSQL/firebird-uuid/master/root.oid'

ITEM_NODE = 'node'
ITEM_OID = 'oid'
ITEM_CHILDREN = 'children'
ITEM_NAME = 'name'
ITEM_DESCRIPTION = 'description'
ITEM_CONTACT = 'contact'
ITEM_EMAIL = 'email'
ITEM_SITE = 'site'
ITEM_PARENT_SPEC = 'parent-spec'
ITEM_TYPE = 'type'
ITEM_NODE_SPEC = 'node-spec'
ITEM_NUMBER = 'number'

#KEY_ITEMS = (ITEM_OID, ITEM_NAME, ITEM_DESCRIPTION, ITEM_CONTACT, ITEM_EMAIL, ITEM_SITE,
             #ITEM_TYPE, ITEM_NODE_SPEC)

SPEC_ITEMS: KeySet = set((ITEM_NODE, ITEM_CHILDREN))
NODE_ITEMS: KeySet = set((ITEM_OID, ITEM_NAME, ITEM_DESCRIPTION, ITEM_CONTACT, ITEM_EMAIL,
                          ITEM_SITE, ITEM_PARENT_SPEC, ITEM_TYPE))
CHILD_ITEMS: KeySet = set((ITEM_NUMBER, ITEM_NAME, ITEM_DESCRIPTION, ITEM_CONTACT, ITEM_EMAIL,
                           ITEM_SITE, ITEM_NODE_SPEC))

RE_EMAIL = re.compile(r"""(?:[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*|"(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21\x23-\x5b\x5d-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])*")@(?:(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?|\[(?:(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9]))\.){3}(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9])|[a-z0-9-]*[a-z0-9]:(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21-\x5a\x53-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])+)\])""")
RE_OID = re.compile(r"^(\d+\.)+\d+$")
RE_NAME = re.compile(r"^[a-zA-Z0-9_\-]+$")
TYPE_VALUES = ('root', 'node', 'leaf')
NODE_KEYWORDS = ('private', 'leaf')

def _check_string(data: Dict, item: str) -> None:
    value = data[item]
    if not isinstance(value, str) or not value.strip():
        raise ValueError(f"String value required for '{item}', found '{value}'")

def _check_number(data: Dict, item: str) -> None:
    value = data[item]
    if not isinstance(value, int) or value < 0:
        raise ValueError(f"Non-negative number value required for '{item}', found '{value}'")


def _check_email(data: Dict, item: str) -> None:
    _check_string(data, item)
    value = data[item]
    if RE_EMAIL.fullmatch(value) is None:
        raise ValueError(f"E-mail value required for '{item}', found '{value}'")

def _check_oid(data: Dict, item: str) -> None:
    _check_string(data, item)
    value = data[item]
    if RE_OID.fullmatch(value) is None:
        raise ValueError(f"OID value required for '{item}'")

def _check_name(data: Dict, item: str) -> None:
    _check_string(data, item)
    value = data[item]
    if RE_NAME.fullmatch(value) is None or value.lower() != value:
        raise ValueError(f"Single lowercase word required for '{item}', found '{value}'")

def _check_parent_spec(data: Dict, item: str) -> None:
    if data['type'] != 'root' and (not isinstance(item, str) or not item.strip()):
        raise ValueError(f"String value required for '{item}'")

def _check_node_spec(data: Dict, item: str) -> None:
    _check_string(data, item)
    value = data[item]
    if RE_NAME.fullmatch(value) is not None and value not in NODE_KEYWORDS:
        raise ValueError(f"Either URL or keyword required for '{item}'")

def _check_type(data: Dict, item: str) -> None:
    _check_string(data, item)
    value = data[item]
    if value not in TYPE_VALUES:
        raise ValueError(f"Invalid node type '{value}'")

_VALIDATE_MAP = {ITEM_OID: _check_oid,
                 ITEM_NAME: _check_name,
                 ITEM_DESCRIPTION: _check_string,
                 ITEM_CONTACT: _check_string,
                 ITEM_EMAIL: _check_email,
                 ITEM_SITE: _check_string,
                 ITEM_PARENT_SPEC: _check_parent_spec,
                 ITEM_TYPE: _check_type,
                 ITEM_NUMBER: _check_number,
                 }

def validate_dict(expected: KeySet, data: Dict[str, str]) -> None:
    """Validates dictionary.

    Arguments:
      expected: Set of expected keys.
      data:     Validated dictionary.

    Raises:
      ValueError: When any keys are missing or additional keys are present, or when
        values do not conform to specification.
    """
    given = set(data.keys())
    if expected != given:
        missing = expected.difference(given)
        additional = given.difference(expected)
        if missing and not additional:
            raise ValueError(f'Missing keys: {", ".join(missing)}')
        elif additional and not missing:
            raise ValueError(f'Found unexpected keys: {", ".join(additional)}')
        else:
            raise ValueError(f'Missing and unexpected keys found')
    # Validate items
    for item, checker in _VALIDATE_MAP.items():
        if item in data:
            checker(data, item)

def validate_spec(data: Dict) -> None:
    """Validates OID specification dictionary.

    Arguments:
      data:     Dictiorary with parsed OID YAML specification.

    Raises:
      ValueError: When any keys are missing or additional keys are present, or when
        values do not conform to specification.
    """
    validate_dict(SPEC_ITEMS, data)
    validate_dict(NODE_ITEMS, data['node'])
    for child in data['children']:
        validate_dict(CHILD_ITEMS, child)

def pythonize(data: Dict) -> Dict:
    """Returns dictionary with normalized key names for use as keyword parameters to
    `.Node` __init__ method.

    Arguments:
      data: Dictionary for normalization.
    """
    result = {key.replace('-', '_'): value for key, value in data.items()}
    if 'type' in result:
        result['node_type'] = result['type']
        del result['type']
    return result

def pythonize_spec(data: Dict) -> Dict:
    """Returns dictionary of parsed OID specification with normalized key names for use as
    keyword parameters to `.Node` __init__ method.

    Arguments:
      data: Dictionary for normalization.
    """
    result = {}
    result['node'] = pythonize(data['node'])
    for i, child in enumerate(data['children']):
        result['children'][i] = pythonize(child)
    return result

[docs] def get_specification(url: str) -> str: """Returns YAML text of OID specification from URL. Arguments: url: URL of OID specification. Raises: requests.HTTPError: If one occurred. """ requests_session = requests.session() requests_session.mount('file://', LocalFileAdapter()) spec_req: requests.Response = requests_session.get(url, allow_redirects=True) if not spec_req.ok: spec_req.raise_for_status() return spec_req.text
#def parse_spec(spec: str) -> Dict: #"""Returns dictionary with parsed OID specification. #Dictionary keys are normalized to Python identifiers (dash replaced with undersore), #and `type` is renamed to `node_type`. #Arguments: #spec: OID specification in YAML format. #Raises: #YAMLError: If specification is not valid YAML. #ValueError: If specification does not conform to specification format. #""" #data = yaml.safe_load(spec) #validate_spec(data) #pythonize_spec(data) #return data
[docs] def get_specifications(root: str=ROOT_SPEC) -> Tuple[Dict[str, str], Dict[str, Exception]]: """Function traverses the tree of OID YAML specifications, and returns accessible YAML specifications and errors encountered during tree traversal. This function does not perform any validation of loaded specifications beoynd checks and transformations needed to parse the YAML to get links to child specifications. Returns tuple with two dictionaries: - First dictionary contains `url: spec_yaml` with all YAML specifications that were successfuly fetched. - Second dictionary contains `url: Exception` with all errors encountered during tree traversal. Arguments: root: URL to root specification where tree traversal should begin. """ def load_tree(node: str) -> None: try: spec = get_specification(node) spec_map[node] = spec data = yaml.safe_load(spec) except Exception as exc: err_map[node] = exc return if ITEM_CHILDREN not in data: err_map[node] = Exception("Missing children specification") return for i, child in enumerate(data[ITEM_CHILDREN]): if ITEM_NODE_SPEC not in child: err_map[node] = Exception(f"Children {i} does not contain node-spec") return else: if child[ITEM_NODE_SPEC].lower() not in ('leaf', 'private'): load_tree(child[ITEM_NODE_SPEC]) spec_map: Dict[str, str] = {} err_map: Dict[str, Exception] = {} load_tree(ROOT_SPEC) return (spec_map, err_map)
[docs] def parse_specifications(specifications: Dict) -> Tuple[Dict[str, Dict], Dict[str, Exception]]: """Function that parses OID YAML specifications. Returns tuple with two dictionaries: - First dictionary contains `url: spec_dict`, where dictionaries contain data from successfuly parsed and validated OID YAML specifications. - Second dictionary contains `url: Exception` with errors encountered during parsing and validation. Arguments: specifications: Dictionary with YAML specifications returned by `.get_all_specifications()` function. """ data_map: Dict[str, Dict] = {} err_map: Dict[str, Exception] = {} for url, spec in specifications.items(): try: data: Dict = yaml.safe_load(spec) validate_spec(data) data = pythonize_spec(data) data_map[url] = data except Exception as exc: err_map[url] = exc return (data_map, err_map)