dwh_auditor.analyzer.cost のソースコード

"""高コストクエリの分析ロジック.

注意: このモジュールは google.cloud.bigquery を一切インポートしてはなりません。
純粋な Python ロジックのみで構成し、単体テストがミリ秒単位で完了するようにします。
"""

from dwh_auditor.config import AppConfig
from dwh_auditor.models.job import QueryJob
from dwh_auditor.models.result import CostInsight

# 1 TB をバイトに変換する定数
_BYTES_PER_TB: float = 1024**4


def _bytes_to_tb(bytes_value: int) -> float:
    """バイト数を TB に変換する."""
    return bytes_value / _BYTES_PER_TB


def _estimate_cost_usd(total_bytes_billed: int, tb_price_usd: float) -> float:
    """スキャンバイト数からコスト (USD) を推定する.

    Args:
        total_bytes_billed: 課金対象バイト数
        tb_price_usd: 1TB あたりのオンデマンド料金 (USD)

    Returns:
        推定コスト (USD)
    """
    return _bytes_to_tb(total_bytes_billed) * tb_price_usd


[ドキュメント] def analyze_cost(jobs: list[QueryJob], config: AppConfig) -> list[CostInsight]: """クエリジョブを解析し、高コストクエリのランキングを返す. キャッシュヒットしたジョブは課金ゼロなので除外します。 Args: jobs: QueryJob のリスト (Extractor から受け取る) config: AppConfig オブジェクト Returns: CostInsight のリスト (コスト降順、上位 N 件) """ tb_price = config.pricing.tb_scan_usd limit = config.thresholds.top_expensive_queries_limit insights: list[CostInsight] = [] for job in jobs: if job.total_bytes_billed <= 0: continue cost_usd = _estimate_cost_usd(job.total_bytes_billed, tb_price) scanned_tb = _bytes_to_tb(job.total_bytes_billed) insights.append( CostInsight( job=job, estimated_cost_usd=cost_usd, scanned_tb=scanned_tb, ) ) insights.sort(key=lambda x: x.estimated_cost_usd, reverse=True) return insights[:limit]