# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License in the project root for
# license information.
# --------------------------------------------------------------------------
from functools import cached_property
from logging import getLogger
from typing import Dict, List, cast
from opentelemetry._events import _set_event_logger_provider
from opentelemetry._logs import set_logger_provider
from opentelemetry.instrumentation.dependencies import (
get_dist_dependency_conflicts,
)
from opentelemetry.instrumentation.instrumentor import ( # type: ignore
BaseInstrumentor,
)
from opentelemetry.metrics import set_meter_provider
from opentelemetry.sdk._events import EventLoggerProvider
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.metrics.view import View
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace import set_tracer_provider
from opentelemetry.util._importlib_metadata import (
EntryPoint,
distributions,
entry_points,
)
from azure.core.settings import settings
from azure.core.tracing.ext.opentelemetry_span import OpenTelemetrySpan
from azure.monitor.opentelemetry._constants import (
_ALL_SUPPORTED_INSTRUMENTED_LIBRARIES,
_AZURE_SDK_INSTRUMENTATION_NAME,
DISABLE_LOGGING_ARG,
DISABLE_METRICS_ARG,
DISABLE_TRACING_ARG,
ENABLE_LIVE_METRICS_ARG,
LOGGER_NAME_ARG,
RESOURCE_ARG,
SAMPLING_RATIO_ARG,
SPAN_PROCESSORS_ARG,
VIEWS_ARG,
)
from azure.monitor.opentelemetry._types import ConfigurationValue
from azure.monitor.opentelemetry.exporter._quickpulse import ( # pylint: disable=import-error,no-name-in-module
enable_live_metrics,
)
from azure.monitor.opentelemetry.exporter._quickpulse._processor import ( # pylint: disable=import-error,no-name-in-module
_QuickpulseLogRecordProcessor,
_QuickpulseSpanProcessor,
)
from azure.monitor.opentelemetry.exporter import ( # pylint: disable=import-error,no-name-in-module
ApplicationInsightsSampler,
AzureMonitorLogExporter,
AzureMonitorMetricExporter,
AzureMonitorTraceExporter,
)
from azure.monitor.opentelemetry.exporter._utils import ( # pylint: disable=import-error,no-name-in-module
_is_attach_enabled,
_is_on_functions,
)
from azure.monitor.opentelemetry._diagnostics.diagnostic_logging import (
_DISTRO_DETECTS_ATTACH,
AzureDiagnosticLogging,
)
from azure.monitor.opentelemetry._utils.configurations import (
_get_configurations,
_is_instrumentation_enabled,
)
_logger = getLogger(__name__)
def _setup_tracing(configurations: Dict[str, ConfigurationValue]):
resource: Resource = configurations[RESOURCE_ARG] # type: ignore
sampling_ratio = configurations[SAMPLING_RATIO_ARG]
tracer_provider = TracerProvider(
sampler=ApplicationInsightsSampler(sampling_ratio=cast(float, sampling_ratio)), resource=resource
)
for span_processor in configurations[SPAN_PROCESSORS_ARG]: # type: ignore
tracer_provider.add_span_processor(span_processor) # type: ignore
if configurations.get(ENABLE_LIVE_METRICS_ARG):
qsp = _QuickpulseSpanProcessor()
tracer_provider.add_span_processor(qsp)
trace_exporter = AzureMonitorTraceExporter(**configurations)
bsp = BatchSpanProcessor(
trace_exporter,
)
tracer_provider.add_span_processor(bsp)
set_tracer_provider(tracer_provider)
if _is_instrumentation_enabled(configurations, _AZURE_SDK_INSTRUMENTATION_NAME):
settings.tracing_implementation = OpenTelemetrySpan
def _setup_logging(configurations: Dict[str, ConfigurationValue]):
resource: Resource = configurations[RESOURCE_ARG] # type: ignore
logger_provider = LoggerProvider(resource=resource)
if configurations.get(ENABLE_LIVE_METRICS_ARG):
qlp = _QuickpulseLogRecordProcessor()
logger_provider.add_log_record_processor(qlp)
log_exporter = AzureMonitorLogExporter(**configurations)
log_record_processor = BatchLogRecordProcessor(
log_exporter,
)
logger_provider.add_log_record_processor(log_record_processor)
set_logger_provider(logger_provider)
logger_name: str = configurations[LOGGER_NAME_ARG] # type: ignore
logger = getLogger(logger_name)
# Only add OpenTelemetry LoggingHandler if logger does not already have the handler
# This is to prevent most duplicate logging telemetry
if not any(isinstance(handler, LoggingHandler) for handler in logger.handlers):
handler = LoggingHandler(logger_provider=logger_provider)
logger.addHandler(handler)
# Setup EventLoggerProvider
event_provider = EventLoggerProvider(logger_provider)
_set_event_logger_provider(event_provider, False)
def _setup_metrics(configurations: Dict[str, ConfigurationValue]):
resource: Resource = configurations[RESOURCE_ARG] # type: ignore
views: List[View] = configurations[VIEWS_ARG] # type: ignore
metric_exporter = AzureMonitorMetricExporter(**configurations)
reader = PeriodicExportingMetricReader(metric_exporter)
meter_provider = MeterProvider(
metric_readers=[reader],
resource=resource,
views=views,
)
set_meter_provider(meter_provider)
def _setup_live_metrics(configurations):
enable_live_metrics(**configurations)
class _EntryPointDistFinder:
@cached_property
def _mapping(self):
return {self._key_for(ep): dist for dist in distributions() for ep in dist.entry_points}
def dist_for(self, entry_point: EntryPoint):
dist = getattr(entry_point, "dist", None)
if dist:
return dist
return self._mapping.get(self._key_for(entry_point))
@staticmethod
def _key_for(entry_point: EntryPoint):
return f"{entry_point.group}:{entry_point.name}:{entry_point.value}"
def _setup_instrumentations(configurations: Dict[str, ConfigurationValue]):
entry_point_finder = _EntryPointDistFinder()
# use pkg_resources for now until https://github.com/open-telemetry/opentelemetry-python/pull/3168 is merged
for entry_point in entry_points(group="opentelemetry_instrumentor"):
lib_name = entry_point.name
if lib_name not in _ALL_SUPPORTED_INSTRUMENTED_LIBRARIES:
continue
if not _is_instrumentation_enabled(configurations, lib_name):
_logger.debug("Instrumentation skipped for library %s", entry_point.name)
continue
try:
# Check if dependent libraries/version are installed
entry_point_dist = entry_point_finder.dist_for(entry_point) # type: ignore
conflict = get_dist_dependency_conflicts(entry_point_dist) # type: ignore
if conflict:
_logger.debug(
"Skipping instrumentation %s: %s",
entry_point.name,
conflict,
)
continue
# Load the instrumentor via entrypoint
instrumentor: BaseInstrumentor = entry_point.load()
# tell instrumentation to not run dep checks again as we already did it above
instrumentor().instrument(skip_dep_check=True)
except Exception as ex: # pylint: disable=broad-except
_logger.warning(
"Exception occurred when instrumenting: %s.",
lib_name,
exc_info=ex,
)
_setup_additional_azure_sdk_instrumentations(configurations)
def _send_attach_warning():
if _is_attach_enabled() and not _is_on_functions():
AzureDiagnosticLogging.warning(
"Distro detected that automatic attach may have occurred. Check your data to ensure "
"that telemetry is not being duplicated. This may impact your cost.",
_DISTRO_DETECTS_ATTACH,
)
def _setup_additional_azure_sdk_instrumentations(configurations: Dict[str, ConfigurationValue]):
if _AZURE_SDK_INSTRUMENTATION_NAME not in _ALL_SUPPORTED_INSTRUMENTED_LIBRARIES:
return
if not _is_instrumentation_enabled(configurations, _AZURE_SDK_INSTRUMENTATION_NAME):
_logger.debug("Instrumentation skipped for library azure_sdk")
return
try:
from azure.ai.inference.tracing import AIInferenceInstrumentor # pylint: disable=import-error,no-name-in-module
except Exception as ex: # pylint: disable=broad-except
_logger.debug(
"Failed to import AIInferenceInstrumentor from azure-ai-inference",
exc_info=ex,
)
return
try:
AIInferenceInstrumentor().instrument()
except Exception as ex: # pylint: disable=broad-except
_logger.warning(
"Exception occurred when instrumenting: %s.",
"azure-ai-inference",
exc_info=ex,
)