Source code for dwh_auditor.analyzer.cost

"""é«˜ă‚łă‚čトクスăƒȘăźćˆ†æžăƒ­ă‚žăƒƒă‚Ż.

æłšæ„: ă“ăźăƒąă‚žăƒ„ăƒŒăƒ«ăŻ google.cloud.bigquery ă‚’äž€ćˆ‡ă‚€ăƒłăƒăƒŒăƒˆă—ăŠăŻăȘă‚ŠăŸă›ă‚“ă€‚
箔çȋăȘ Python ăƒ­ă‚žăƒƒă‚Żăźăżă§æ§‹æˆă—ă€ć˜äœ“ăƒ†ă‚čトがミăƒȘç§’ć˜äœă§ćźŒäș†ă™ă‚‹ă‚ˆă†ă«ă—ăŸă™ă€‚
"""

from dwh_auditor.config import AppConfig
from dwh_auditor.models.job import QueryJob
from dwh_auditor.models.result import CostInsight

# 1 TB ă‚’ăƒă‚€ăƒˆă«ć€‰æ›ă™ă‚‹ćźšæ•°
_BYTES_PER_TB: float = 1024**4


def _bytes_to_tb(bytes_value: int) -> float:
    """ăƒă‚€ăƒˆæ•°ă‚’ TB ă«ć€‰æ›ă™ă‚‹."""
    return bytes_value / _BYTES_PER_TB


def _estimate_cost_usd(total_bytes_billed: int, tb_price_usd: float) -> float:
    """ă‚čă‚­ăƒŁăƒłăƒă‚€ăƒˆæ•°ă‹ă‚‰ă‚łă‚čト (USD) ă‚’æŽšćźšă™ă‚‹.

    Args:
        total_bytes_billed: èȘČé‡‘ćŻŸè±Ąăƒă‚€ăƒˆæ•°
        tb_price_usd: 1TB あたりぼă‚Șăƒłăƒ‡ăƒžăƒłăƒ‰æ–™é‡‘ (USD)

    Returns:
        æŽšćźšă‚łă‚čト (USD)
    """
    return _bytes_to_tb(total_bytes_billed) * tb_price_usd


[docs] def analyze_cost(jobs: list[QueryJob], config: AppConfig) -> list[CostInsight]: """クスăƒȘă‚žăƒ§ăƒ–ă‚’è§Łæžă—ă€é«˜ă‚łă‚čトクスăƒȘăźăƒ©ăƒłă‚­ăƒłă‚°ă‚’èż”ă™. ă‚­ăƒŁăƒƒă‚·ăƒ„ăƒ’ăƒƒăƒˆă—ăŸă‚žăƒ§ăƒ–ăŻèȘČé‡‘ă‚Œăƒ­ăȘăźă§é™€ć€–ă—ăŸă™ă€‚ Args: jobs: QueryJob たăƒȘă‚čト (Extractor から揗け揖る) config: AppConfig ă‚Șブゾェクト Returns: CostInsight たăƒȘă‚čト (コă‚čăƒˆé™é †ă€äžŠäœ N ä»¶) """ tb_price = config.pricing.tb_scan_usd limit = config.thresholds.top_expensive_queries_limit insights: list[CostInsight] = [] for job in jobs: if job.total_bytes_billed <= 0: continue cost_usd = _estimate_cost_usd(job.total_bytes_billed, tb_price) scanned_tb = _bytes_to_tb(job.total_bytes_billed) insights.append( CostInsight( job=job, estimated_cost_usd=cost_usd, scanned_tb=scanned_tb, ) ) insights.sort(key=lambda x: x.estimated_cost_usd, reverse=True) return insights[:limit]