Skip to content

Commit

Permalink
✨ api for osparc credits usage aggregation (#6145)
Browse files Browse the repository at this point in the history
  • Loading branch information
matusdrobuliak66 authored Aug 8, 2024
1 parent da32561 commit ef4a7e8
Show file tree
Hide file tree
Showing 13 changed files with 746 additions and 11 deletions.
29 changes: 28 additions & 1 deletion api/specs/web-server/_resource_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@
UpdatePricingUnitBodyParams,
)
from models_library.generics import Envelope
from models_library.resource_tracker import PricingPlanId, PricingUnitId
from models_library.resource_tracker import (
PricingPlanId,
PricingUnitId,
ServicesAggregatedUsagesTimePeriod,
ServicesAggregatedUsagesType,
)
from models_library.rest_pagination import DEFAULT_NUMBER_OF_ITEMS_PER_PAGE
from models_library.wallets import WalletID
from pydantic import Json, NonNegativeInt
Expand All @@ -40,6 +45,7 @@
)
from simcore_service_webserver.resource_usage._service_runs_handlers import (
ORDER_BY_DESCRIPTION,
_ListServicesAggregatedUsagesQueryParams,
_ListServicesResourceUsagesQueryParams,
_ListServicesResourceUsagesQueryParamsWithPagination,
)
Expand Down Expand Up @@ -85,6 +91,27 @@ async def list_resource_usage_services(
)


@router.get(
"/services/-/aggregated-usages",
response_model=Envelope[list[ServiceRunGet]],
summary="Used credits based on aggregate by type, currently supported `services`. (user and product are taken from context, optionally wallet_id parameter might be provided).",
tags=["usage"],
)
async def list_osparc_credits_aggregated_usages(
aggregated_by: ServicesAggregatedUsagesType,
time_period: ServicesAggregatedUsagesTimePeriod,
wallet_id: Annotated[WalletID, Query],
limit: int = DEFAULT_NUMBER_OF_ITEMS_PER_PAGE,
offset: NonNegativeInt = 0,
):
...


assert_handler_signature_against_model(
list_osparc_credits_aggregated_usages, _ListServicesAggregatedUsagesQueryParams
)


@router.get(
"/services/-/usage-report",
status_code=status.HTTP_302_FOUND,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,13 @@ class ServiceRunGet(BaseModel):
class ServiceRunPage(NamedTuple):
items: list[ServiceRunGet]
total: PositiveInt


class OsparcCreditsAggregatedByServiceGet(BaseModel):
osparc_credits: Decimal
service_key: ServiceKey


class OsparcCreditsAggregatedUsagesPage(NamedTuple):
items: list[OsparcCreditsAggregatedByServiceGet]
total: PositiveInt
12 changes: 11 additions & 1 deletion packages/models-library/src/models_library/resource_tracker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from datetime import datetime, timezone
from decimal import Decimal
from enum import auto
from enum import IntEnum, auto
from typing import Any, ClassVar, NamedTuple, TypeAlias

from pydantic import (
Expand Down Expand Up @@ -283,3 +283,13 @@ class Config:
},
]
}


class ServicesAggregatedUsagesType(StrAutoEnum):
services = "services"


