# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import base64
from json import JSONEncoder
from typing import Dict, List, Optional, Union, cast, Any
from datetime import datetime, date, time, timedelta
from datetime import timezone
__all__ = ["NULL", "AzureJSONEncoder", "is_generated_model", "as_attribute_dict", "attribute_list"]
TZ_UTC = timezone.utc
class _Null:
"""To create a Falsy object"""
def __bool__(self) -> bool:
return False
NULL = _Null()
"""
A falsy sentinel object which is supposed to be used to specify attributes
with no data. This gets serialized to `null` on the wire.
"""
def _timedelta_as_isostr(td: timedelta) -> str:
"""Converts a datetime.timedelta object into an ISO 8601 formatted string, e.g. 'P4DT12H30M05S'
Function adapted from the Tin Can Python project: https://github.com/RusticiSoftware/TinCanPython
:param td: The timedelta object to convert
:type td: datetime.timedelta
:return: An ISO 8601 formatted string representing the timedelta object
:rtype: str
"""
# Split seconds to larger units
seconds = td.total_seconds()
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
days, hours = divmod(hours, 24)
days, hours, minutes = list(map(int, (days, hours, minutes)))
seconds = round(seconds, 6)
# Build date
date_str = ""
if days:
date_str = "%sD" % days
# Build time
time_str = "T"
# Hours
bigger_exists = date_str or hours
if bigger_exists:
time_str += "{:02}H".format(hours)
# Minutes
bigger_exists = bigger_exists or minutes
if bigger_exists:
time_str += "{:02}M".format(minutes)
# Seconds
try:
if seconds.is_integer():
seconds_string = "{:02}".format(int(seconds))
else:
# 9 chars long w/ leading 0, 6 digits after decimal
seconds_string = "%09.6f" % seconds
# Remove trailing zeros
seconds_string = seconds_string.rstrip("0")
except AttributeError: # int.is_integer() raises
seconds_string = "{:02}".format(seconds)
time_str += "{}S".format(seconds_string)
return "P" + date_str + time_str
def _datetime_as_isostr(dt: Union[datetime, date, time, timedelta]) -> str:
"""Converts a datetime.(datetime|date|time|timedelta) object into an ISO 8601 formatted string.
:param dt: The datetime object to convert
:type dt: datetime.datetime or datetime.date or datetime.time or datetime.timedelta
:return: An ISO 8601 formatted string representing the datetime object
:rtype: str
"""
# First try datetime.datetime
if hasattr(dt, "year") and hasattr(dt, "hour"):
dt = cast(datetime, dt)
# astimezone() fails for naive times in Python 2.7, so make make sure dt is aware (tzinfo is set)
if not dt.tzinfo:
iso_formatted = dt.replace(tzinfo=TZ_UTC).isoformat()
else:
iso_formatted = dt.astimezone(TZ_UTC).isoformat()
# Replace the trailing "+00:00" UTC offset with "Z" (RFC 3339: https://www.ietf.org/rfc/rfc3339.txt)
return iso_formatted.replace("+00:00", "Z")
# Next try datetime.date or datetime.time
try:
dt = cast(Union[date, time], dt)
return dt.isoformat()
# Last, try datetime.timedelta
except AttributeError:
dt = cast(timedelta, dt)
return _timedelta_as_isostr(dt)
[docs]
class AzureJSONEncoder(JSONEncoder):
"""A JSON encoder that's capable of serializing datetime objects and bytes."""
[docs]
def default(self, o: Any) -> Any:
"""Override the default method to handle datetime and bytes serialization.
:param o: The object to serialize.
:type o: any
:return: A JSON-serializable representation of the object.
:rtype: any
"""
if isinstance(o, (bytes, bytearray)):
return base64.b64encode(o).decode()
try:
return _datetime_as_isostr(o)
except AttributeError:
pass
return super(AzureJSONEncoder, self).default(o)
[docs]
def is_generated_model(obj: Any) -> bool:
"""Check if the object is a generated SDK model.
:param obj: The object to check.
:type obj: any
:return: True if the object is a generated SDK model, False otherwise.
:rtype: bool
"""
return bool(getattr(obj, "_is_model", False) or hasattr(obj, "_attribute_map"))
def _is_readonly(p: Any) -> bool:
"""Check if an attribute is readonly.
:param any p: The property to check.
:return: True if the property is readonly, False otherwise.
:rtype: bool
"""
try:
return p._visibility == ["read"] # pylint: disable=protected-access
except AttributeError:
return False
def _as_attribute_dict_value(v: Any, *, exclude_readonly: bool = False) -> Any:
if v is None or isinstance(v, _Null):
return None
if isinstance(v, (list, tuple, set)):
return type(v)(_as_attribute_dict_value(x, exclude_readonly=exclude_readonly) for x in v)
if isinstance(v, dict):
return {dk: _as_attribute_dict_value(dv, exclude_readonly=exclude_readonly) for dk, dv in v.items()}
return as_attribute_dict(v, exclude_readonly=exclude_readonly) if is_generated_model(v) else v
def _get_flattened_attribute(obj: Any) -> Optional[str]:
"""Get the name of the flattened attribute in a generated TypeSpec model if one exists.
:param any obj: The object to check.
:return: The name of the flattened attribute if it exists, otherwise None.
:rtype: Optional[str]
"""
flattened_items = None
try:
flattened_items = getattr(obj, next(a for a in dir(obj) if "__flattened_items" in a), None)
except StopIteration:
return None
if flattened_items is None:
return None
for k, v in obj._attr_to_rest_field.items(): # pylint: disable=protected-access
try:
if set(v._class_type._attr_to_rest_field.keys()).intersection( # pylint: disable=protected-access
set(flattened_items)
):
return k
except AttributeError:
# if the attribute does not have _class_type, it is not a typespec generated model
continue
return None
[docs]
def attribute_list(obj: Any) -> List[str]:
"""Get a list of attribute names for a generated SDK model.
:param obj: The object to get attributes from.
:type obj: any
:return: A list of attribute names.
:rtype: List[str]
"""
if not is_generated_model(obj):
raise TypeError("Object is not a generated SDK model.")
if hasattr(obj, "_attribute_map"):
# msrest model
return list(obj._attribute_map.keys()) # pylint: disable=protected-access
flattened_attribute = _get_flattened_attribute(obj)
retval: List[str] = []
for attr_name, rest_field in obj._attr_to_rest_field.items(): # pylint: disable=protected-access
if flattened_attribute == attr_name:
retval.extend(attribute_list(rest_field._class_type)) # pylint: disable=protected-access
else:
retval.append(attr_name)
return retval
[docs]
def as_attribute_dict(obj: Any, *, exclude_readonly: bool = False) -> Dict[str, Any]:
"""Convert an object to a dictionary of its attributes.
Made solely for backcompatibility with the legacy `.as_dict()` on msrest models.
.. deprecated::1.35.0
This function is added for backcompat purposes only.
:param any obj: The object to convert to a dictionary
:keyword bool exclude_readonly: Whether to exclude readonly properties
:return: A dictionary containing the object's attributes
:rtype: dict[str, any]
:raises TypeError: If the object is not a generated model instance
"""
if not is_generated_model(obj):
raise TypeError("Object must be a generated model instance.")
if hasattr(obj, "_attribute_map"):
# msrest generated model
return obj.as_dict(keep_readonly=not exclude_readonly)
try:
# now we're a typespec generated model
result = {}
readonly_props = set()
# create a reverse mapping from rest field name to attribute name
rest_to_attr = {}
flattened_attribute = _get_flattened_attribute(obj)
for attr_name, rest_field in obj._attr_to_rest_field.items(): # pylint: disable=protected-access
if exclude_readonly and _is_readonly(rest_field):
# if we're excluding readonly properties, we need to track them
readonly_props.add(rest_field._rest_name) # pylint: disable=protected-access
if flattened_attribute == attr_name:
for fk, fv in rest_field._class_type._attr_to_rest_field.items(): # pylint: disable=protected-access
rest_to_attr[fv._rest_name] = fk # pylint: disable=protected-access
else:
rest_to_attr[rest_field._rest_name] = attr_name # pylint: disable=protected-access
for k, v in obj.items():
if exclude_readonly and k in readonly_props: # pyright: ignore
continue
if k == flattened_attribute:
for fk, fv in v.items():
result[rest_to_attr.get(fk, fk)] = _as_attribute_dict_value(fv, exclude_readonly=exclude_readonly)
else:
is_multipart_file_input = False
try:
is_multipart_file_input = next( # pylint: disable=protected-access
rf
for rf in obj._attr_to_rest_field.values() # pylint: disable=protected-access
if rf._rest_name == k # pylint: disable=protected-access
)._is_multipart_file_input
except StopIteration:
pass
result[rest_to_attr.get(k, k)] = (
v if is_multipart_file_input else _as_attribute_dict_value(v, exclude_readonly=exclude_readonly)
)
return result
except AttributeError as exc:
# not a typespec generated model
raise TypeError("Object must be a generated model instance.") from exc