# The MIT License (MIT)
# Copyright (c) 2014 Microsoft Corporation
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""Create, read, update and delete and execute scripts in the Azure Cosmos DB SQL API service.
"""
from typing import Any, Mapping, Union, Optional
from azure.core.paging import ItemPaged
from azure.core.tracing.decorator import distributed_trace
from azure.cosmos import CosmosDict
from ._cosmos_client_connection import CosmosClientConnection
from ._base import build_options
from .partition_key import NonePartitionKeyValue, _return_undefined_or_empty_partition_key, PartitionKeyType
# pylint: disable=protected-access
# pylint: disable=missing-client-constructor-parameter-credential,missing-client-constructor-parameter-kwargs
class ScriptType:
StoredProcedure = "sprocs"
Trigger = "triggers"
UserDefinedFunction = "udfs"
[docs]
class ScriptsProxy:
"""An interface to interact with stored procedures.
This class should not be instantiated directly. Instead, use the
:func:`ContainerProxy.scripts` attribute.
"""
def __init__(
self,
client_connection: CosmosClientConnection,
container_link: str,
is_system_key: bool
) -> None:
self.client_connection = client_connection
self.container_link = container_link
self.is_system_key = is_system_key
def _get_resource_link(self, script_or_id: Union[str, Mapping[str, Any]], typ: str) -> str:
if isinstance(script_or_id, str):
return "{}/{}/{}".format(self.container_link, typ, script_or_id)
return script_or_id["_self"]
[docs]
@distributed_trace
def list_stored_procedures(
self,
max_item_count: Optional[int] = None,
**kwargs: Any
) -> ItemPaged[dict[str, Any]]:
"""List all stored procedures in the container.
:param int max_item_count: Max number of items to be returned in the enumeration operation.
:returns: An Iterable of stored procedures (dicts).
:rtype: Iterable[dict[str, Any]]
"""
feed_options = build_options(kwargs)
if max_item_count is not None:
feed_options["maxItemCount"] = max_item_count
return self.client_connection.ReadStoredProcedures(
collection_link=self.container_link, options=feed_options, **kwargs
)
[docs]
@distributed_trace
def query_stored_procedures(
self,
query: str,
parameters: Optional[list[dict[str, Any]]] = None,
max_item_count: Optional[int] = None,
**kwargs: Any
) -> ItemPaged[dict[str, Any]]:
"""Return all stored procedures matching the given `query`.
:param str query: The Azure Cosmos DB SQL query to execute.
:param parameters: Optional array of parameters to the query. Ignored if no query is provided.
:type parameters: list[dict[str, Any]]
:param int max_item_count: Max number of items to be returned in the enumeration operation.
:returns: An Iterable of stored procedures (dicts).
:rtype: Iterable[dict[str, Any]]
"""
feed_options = build_options(kwargs)
if max_item_count is not None:
feed_options["maxItemCount"] = max_item_count
return self.client_connection.QueryStoredProcedures(
collection_link=self.container_link,
query=query if parameters is None else {"query": query, "parameters": parameters},
options=feed_options,
**kwargs
)
[docs]
@distributed_trace
def get_stored_procedure(
self,
sproc: Union[str, Mapping[str, Any]],
**kwargs: Any
) -> CosmosDict:
"""Get the stored procedure identified by `id`.
:param sproc: The ID (name) or dict representing stored procedure to retrieve.
:type sproc: Union[str, dict[str, Any]]
:returns: A dict representing the retrieved stored procedure.
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the given stored procedure couldn't be retrieved.
:rtype: ~azure.cosmos.CosmosDict[str, Any]
"""
request_options = build_options(kwargs)
return self.client_connection.ReadStoredProcedure(
sproc_link=self._get_resource_link(sproc, ScriptType.StoredProcedure), options=request_options, **kwargs
)
[docs]
@distributed_trace
def create_stored_procedure(
self,
body: dict[str, Any],
**kwargs: Any
) -> CosmosDict:
"""Create a new stored procedure in the container.
To replace an existing sproc, use the :func:`Container.scripts.replace_stored_procedure` method.
:param dict[str, Any] body: A dict-like object representing the sproc to create.
:returns: A dict representing the new stored procedure.
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the given stored procedure couldn't be created.
:rtype: ~azure.cosmos.CosmosDict[str, Any]
"""
request_options = build_options(kwargs)
return self.client_connection.CreateStoredProcedure(
collection_link=self.container_link, sproc=body, options=request_options, **kwargs
)
[docs]
@distributed_trace
def replace_stored_procedure(
self,
sproc: Union[str, Mapping[str, Any]],
body: dict[str, Any],
**kwargs: Any
) -> CosmosDict:
"""Replace a specified stored procedure in the container.
If the stored procedure does not already exist in the container, an exception is raised.
:param sproc: The ID (name) or dict representing stored procedure to be replaced.
:type sproc: Union[str, dict[str, Any]]
:param dict[str, Any] body: A dict-like object representing the sproc to replace.
:returns: A dict representing the stored procedure after replace went through.
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the replace operation failed or the stored
procedure with given id does not exist.
:rtype: ~azure.cosmos.CosmosDict[str, Any]
"""
request_options = build_options(kwargs)
return self.client_connection.ReplaceStoredProcedure(
sproc_link=self._get_resource_link(sproc, ScriptType.StoredProcedure),
sproc=body,
options=request_options,
**kwargs
)
[docs]
@distributed_trace
def delete_stored_procedure(
self,
sproc: Union[str, Mapping[str, Any]],
**kwargs: Any
) -> None:
"""Delete a specified stored procedure from the container.
If the stored procedure does not already exist in the container, an exception is raised.
:param sproc: The ID (name) or dict representing stored procedure to be deleted.
:type sproc: Union[str, dict[str, Any]]
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The sproc wasn't deleted successfully.
:raises ~azure.cosmos.exceptions.CosmosResourceNotFoundError: The sproc does not exist in the container.
:rtype: None
"""
request_options = build_options(kwargs)
self.client_connection.DeleteStoredProcedure(
sproc_link=self._get_resource_link(sproc, ScriptType.StoredProcedure), options=request_options, **kwargs
)
[docs]
@distributed_trace
def execute_stored_procedure(
self,
sproc: Union[str, Mapping[str, Any]],
partition_key: Optional[PartitionKeyType] = None,
params: Optional[list[dict[str, Any]]] = None,
enable_script_logging: Optional[bool] = None,
**kwargs: Any
) -> Any:
"""Execute a specified stored procedure.
If the stored procedure does not already exist in the container, an exception is raised.
:param sproc: The ID (name) or dict representing stored procedure to be executed.
:type sproc: Union[str, dict[str, Any]]
:param partition_key: Specifies the partition key to indicate which partition the sproc should execute on.
:type partition_key: Union[str, int, float, bool]
:param params: List of parameters to be passed to the stored procedure to be executed.
:type params: list[dict[str, Any]]
:param bool enable_script_logging: Enables or disables script logging for the current request.
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the stored procedure execution failed
or if the stored procedure with given id does not exist in the container.
:returns: Result of the executed stored procedure for the given parameters.
:rtype: Any
"""
request_options = build_options(kwargs)
if partition_key is not None:
request_options["partitionKey"] = (
_return_undefined_or_empty_partition_key(self.is_system_key)
if partition_key == NonePartitionKeyValue
else partition_key
)
if enable_script_logging is not None:
request_options["enableScriptLogging"] = enable_script_logging
return self.client_connection.ExecuteStoredProcedure(
sproc_link=self._get_resource_link(sproc, ScriptType.StoredProcedure),
params=params,
options=request_options,
**kwargs
)
[docs]
@distributed_trace
def list_triggers(
self,
max_item_count: Optional[int] = None,
**kwargs: Any
) -> ItemPaged[dict[str, Any]]:
"""List all triggers in the container.
:param int max_item_count: Max number of items to be returned in the enumeration operation.
:returns: An Iterable of triggers (dicts).
:rtype: Iterable[dict[str, Any]]
"""
feed_options = build_options(kwargs)
if max_item_count is not None:
feed_options["maxItemCount"] = max_item_count
return self.client_connection.ReadTriggers(
collection_link=self.container_link, options=feed_options, **kwargs
)
[docs]
@distributed_trace
def query_triggers(
self,
query: str,
parameters: Optional[list[dict[str, Any]]] = None,
max_item_count: Optional[int] = None,
**kwargs: Any
) -> ItemPaged[dict[str, Any]]:
"""Return all triggers matching the given `query`.
:param str query: The Azure Cosmos DB SQL query to execute.
:param parameters: Optional array of parameters to the query. Ignored if no query is provided.
:type parameters: list[dict[str, Any]]
:param int max_item_count: Max number of items to be returned in the enumeration operation.
:returns: An Iterable of triggers (dicts).
:rtype: Iterable[dict[str, Any]]
"""
feed_options = build_options(kwargs)
if max_item_count is not None:
feed_options["maxItemCount"] = max_item_count
return self.client_connection.QueryTriggers(
collection_link=self.container_link,
query=query if parameters is None else {"query": query, "parameters": parameters},
options=feed_options,
**kwargs
)
[docs]
@distributed_trace
def get_trigger(
self,
trigger: Union[str, Mapping[str, Any]],
**kwargs: Any
) -> CosmosDict:
"""Get a trigger identified by `id`.
:param trigger: The ID (name) or dict representing trigger to retrieve.
:type trigger: Union[str, dict[str, Any]]
:returns: A dict representing the retrieved trigger.
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the given trigger couldn't be retrieved.
:rtype: ~azure.cosmos.CosmosDict[str, Any]
"""
request_options = build_options(kwargs)
return self.client_connection.ReadTrigger(
trigger_link=self._get_resource_link(trigger, ScriptType.Trigger), options=request_options, **kwargs
)
[docs]
@distributed_trace
def create_trigger(
self,
body: dict[str, Any],
**kwargs: Any
) -> CosmosDict:
"""Create a trigger in the container.
To replace an existing trigger, use the :func:`ContainerProxy.scripts.replace_trigger` method.
:param dict[str, Any] body: A dict-like object representing the trigger to create.
:returns: A dict representing the new trigger.
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the given trigger couldn't be created.
:rtype: ~azure.cosmos.CosmosDict[str, Any]
"""
request_options = build_options(kwargs)
return self.client_connection.CreateTrigger(
collection_link=self.container_link, trigger=body, options=request_options, **kwargs
)
[docs]
@distributed_trace
def replace_trigger(
self,
trigger: Union[str, Mapping[str, Any]],
body: dict[str, Any],
**kwargs: Any
) -> CosmosDict:
"""Replace a specified trigger in the container.
If the trigger does not already exist in the container, an exception is raised.
:param trigger: The ID (name) or dict representing trigger to be replaced.
:type trigger: Union[str, dict[str, Any]]
:param dict[str, Any] body: A dict-like object representing the trigger to replace.
:returns: A dict representing the trigger after replace went through.
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the replace operation failed or the trigger
with given id does not exist.
:rtype: ~azure.cosmos.CosmosDict[str, Any]
"""
request_options = build_options(kwargs)
return self.client_connection.ReplaceTrigger(
trigger_link=self._get_resource_link(trigger, ScriptType.Trigger),
trigger=body,
options=request_options,
**kwargs
)
[docs]
@distributed_trace
def delete_trigger(
self,
trigger: Union[str, Mapping[str, Any]],
**kwargs: Any
) -> None:
"""Delete a specified trigger from the container.
If the trigger does not already exist in the container, an exception is raised.
:param trigger: The ID (name) or dict representing trigger to be deleted.
:type trigger: Union[str, dict[str, Any]]
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The trigger wasn't deleted successfully.
:raises ~azure.cosmos.exceptions.CosmosResourceNotFoundError: The trigger does not exist in the container.
:rtype: None
"""
request_options = build_options(kwargs)
self.client_connection.DeleteTrigger(
trigger_link=self._get_resource_link(trigger, ScriptType.Trigger), options=request_options, **kwargs
)
[docs]
@distributed_trace
def list_user_defined_functions(
self,
max_item_count: Optional[int] = None,
**kwargs: Any
) -> ItemPaged[dict[str, Any]]:
"""List all the user-defined functions in the container.
:param int max_item_count: Max number of items to be returned in the enumeration operation.
:returns: An Iterable of user-defined functions (dicts).
:rtype: Iterable[dict[str, Any]]
"""
feed_options = build_options(kwargs)
if max_item_count is not None:
feed_options["maxItemCount"] = max_item_count
return self.client_connection.ReadUserDefinedFunctions(
collection_link=self.container_link, options=feed_options, **kwargs
)
[docs]
@distributed_trace
def query_user_defined_functions(
self,
query: str,
parameters: Optional[list[dict[str, Any]]] = None,
max_item_count: Optional[int] = None,
**kwargs: Any
) -> ItemPaged[dict[str, Any]]:
"""Return user-defined functions matching a given `query`.
:param str query: The Azure Cosmos DB SQL query to execute.
:param parameters: Optional array of parameters to the query. Ignored if no query is provided.
:type parameters: list[dict[str, Any]]
:param int max_item_count: Max number of items to be returned in the enumeration operation.
:returns: An Iterable of user-defined functions (dicts).
:rtype: Iterable[dict[str, Any]]
"""
feed_options = build_options(kwargs)
if max_item_count is not None:
feed_options["maxItemCount"] = max_item_count
return self.client_connection.QueryUserDefinedFunctions(
collection_link=self.container_link,
query=query if parameters is None else {"query": query, "parameters": parameters},
options=feed_options,
**kwargs
)
[docs]
@distributed_trace
def get_user_defined_function(
self,
udf: Union[str, Mapping[str, Any]],
**kwargs: Any
) -> CosmosDict:
"""Get a user-defined functions identified by `id`.
:param udf: The ID (name) or dict representing udf to retrieve.
:type udf: Union[str, dict[str, Any]]
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the user-defined function couldn't be retrieved.
:returns: A dict representing the retrieved user-defined function.
:rtype: ~azure.cosmos.CosmosDict[str, Any]
"""
request_options = build_options(kwargs)
return self.client_connection.ReadUserDefinedFunction(
udf_link=self._get_resource_link(udf, ScriptType.UserDefinedFunction), options=request_options, **kwargs
)
[docs]
@distributed_trace
def create_user_defined_function(
self,
body: dict[str, Any],
**kwargs: Any
) -> CosmosDict:
"""Create a user-defined function in the container.
To replace an existing UDF, use the :func:`ContainerProxy.scripts.replace_user_defined_function` method.
:param dict[str, Any] body: A dict-like object representing the udf to create.
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the user-defined function couldn't be created.
:returns: A dict representing the new user-defined function.
:rtype: ~azure.cosmos.CosmosDict[str, Any]
"""
request_options = build_options(kwargs)
return self.client_connection.CreateUserDefinedFunction(
collection_link=self.container_link, udf=body, options=request_options, **kwargs
)
[docs]
@distributed_trace
def replace_user_defined_function(
self,
udf: Union[str, Mapping[str, Any]],
body: dict[str, Any],
**kwargs: Any
) -> CosmosDict:
"""Replace a specified user-defined function in the container.
If the UDF does not already exist in the container, an exception is raised.
:param udf: The ID (name) or dict representing udf to be replaced.
:type udf: Union[str, dict[str, Any]]
:param Dict[str, Any] body: A dict-like object representing the udf to replace.
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the replace operation failed or the user-defined
function with the given id does not exist.
:returns: A dict representing the user-defined function after replace went through.
:rtype: ~azure.cosmos.CosmosDict[str, Any]
"""
request_options = build_options(kwargs)
return self.client_connection.ReplaceUserDefinedFunction(
udf_link=self._get_resource_link(udf, ScriptType.UserDefinedFunction),
udf=body,
options=request_options,
**kwargs
)
[docs]
@distributed_trace
def delete_user_defined_function(
self,
udf: Union[str, Mapping[str, Any]],
**kwargs: Any
) -> None:
"""Delete a specified user-defined function from the container.
If the UDF does not already exist in the container, an exception is raised.
:param udf: The ID (name) or dict representing udf to be deleted.
:type udf: Union[str, dict[str, Any]]
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The udf wasn't deleted successfully.
:raises ~azure.cosmos.exceptions.CosmosResourceNotFoundError: The UDF does not exist in the container.
:rtype: None
"""
request_options = build_options(kwargs)
self.client_connection.DeleteUserDefinedFunction(
udf_link=self._get_resource_link(udf, ScriptType.UserDefinedFunction), options=request_options, **kwargs
)