class ServicesAggregatedUsagesTimePeriod(IntEnum):
ONE_DAY = 1
ONE_WEEK = 7
ONE_MONTH = 30
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@
RESOURCE_USAGE_TRACKER_RPC_NAMESPACE,
)
from models_library.api_schemas_resource_usage_tracker.service_runs import (
OsparcCreditsAggregatedUsagesPage,
ServiceRunPage,
)
from models_library.products import ProductName
from models_library.rabbitmq_basic_types import RPCMethodName
from models_library.resource_tracker import ServiceResourceUsagesFilters
from models_library.resource_tracker import (
ServiceResourceUsagesFilters,
ServicesAggregatedUsagesTimePeriod,
ServicesAggregatedUsagesType,
)
from models_library.rest_ordering import OrderBy
from models_library.users import UserID
from models_library.wallets import WalletID
Expand All @@ -35,7 +40,7 @@ async def get_service_run_page(
wallet_id: WalletID | None = None,
access_all_wallet_usage: bool = False,
order_by: OrderBy | None = None,
filters: ServiceResourceUsagesFilters | None = None
filters: ServiceResourceUsagesFilters | None = None,
) -> ServiceRunPage:
result = await rabbitmq_rpc_client.request(
RESOURCE_USAGE_TRACKER_RPC_NAMESPACE,
Expand All @@ -54,6 +59,36 @@ async def get_service_run_page(
return result


@log_decorator(_logger, level=logging.DEBUG)
async def get_osparc_credits_aggregated_usages_page(
rabbitmq_rpc_client: RabbitMQRPCClient,
*,
user_id: UserID,
product_name: ProductName,
aggregated_by: ServicesAggregatedUsagesType,
time_period: ServicesAggregatedUsagesTimePeriod,
limit: int = 20,
offset: int = 0,
wallet_id: WalletID,
access_all_wallet_usage: bool = False,
) -> OsparcCreditsAggregatedUsagesPage:
result = await rabbitmq_rpc_client.request(
RESOURCE_USAGE_TRACKER_RPC_NAMESPACE,
parse_obj_as(RPCMethodName, "get_osparc_credits_aggregated_usages_page"),
user_id=user_id,
product_name=product_name,
limit=limit,
offset=offset,
wallet_id=wallet_id,
access_all_wallet_usage=access_all_wallet_usage,
aggregated_by=aggregated_by,
time_period=time_period,
timeout_s=60,
)
assert isinstance(result, OsparcCreditsAggregatedUsagesPage) # nosec
return result


@log_decorator(_logger, level=logging.DEBUG)
async def export_service_runs(
rabbitmq_rpc_client: RabbitMQRPCClient,
Expand All @@ -63,7 +98,7 @@ async def export_service_runs(
wallet_id: WalletID | None = None,
access_all_wallet_usage: bool = False,
order_by: OrderBy | None = None,
filters: ServiceResourceUsagesFilters | None = None
filters: ServiceResourceUsagesFilters | None = None,
) -> AnyUrl:
result: AnyUrl = await rabbitmq_rpc_client.request(
RESOURCE_USAGE_TRACKER_RPC_NAMESPACE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
PricingUnitGet,
)
from models_library.api_schemas_resource_usage_tracker.service_runs import (
OsparcCreditsAggregatedUsagesPage,
ServiceRunPage,
)
from models_library.products import ProductName
Expand All @@ -16,6 +17,8 @@
PricingUnitWithCostCreate,
PricingUnitWithCostUpdate,
ServiceResourceUsagesFilters,
ServicesAggregatedUsagesTimePeriod,
ServicesAggregatedUsagesType,
)
from models_library.rest_ordering import OrderBy
from models_library.services import ServiceKey, ServiceVersion
Expand Down Expand Up @@ -95,6 +98,32 @@ async def export_service_runs(
)


@router.expose(reraise_if_error_type=(CustomResourceUsageTrackerError,))
async def get_osparc_credits_aggregated_usages_page(
app: FastAPI,
*,
user_id: UserID,
product_name: ProductName,
aggregated_by: ServicesAggregatedUsagesType,
time_period: ServicesAggregatedUsagesTimePeriod,
limit: int = 20,
offset: int = 0,
wallet_id: WalletID,
access_all_wallet_usage: bool = False,
) -> OsparcCreditsAggregatedUsagesPage:
return await service_runs.get_osparc_credits_aggregated_usages_page(
user_id=user_id,
product_name=product_name,
resource_tracker_repo=ResourceTrackerRepository(db_engine=app.state.engine),
aggregated_by=aggregated_by,
time_period=time_period,
limit=limit,
offset=offset,
wallet_id=wallet_id,
access_all_wallet_usage=access_all_wallet_usage,
)


## Pricing plans


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ class Config:
orm_mode = True


class OsparcCreditsAggregatedByServiceKeyDB(BaseModel):
osparc_credits: Decimal
service_key: ServiceKey

class Config:
orm_mode = True


class ServiceRunForCheckDB(BaseModel):
service_run_id: ServiceRunId
last_heartbeat_at: datetime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from models_library.api_schemas_storage import S3BucketName
from models_library.products import ProductName
from models_library.resource_tracker import (
CreditClassification,
CreditTransactionId,
CreditTransactionStatus,
PricingPlanCreate,
Expand Down Expand Up @@ -63,6 +64,7 @@
from ....models.resource_tracker_pricing_unit_costs import PricingUnitCostsDB
from ....models.resource_tracker_pricing_units import PricingUnitsDB
from ....models.resource_tracker_service_runs import (
OsparcCreditsAggregatedByServiceKeyDB,
ServiceRunCreate,
ServiceRunDB,
ServiceRunForCheckDB,
Expand Down Expand Up @@ -309,6 +311,89 @@ async def list_service_runs_by_product_and_user_and_wallet(

return [ServiceRunWithCreditsDB.from_orm(row) for row in result.fetchall()]

async def get_osparc_credits_aggregated_by_service(
self,
product_name: ProductName,
*,
user_id: UserID | None,
wallet_id: WalletID,
offset: int,
limit: int,
started_from: datetime | None = None,
started_until: datetime | None = None,
) -> tuple[int, list[OsparcCreditsAggregatedByServiceKeyDB]]:
async with self.db_engine.begin() as conn:
base_query = (
sa.select(
resource_tracker_service_runs.c.service_key,
sa.func.SUM(
resource_tracker_credit_transactions.c.osparc_credits
).label("osparc_credits"),
)
.select_from(
resource_tracker_service_runs.join(
resource_tracker_credit_transactions,
(
resource_tracker_service_runs.c.product_name
== resource_tracker_credit_transactions.c.product_name
)
& (
resource_tracker_service_runs.c.service_run_id
== resource_tracker_credit_transactions.c.service_run_id
),
isouter=True,
)
)
.where(
(resource_tracker_service_runs.c.product_name == product_name)
& (
resource_tracker_credit_transactions.c.transaction_status
== CreditTransactionStatus.BILLED
)
& (
resource_tracker_credit_transactions.c.transaction_classification
== CreditClassification.DEDUCT_SERVICE_RUN
)
& (resource_tracker_credit_transactions.c.wallet_id == wallet_id)
)
.group_by(resource_tracker_service_runs.c.service_key)
)

if user_id:
base_query = base_query.where(
resource_tracker_service_runs.c.user_id == user_id
)
if started_from:
base_query = base_query.where(
sa.func.DATE(resource_tracker_service_runs.c.started_at)
>= started_from.date()
)
if started_until:
base_query = base_query.where(
sa.func.DATE(resource_tracker_service_runs.c.started_at)
<= started_until.date()
)

subquery = base_query.subquery()
count_query = sa.select(sa.func.count()).select_from(subquery)
count_result = await conn.execute(count_query)

# Default ordering and pagination
list_query = (
base_query.order_by(resource_tracker_service_runs.c.service_key.asc())
.offset(offset)
.limit(limit)
)
list_result = await conn.execute(list_query)

return (
cast(int, count_result.scalar()),
[
OsparcCreditsAggregatedByServiceKeyDB.from_orm(row)
for row in list_result.fetchall()
],
)

async def export_service_runs_table_to_s3(
self,
product_name: ProductName,
Expand Down
Loading

0 comments on commit ef4a7e8

Please sign in to comment.