started work on backend

This commit is contained in:
d3m1g0d
2019-01-21 17:36:00 +01:00
parent a1a8bca34b
commit 9f9a7e4974
4032 changed files with 745079 additions and 0 deletions
@@ -0,0 +1,10 @@
"""
`serial` is an object serialization/deserialization library intended to facilitate authoring of API models which are
readable and introspective, and to expedite code and data validation and testing. `serial` supports JSON, YAML, and
XML.
"""
from __future__ import nested_scopes, generators, division, absolute_import, with_statement, \
print_function, unicode_literals
from . import utilities, abc, model, marshal, errors, properties, meta, hooks, test, request
@@ -0,0 +1,9 @@
# region Backwards Compatibility
from __future__ import nested_scopes, generators, division, absolute_import, with_statement, \
print_function, unicode_literals
from ..utilities.compatibility import backport
backport()
from . import model, properties
@@ -0,0 +1,250 @@
# Tell the linters what's up:
# pylint:disable=wrong-import-position,consider-using-enumerate,useless-object-inheritance
# mccabe:options:max-complexity=999
from __future__ import nested_scopes, generators, division, absolute_import, with_statement, \
print_function, unicode_literals
from serial.utilities.compatibility import backport
backport()
from abc import ABCMeta, abstractmethod
# We need ABCs to inherit from `ABC` in python 3x, but in python 2x ABC is absent and we need classes to inherit from
# `object` in order to be new-style classes
try:
from abc import ABC
except ImportError:
ABC = object
from ..utilities import collections
class Model(ABC):
__metaclass__ = ABCMeta
_format = None # type: Optional[str]
_meta = None # type: Optional[meta.Object]
_hooks = None # type: Optional[hooks.Object]
@abstractmethod
def __init__(self):
self._format = None # type: Optional[str]
self._meta = None # type: Optional[meta.Meta]
self._hooks = None # type: Optional[hooks.Hooks]
self._url = None # type: Optional[str]
self._xpath = None # type: Optional[str]
self._pointer = None # type: Optional[str]
@classmethod
def __subclasscheck__(cls, subclass):
# type: (object) -> bool
"""
Check a subclass to ensure it has required properties
"""
if cls is subclass or type.__subclasscheck__(cls, subclass):
return True
for attribute in (
'_format',
'_meta',
'_hooks'
):
if not hasattr(subclass, attribute):
return False
return True
def __instancecheck__(self, instance):
# type: (object) -> bool
"""
Check an instance of a subclass to ensure it has required properties
"""
for attribute in (
'_format',
'_meta',
'_hooks',
'_url',
'_xpath',
'_pointer'
):
if not hasattr(self, attribute):
return False
# Perform any instance checks needed for our superclass(es)
try:
return super().__instancecheck__(instance)
except AttributeError: # There is no further instance-checking to perform
return True
@abstractmethod
def _marshal(self):
# type: (...) -> Union[collections.OrderedDict, list]
pass
@abstractmethod
def _validate(self, raise_errors=True):
# type: (bool) -> None
pass
@abstractmethod
def __str__(self):
# type: (...) -> str
pass
@abstractmethod
def __repr__(self):
# type: (...) -> str
pass
@abstractmethod
def __copy__(self):
# type: (...) -> Object
pass
@abstractmethod
def __deepcopy__(self, memo):
# type: (Optional[dict]) -> Object
pass
@abstractmethod
def __eq__(self, other):
# type: (Any) -> bool
pass
@abstractmethod
def __ne__(self, other):
# type: (Any) -> bool
pass
class Object(Model):
@abstractmethod
def _marshal(self):
# type: (...) -> collections.OrderedDict
pass
@abstractmethod
def __setitem__(self, key, value):
# type: (str, Any) -> None
pass
@abstractmethod
def __getitem__(self, key):
# type: (str, Any) -> None
pass
@abstractmethod
def __setattr__(self, property_name, value):
# type: (Object, str, Any) -> None
pass
@abstractmethod
def __getattr__(self, key):
# type: (str) -> Any
pass
@abstractmethod
def __delattr__(self, key):
# type: (str) -> None
pass
class Dictionary(Model):
@classmethod
def __subclasscheck__(cls, subclass):
# type: (object) -> bool
"""
Verify inheritance
"""
if cls is subclass or type.__subclasscheck__(cls, subclass):
return True
if not issubclass(subclass, collections.OrderedDict):
return False
# Perform any subclass checks needed for our superclass(es)
return super().__subclasscheck__(subclass)
def __instancecheck__(self, instance):
# type: (object) -> bool
"""
Check an instance of a subclass to ensure it has required properties
"""
if not isinstance(self, collections.OrderedDict):
return False
# Perform any instance checks needed for our superclass(es)
return super().__instancecheck__(instance)
@abstractmethod
def _marshal(self):
# type: (...) -> collections.OrderedDict
pass
@abstractmethod
def __setitem__(self, key, value):
# type: (str, Any) -> None
pass
def keys(self):
# type: (...) -> Iterable[str]
pass
def values(self):
# type: (...) -> Iterable[Any]
pass
class Array(Model):
@classmethod
def __subclasscheck__(cls, subclass):
# type: (object) -> bool
"""
Verify inheritance
"""
if cls is subclass or type.__subclasscheck__(cls, subclass):
return True
if not issubclass(subclass, list):
return False
# Perform any subclass checks needed for our superclass(es)
return super().__subclasscheck__(subclass)
def __instancecheck__(self, instance):
# type: (object) -> bool
"""
Check an instance of a subclass to ensure it has required properties
"""
if not isinstance(self, list):
return False
# Perform any instance checks needed for our superclass(es)
return super().__instancecheck__(instance)
@abstractmethod
def _marshal(self):
# type: (...) -> list
pass
@abstractmethod
def __setitem__(self, key, value):
# type: (str, Any) -> None
pass
@abstractmethod
def append(self, value):
# type: (Any) -> None
pass
@@ -0,0 +1,108 @@
# Tell the linters what's up:
# pylint:disable=wrong-import-position,consider-using-enumerate,useless-object-inheritance
# mccabe:options:max-complexity=999
from __future__ import nested_scopes, generators, division, absolute_import, with_statement, \
print_function, unicode_literals
from ..utilities.compatibility import backport
backport()
from future.utils import native_str # noqa
from abc import ABCMeta, abstractmethod
# We need to inherit from `ABC` in python 3x, but in python 2x ABC is absent
try:
from abc import ABC
except ImportError:
ABC = object
class Property(ABC):
__metaclass__ = ABCMeta
@abstractmethod
def __init__(
self,
types=None, # type: Sequence[Union[type, Property]]
name=None, # type: Optional[str]
required=False, # type: Union[bool, collections.Callable]
versions=None, # type: Optional[Sequence[Union[str, serial.meta.Version]]]
):
self._types = None # type: Optional[Sequence[Union[type, Property]]]
self.types = types
self.name = name
self.required = required
self._versions = None # type: Optional[Union[Mapping[str, Optional[Property]], Set[Union[str, Number]]]]
self.versions = versions # type: Optional[Union[Mapping[str, Optional[Property]], Set[Union[str, Number]]]]
def __instancecheck__(self, instance):
# type: (object) -> bool
"""
Check an instance of a subclass to ensure it has required properties
"""
for attribute in (
'_types',
'_versions',
'name',
'required'
):
if not hasattr(self, attribute):
return False
# Perform any instance checks needed for our superclass(es)
return super().__instancecheck__(instance)
@property
@abstractmethod
def types(self):
# type: (...) -> Optional[Sequence[Union[type, Property, Model]]]
pass
@types.setter
@abstractmethod
def types(self, types_or_properties):
# type: (Optional[Sequence[Union[type, Property, Model]]]) -> None
pass
@property
@abstractmethod
def versions(self):
# type: () -> Optional[Sequence[meta.Version]]
pass
@versions.setter
@abstractmethod
def versions(
self,
versions # type: Optional[Sequence[Union[str, collections_abc.Iterable, meta.Version]]]
):
# type: (...) -> Optional[Union[Mapping[str, Optional[Property]], Set[Union[str, Number]]]]
pass
@abstractmethod
def unmarshal(self, data):
# type: (Any) -> Any
pass
@abstractmethod
def marshal(self, data):
# type: (Any) -> Any
pass
@abstractmethod
def __repr__(self):
# type: (...) -> str
pass
@abstractmethod
def __copy__(self):
# type: (...) -> Property
pass
@abstractmethod
def __deepcopy__(self, memo):
# type: (dict) -> Property
pass
@@ -0,0 +1,214 @@
from __future__ import nested_scopes, generators, division, absolute_import, with_statement, \
print_function, unicode_literals
from .utilities.compatibility import backport
backport()
from future.utils import native_str
from datetime import date, datetime
from decimal import Decimal
from numbers import Number
try:
import typing
except ImportError as e:
typing = None
from .abc.model import Model
from .utilities import qualified_name, collections, collections_abc, Generator
class ValidationError(Exception):
pass
class VersionError(AttributeError):
pass
class DefinitionExistsError(Exception):
pass
class UnmarshalValueError(ValueError):
pass
UNMARSHALLABLE_TYPES = tuple({
str, bytes, native_str, Number, Decimal, date, datetime, bool,
dict, collections.OrderedDict,
collections_abc.Set, collections_abc.Sequence, Generator,
Model
})
class UnmarshalError(Exception):
def __init__(
self,
data, # type: Any
types=None, # type: Optional[Sequence[Model, Property, type]]
item_types=None, # type: Optional[Sequence[Model, Property, type]]
value_types=None # type: Optional[Sequence[Model, Property, type]]
):
# type: (...) -> None
"""
Generate a comprehensible error message for data which could not be un-marshalled according to spec, and raise
the appropriate exception
"""
self._message = None # type: Optional[str]
self._parameter = None # type: Optional[str]
self._index = None # type: Optional[int]
self._key = None # type: Optional[str]
error_message_lines = ['']
# Identify which parameter is being used for type validation
types_label = None
if types:
types_label = 'types'
elif item_types:
types_label = 'item_types'
types = item_types
elif value_types:
types_label = 'value_types'
types = value_types
# Assemble the error message
# Assemble a text representation of the `data`
data_lines = []
lines = repr(data).strip().split('\n')
if len(lines) == 1:
data_lines.append(lines[0])
else:
data_lines.append('')
for line in lines:
data_lines.append(
' ' + line
)
# Assemble a text representation of the `types`, `item_types`, or `value_types`.
if types is None:
error_message_lines.append('The data provided is not an instance of an un-marshallable type:')
else:
error_message_lines.append(
'The data provided does not match any of the expected types and/or property definitions:'
)
error_message_lines.append(
' - data: %s' % '\n'.join(data_lines)
)
if types is None:
types = UNMARSHALLABLE_TYPES
types_label = 'un-marshallable types'
types_lines = ['(']
for type_ in types:
if isinstance(type_, type):
lines = (qualified_name(type_),)
else:
lines = repr(type_).split('\n')
for line in lines:
types_lines.append(
' ' + line
)
types_lines[-1] += ','
types_lines.append(' )')
error_message_lines.append(
' - %s: %s' % (types_label, '\n'.join(types_lines))
)
self.message = '\n'.join(error_message_lines)
@property
def paramater(self):
# type: (...) -> Optional[str]
return self._parameter
@paramater.setter
def paramater(self, paramater_name):
# type: (str) -> None
if paramater_name != self.paramater:
self._parameter = paramater_name
self.assemble_message()
@property
def message(self):
# type: (...) -> Optional[str]
return self._message
@message.setter
def message(self, message_text):
# type: (str) -> None
if message_text != self.message:
self._message = message_text
self.assemble_message()
@property
def index(self):
# type: (...) -> Optional[int]
return self._message
@index.setter
def index(self, index_or_key):
# type: (Union[str, int]) -> None
if index_or_key != self.index:
self._index = index_or_key
self.assemble_message()
def assemble_message(self):
messages = []
if self.paramater:
messages.append(
'Errors encountered in attempting to un-marshal %s:' % self.paramater
)
if self.index is not None:
messages.append(
'Errors encountered in attempting to un-marshal the value at index %s:' % repr(self.index)
)
if self.message:
messages.append(self.message)
super().__init__('\n'.join(messages))
class UnmarshalTypeError(UnmarshalError, TypeError):
pass
class UnmarshalValueError(UnmarshalError, ValueError):
pass
@@ -0,0 +1,258 @@
from __future__ import nested_scopes, generators, division, absolute_import, with_statement, \
print_function, unicode_literals
from .utilities.compatibility import backport
backport()
import serial.abc
from copy import deepcopy
import serial
from .utilities import qualified_name
from .abc.model import Model
try:
import typing
except ImportError as e:
typing = None
class Hooks(object):
def __init__(
self,
before_marshal=None, # Optional[Callable]
after_marshal=None, # Optional[Callable]
before_unmarshal=None, # Optional[Callable]
after_unmarshal=None, # Optional[Callable]
before_serialize=None, # Optional[Callable]
after_serialize=None, # Optional[Callable]
before_deserialize=None, # Optional[Callable]
after_deserialize=None, # Optional[Callable]
before_validate=None, # Optional[Callable]
after_validate=None, # Optional[Callable]
):
self.before_marshal = before_marshal
self.after_marshal = after_marshal
self.before_unmarshal = before_unmarshal
self.after_unmarshal = after_unmarshal
self.before_serialize = before_serialize
self.after_serialize = after_serialize
self.before_deserialize = before_deserialize
self.after_deserialize = after_deserialize
self.before_validate = before_validate
self.after_validate = after_validate
def __copy__(self):
return self.__class__(**vars(self))
def __deepcopy__(self, memo=None):
# type: (dict) -> Memo
return self.__class__(**{
k: deepcopy(v, memo=memo)
for k, v in vars(self).items()
})
def __bool__(self):
return True
class Object(Hooks):
def __init__(
self,
before_marshal=None, # Optional[Callable]
after_marshal=None, # Optional[Callable]
before_unmarshal=None, # Optional[Callable]
after_unmarshal=None, # Optional[Callable]
before_serialize=None, # Optional[Callable]
after_serialize=None, # Optional[Callable]
before_deserialize=None, # Optional[Callable]
after_deserialize=None, # Optional[Callable]
before_validate=None, # Optional[Callable]
after_validate=None, # Optional[Callable]
before_setattr=None, # Optional[Callable]
after_setattr=None, # Optional[Callable]
before_setitem=None, # Optional[Callable]
after_setitem=None, # Optional[Callable]
):
self.before_marshal = before_marshal
self.after_marshal = after_marshal
self.before_unmarshal = before_unmarshal
self.after_unmarshal = after_unmarshal
self.before_serialize = before_serialize
self.after_serialize = after_serialize
self.before_deserialize = before_deserialize
self.after_deserialize = after_deserialize
self.before_validate = before_validate
self.after_validate = after_validate
self.before_setattr = before_setattr
self.after_setattr = after_setattr
self.before_setitem = before_setitem
self.after_setitem = after_setitem
class Array(Hooks):
def __init__(
self,
before_marshal=None, # Optional[Callable]
after_marshal=None, # Optional[Callable]
before_unmarshal=None, # Optional[Callable]
after_unmarshal=None, # Optional[Callable]
before_serialize=None, # Optional[Callable]
after_serialize=None, # Optional[Callable]
before_deserialize=None, # Optional[Callable]
after_deserialize=None, # Optional[Callable]
before_validate=None, # Optional[Callable]
after_validate=None, # Optional[Callable]
before_setitem=None, # Optional[Callable]
after_setitem=None, # Optional[Callable]
before_append=None, # Optional[Callable]
after_append=None, # Optional[Callable]
):
self.before_marshal = before_marshal
self.after_marshal = after_marshal
self.before_unmarshal = before_unmarshal
self.after_unmarshal = after_unmarshal
self.before_serialize = before_serialize
self.after_serialize = after_serialize
self.before_deserialize = before_deserialize
self.after_deserialize = after_deserialize
self.before_validate = before_validate
self.after_validate = after_validate
self.before_setitem = before_setitem
self.after_setitem = after_setitem
self.before_append = before_append
self.after_append = after_append
class Dictionary(Hooks):
def __init__(
self,
before_marshal=None, # Optional[Callable]
after_marshal=None, # Optional[Callable]
before_unmarshal=None, # Optional[Callable]
after_unmarshal=None, # Optional[Callable]
before_serialize=None, # Optional[Callable]
after_serialize=None, # Optional[Callable]
before_deserialize=None, # Optional[Callable]
after_deserialize=None, # Optional[Callable]
before_validate=None, # Optional[Callable]
after_validate=None, # Optional[Callable]
before_setitem=None, # Optional[Callable]
after_setitem=None, # Optional[Callable]
):
self.before_marshal = before_marshal
self.after_marshal = after_marshal
self.before_unmarshal = before_unmarshal
self.after_unmarshal = after_unmarshal
self.before_serialize = before_serialize
self.after_serialize = after_serialize
self.before_deserialize = before_deserialize
self.after_deserialize = after_deserialize
self.before_validate = before_validate
self.after_validate = after_validate
self.before_setitem = before_setitem
self.after_setitem = after_setitem
def read(
model_instance # type: Union[type, serial.abc.model.Model]
):
# type: (...) -> Hooks
"""
Read metadata from a model instance (the returned metadata may be inherited, and therefore should not be written to)
"""
if isinstance(model_instance, type):
return model_instance._hooks
elif isinstance(model_instance, Model):
return model_instance._hooks or read(type(model_instance))
def writable(
model_instance # type: Union[type, serial.abc.model.Model]
):
# type: (...) -> Hooks
"""
Retrieve a metadata instance. If the instance currently inherits its metadata from a class or superclass,
this funtion will copy that metadata and assign it directly to the model instance.
"""
if isinstance(model_instance, type):
if model_instance._hooks is None:
model_instance._hooks = (
Object()
if issubclass(model_instance, serial.model.Object) else
Array()
if issubclass(model_instance, serial.model.Array) else
Dictionary()
if issubclass(model_instance, serial.model.Dictionary)
else None
)
else:
for b in model_instance.__bases__:
if hasattr(b, '_hooks') and (model_instance._hooks is b._hooks):
model_instance._hooks = deepcopy(model_instance._hooks)
break
elif isinstance(model_instance, Model):
if model_instance._hooks is None:
model_instance._hooks = deepcopy(writable(type(model_instance)))
return model_instance._hooks
def write(
model_instance, # type: Union[type, serial.abc.model.Model]
meta # type: Hooks
):
# type: (...) -> None
"""
Write metadata to a class or instance
"""
if isinstance(model_instance, type):
t = model_instance
mt = (
Object
if issubclass(model_instance, serial.model.Object) else
Array
if issubclass(model_instance, serial.model.Array) else
Dictionary
if issubclass(model_instance, serial.model.Dictionary)
else None
)
elif isinstance(model_instance, Model):
t = type(model_instance)
mt = (
Object
if isinstance(model_instance, serial.model.Object) else
Array
if isinstance(model_instance, serial.model.Array) else
Dictionary
if isinstance(model_instance, serial.model.Dictionary)
else None
)
if not isinstance(meta, mt):
raise ValueError(
'Hooks assigned to `%s` must be of type `%s`' % (
qualified_name(t),
qualified_name(mt)
)
)
model_instance._hooks = meta
@@ -0,0 +1,704 @@
# Tell the linters what's up:
# pylint:disable=wrong-import-position,consider-using-enumerate,useless-object-inheritance
# mccabe:options:max-complexity=999
from __future__ import nested_scopes, generators, division, absolute_import, with_statement, \
print_function, unicode_literals
from .utilities.compatibility import backport
backport() # noqa
from future.utils import native_str
from copy import deepcopy
import json
from decimal import Decimal
from base64 import b64encode
from numbers import Number
import yaml
from datetime import date, datetime
from itertools import chain
import serial
from . import utilities, abc, properties, errors, meta, hooks
from .utilities import Generator, qualified_name, read, collections, collections_abc
try:
from typing import Union, Optional, Sequence, Any, Callable
except ImportError:
Union = Optional = Sequence = Any = Callable = None
try:
from abc import ABC
except ImportError:
ABC = None
UNMARSHALLABLE_TYPES = tuple({
str, bytes, native_str, Number, Decimal, date, datetime, bool,
dict, collections.OrderedDict,
collections_abc.Set, collections_abc.Sequence, Generator,
abc.model.Model, properties.Null, properties.NoneType
})
def ab2c(abc_or_property):
# type: (ABC) -> Union[model.Object, model.Dict, model.Array, properties.Property]
"""
Get the corresponding model class from an abstract base class, or if none exists--return the original class or type
"""
class_or_property = abc_or_property # type: (type)
if isinstance(abc_or_property, type):
if abc_or_property in {abc.model.Dictionary, abc.model.Array, abc.model.Object}:
class_or_property = getattr(
serial.model,
abc_or_property.__name__.split('.')[-1]
)
elif isinstance(abc_or_property, properties.Property):
class_or_property = deepcopy(abc_or_property)
for types_attribute in ('types', 'value_types', 'item_types'):
if hasattr(class_or_property, types_attribute):
setattr(
class_or_property,
types_attribute,
tuple(
ab2c(type_) for type_ in getattr(class_or_property, types_attribute)
)
)
return class_or_property
def ab2cs(abcs_or_properties):
# type: (Sequence[Union[ABC, properties.Property]]) -> Sequence[model.Model, properties.Property]
for abc_or_property in abcs_or_properties:
yield ab2c(abc_or_property)
class _Marshal(object):
pass
def marshal(
data, # type: Any
types=None, # type: Optional[Sequence[Union[type, properties.Property, Callable]]]
value_types=None, # type: Optional[Sequence[Union[type, properties.Property]]]
item_types=None, # type: Optional[Sequence[Union[type, properties.Property]]]
):
# type: (...) -> Any
"""
Recursively converts instances of `serial.abc.model.Model` into JSON/YAML/XML serializable objects.
"""
if hasattr(data, '_marshal'):
return data._marshal() # noqa - this is *our* protected member, so linters can buzz off
if data is None:
return data
if callable(types):
types = types(data)
# If data types have been provided, validate the un-marshalled data by attempting to initialize the provided type(s)
# with `data`
if types is not None:
if (str in types) and (native_str is not str) and (native_str not in types):
types = tuple(chain(*(
((type_, native_str) if (type_ is str) else (type_,))
for type_ in types
)))
matched = False
for type_ in types:
if isinstance(type_, properties.Property):
try:
data = type_.marshal(data)
matched = True
break
except TypeError:
pass
elif isinstance(type_, type) and isinstance(data, type_):
matched = True
break
if not matched:
raise TypeError(
'%s cannot be interpreted as any of the designated types: %s' % (
repr(data),
repr(types)
)
)
if value_types is not None:
for k, v in data.items():
data[k] = marshal(v, types=value_types)
if item_types is not None:
for i in range(len(data)):
data[i] = marshal(data[i], types=item_types)
if isinstance(data, Decimal):
return float(data)
if isinstance(data, (date, datetime)):
return data.isoformat()
if isinstance(data, native_str):
return data
if isinstance(data, (bytes, bytearray)):
return str(b64encode(data), 'ascii')
if hasattr(data, '__bytes__'):
return str(b64encode(bytes(data)), 'ascii')
return data
class _Unmarshal(object):
"""
This class should be used exclusively by `serial.marshal.unmarshal`.
"""
def __init__(
self,
data, # type: Any
types=None, # type: Optional[Sequence[Union[type, properties.Property]]]
value_types=None, # type: Optional[Sequence[Union[type, properties.Property]]]
item_types=None # type: Optional[Sequence[Union[type, properties.Property]]]
):
# type: (...) -> None
if not isinstance(
data,
UNMARSHALLABLE_TYPES
):
# Verify that the data can be parsed before attempting to un-marshall it
raise errors.UnmarshalTypeError(
'%s, an instance of `%s`, cannot be un-marshalled. ' % (repr(data), type(data).__name__) +
'Acceptable types are: ' + ', '.join((
qualified_name(data_type)
for data_type in UNMARSHALLABLE_TYPES
))
)
# If only one type was passed for any of the following parameters--we convert it to a tuple
# If any parameters are abstract base classes--we convert them to the corresponding models
if types is not None:
if not isinstance(types, collections_abc.Sequence):
types = (types,)
if value_types is not None:
if not isinstance(value_types, collections_abc.Sequence):
value_types = (value_types,)
if item_types is not None:
if not isinstance(item_types, collections_abc.Sequence):
item_types = (item_types,)
# Member data
self.data = data # type: Any
self.types = types # type: Optional[Sequence[Union[type, properties.Property]]]
self.value_types = value_types # type: Optional[Sequence[Union[type, properties.Property]]]
self.item_types = item_types # type: Optional[Sequence[Union[type, properties.Property]]]
self.meta = None # type: Optional[meta.Meta]
def __call__(self):
# type: (...) -> Any
"""
Return `self.data` unmarshalled
"""
unmarshalled_data = self.data
if (
(self.data is not None) and
(self.data is not properties.NULL)
):
# If the data is a serial `Model`, get it's metadata
if isinstance(self.data, abc.model.Model):
self.meta = meta.read(self.data)
if self.meta is None: # Only un-marshall models if they have no metadata yet (are generic)
# If `types` is a function, it should be one which expects to receive marshalled data and returns a list
# of types which are applicable
if callable(self.types):
self.types = self.types(self.data)
# If the data provided is a `Generator`, make it static by casting the data into a tuple
if isinstance(self.data, Generator):
self.data = tuple(self.data)
if self.types is None:
# If no types are provided, we unmarshal the data into one of serial's generic container types
unmarshalled_data = self.as_container_or_simple_type
else:
self.backport_types()
unmarshalled_data = None
successfully_unmarshalled = False
first_error = None # type: Optional[Exception]
# Attempt to un-marshal the data as each type, in the order provided
for type_ in self.types:
error = None # type: Optional[Union[AttributeError, KeyError, TypeError, ValueError]]
try:
unmarshalled_data = self.as_type(type_)
# if (self.data is not None) and (unmarshalled_data is None):
# raise RuntimeError(self.data)
# If the data is un-marshalled successfully, we do not need to try any further types
successfully_unmarshalled = True
break
except (AttributeError, KeyError, TypeError, ValueError) as e:
error = e
if (first_error is None) and (error is not None):
first_error = error
if not successfully_unmarshalled:
if (first_error is None) or isinstance(first_error, TypeError):
raise errors.UnmarshalTypeError(
self.data,
types=self.types,
value_types=self.value_types,
item_types=self.item_types
)
elif isinstance(first_error, ValueError):
raise errors.UnmarshalValueError(
self.data,
types=self.types,
value_types=self.value_types,
item_types=self.item_types
)
else:
raise first_error # noqa - pylint erroneously identifies this as raising `None`
return unmarshalled_data
@property
def as_container_or_simple_type(self):
# type: (...) -> Any
"""
This function unmarshalls and returns the data into one of serial's container types, or if the data is of a
simple data type--it returns that data unmodified
"""
unmarshalled_data = self.data
if isinstance(self.data, abc.model.Dictionary):
type_ = type(self.data())
if self.value_types is not None:
unmarshalled_data = type_(self.data, value_types=self.value_types)
elif isinstance(self.data, abc.model.Array):
if self.item_types is not None:
unmarshalled_data = serial.model.Array(self.data, item_types=self.item_types)
elif isinstance(self.data, (dict, collections.OrderedDict)):
unmarshalled_data = serial.model.Dictionary(self.data, value_types=self.value_types)
elif (
isinstance(self.data, (collections_abc.Set, collections_abc.Sequence))
) and (
not isinstance(self.data, (str, bytes, native_str))
):
unmarshalled_data = serial.model.Array(self.data, item_types=self.item_types)
elif not isinstance(self.data, (str, bytes, native_str, Number, Decimal, date, datetime, bool, abc.model.Model)):
raise errors.UnmarshalValueError(
'%s cannot be un-marshalled' % repr(self.data)
)
return unmarshalled_data
def backport_types(self):
# type: (...) -> None
"""
This examines a set of types passed to `unmarshal`, and resolves any compatibility issues with the python
version being utilized
"""
if (str in self.types) and (native_str is not str) and (native_str not in self.types):
self.types = tuple(chain(*(
((type_, native_str) if (type_ is str) else (type_,))
for type_ in self.types
))) # type: Tuple[Union[type, properties.Property], ...]
def as_type(
self,
type_, # type: Union[type, properties.Property]
):
# type: (...) -> bool
unmarshalled_data = None # type: Union[abc.model.Model, Number, str, bytes, date, datetime]
if isinstance(
type_,
properties.Property
):
unmarshalled_data = type_.unmarshal(self.data)
elif isinstance(type_, type):
if isinstance(
self.data,
(dict, collections.OrderedDict, abc.model.Model)
):
if issubclass(type_, abc.model.Object):
unmarshalled_data = type_(self.data)
elif issubclass(
type_,
abc.model.Dictionary
):
unmarshalled_data = type_(self.data, value_types=self.value_types)
elif issubclass(
type_,
(dict, collections.OrderedDict)
):
unmarshalled_data = serial.model.Dictionary(self.data, value_types=self.value_types)
else:
raise TypeError(self.data)
elif (
isinstance(self.data, (collections_abc.Set, collections_abc.Sequence, abc.model.Array)) and
(not isinstance(self.data, (str, bytes, native_str)))
):
if issubclass(type_, abc.model.Array):
unmarshalled_data = type_(self.data, item_types=self.item_types)
elif issubclass(
type_,
(collections_abc.Set, collections_abc.Sequence)
) and not issubclass(
type_,
(str, bytes, native_str)
):
unmarshalled_data = serial.model.Array(self.data, item_types=self.item_types)
else:
raise TypeError('%s is not of type `%s`' % (repr(self.data), repr(type_)))
elif isinstance(self.data, type_):
if isinstance(self.data, Decimal):
unmarshalled_data = float(self.data)
else:
unmarshalled_data = self.data
else:
raise TypeError(self.data)
return unmarshalled_data
def unmarshal(
data, # type: Any
types=None, # type: Optional[Union[Sequence[Union[type, properties.Property]], type, properties.Property]]
value_types=None, # type: Optional[Union[Sequence[Union[type, properties.Property]], type, properties.Property]]
item_types=None, # type: Optional[Union[Sequence[Union[type, properties.Property]], type, properties.Property]]
):
# type: (...) -> Optional[Union[abc.model., str, Number, date, datetime]]
"""
Converts `data` into an instance of a serial model, and recursively does the same for all member data.
Parameters:
- data ([type|serial.properties.Property]): One or more data types. Each type
This is done by attempting to cast that data into a series of `types`.
to "un-marshal" data which has been deserialized from bytes or text, but is still represented
by generic containers
"""
unmarshalled_data = _Unmarshal(
data,
types=types,
value_types=value_types,
item_types=item_types
)()
return unmarshalled_data
def serialize(data, format_='json'):
# type: (Union[abc.model.Model, str, Number], Optional[str]) -> str
"""
Serializes instances of `serial.model.Object` as JSON or YAML.
"""
instance_hooks = None
if isinstance(data, abc.model.Model):
instance_hooks = hooks.read(data)
if (instance_hooks is not None) and (instance_hooks.before_serialize is not None):
data = instance_hooks.before_serialize(data)
if format_ not in ('json', 'yaml'): # , 'xml'
format_ = format_.lower()
if format_ not in ('json', 'yaml'):
raise ValueError(
'Supported `serial.model.serialize()` `format_` values include "json" and "yaml" (not "%s").' %
format_
)
if format_ == 'json':
data = json.dumps(marshal(data))
elif format_ == 'yaml':
data = yaml.dump(marshal(data))
if (instance_hooks is not None) and (instance_hooks.after_serialize is not None):
data = instance_hooks.after_serialize(data)
if not isinstance(data, str):
if isinstance(data, native_str):
data = str(data)
return data
def deserialize(data, format_):
# type: (Optional[Union[str, IOBase, addbase]], str) -> Any
"""
Parameters:
- data (str|io.IOBase|io.addbase):
This can be a string or file-like object containing JSON, YAML, or XML serialized inforation.
- format_ (str):
This can be "json", "yaml" or "xml".
Returns:
A deserialized representation of the information you provided.
"""
if format_ not in ('json', 'yaml'): # , 'xml'
raise NotImplementedError(
'Deserialization of data in the format %s is not currently supported.' % repr(format_)
)
if not isinstance(data, (str, bytes)):
data = read(data)
if isinstance(data, bytes):
data = str(data, encoding='utf-8')
if isinstance(data, str):
if format_ == 'json':
data = json.loads(
data,
object_hook=collections.OrderedDict,
object_pairs_hook=collections.OrderedDict
)
elif format_ == 'yaml':
data = yaml.load(data)
return data
def detect_format(data):
# type: (Optional[Union[str, IOBase, addbase]]) -> Tuple[Any, str]
"""
Parameters:
- data (str|io.IOBase|io.addbase):
This can be a string or file-like object containing JSON, YAML, or XML serialized inforation.
Returns:
A tuple containing the deserialized information and a string indicating the format of that information.
"""
if not isinstance(data, str):
try:
data = utilities.read(data)
except TypeError:
return data, None
formats = ('json', 'yaml') # , 'xml'
format_ = None
for potential_format in formats:
try:
data = deserialize(data, potential_format)
format_ = potential_format
break
except (ValueError, yaml.YAMLError):
pass
if format is None:
raise ValueError(
'The data provided could not be parsed:\n' + repr(data)
)
return data, format_
def validate(
data, # type: Optional[abc.model.Model]
types=None, # type: Optional[Union[type, properties.Property, model.Object, Callable]]
raise_errors=True # type: bool
):
# type: (...) -> Sequence[str]
"""
This function verifies that all properties/items/values in an instance of `serial.abc.model.Model` are of the
correct data type(s), and that all required attributes are present (if applicable). If `raise_errors` is `True`
(this is the default)--violations will result in a validation error. If `raise_errors` is `False`--a list of error
messages will be returned if invalid/missing information is found, or an empty list otherwise.
"""
if isinstance(data, Generator):
data = tuple(data)
error_messages = []
error_message = None
if types is not None:
if callable(types):
types = types(data)
if (str in types) and (native_str is not str) and (native_str not in types):
types = tuple(chain(*(
((type_, native_str) if (type_ is str) else (type_,))
for type_ in types
)))
valid = False
for type_ in types:
if isinstance(type_, type) and isinstance(data, type_):
valid = True
break
elif isinstance(type_, properties.Property):
if type_.types is None:
valid = True
break
try:
validate(data, type_.types, raise_errors=True)
valid = True
break
except errors.ValidationError:
pass
if not valid:
error_message = (
'Invalid data:\n\n%s\n\nThe data must be one of the following types:\n\n%s' % (
'\n'.join(
' ' + line
for line in repr(data).split('\n')
),
'\n'.join(chain(
(' (',),
(
' %s,' % '\n'.join(
' ' + line
for line in repr(type_).split('\n')
).strip()
for type_ in types
),
(' )',)
))
)
)
if error_message is not None:
if (not error_messages) or (error_message not in error_messages):
error_messages.append(error_message)
if ('_validate' in dir(data)) and callable(data._validate):
error_messages.extend(
error_message for error_message in
data._validate(raise_errors=False)
if error_message not in error_messages
)
if raise_errors and error_messages:
raise errors.ValidationError('\n' + '\n\n'.join(error_messages))
return error_messages
@@ -0,0 +1,812 @@
from __future__ import nested_scopes, generators, division, absolute_import, with_statement, \
print_function, unicode_literals
from .utilities.compatibility import backport
backport() # noqa
from future.utils import native_str
import numbers
import operator
import re
import collections
from collections import OrderedDict
from copy import copy, deepcopy
from itertools import chain
from numbers import Number
try:
from typing import Optional, Dict, Sequence, Tuple, Mapping, Union, Any, List
except ImportError:
Optional = Sequence = Dict = Tuple = Mapping = Union = Any = List = None
import serial
from serial.utilities import qualified_name, properties_values, collections_abc
from serial.abc.model import Model
from serial.abc.properties import Property
_DOT_SYNTAX_RE = re.compile(
r'^\d+(\.\d+)*$'
)
class Meta(object):
def __copy__(self):
new_instance = self.__class__()
for a in dir(self):
if a[0] != '_':
v = getattr(self, a)
if not isinstance(v, collections.Callable):
setattr(new_instance, a, v)
return new_instance
def __deepcopy__(self, memo=None):
# type: (Optional[dict]) -> Meta
new_instance = self.__class__()
for a, v in properties_values(self):
setattr(new_instance, a, deepcopy(v, memo=memo))
return new_instance
def __bool__(self):
return True
def __repr__(self):
return ('\n'.join(
['%s(' % qualified_name(type(self))] +
[
' %s=%s,' % (p, repr(v))
for p, v in properties_values(self)
] +
[')']
))
class Version(Meta):
def __init__(
self,
version_number=None, # type: Optional[str]
specification=None, # type: Optional[Sequence[str]]
equals=None, # type: Optional[Sequence[Union[str, Number]]]
not_equals=None, # type: Optional[Sequence[Union[str, Number]]]
less_than=None, # type: Optional[Sequence[Union[str, Number]]]
less_than_or_equal_to=None, # type: Optional[Sequence[Union[str, Number]]]
greater_than=None, # type: Optional[Sequence[Union[str, Number]]]
greater_than_or_equal_to=None, # type: Optional[Sequence[Union[str, Number]]]
):
if isinstance(version_number, str) and (
(specification is None) and
(equals is None) and
(not_equals is None) and
(less_than is None) and
(less_than_or_equal_to is None) and
(greater_than is None) and
(greater_than_or_equal_to is None)
):
specification = None
for s in version_number.split('&'):
if '==' in s:
s, equals = s.split('==')
elif '<=' in s:
s, less_than_or_equal_to = s.split('<=')
elif '>=' in s:
s, greater_than_or_equal_to = s.split('>=')
elif '<' in s:
s, less_than = s.split('<')
elif '>' in s:
s, greater_than = s.split('>')
elif '!=' in s:
s, not_equals = s.split('!=')
elif '=' in s:
s, equals = s.split('=')
if specification:
if s != specification:
raise ValueError(
'Multiple specifications cannot be associated with an instance of ' +
'`serial.meta.Version`: ' + repr(version_number)
)
elif s:
specification = s
self.specification = specification
self.equals = equals
self.not_equals = not_equals
self.less_than = less_than
self.less_than_or_equal_to = less_than_or_equal_to
self.greater_than = greater_than
self.greater_than_or_equal_to = greater_than_or_equal_to
def __eq__(self, other):
# type: (Any) -> bool
compare_properties_functions = (
('equals', operator.eq),
('not_equals', operator.ne),
('less_than', operator.lt),
('less_than_or_equal_to', operator.le),
('greater_than', operator.gt),
('greater_than_or_equal_to', operator.ge),
)
if (
(isinstance(other, str) and _DOT_SYNTAX_RE.match(other)) or
isinstance(other, (collections_abc.Sequence, int))
):
if isinstance(other, (native_str, bytes, numbers.Number)):
other = str(other)
if isinstance(other, str):
other = other.rstrip('.0')
if other == '':
other_components = (0,)
else:
other_components = tuple(int(other_component) for other_component in other.split('.'))
else:
other_components = tuple(other)
for compare_property, compare_function in compare_properties_functions:
compare_value = getattr(self, compare_property)
if compare_value is not None:
compare_values = tuple(int(n) for n in compare_value.split('.'))
other_values = copy(other_components)
ld = len(other_values) - len(compare_values)
if ld < 0:
other_values = tuple(chain(other_values, [0] * (-ld)))
elif ld > 0:
compare_values = tuple(chain(compare_values, [0] * ld))
if not compare_function(other_values, compare_values):
return False
else:
for compare_property, compare_function in compare_properties_functions:
compare_value = getattr(self, compare_property)
if (compare_value is not None) and not compare_function(other, compare_value):
return False
return True
def __str__(self):
representation = []
for property, operator in (
('equals', '=='),
('not_equals', '!='),
('greater_than', '>'),
('greater_than_or_equal_to', '>='),
('less_than', '<'),
('less_than_or_equal_to', '<='),
):
v = getattr(self, property)
if v is not None:
representation.append(
self.specification + operator + v
)
return '&'.join(representation)
class Object(Meta):
def __init__(
self,
properties=None, # type: Optional[Properties]
):
self._properties = None # type: Optional[Properties]
self.properties = properties
@property
def properties(self):
# type: () -> Optional[Properties]
return self._properties
@properties.setter
def properties(
self,
properties_
# type: Optional[Union[Dict[str, Property], Sequence[Tuple[str, Property]]]]
):
# type: (...) -> None
self._properties = Properties(properties_)
class Dictionary(Meta):
def __init__(
self,
value_types=None, # type: Optional[Sequence[Property, type]]
):
self._value_types = None # type: Optional[Tuple]
self.value_types = value_types
@property
def value_types(self):
# type: () -> Optional[Dict[str, Union[type, Property, serial.model.Object]]]
return self._value_types
@value_types.setter
def value_types(self, value_types):
# type: (Optional[Sequence[Union[type, Property, serial.model.Object]]]) -> None
if value_types is not None:
if isinstance(value_types, (type, Property)):
value_types = (value_types,)
if native_str is not str:
if isinstance(value_types, collections.Callable):
_types = value_types
def value_types(d):
# type: (Any) -> Any
ts = _types(d)
if (ts is not None) and (str in ts) and (native_str not in ts):
ts = tuple(chain(*(
((t, native_str) if (t is str) else (t,))
for t in ts
)))
return ts
elif (str in value_types) and (native_str is not str) and (native_str not in value_types):
value_types = chain(*(
((t, native_str) if (t is str) else (t,))
for t in value_types
))
if not isinstance(value_types, collections_abc.Callable):
value_types = tuple(value_types)
self._value_types = value_types
class Array(Meta):
def __init__(
self,
item_types=None, # type: Optional[Sequence[Property, type]]
):
self._item_types = None # type: Optional[Tuple]
self.item_types = item_types
@property
def item_types(self):
return self._item_types
@item_types.setter
def item_types(self, item_types):
# type: (Optional[Sequence[Union[type, Property, serial.model.Object]]]) -> None
if item_types is not None:
if isinstance(item_types, (type, Property)):
item_types = (item_types,)
if native_str is not str:
if isinstance(item_types, collections.Callable):
_types = item_types
def item_types(d):
# type: (Any) -> Any
ts = _types(d)
if (ts is not None) and (str in ts) and (native_str not in ts):
ts = tuple(chain(*(
((t, native_str) if (t is str) else (t,))
for t in ts
)))
return ts
elif (str in item_types) and (native_str is not str) and (native_str not in item_types):
item_types = chain(*(
((t, native_str) if (t is str) else (t,))
for t in item_types
))
if not callable(item_types):
item_types = tuple(item_types)
self._item_types = item_types
class Properties(OrderedDict):
def __init__(
self,
items=(
None
) # type: Optional[Union[Dict[str, Property], List[Tuple[str, Property]]]]
):
if items is None:
super().__init__()
else:
if isinstance(items, OrderedDict):
items = items.items()
elif isinstance(items, dict):
items = sorted(items.items())
super().__init__(items)
def __setitem__(self, key, value):
# type: (str, Property) -> None
if not isinstance(value, Property):
raise ValueError(value)
super().__setitem__(key, value)
def __copy__(self):
# type: () -> Properties
return self.__class__(self)
def __deepcopy__(self, memo=None):
# type: (dict) -> Properties
return self.__class__(
tuple(
(k, deepcopy(v, memo=memo))
for k, v in self.items()
)
)
def __repr__(self):
representation = [
qualified_name(type(self)) + '('
]
items = tuple(self.items())
if len(items) > 0:
representation[0] += '['
for k, v in items:
rv = (
qualified_name(v) if isinstance(v, type) else
repr(v)
)
rvls = rv.split('\n')
if len(rvls) > 1:
rvs = [rvls[0]]
for rvl in rvls[1:]:
rvs.append(' ' + rvl)
rv = '\n'.join(rvs)
representation += [
' (',
' %s,' % repr(k),
' %s' % rv,
' ),'
]
else:
representation.append(
' (%s, %s),' % (repr(k), rv)
)
representation[-1] = representation[-1][:-1]
representation.append(
']'
)
representation[-1] += ')'
if len(representation) > 2:
return '\n'.join(representation)
else:
return ''.join(representation)
def read(
model # type: Union[type, serial.abc.model.Model]
):
# type: (...) -> Optional[Meta]
if isinstance(
model,
Model
):
return model._meta or read(type(model))
elif isinstance(model, type) and issubclass(model, Model):
return model._meta
else:
try:
repr_model = repr(model)
except:
repr_model = object.__repr__(model)
raise TypeError(
'%s requires a parameter which is an instance or sub-class of `%s`, not%s' % (
serial.utilities.calling_function_qualified_name(),
qualified_name(Model),
(
':\n' + repr_model
if '\n' in repr_model else
' `%s`' % repr_model
)
)
)
def writable(
model # type: Union[type, serial.abc.model.Model]
):
# type: (...) -> Optional[Meta]
if isinstance(model, Model):
if model._meta is None:
model._meta = deepcopy(writable(type(model)))
elif isinstance(model, type) and issubclass(model, Model):
if model._meta is None:
model._meta = (
Object()
if issubclass(model, serial.model.Object) else
Array()
if issubclass(model, serial.model.Array) else
Dictionary()
if issubclass(model, serial.model.Dictionary)
else None
)
else:
for b in model.__bases__:
if hasattr(b, '_meta') and (model._meta is b._meta):
model._meta = deepcopy(model._meta)
break
else:
repr_model = repr(model)
raise TypeError(
'%s requires a parameter which is an instance or sub-class of `%s`, not%s' % (
serial.utilities.calling_function_qualified_name(),
qualified_name(Model),
(
':\n' + repr_model
if '\n' in repr_model else
' `%s`' % repr_model
)
)
)
return model._meta
def write(
model, # type: Union[type, serial.model.Object]
meta # type: Meta
):
# type: (...) -> None
if isinstance(model, Model):
model_type = type(model)
elif isinstance(model, type) and issubclass(model, Model):
model_type = model
else:
repr_model = repr(model)
raise TypeError(
'%s requires a value for the parameter `model` which is an instance or sub-class of `%s`, not%s' % (
serial.utilities.calling_function_qualified_name(),
qualified_name(Model),
(
':\n' + repr_model
if '\n' in repr_model else
' `%s`' % repr_model
)
)
)
metadata_type = (
Object
if issubclass(model_type, serial.model.Object) else
Array
if issubclass(model_type, serial.model.Array) else
Dictionary
if issubclass(model_type, serial.model.Dictionary)
else None
)
if not isinstance(meta, metadata_type):
raise ValueError(
'Metadata assigned to `%s` must be of type `%s`' % (
qualified_name(model_type),
qualified_name(metadata_type)
)
)
model._meta = meta
_UNIDENTIFIED = None
def xpath(model, xpath_=_UNIDENTIFIED):
# type: (serial.abc.model.Model, Optional[str]) -> Optional[str]
"""
Return the xpath at which the element represented by this object was found, relative to the root document. If
the parameter `xpath_` is provided--set the value
"""
if not isinstance(model, Model):
raise TypeError(
'`model` must be an instance of `%s`, not %s.' % (qualified_name(Model), repr(model))
)
if xpath_ is not _UNIDENTIFIED:
if not isinstance(xpath_, str):
if isinstance(xpath_, native_str):
xpath_ = str(xpath_)
else:
raise TypeError(
'`xpath_` must be a `str`, not %s.' % repr(xpath_)
)
model._xpath = xpath_
if isinstance(model, serial.model.Dictionary):
for k, v in model.items():
if isinstance(v, Model):
xpath(v, '%s/%s' % (xpath_, k))
elif isinstance(model, serial.model.Object):
for pn, p in read(model).properties.items():
k = p.name or pn
v = getattr(model, pn)
if isinstance(v, Model):
xpath(v, '%s/%s' % (xpath_, k))
elif isinstance(model, serial.model.Array):
for i in range(len(model)):
v = model[i]
if isinstance(v, Model):
xpath(v, '%s[%s]' % (xpath_, str(i)))
return model._xpath
def pointer(model, pointer_=_UNIDENTIFIED):
# type: (serial.abc.model.Model, Optional[str]) -> Optional[str]
if not isinstance(model, Model):
raise TypeError(
'`model` must be an instance of `%s`, not %s.' % (qualified_name(Model), repr(model))
)
if pointer_ is not _UNIDENTIFIED:
if not isinstance(pointer_, (str, native_str)):
raise TypeError(
'`pointer_` must be a `str`, not %s (of type `%s`).' % (repr(pointer_), type(pointer_).__name__)
)
model._pointer = pointer_
if isinstance(model, serial.model.Dictionary):
for k, v in model.items():
if isinstance(v, Model):
pointer(v, '%s/%s' % (pointer_, k.replace('~', '~0').replace('/', '~1')))
elif isinstance(model, serial.model.Object):
for pn, property in read(model).properties.items():
k = property.name or pn
v = getattr(model, pn)
if isinstance(v, Model):
pointer(v, '%s/%s' % (pointer_, k.replace('~', '~0').replace('/', '~1')))
elif isinstance(model, serial.model.Array):
for i in range(len(model)):
v = model[i]
if isinstance(v, Model):
pointer(v, '%s[%s]' % (pointer_, str(i)))
return model._pointer
def url(model, url_=_UNIDENTIFIED):
# type: (serial.abc.model.Model, Optional[str]) -> Optional[str]
if not isinstance(model, serial.abc.model.Model):
raise TypeError(
'`model` must be an instance of `%s`, not %s.' % (qualified_name(Model), repr(model))
)
if url_ is not _UNIDENTIFIED:
if not isinstance(url_, str):
raise TypeError(
'`url_` must be a `str`, not %s.' % repr(url_)
)
model._url = url_
if isinstance(model, serial.model.Dictionary):
for v in model.values():
if isinstance(v, Model):
url(v, url_)
elif isinstance(model, serial.model.Object):
for pn in read(model).properties.keys():
v = getattr(model, pn)
if isinstance(v, Model):
url(v, url_)
elif isinstance(model, serial.model.Array):
for v in model:
if isinstance(v, Model):
url(v, url_)
return model._url
def format_(model, serialization_format=_UNIDENTIFIED):
# type: (serial.abc.model.Model, Optional[str]) -> Optional[str]
if not isinstance(model, Model):
raise TypeError(
'`model` must be an instance of `%s`, not %s.' % (qualified_name(Model), repr(model))
)
if serialization_format is not _UNIDENTIFIED:
if not isinstance(serialization_format, str):
if isinstance(serialization_format, native_str):
serialization_format = str(serialization_format)
else:
raise TypeError(
'`serialization_format` must be a `str`, not %s.' % repr(serialization_format)
)
model._format = serialization_format
if isinstance(model, serial.model.Dictionary):
for v in model.values():
if isinstance(v, Model):
format_(v, serialization_format)
elif isinstance(model, serial.model.Object):
for pn in read(model).properties.keys():
v = getattr(model, pn)
if isinstance(v, Model):
format_(v, serialization_format)
elif isinstance(model, serial.model.Array):
for v in model:
if isinstance(v, Model):
format_(v, serialization_format)
return model._format
def version(data, specification, version_number):
# type: (serial.abc.model.Model, str, Union[str, int, Sequence[int]]) -> None
"""
Recursively alters model class or instance metadata based on version number metadata associated with an
object's properties. This allows one data model to represent multiple versions of a specification and dynamically
change based on the version of a specification represented.
Arguments:
- data (serial.abc.model.Model)
- specification (str):
The specification to which the `version_number` argument applies.
- version_number (str|int|[int]):
A version number represented as text (in the form of integers separated by periods), an integer, or a
sequence of integers.
"""
if not isinstance(data, serial.abc.model.Model):
raise TypeError(
'The data provided is not an instance of serial.abc.model.Model: ' + repr(data)
)
def version_match(property_):
# type: (serial.properties.Property) -> bool
if property_.versions is not None:
version_matched = False
specification_matched = False
for applicable_version in property_.versions:
if applicable_version.specification == specification:
specification_matched = True
if applicable_version == version_number:
version_matched = True
break
if specification_matched and (not version_matched):
return False
return True
def version_properties(properties_):
# type: (Sequence[serial.properties.Property]) -> Optional[Sequence[Meta]]
changed = False
nps = []
for property in properties_:
if isinstance(property, serial.properties.Property):
if version_match(property):
np = version_property(property)
if np is not property:
changed = True
nps.append(np)
else:
changed = True
else:
nps.append(property)
if changed:
return tuple(nps)
else:
return None
def version_property(property):
# type: (serial.properties.Property) -> Meta
changed = False
if isinstance(property, serial.properties.Array) and (property.item_types is not None):
item_types = version_properties(property.item_types)
if item_types is not None:
if not changed:
property = deepcopy(property)
property.item_types = item_types
changed = True
elif isinstance(property, serial.properties.Dictionary) and (property.value_types is not None):
value_types = version_properties(property.value_types)
if value_types is not None:
if not changed:
property = deepcopy(property)
property.value_types = value_types
changed = True
if property.types is not None:
types = version_properties(property.types)
if types is not None:
if not changed:
property = deepcopy(property)
property.types = types
return property
instance_meta = read(data)
class_meta = read(type(data))
if isinstance(data, serial.abc.model.Object):
for property_name in tuple(instance_meta.properties.keys()):
property = instance_meta.properties[property_name]
if version_match(property):
np = version_property(property)
if np is not property:
if instance_meta is class_meta:
instance_meta = writable(data)
instance_meta.properties[property_name] = np
else:
if instance_meta is class_meta:
instance_meta = writable(data)
del instance_meta.properties[property_name]
version_ = getattr(data, property_name)
if version_ is not None:
raise serial.errors.VersionError(
'%s - the property `%s` is not applicable in %s version %s:\n%s' % (
qualified_name(type(data)),
property_name,
specification,
version_number,
str(data)
)
)
value = getattr(data, property_name)
if isinstance(value, serial.abc.model.Model):
version(value, specification, version_number)
elif isinstance(data, serial.abc.model.Dictionary):
if instance_meta and instance_meta.value_types:
new_value_types = version_properties(instance_meta.value_types)
if new_value_types:
if instance_meta is class_meta:
instance_meta = writable(data)
instance_meta.value_types = new_value_types
for value in data.values():
if isinstance(value, serial.abc.model.Model):
version(value, specification, version_number)
elif isinstance(data, serial.abc.model.Array):
if instance_meta and instance_meta.item_types:
new_item_types = version_properties(instance_meta.item_types)
if new_item_types:
if instance_meta is class_meta:
instance_meta = writable(data)
instance_meta.item_types = new_item_types
for item in data:
if isinstance(item, serial.abc.model.Model):
version(item, specification, version_number)
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,941 @@
# Tell the linters what's up:
# pylint:disable=wrong-import-position,consider-using-enumerate,useless-object-inheritance
# mccabe:options:max-complexity=999
from __future__ import nested_scopes, generators, division, absolute_import, with_statement, \
print_function, unicode_literals
from .utilities.compatibility import backport
backport() # noqa
from future.utils import native_str
import numbers # noqa
from base64 import b64decode, b64encode # noqa
from copy import deepcopy # noqa
from datetime import date, datetime # noqa
try:
from typing import Union, Optional, Sequence, Mapping, Set, Sequence, Callable, Dict, Any, Hashable, Collection,\
Tuple
except ImportError:
Union = Optional = Sequence = Mapping = Set = Sequence = Callable = Dict = Any = Hashable = Collection = Tuple =\
Iterable = None
import iso8601 # noqa
from .utilities import collections, collections_abc, qualified_name, properties_values, parameters_defaults,\
calling_function_qualified_name
from serial import abc, errors, meta
import serial
NoneType = type(None)
NULL = None
class Null(object): # noqa - inheriting from object is intentional, as this is needed for python 2x compatibility
"""
Instances of this class represent an *explicit* null value, rather than the absence of a
property/attribute/element, as would be inferred from a value of `None`.
"""
def __init__(self):
if NULL is not None:
raise errors.DefinitionExistsError(
'%s may only be defined once.' % repr(self)
)
def __bool__(self):
# type: (...) -> bool
return False
def __eq__(self, other):
# type: (Any) -> bool
return id(other) == id(self)
def __hash__(self):
# type: (...) -> int
return 0
def __str__(self):
# type: (...) -> str
return 'null'
def _marshal(self):
# type: (...) -> None
return None
def __repr__(self):
# type: (...) -> str
return (
'NULL'
if self.__module__ in ('__main__', 'builtins', '__builtin__') else
'%s.NULL' % self.__module__
)
def __copy__(self):
# type: (...) -> Null
return self
def __deepcopy__(self, memo):
# type: (Dict[Hashable, Any]) -> Null
return self
NULL = Null()
def _validate_type_or_property(type_or_property):
# type: (Union[type, Property]) -> (Union[type, Property])
if not isinstance(type_or_property, (type, Property)):
raise TypeError(type_or_property)
if not (
(type_or_property is Null) or
(
isinstance(type_or_property, type) and
issubclass(
type_or_property,
(
abc.model.Model,
str,
native_str,
bytes,
numbers.Number,
date,
datetime,
Null,
collections_abc.Iterable,
dict,
collections.OrderedDict,
bool
)
)
) or
isinstance(type_or_property, Property)
):
raise TypeError(type_or_property)
return type_or_property
class Types(list):
"""
Instances of this class are lists which will only take values which are valid types for describing a property
definition.
"""
def __init__(
self,
property_, # type: Property
items=None # type: Optional[Union[Sequence[Union[type, Property], Types], type, Property]]
):
# (...) -> None
if not isinstance(property_, Property):
raise TypeError(
'The parameter `property` must be a `type`, or an instance of `%s`.' % qualified_name(Property)
)
self.property_ = property_
if isinstance(items, (type, Property)):
items = (items,)
if items is None:
super().__init__()
else:
super().__init__(items)
def __setitem__(self, index, value):
# type: (int, Union[type, Property]) -> None
super().__setitem__(index, _validate_type_or_property(value))
if value is str and (native_str is not str) and (native_str not in self):
super().append(native_str)
def append(self, value):
# type: (Union[type, Property]) -> None
super().append(_validate_type_or_property(value))
if value is str and (native_str is not str) and (native_str not in self):
super().append(native_str)
def __delitem__(self, index):
# type: (int) -> None
value = self[index]
super().__delitem__(index)
if (value is str) and (native_str in self):
self.remove(native_str)
def pop(self, index=-1):
# type: (int) -> Union[type, Property]
value = super().pop(index)
if (value is str) and (native_str in self):
self.remove(native_str)
return value
def __copy__(self):
# type: () -> Types
return self.__class__(self.property_, self)
def __deepcopy__(self, memo=None):
# type: (dict) -> Types
return self.__class__(
self.property_,
tuple(
deepcopy(v, memo=memo)
for v in self
)
)
def __repr__(self):
representation = [
qualified_name(type(self)) + '('
]
if self:
representation[0] += '['
for v in self:
rv = (
qualified_name(v) if isinstance(v, type) else
repr(v)
)
rvls = rv.split('\n')
if len(rvls) > 1:
rvs = [rvls[0]]
for rvl in rvls[1:]:
rvs.append(' ' + rvl)
rv = '\n'.join(rvs)
representation += [
' %s' % rv,
]
else:
representation.append(
' %s,' % rv
)
representation[-1] = representation[-1][:-1]
representation.append(
']'
)
representation[-1] += ')'
return '\n'.join(representation) if len(representation) > 2 else ''.join(representation)
class Property(object):
"""
This is the base class for defining a property.
Properties
- value_types ([type|Property]): One or more expected value_types or `Property` instances. Values are checked,
sequentially, against each type or `Property` instance, and the first appropriate match is used.
- required (bool|collections.Callable): If `True`--dumping the json_object will throw an error if this value
is `None`.
- versions ([str]|{str:Property}): The property should be one of the following:
- A set/tuple/list of version numbers to which this property applies.
- A mapping of version numbers to an instance of `Property` applicable to that version.
Version numbers prefixed by "<" indicate any version less than the one specified, so "<3.0" indicates that
this property is available in versions prior to 3.0. The inverse is true for version numbers prefixed by ">".
">=" and "<=" have similar meanings, but are inclusive.
Versioning can be applied to an json_object by calling `serial.meta.set_version` in the `__init__` method of
an `serial.model.Object` sub-class. For an example, see `oapi.model.OpenAPI.__init__`.
- name (str): The name of the property when loaded from or dumped into a JSON/YAML/XML json_object. Specifying a
`name` facilitates mapping of PEP8 compliant property to JSON or YAML attribute names, or XML element names,
which are either camelCased, are python keywords, or otherwise not appropriate for usage in python code.
"""
def __init__(
self,
types=None, # type: Sequence[Union[type, Property]]
name=None, # type: Optional[str]
required=False, # type: Union[bool, collections.Callable]
versions=None, # type: Optional[Sequence[Union[str, meta.Version]]]
):
self._types = None # type: Optional[Sequence[Union[type, Property]]]
self.types = types
self.name = name
self.required = required
self._versions = None # type: Optional[Union[Mapping[str, Optional[Property]], Set[Union[str, Number]]]]
self.versions = versions # type: Optional[Union[Mapping[str, Optional[Property]], Set[Union[str, Number]]]]
@property
def types(self):
return self._types
@types.setter
def types(self, types_or_properties):
# type: (Optional[Sequence[Union[type, Property, abc.model.Model]]]) -> None
if types_or_properties is not None:
if callable(types_or_properties):
if native_str is not str:
_types_or_properties = types_or_properties
def types_or_properties(d):
# type: (Sequence[Union[type, Property, abc.model.Model]]) -> Types
return Types(self, _types_or_properties(d))
else:
types_or_properties = Types(self, types_or_properties)
self._types = types_or_properties
@property
def versions(self):
# type: () -> Optional[Sequence[meta.Version]]
return self._versions
@versions.setter
def versions(
self,
versions # type: Optional[Sequence[Union[str, collections_abc.Iterable, meta.Version]]]
):
# type: (...) -> Optional[Union[Mapping[str, Optional[Property]], Set[Union[str, Number]]]]
if versions is not None:
if isinstance(versions, (str, Number, meta.Version)):
versions = (versions,)
if isinstance(versions, collections_abc.Iterable):
versions = tuple(
(v if isinstance(v, meta.Version) else meta.Version(v))
for v in versions
)
else:
repr_versions = repr(versions)
raise TypeError(
(
'`%s` requires a sequence of version strings or ' %
calling_function_qualified_name()
) + (
'`%s` instances, not' % qualified_name(meta.Version)
) + (
':\n' + repr_versions
if '\n' in repr_versions else
' `%s`.' % repr_versions
)
)
self._versions = versions
def unmarshal(self, data):
# type: (Any) -> Any
# return data if self.types is None else unmarshal(data, types=self.types)
if isinstance(
data,
collections_abc.Iterable
) and not isinstance(
data,
(str, bytes, bytearray, native_str)
) and not isinstance(
data,
abc.model.Model
):
if isinstance(data, (dict, collections.OrderedDict)):
for k, v in data.items():
if v is None:
data[k] = NULL
else:
data = tuple((NULL if i is None else i) for i in data)
return serial.marshal.unmarshal(data, types=self.types)
def marshal(self, data):
# type: (Any) -> Any
return serial.marshal.marshal(data, types=self.types) #, types=self.types, value_types=self.value_types)
def __repr__(self):
representation = [qualified_name(type(self)) + '(']
pd = parameters_defaults(self.__init__)
for p, v in properties_values(self):
if (p not in pd) or pd[p] == v:
continue
if (v is not None) and (v is not NULL):
if isinstance(v, collections_abc.Sequence) and not isinstance(v, (str, bytes)):
rvs = ['(']
for i in v:
ri = (
qualified_name(i)
if isinstance(i, type) else
"'%s'" % str(i)
if isinstance(i, meta.Version) else
repr(i)
)
rils = ri.split('\n')
if len(rils) > 1:
ris = [rils[0]]
for ril in rils[1:]:
ris.append(' ' + ril)
ri = '\n'.join(ris)
rvs.append(' %s,' % ri)
if len(v) > 1:
rvs[-1] = rvs[-1][:-1]
rvs.append(' )')
rv = '\n'.join(rvs)
else:
rv = (
qualified_name(v)
if isinstance(v, type) else
"'%s'" % str(v)
if isinstance(v, meta.Version) else
repr(v)
)
rvls = rv.split('\n')
if len(rvls) > 2:
rvs = [rvls[0]]
for rvl in rvls[1:]:
rvs.append(' ' + rvl)
rv = '\n'.join(rvs)
representation.append(
' %s=%s,' % (p, rv)
)
representation.append(')')
if len(representation) > 2:
return '\n'.join(representation)
else:
return ''.join(representation)
def __copy__(self):
new_instance = self.__class__()
for a in dir(self):
if a[0] != '_' and a != 'data':
v = getattr(self, a)
if not callable(v):
setattr(new_instance, a, v)
return new_instance
def __deepcopy__(self, memo):
# type: (dict) -> Property
new_instance = self.__class__()
for a, v in properties_values(self):
setattr(new_instance, a, deepcopy(v, memo))
return new_instance
abc.properties.Property.register(Property)
class String(Property):
"""
See `serial.properties.Property`
"""
def __init__(
self,
name=None, # type: Optional[str]
required=False, # type: Union[bool, collections.Callable]
versions=None, # type: Optional[Collection]
):
super().__init__(
types=(str,),
name=name,
required=required,
versions=versions,
)
class Date(Property):
"""
See `serial.properties.Property`
Additional Properties:
- marshal (collections.Callable): A function, taking one argument (a python `date` json_object), and returning
a date string in the desired format. The default is `date.isoformat`--returning an iso8601 compliant date
string.
- unmarshal (collections.Callable): A function, taking one argument (a date string), and returning a python
`date` json_object. By default, this is `iso8601.parse_date`.
"""
def __init__(
self,
name=None, # type: Optional[str]
required=False, # type: Union[bool, collections.Callable]
versions=None, # type: Optional[Collection]
date2str=date.isoformat, # type: Optional[Callable]
str2date=iso8601.parse_date # type: Callable
):
super().__init__(
types=(date,),
name=name,
required=required,
versions=versions,
)
self.date2str = date2str
self.str2date = str2date
def unmarshal(self, data):
# type: (Optional[str]) -> Union[date, NoneType]
if data is None:
return data
else:
if isinstance(data, date):
date_ = data
elif isinstance(data, str):
date_ = self.str2date(data)
else:
raise TypeError(
'%s is not a `str`.' % repr(data)
)
if isinstance(date_, date):
return date_
else:
raise TypeError(
'"%s" is not a properly formatted date string.' % data
)
def marshal(self, data):
# type: (Optional[date]) -> Optional[str]
if data is None:
return data
else:
ds = self.date2str(data)
if not isinstance(ds, str):
if isinstance(ds, native_str):
ds = str(ds)
else:
raise TypeError(
'The date2str function should return a `str`, not a `%s`: %s' % (
type(ds).__name__,
repr(ds)
)
)
return ds
class DateTime(Property):
"""
See `serial.properties.Property`
Additional Properties:
- marshal (collections.Callable): A function, taking one argument (a python `datetime` json_object), and
returning a date-time string in the desired format. The default is `datetime.isoformat`--returning an
iso8601 compliant date-time string.
- unmarshal (collections.Callable): A function, taking one argument (a datetime string), and returning a python
`datetime` json_object. By default, this is `iso8601.parse_date`.
"""
def __init__(
self,
name=None, # type: Optional[str]
required=False, # type: Union[bool, collections.Callable]
versions=None, # type: Optional[Collection]
datetime2str=datetime.isoformat, # type: Optional[Callable]
str2datetime=iso8601.parse_date # type: Callable
):
self.datetime2str = datetime2str
self.str2datetime = str2datetime
super().__init__(
types=(datetime,),
name=name,
required=required,
versions=versions,
)
def unmarshal(self, data):
# type: (Optional[str]) -> Union[datetime, NoneType]
if data is None:
return data
else:
if isinstance(data, datetime):
datetime_ = data
elif isinstance(data, str):
datetime_ = self.str2datetime(data)
else:
raise TypeError(
'%s is not a `str`.' % repr(data)
)
if isinstance(datetime_, datetime):
return datetime_
else:
raise TypeError(
'"%s" is not a properly formatted date-time string.' % data
)
def marshal(self, data):
# type: (Optional[datetime]) -> Optional[str]
if data is None:
return data
else:
datetime_string = self.datetime2str(data)
if not isinstance(datetime_string, str):
if isinstance(datetime_string, native_str):
datetime_string = str(datetime_string)
else:
repr_datetime_string = repr(datetime_string).strip()
raise TypeError(
'The datetime2str function should return a `str`, not:' + (
'\n'
if '\n' in repr_datetime_string else
' '
) + repr_datetime_string
)
return datetime_string
class Bytes(Property):
"""
See `serial.properties.Property`
"""
def __init__(
self,
name=None, # type: Optional[str]
required=False, # type: bool
versions=None, # type: Optional[Collection]
):
super().__init__(
types=(bytes, bytearray),
name=name,
required=required,
versions=versions,
)
def unmarshal(self, data):
# type: (str) -> Optional[bytes]
"""
Un-marshal a base-64 encoded string into bytes
"""
if data is None:
return data
elif isinstance(data, str):
return b64decode(data)
elif isinstance(data, bytes):
return data
else:
raise TypeError(
'`data` must be a base64 encoded `str` or `bytes`--not `%s`' % qualified_name(type(data))
)
def marshal(self, data):
# type: (bytes) -> str
"""
Marshal bytes into a base-64 encoded string
"""
if (data is None) or isinstance(data, str):
return data
elif isinstance(data, bytes):
return str(b64encode(data), 'ascii')
else:
raise TypeError(
'`data` must be a base64 encoded `str` or `bytes`--not `%s`' % qualified_name(type(data))
)
class Enumerated(Property):
"""
See `serial.properties.Property`...
+ Properties:
- values ([Any]): A list of possible values. If the parameter `types` is specified--typing is
applied prior to validation.
"""
def __init__(
self,
types=None, # type: Optional[Sequence[Union[type, Property]]]
values=None, # type: Optional[Union[Sequence, Set]]
name=None, # type: Optional[str]
required=False, # type: Union[bool, collections.Callable]
versions=None, # type: Optional[Collection]
):
self._values = None
super().__init__(
types=types,
name=name,
required=required,
versions=versions
)
self.values = values # type: Optional[Sequence]
@property
def values(self):
# type: () -> Optional[Union[Tuple, Callable]]
return self._values
@values.setter
def values(self, values):
# type: (Iterable) -> None
if not ((values is None) or callable(values)):
if (values is not None) and (not isinstance(values, (collections_abc.Sequence, collections_abc.Set))):
raise TypeError(
'`values` must be a finite set or sequence, not `%s`.' % qualified_name(type(values))
)
if values is not None:
values = [
serial.marshal.unmarshal(v, types=self.types)
for v in values
]
self._values = values
def unmarshal(self, data):
# type: (Any) -> Any
if self.types is not None:
data = serial.marshal.unmarshal(data, types=self.types)
if (
(data is not None) and
(self.values is not None) and
(data not in self.values)
):
raise ValueError(
'The value provided is not a valid option:\n%s\n\n' % repr(data) +
'Valid options include:\n%s' % (
', '.join(repr(t) for t in self.values)
)
)
return data
class Number(Property):
"""
See `serial.properties.Property`
"""
def __init__(
self,
name=None, # type: Optional[str]
required=False, # type: Union[bool, collections.Callable]
versions=None, # type: Optional[Collection]
):
# type: (...) -> None
super().__init__(
types=(numbers.Number,),
name=name,
required=required,
versions=versions,
)
class Integer(Property):
"""
See `serial.properties.Property`
"""
def __init__(
self,
name=None, # type: Optional[str]
required=False, # type: Union[bool, collections.Callable]
versions=None, # type: Optional[Collection]
):
super().__init__(
types=(int,),
name=name,
required=required,
versions=versions,
)
# def unmarshal(self, data):
# # type: (Any) -> Any
# if data is None:
# return data
# else:
# return int(data)
#
# def marshal(self, data):
# # type: (Any) -> Any
# if data is None:
# return data
# else:
# return int(data)
class Boolean(Property):
"""
See `serial.properties.Property`
"""
def __init__(
self,
name=None, # type: Optional[str]
required=False, # type: Union[bool, collections.Callable]
versions=None, # type: Optional[Collection]
):
# type: (...) -> None
super().__init__(
types=(bool,),
name=name,
required=required,
versions=versions,
)
class Array(Property):
"""
See `serial.properties.Property`...
+ Properties:
- item_types (type|Property|[type|Property]): The type(s) of values/objects contained in the array. Similar to
`serial.properties.Property().value_types`, but applied to items in the array, not the array itself.
"""
def __init__(
self,
item_types=None, # type: Optional[Union[type, Sequence[Union[type, Property]]]]
name=None, # type: Optional[str]
required=False, # type: Union[bool, collections.Callable]
versions=None, # type: Optional[Collection]
):
self._item_types = None
self.item_types = item_types
super().__init__(
types=(serial.model.Array,),
name=name,
required=required,
versions=versions,
)
def unmarshal(self, data):
# type: (Any) -> Any
return serial.marshal.unmarshal(data, types=self.types, item_types=self.item_types)
def marshal(self, data):
# type: (Any) -> Any
return serial.marshal.marshal(data, types=self.types, item_types=self.item_types)
@property
def item_types(self):
return self._item_types
@item_types.setter
def item_types(self, item_types):
# type: (Optional[Sequence[Union[type, Property, abc.model.Object]]]) -> None
if item_types is not None:
if callable(item_types):
if native_str is not str:
_item_types = item_types
def item_types(d):
# type: (Sequence[Union[type, Property, abc.model.Object]]) -> Types
return Types(self, _item_types(d))
else:
item_types = Types(self, item_types)
self._item_types = item_types
class Dictionary(Property):
"""
See `serial.properties.Property`...
+ Properties:
- value_types (type|Property|[type|Property]): The type(s) of values/objects comprising the mapped
values. Similar to `serial.properties.Property.types`, but applies to *values* in the dictionary
object, not the dictionary itself.
"""
def __init__(
self,
value_types=None, # type: Optional[Union[type, Sequence[Union[type, Property]]]]
name=None, # type: Optional[str]
required=False, # type: Union[bool, collections.Callable]
versions=None, # type: Optional[Collection]
):
self._value_types = None
self.value_types = value_types
super().__init__(
types=(serial.model.Dictionary,),
name=name,
required=required,
versions=versions,
)
def unmarshal(self, data):
# type: (Any) -> Any
return serial.marshal.unmarshal(data, types=self.types, value_types=self.value_types)
@property
def value_types(self):
return self._value_types
@value_types.setter
def value_types(self, value_types_):
# type: (Optional[Sequence[Union[type, Property, abc.model.Object]]]) -> None
"""
The `types` can be either:
- A sequence of types and/or `serial.properties.Property` instances.
- A function which accepts exactly one argument (a dictionary), and which returns a sequence of types and/or
`serial.properties.Property` instances.
If more than one type or property definition is provided, un-marshalling is attempted using each `value_type`,
in sequential order. If a value could be cast into more than one of the `types` without throwing a
`ValueError`, `TypeError`, or `serial.errors.ValidationError`, the value type occuring *first* in the sequence
will be used.
"""
if value_types_ is not None:
if callable(value_types_):
if native_str is not str:
original_value_types_ = value_types_
def value_types_(data):
# type: (Sequence[Union[type, Property, abc.model.Object]]) -> Types
return Types(self, original_value_types_(data))
else:
value_types_ = Types(self, value_types_)
self._value_types = value_types_
@@ -0,0 +1,451 @@
"""
This module extends the functionality of `urllib.request.Request` to support multipart requests, to support passing
instances of serial models to the `data` parameter/property for `urllib.request.Request`, and to
support casting requests as `str` or `bytes` (typically for debugging purposes and/or to aid in producing
non-language-specific API documentation).
"""
# region Backwards Compatibility
from __future__ import nested_scopes, generators, division, absolute_import, with_statement, \
print_function, unicode_literals
from .utilities.compatibility import backport
backport() # noqa
from future.utils import native_str
import random
import re
import string
import urllib.request
try:
from typing import Dict, Sequence, Set, Iterable
except ImportError:
Dict = Sequence = Set = None
from serial.marshal import serialize
from .abc.model import Model
from .utilities import collections_abc
class Headers(object):
"""
A dictionary of headers for a `Request`, `Part`, or `MultipartRequest` instance.
"""
def __init__(self, items, request):
# type: (Dict[str, str], Union[Part, Request]) -> None
self._dict = {}
self.request = request # type: Data
self.update(items)
def pop(self, key, default=None):
# type: (str, Optional[str]) -> str
key = key.capitalize()
if hasattr(self.request, '_boundary'):
self.request._boundary = None
if hasattr(self.request, '_bytes'):
self.request._bytes = None
return self._dict.pop(key, default=default)
def popitem(self):
# type: (str, Optional[str]) -> str
if hasattr(self.request, '_boundary'):
self.request._boundary = None
if hasattr(self.request, '_bytes'):
self.request._bytes = None
return self._dict.popitem()
def setdefault(self, key, default=None):
# type: (str, Optional[str]) -> str
key = key.capitalize()
if hasattr(self.request, '_boundary'):
self.request._boundary = None
if hasattr(self.request, '_bytes'):
self.request._bytes = None
return self._dict.setdefault(key, default=default)
def update(self, iterable=None, **kwargs):
# type: (Union[Dict[str, str], Sequence[Tuple[str, str]]], Union[Dict[str, str]]) -> None
cd = {}
if iterable is None:
d = kwargs
else:
d = dict(iterable, **kwargs)
for k, v in d.items():
cd[k.capitalize()] = v
if hasattr(self.request, '_boundary'):
self.request._boundary = None
if hasattr(self.request, '_bytes'):
self.request._bytes = None
return self._dict.update(cd)
def __delitem__(self, key):
# type: (str) -> None
key = key.capitalize()
if hasattr(self.request, '_boundary'):
self.request._boundary = None
if hasattr(self.request, '_bytes'):
self.request._bytes = None
del self._dict[key]
def __setitem__(self, key, value):
# type: (str, str) -> None
key = key.capitalize()
if key != 'Content-length':
if hasattr(self.request, '_boundary'):
self.request._boundary = None
if hasattr(self.request, '_bytes'):
self.request._bytes = None
return self._dict.__setitem__(key, value)
def __getitem__(self, key):
# type: (str) -> None
key = key.capitalize()
if key == 'Content-length':
data = self.request.data
if data is None:
content_length = 0
else:
content_length = len(data)
value = str(content_length)
else:
try:
value = self._dict.__getitem__(key)
except KeyError as e:
if key == 'Content-type':
if hasattr(self.request, 'parts') and self.request.parts:
value = 'multipart/form-data'
if (
(value is not None) and
value.strip().lower()[:9] == 'multipart' and
hasattr(self.request, 'boundary')
):
value += '; boundary=' + str(self.request.boundary, encoding='utf-8')
return value
def keys(self):
# type: (...) -> Iterable[str]
return (k for k in self)
def values(self):
return (self[k] for k in self)
def __len__(self):
return len(tuple(self))
def __iter__(self):
# type: (...) -> Iterable[str]
keys = set()
for k in self._dict.keys():
keys.add(k)
yield k
if type(self.request) is not Part:
# *Always* include "Content-length"
if 'Content-length' not in keys:
yield 'Content-length'
if (
hasattr(self.request, 'parts') and
self.request.parts and
('Content-type' not in keys)
):
yield 'Content-type'
def __contains__(self, key):
# type: (str) -> bool
return True if key in self.keys() else False
def items(self):
# type: (...) -> Iterable[Tuple[str, str]]
for k in self:
yield k, self[k]
def copy(self):
# type: (...) -> Headers
return self.__class__(
self._dict,
request=self.request
)
def __copy__(self):
# type: (...) -> Headers
return self.copy()
class Data(object):
"""
One of a multipart request's parts.
"""
def __init__(
self,
data=None, # type: Optional[Union[bytes, str, Sequence, Set, dict, Model]]
headers=None # type: Optional[Dict[str, str]]
):
"""
Parameters:
- data (bytes|str|collections.Sequence|collections.Set|dict|serial.abc.Model): The payload.
- headers ({str: str}): A dictionary of headers (for this part of the request body, not the main request).
This should (almost) always include values for "Content-Disposition" and "Content-Type".
"""
self._bytes = None # type: Optional[bytes]
self._headers = None
self._data = None
self.headers = headers # type: Dict[str, str]
self.data = data # type: Optional[bytes]
@property
def headers(self):
return self._headers
@headers.setter
def headers(self, headers):
self._bytes = None
if headers is None:
headers = Headers({}, self)
elif isinstance(headers, Headers):
headers.request = self
else:
headers = Headers(headers, self)
self._headers = headers
@property
def data(self):
return self._data
@data.setter
def data(self, data):
# type: (Optional[Union[bytes, str, Sequence, Set, dict, Model]]) -> None
self._bytes = None
if data is not None:
serialize_type = None
if 'Content-type' in self.headers:
ct = self.headers['Content-type']
if re.search(r'/json\b', ct) is not None:
serialize_type = 'json'
if re.search(r'/xml\b', ct) is not None:
serialize_type = 'xml'
if re.search(r'/yaml\b', ct) is not None:
serialize_type = 'yaml'
if isinstance(data, (Model, dict)) or (
isinstance(data, (collections_abc.Sequence, collections_abc.Set)) and not
isinstance(data, (str, bytes))
):
data = serialize(data, serialize_type or 'json')
if isinstance(data, str):
data = bytes(data, encoding='utf-8')
self._data = data
def __bytes__(self):
if self._bytes is None:
lines = []
for k, v in self.headers.items():
lines.append(bytes(
'%s: %s' % (k, v),
encoding='utf-8'
))
lines.append(b'')
data = self.data
if data:
lines.append(self.data)
self._bytes = b'\r\n'.join(lines) + b'\r\n'
return self._bytes
def __str__(self):
b = self.__bytes__()
if not isinstance(b, native_str):
b = repr(b)[2:-1].replace('\\r\\n', '\r\n').replace('\\n', '\n')
return b
class Part(Data):
def __init__(
self,
data=None, # type: Optional[Union[bytes, str, Sequence, Set, dict, Model]]
headers=None, # type: Optional[Dict[str, str]]
parts=None # type: Optional[Sequence[Part]]
):
"""
Parameters:
- data (bytes|str|collections.Sequence|collections.Set|dict|serial.abc.Model): The payload.
- headers ({str: str}): A dictionary of headers (for this part of the request body, not the main request).
This should (almost) always include values for "Content-Disposition" and "Content-Type".
"""
self._boundary = None # type: Optional[bytes]
self._parts = None # type: Optional[Parts]
self.parts = parts
Data.__init__(self, data=data, headers=headers)
@property
def boundary(self):
"""
Calculates a boundary which is not contained in any of the request parts.
"""
if self._boundary is None:
data = b'\r\n'.join(
[self._data or b''] +
[bytes(p) for p in self.parts]
)
boundary = b''.join(
bytes(
random.choice(string.digits + string.ascii_letters),
encoding='utf-8'
)
for i in range(16)
)
while boundary in data:
boundary += bytes(
random.choice(string.digits + string.ascii_letters),
encoding='utf-8'
)
self._boundary = boundary
return self._boundary
@property
def data(self):
# type: (bytes) -> None
if self.parts:
data = (b'\r\n--' + self.boundary + b'\r\n').join(
[self._data or b''] +
[bytes(p).rstrip() for p in self.parts]
) + (b'\r\n--' + self.boundary + b'--')
else:
data = self._data
return data
@data.setter
def data(self, data):
return Data.data.__set__(self, data)
@property
def parts(self):
# type: (...) -> Parts
return self._parts
@parts.setter
def parts(self, parts):
# type: (Optional[Sequence[Part]]) -> None
if parts is None:
parts = Parts([], request=self)
elif isinstance(parts, Parts):
parts.request = self
else:
parts = Parts(parts, request=self)
self._boundary = None
self._parts = parts
class Parts(list):
def __init__(self, items, request):
# type: (typing.Sequence[Part], MultipartRequest) -> None
self.request = request
super().__init__(items)
def append(self, item):
# type: (Part) -> None
self.request._boundary = None
self.request._bytes = None
super().append(item)
def clear(self):
# type: (...) -> None
self.request._boundary = None
self.request._bytes = None
super().clear()
def extend(self, items):
# type: (Iterable[Part]) -> None
self.request._boundary = None
self.request._bytes = None
super().extend(items)
def reverse(self):
# type: (...) -> None
self.request._boundary = None
self.request._bytes = None
super().reverse()
def __delitem__(self, key):
# type: (str) -> None
self.request._boundary = None
self.request._bytes = None
super().__delitem__(key)
def __setitem__(self, key, value):
# type: (str) -> None
self.request._boundary = None
self.request._bytes = None
super().__setitem__(key, value)
class Request(Data, urllib.request.Request):
"""
A sub-class of `urllib.request.Request` which accommodates additional data types, and serializes `data` in
accordance with what is indicated by the request's "Content-Type" header.
"""
def __init__(
self,
url,
data=None, # type: Optional[Union[bytes, str, Sequence, Set, dict, Model]]
headers=None, # type: Optional[Dict[str, str]]
origin_req_host=None, # type: Optional[str]
unverifiable=False, # type: bool
method=None # type: Optional[str]
):
# type: (...) -> None
self._bytes = None # type: Optional[bytes]
self._headers = None
self._data = None
self.headers = headers
urllib.request.Request.__init__(
self,
url,
data=data,
headers=headers,
origin_req_host=origin_req_host,
unverifiable=unverifiable,
method=method
)
class MultipartRequest(Part, Request):
"""
A sub-class of `Request` which adds a property (and initialization parameter) to hold the `parts` of a
multipart request.
https://www.w3.org/Protocols/rfc1341/7_2_Multipart.html
"""
def __init__(
self,
url,
data=None, # type: Optional[Union[bytes, str, Sequence, Set, dict, Model]]
headers=None, # type: Optional[Dict[str, str]]
origin_req_host=None, # type: Optional[str]
unverifiable=False, # type: bool
method=None, # type: Optional[str]
parts=None # type: Optional[Sequence[Part]]
):
# type: (...) -> None
Part.__init__(
self,
data=data,
headers=headers,
parts=parts
)
Request.__init__(
self,
url,
data=data,
headers=headers,
origin_req_host=origin_req_host,
unverifiable=unverifiable,
method=method
)
@@ -0,0 +1,304 @@
from __future__ import nested_scopes, generators, division, absolute_import, with_statement, \
print_function, unicode_literals
from .utilities.compatibility import backport
backport() # noqa
from future.utils import native_str
from serial.utilities import qualified_name, calling_function_qualified_name
from serial.abc.model import Model
from warnings import warn
import collections
import json as _json
from itertools import chain
import yaml as _yaml
import serial
try:
import typing
except ImportError as e:
typing = None
def _object_discrepancies(a, b):
# type: (serial.model.Object, serial.model.Object) -> dict
discrepancies = {}
a_properties = set(serial.meta.read(a).properties.keys())
b_properties = set(serial.meta.read(b).properties.keys())
for property_ in a_properties | b_properties:
try:
a_value = getattr(a, property_)
except AttributeError:
a_value = None
try:
b_value = getattr(b, property_)
except AttributeError:
b_value = None
if a_value != b_value:
discrepancies[property_] = (a_value, b_value)
return discrepancies
def json(
model_instance, # type: serial.model.Model
raise_validation_errors=True, # type: bool
):
# type: (...) -> None
model(
model_instance=model_instance,
format_='json',
raise_validation_errors=raise_validation_errors
)
def yaml(
model_instance, # type: serial.model.Model
raise_validation_errors=True, # type: bool
):
# type: (...) -> None
model(
model_instance=model_instance,
format_='yaml',
raise_validation_errors=raise_validation_errors
)
def xml(
model_instance, # type: serial.model.Model
raise_validation_errors=True, # type: bool
):
# type: (...) -> None
raise NotImplementedError()
# model(
# model_instance=model_instance,
# format_='xml',
# raise_validation_errors=raise_validation_errors
# )
def model(
model_instance, # type: serial.model.Model
format_, # type: str
raise_validation_errors=True, # type: bool
):
# type: (...) -> None
"""
Tests an instance of a `serial.model.Model` sub-class.
Parameters:
- model_instance (serial.model.Model):
An instance of a `serial.model.Model` sub-class.
- format_ (str):
The serialization format being tested: 'json', 'yaml' or 'xml'.
- raise_validation_errors (bool):
The function `serial.model.validate` verifies that all required attributes are present, as well as any
additional validations implemented using the model's validation hooks `after_validate` and/or
`before_validate`.
- If `True`, errors resulting from `serial.model.validate` are raised.
- If `False`, errors resulting from `serial.model.validate` are expressed only as warnings.
"""
if not isinstance(model_instance, Model):
value_representation = repr(model_instance)
raise TypeError(
'`%s` requires an instance of `%s` for the parameter `model_instance`, not%s' % (
calling_function_qualified_name(),
qualified_name(Model),
(
(':\n%s' if '\n' in value_representation else ' `%s`') %
value_representation
)
)
)
serial.meta.format_(model_instance, format_)
if isinstance(model_instance, serial.model.Object):
errors = serial.marshal.validate(model_instance, raise_errors=raise_validation_errors)
if errors:
warn('\n' + '\n'.join(errors))
model_type = type(model_instance)
string = str(model_instance)
assert string != ''
reloaded_model_instance = model_type(string)
qualified_model_name = qualified_name(type(model_instance))
try:
assert model_instance == reloaded_model_instance
except AssertionError as e:
sa = serial.marshal.serialize(model_instance)
sb = serial.marshal.serialize(reloaded_model_instance)
ra = ''.join(l.strip() for l in repr(model_instance).split('\n'))
rb = ''.join(l.strip() for l in repr(reloaded_model_instance).split('\n'))
message = [
'Discrepancies were found between the instance of `%s` provided and ' % qualified_model_name +
'a serialized/deserialized clone:'
]
if sa != sb:
message.append(
'\n :\n\n %s\n !=\n %s' % (
sa,
sb
)
)
if ra != rb:
message.append(
'\n %s\n !=\n %s\n' % (
ra,
rb
)
)
for k, a_b in _object_discrepancies(model_instance, reloaded_model_instance).items():
a, b = a_b
assert a != b
sa = serial.marshal.serialize(a)
sb = serial.marshal.serialize(b)
if sa != sb:
message.append(
'\n %s().%s:\n\n %s\n %s\n %s' % (
qualified_model_name,
k,
sa,
'==' if sa == sb else '!=',
sb
)
)
ra = ''.join(l.strip() for l in repr(a).split('\n'))
rb = ''.join(l.strip() for l in repr(b).split('\n'))
if ra != rb:
message.append(
'\n %s\n %s\n %s' % (
ra,
'==' if ra == rb else '!=',
rb
)
)
e.args = tuple(
chain(
(e.args[0] + '\n' + '\n'.join(message) if e.args else '\n'.join(message),),
e.args[1:] if e.args else tuple()
)
)
raise e
reloaded_string = str(reloaded_model_instance)
try:
assert string == reloaded_string
except AssertionError as e:
m = '\n%s\n!=\n%s' % (string, reloaded_string)
if e.args:
e.args = tuple(chain(
(e.args[0] + '\n' + m,),
e.args[1:]
))
else:
e.args = (m,)
raise e
if format_ == 'json':
reloaded_marshalled_data = _json.loads(
string,
object_hook=collections.OrderedDict,
object_pairs_hook=collections.OrderedDict
)
elif format_ == 'yaml':
reloaded_marshalled_data = _yaml.load(string)
elif format_ == 'xml':
raise NotImplementedError()
else:
format_representation = repr(format_)
raise ValueError(
'Valid serialization types for parameter `format_` are "json", "yaml", or "xml'", not" + (
(
':\n%s' if '\n' in format_representation else ' %s.'
) % format_representation
)
)
keys = set()
for property_name, property in serial.meta.read(model_instance).properties.items():
keys.add(property.name or property_name)
property_value = getattr(model_instance, property_name)
if isinstance(property_value, Model) or (
hasattr(property_value, '__iter__') and
(not isinstance(property_value, (str, native_str, bytes)))
):
model(property_value, format_=format_, raise_validation_errors=raise_validation_errors)
for k in reloaded_marshalled_data.keys():
if k not in keys:
raise KeyError(
'"%s" not found in serialized/re-deserialized data: %s' % (
k,
string
)
)
elif isinstance(model_instance, serial.model.Array):
serial.marshal.validate(model_instance)
for item in model_instance:
if isinstance(item, Model) or (
hasattr(item, '__iter__') and
(not isinstance(item, (str, native_str, bytes)))
):
model(item, format_=format_, raise_validation_errors=raise_validation_errors)
elif isinstance(model_instance, serial.model.Dictionary):
serial.marshal.validate(model_instance)
for key, value in model_instance.items():
if isinstance(value, Model) or (
hasattr(value, '__iter__') and
(not isinstance(value, (str, native_str, bytes)))
):
model(value, format_=format_, raise_validation_errors=raise_validation_errors)
@@ -0,0 +1,494 @@
from __future__ import nested_scopes, generators, division, absolute_import, with_statement, \
print_function, unicode_literals
from . import compatibility
compatibility.backport() # noqa
import builtins
import os # noqa
import sys # noqa
from io import UnsupportedOperation # noqa
from collections import OrderedDict # noqa
from unicodedata import normalize # noqa
import re # noqa
import inspect # noqa
from keyword import iskeyword # noqa
# region Compatibility Conditionals
# The following detects the presence of the typing library
try:
from typing import Union, Optional, Iterable, Tuple, Any, Callable, AnyStr # noqa
except ImportError:
Union = Optional = Iterable = Tuple = Any = Callable = AnyStr = None
# Before `collections.abc` existed, the definitions we use from this module were in `collections`
try:
import collections.abc as collections_abc
import collections
except ImportError:
import collections
collections_abc = collections
# Earlier versions of the `collections` library do not include the `Generator` class, so when this class is missing--
# we employ a workaround.
if hasattr(collections_abc, 'Generator'):
Generator = collections_abc.Generator
else:
Generator = type(n for n in (1, 2, 3))
# endregion
try:
from inspect import signature
getargspec = None
except ImportError:
signature = None
try:
from inspect import getfullargspec
except ImportError:
from inspect import getargspec as getfullargspec
_Module = type(re)
def qualified_name(type_):
# type: (Union[type, _Module]) -> str
"""
>>> print(qualified_name(qualified_name))
qualified_name
>>> from serial import model
>>> print(qualified_name(model.marshal))
serial.model.marshal
"""
if hasattr(type_, '__qualname__'):
type_name = '.'.join(name_part for name_part in type_.__qualname__.split('.') if name_part[0] != '<')
else:
type_name = type_.__name__
if isinstance(type_, _Module):
if type_name in (
'builtins', '__builtin__', '__main__', '__init__'
):
type_name = None
else:
if type_.__module__ not in (
'builtins', '__builtin__', '__main__', '__init__'
):
type_name = type_.__module__ + '.' + type_name
return type_name
def calling_function_qualified_name(depth=1):
# type: (int) -> Optional[str]
"""
>>> def my_function(): return calling_function_qualified_name()
>>> print(my_function())
my_function
"""
if not isinstance(depth, int):
depth_representation = repr(depth)
raise TypeError(
'The parameter `depth` for `serial.utilities.calling_function_qualified_name` must be an `int`, not' +
(
(':\n%s' if '\n' in depth_representation else ' %s.') %
depth_representation
)
)
stack = inspect.stack()
if len(stack) < (depth + 1):
return None
else:
name_list = []
stack = inspect.stack()
frame_info = stack[depth] # type: inspect.FrameInfo
try:
frame_function = frame_info.function
except AttributeError:
frame_function = frame_info[3]
if frame_function != '<module>':
try:
frame = frame_info.frame
except AttributeError:
frame = frame_info[0]
name_list.append(frame_function)
arguments, _, _, frame_locals = inspect.getargvalues(frame)
if arguments:
argument = arguments[0]
argument_value = frame_locals[argument]
argument_value_type = type(argument_value)
if (
hasattr(argument_value_type, '__name__') and
hasattr(argument_value_type, '__module__') and
(
(argument_value_type.__name__ not in dir(builtins)) or
(getattr(builtins, argument_value_type.__name__) is not argument_value_type)
)
):
name_list.append(qualified_name(argument_value_type))
if len(name_list) < 2:
try:
file_name = frame_info.filename
except AttributeError:
file_name = frame_info[1]
module_name = inspect.getmodulename(file_name)
if module_name not in sys.modules:
path_parts = list(os.path.split(file_name))
path_parts.pop()
while path_parts:
parent = path_parts.pop()
module_name = parent + '.' + module_name
if module_name in sys.modules:
break
if module_name is None:
raise ValueError('The path "%s" is not a python module' % file_name)
else:
if module_name in sys.modules:
qualified_module_name = qualified_name(sys.modules[module_name])
name_list.append(qualified_module_name)
return '.'.join(reversed(name_list))
def property_name(string):
# type: (str) -> str
"""
Converts a "camelCased" attribute/property name, or a name which conflicts with a python keyword, to a
pep8-compliant property name.
>>> print(property_name('theBirdsAndTheBees'))
the_birds_and_the_bees
>>> print(property_name('FYIThisIsAnAcronym'))
fyi_this_is_an_acronym
>>> print(property_name('in'))
in_
>>> print(property_name('id'))
id_
"""
pn = re.sub(
r'__+',
'_',
re.sub(
r'[^\w]+',
'',
re.sub(
r'([a-zA-Z])([0-9])',
r'\1_\2',
re.sub(
r'([0-9])([a-zA-Z])',
r'\1_\2',
re.sub(
r'([A-Z])([A-Z])([a-z])',
r'\1_\2\3',
re.sub(
r'([a-z])([A-Z])',
r'\1_\2',
re.sub(
r'([^\x20-\x7F]|\s)+',
'_',
normalize('NFKD', string)
)
)
)
)
)
)
).lower()
if iskeyword(pn) or (pn in dir(builtins)):
pn += '_'
return pn
def class_name(string):
"""
>>> print(class_name('the birds and the bees'))
TheBirdsAndTheBees
>>> print(class_name('the-birds-and-the-bees'))
TheBirdsAndTheBees
>>> print(class_name('**the - birds - and - the - bees**'))
TheBirdsAndTheBees
>>> print(class_name('FYI is an acronym'))
FYIIsAnAcronym
>>> print(class_name('in-you-go'))
InYouGo
>>> print(class_name('False'))
False_
>>> print(class_name('True'))
True_
>>> print(class_name('ABC Acronym'))
ABCAcronym
"""
return camel(string, capitalize=True)
def camel(string, capitalize=False):
# type: (str, bool) -> str
"""
>>> print(camel('the birds and the bees'))
theBirdsAndTheBees
>>> print(camel('the-birds-and-the-bees'))
theBirdsAndTheBees
>>> print(camel('**the - birds - and - the - bees**'))
theBirdsAndTheBees
>>> print(camel('FYI is an acronym'))
fyiIsAnAcronym
>>> print(camel('in-you-go'))
inYouGo
>>> print(camel('False'))
false
>>> print(camel('True'))
true
>>> print(camel('in'))
in_
"""
string = normalize('NFKD', string)
characters = []
if not capitalize:
string = string.lower()
capitalize_next = capitalize
for s in string:
if s in 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789':
if capitalize_next:
if capitalize or characters:
s = s.upper()
characters.append(s)
capitalize_next = False
else:
capitalize_next = True
cn = ''.join(characters)
if iskeyword(cn) or (cn in dir(builtins)):
cn += '_'
return cn
def get_source(o):
# type: (object) -> str
if hasattr(o, '_source') and isinstance(o._source, str):
result = o._source
else:
result = inspect.getsource(o)
return result
def camel_split(string):
# test: (str) -> str
"""
>>> print('(%s)' % ', '.join("'%s'" % s for s in camel_split('theBirdsAndTheBees')))
('the', 'Birds', 'And', 'The', 'Bees')
>>> print('(%s)' % ', '.join("'%s'" % s for s in camel_split('theBirdsAndTheBees123')))
('the', 'Birds', 'And', 'The', 'Bees', '123')
>>> print('(%s)' % ', '.join("'%s'" % s for s in camel_split('theBirdsAndTheBeesABC123')))
('the', 'Birds', 'And', 'The', 'Bees', 'ABC', '123')
>>> print('(%s)' % ', '.join("'%s'" % s for s in camel_split('the-Birds-And-The-Bees-ABC--123')))
('the', '-', 'Birds', '-', 'And', '-', 'The', '-', 'Bees', '-', 'ABC', '--', '123')
>>> print('(%s)' % ', '.join("'%s'" % s for s in camel_split('THEBirdsAndTheBees')))
('THE', 'Birds', 'And', 'The', 'Bees')
"""
words = []
character_type = None
acronym = False
for s in string:
if s in '0123456789':
if character_type == 0:
words[-1].append(s)
else:
words.append([s])
character_type = 0
acronym = False
elif s in 'abcdefghijklmnopqrstuvwxyz':
if character_type == 1:
words[-1].append(s)
elif character_type == 2:
if acronym:
words.append([words[-1].pop()] + [s])
else:
words[-1].append(s)
else:
words.append([s])
character_type = 1
acronym = False
elif s in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ':
if character_type == 2:
words[-1].append(s)
acronym = True
else:
words.append([s])
acronym = False
character_type = 2
else:
if character_type == 3:
words[-1].append(s)
else:
words.append([s])
character_type = 3
return tuple(
''.join(w) for w in words
)
def properties_values(o):
# type: (object) -> Sequence[Tuple[AnyStr, Any]]
for a in dir(o):
if a[0] != '_':
v = getattr(o, a)
if not callable(v):
yield a, v
UNDEFINED = None
class Undefined(object):
def __init__(self):
if UNDEFINED is not None:
raise RuntimeError(
'%s may only be defined once.' % repr(self)
)
def __repr__(self):
return (
'UNDEFINED'
if self.__module__ in ('__main__', 'builtins', '__builtin__', __name__) else
'%s.UNDEFINED' % self.__module__
)
def __bool__(self):
return False
def __hash__(self):
return 0
def __eq__(self, other):
# type: (Any) -> bool
return other is self
UNDEFINED = Undefined()
def parameters_defaults(function):
# type: (Callable) -> OrderedDict
"""
Returns an ordered dictionary mapping a function's argument names to default values, or `UNDEFINED` in the case of
positional arguments.
>>> class X(object):
...
... def __init__(self, a, b, c, d=1, e=2, f=3):
... pass
...
>>> print(list(parameters_defaults(X.__init__).items()))
[('self', UNDEFINED), ('a', UNDEFINED), ('b', UNDEFINED), ('c', UNDEFINED), ('d', 1), ('e', 2), ('f', 3)]
"""
pd = OrderedDict()
if signature is None:
spec = getfullargspec(function)
i = - 1
for a in spec.args:
pd[a] = UNDEFINED
for a in reversed(spec.args):
try:
pd[a] = spec.defaults[i]
except IndexError:
break
i -= 1
else:
for pn, p in signature(function).parameters.items():
if p.default is inspect.Parameter.empty:
pd[pn] = UNDEFINED
else:
pd[pn] = p.default
return pd
def read(data):
# type: (Union[str, IOBase, addbase]) -> Any
if (
(hasattr(data, 'readall') and callable(data.readall)) or
(hasattr(data, 'read') and callable(data.read))
):
if hasattr(data, 'seek') and callable(data.seek):
try:
data.seek(0)
except UnsupportedOperation:
pass
if hasattr(data, 'readall') and callable(data.readall):
try:
data = data.readall()
except UnsupportedOperation:
data = data.read()
else:
data = data.read()
return data
else:
raise TypeError(
'%s is not a file-like object' % repr(data)
)
if __name__ == '__main__':
import doctest
doctest.testmod()
@@ -0,0 +1,29 @@
from __future__ import nested_scopes, generators, division, absolute_import, with_statement, \
print_function, unicode_literals
import inspect
BACKWARDS_COMPATIBILITY_IMPORTS = '\n'.join(
(
'# region Backwards Compatibility',
'from __future__ import nested_scopes, generators, division, absolute_import, with_statement, \\',
' print_function, unicode_literals',
'from future import standard_library',
'standard_library.install_aliases()',
'from future.builtins import *',
'# endregion'
)
)
def backport():
# type: (...) -> None
frame_info = inspect.stack()[1] # type: inspect.FrameInfo
try:
frame = frame_info.frame
except AttributeError:
frame = frame_info[0]
exec(BACKWARDS_COMPATIBILITY_IMPORTS, frame.f_globals, frame.f_locals)