diff --git a/api/specs/web-server/_resource_usage.py b/api/specs/web-server/_resource_usage.py index dfc7c26a692..c60ab4f36b5 100644 --- a/api/specs/web-server/_resource_usage.py +++ b/api/specs/web-server/_resource_usage.py @@ -26,7 +26,12 @@ UpdatePricingUnitBodyParams, ) from models_library.generics import Envelope -from models_library.resource_tracker import PricingPlanId, PricingUnitId +from models_library.resource_tracker import ( + PricingPlanId, + PricingUnitId, + ServicesAggregatedUsagesTimePeriod, + ServicesAggregatedUsagesType, +) from models_library.rest_pagination import DEFAULT_NUMBER_OF_ITEMS_PER_PAGE from models_library.wallets import WalletID from pydantic import Json, NonNegativeInt @@ -40,6 +45,7 @@ ) from simcore_service_webserver.resource_usage._service_runs_handlers import ( ORDER_BY_DESCRIPTION, + _ListServicesAggregatedUsagesQueryParams, _ListServicesResourceUsagesQueryParams, _ListServicesResourceUsagesQueryParamsWithPagination, ) @@ -85,6 +91,27 @@ async def list_resource_usage_services( ) +@router.get( + "/services/-/aggregated-usages", + response_model=Envelope[list[ServiceRunGet]], + summary="Used credits based on aggregate by type, currently supported `services`. (user and product are taken from context, optionally wallet_id parameter might be provided).", + tags=["usage"], +) +async def list_osparc_credits_aggregated_usages( + aggregated_by: ServicesAggregatedUsagesType, + time_period: ServicesAggregatedUsagesTimePeriod, + wallet_id: Annotated[WalletID, Query], + limit: int = DEFAULT_NUMBER_OF_ITEMS_PER_PAGE, + offset: NonNegativeInt = 0, +): + ... + + +assert_handler_signature_against_model( + list_osparc_credits_aggregated_usages, _ListServicesAggregatedUsagesQueryParams +) + + @router.get( "/services/-/usage-report", status_code=status.HTTP_302_FOUND, diff --git a/packages/models-library/src/models_library/api_schemas_resource_usage_tracker/service_runs.py b/packages/models-library/src/models_library/api_schemas_resource_usage_tracker/service_runs.py index 8abb2da421e..0ec5a1dc2ff 100644 --- a/packages/models-library/src/models_library/api_schemas_resource_usage_tracker/service_runs.py +++ b/packages/models-library/src/models_library/api_schemas_resource_usage_tracker/service_runs.py @@ -38,3 +38,13 @@ class ServiceRunGet(BaseModel): class ServiceRunPage(NamedTuple): items: list[ServiceRunGet] total: PositiveInt + + +class OsparcCreditsAggregatedByServiceGet(BaseModel): + osparc_credits: Decimal + service_key: ServiceKey + + +class OsparcCreditsAggregatedUsagesPage(NamedTuple): + items: list[OsparcCreditsAggregatedByServiceGet] + total: PositiveInt diff --git a/packages/models-library/src/models_library/resource_tracker.py b/packages/models-library/src/models_library/resource_tracker.py index f0478d19e0d..13c92e161ed 100644 --- a/packages/models-library/src/models_library/resource_tracker.py +++ b/packages/models-library/src/models_library/resource_tracker.py @@ -1,7 +1,7 @@ import logging from datetime import datetime, timezone from decimal import Decimal -from enum import auto +from enum import IntEnum, auto from typing import Any, ClassVar, NamedTuple, TypeAlias from pydantic import ( @@ -283,3 +283,13 @@ class Config: }, ] } + + +class ServicesAggregatedUsagesType(StrAutoEnum): + services = "services" + + +class ServicesAggregatedUsagesTimePeriod(IntEnum): + ONE_DAY = 1 + ONE_WEEK = 7 + ONE_MONTH = 30 diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/resource_usage_tracker/service_runs.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/resource_usage_tracker/service_runs.py index 20fd3fa4f33..e826363897a 100644 --- a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/resource_usage_tracker/service_runs.py +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/resource_usage_tracker/service_runs.py @@ -5,11 +5,16 @@ RESOURCE_USAGE_TRACKER_RPC_NAMESPACE, ) from models_library.api_schemas_resource_usage_tracker.service_runs import ( + OsparcCreditsAggregatedUsagesPage, ServiceRunPage, ) from models_library.products import ProductName from models_library.rabbitmq_basic_types import RPCMethodName -from models_library.resource_tracker import ServiceResourceUsagesFilters +from models_library.resource_tracker import ( + ServiceResourceUsagesFilters, + ServicesAggregatedUsagesTimePeriod, + ServicesAggregatedUsagesType, +) from models_library.rest_ordering import OrderBy from models_library.users import UserID from models_library.wallets import WalletID @@ -35,7 +40,7 @@ async def get_service_run_page( wallet_id: WalletID | None = None, access_all_wallet_usage: bool = False, order_by: OrderBy | None = None, - filters: ServiceResourceUsagesFilters | None = None + filters: ServiceResourceUsagesFilters | None = None, ) -> ServiceRunPage: result = await rabbitmq_rpc_client.request( RESOURCE_USAGE_TRACKER_RPC_NAMESPACE, @@ -54,6 +59,36 @@ async def get_service_run_page( return result +@log_decorator(_logger, level=logging.DEBUG) +async def get_osparc_credits_aggregated_usages_page( + rabbitmq_rpc_client: RabbitMQRPCClient, + *, + user_id: UserID, + product_name: ProductName, + aggregated_by: ServicesAggregatedUsagesType, + time_period: ServicesAggregatedUsagesTimePeriod, + limit: int = 20, + offset: int = 0, + wallet_id: WalletID, + access_all_wallet_usage: bool = False, +) -> OsparcCreditsAggregatedUsagesPage: + result = await rabbitmq_rpc_client.request( + RESOURCE_USAGE_TRACKER_RPC_NAMESPACE, + parse_obj_as(RPCMethodName, "get_osparc_credits_aggregated_usages_page"), + user_id=user_id, + product_name=product_name, + limit=limit, + offset=offset, + wallet_id=wallet_id, + access_all_wallet_usage=access_all_wallet_usage, + aggregated_by=aggregated_by, + time_period=time_period, + timeout_s=60, + ) + assert isinstance(result, OsparcCreditsAggregatedUsagesPage) # nosec + return result + + @log_decorator(_logger, level=logging.DEBUG) async def export_service_runs( rabbitmq_rpc_client: RabbitMQRPCClient, @@ -63,7 +98,7 @@ async def export_service_runs( wallet_id: WalletID | None = None, access_all_wallet_usage: bool = False, order_by: OrderBy | None = None, - filters: ServiceResourceUsagesFilters | None = None + filters: ServiceResourceUsagesFilters | None = None, ) -> AnyUrl: result: AnyUrl = await rabbitmq_rpc_client.request( RESOURCE_USAGE_TRACKER_RPC_NAMESPACE, diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/api/rpc/_resource_tracker.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/api/rpc/_resource_tracker.py index 75b96cf8737..cae70b1152c 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/api/rpc/_resource_tracker.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/api/rpc/_resource_tracker.py @@ -5,6 +5,7 @@ PricingUnitGet, ) from models_library.api_schemas_resource_usage_tracker.service_runs import ( + OsparcCreditsAggregatedUsagesPage, ServiceRunPage, ) from models_library.products import ProductName @@ -16,6 +17,8 @@ PricingUnitWithCostCreate, PricingUnitWithCostUpdate, ServiceResourceUsagesFilters, + ServicesAggregatedUsagesTimePeriod, + ServicesAggregatedUsagesType, ) from models_library.rest_ordering import OrderBy from models_library.services import ServiceKey, ServiceVersion @@ -95,6 +98,32 @@ async def export_service_runs( ) +@router.expose(reraise_if_error_type=(CustomResourceUsageTrackerError,)) +async def get_osparc_credits_aggregated_usages_page( + app: FastAPI, + *, + user_id: UserID, + product_name: ProductName, + aggregated_by: ServicesAggregatedUsagesType, + time_period: ServicesAggregatedUsagesTimePeriod, + limit: int = 20, + offset: int = 0, + wallet_id: WalletID, + access_all_wallet_usage: bool = False, +) -> OsparcCreditsAggregatedUsagesPage: + return await service_runs.get_osparc_credits_aggregated_usages_page( + user_id=user_id, + product_name=product_name, + resource_tracker_repo=ResourceTrackerRepository(db_engine=app.state.engine), + aggregated_by=aggregated_by, + time_period=time_period, + limit=limit, + offset=offset, + wallet_id=wallet_id, + access_all_wallet_usage=access_all_wallet_usage, + ) + + ## Pricing plans diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/models/resource_tracker_service_runs.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/models/resource_tracker_service_runs.py index f68e9ff2c5a..45cddca3057 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/models/resource_tracker_service_runs.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/models/resource_tracker_service_runs.py @@ -106,6 +106,14 @@ class Config: orm_mode = True +class OsparcCreditsAggregatedByServiceKeyDB(BaseModel): + osparc_credits: Decimal + service_key: ServiceKey + + class Config: + orm_mode = True + + class ServiceRunForCheckDB(BaseModel): service_run_id: ServiceRunId last_heartbeat_at: datetime diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/modules/db/repositories/resource_tracker.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/modules/db/repositories/resource_tracker.py index 8d16475e621..a341bf35e6a 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/modules/db/repositories/resource_tracker.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/modules/db/repositories/resource_tracker.py @@ -10,6 +10,7 @@ from models_library.api_schemas_storage import S3BucketName from models_library.products import ProductName from models_library.resource_tracker import ( + CreditClassification, CreditTransactionId, CreditTransactionStatus, PricingPlanCreate, @@ -63,6 +64,7 @@ from ....models.resource_tracker_pricing_unit_costs import PricingUnitCostsDB from ....models.resource_tracker_pricing_units import PricingUnitsDB from ....models.resource_tracker_service_runs import ( + OsparcCreditsAggregatedByServiceKeyDB, ServiceRunCreate, ServiceRunDB, ServiceRunForCheckDB, @@ -309,6 +311,89 @@ async def list_service_runs_by_product_and_user_and_wallet( return [ServiceRunWithCreditsDB.from_orm(row) for row in result.fetchall()] + async def get_osparc_credits_aggregated_by_service( + self, + product_name: ProductName, + *, + user_id: UserID | None, + wallet_id: WalletID, + offset: int, + limit: int, + started_from: datetime | None = None, + started_until: datetime | None = None, + ) -> tuple[int, list[OsparcCreditsAggregatedByServiceKeyDB]]: + async with self.db_engine.begin() as conn: + base_query = ( + sa.select( + resource_tracker_service_runs.c.service_key, + sa.func.SUM( + resource_tracker_credit_transactions.c.osparc_credits + ).label("osparc_credits"), + ) + .select_from( + resource_tracker_service_runs.join( + resource_tracker_credit_transactions, + ( + resource_tracker_service_runs.c.product_name + == resource_tracker_credit_transactions.c.product_name + ) + & ( + resource_tracker_service_runs.c.service_run_id + == resource_tracker_credit_transactions.c.service_run_id + ), + isouter=True, + ) + ) + .where( + (resource_tracker_service_runs.c.product_name == product_name) + & ( + resource_tracker_credit_transactions.c.transaction_status + == CreditTransactionStatus.BILLED + ) + & ( + resource_tracker_credit_transactions.c.transaction_classification + == CreditClassification.DEDUCT_SERVICE_RUN + ) + & (resource_tracker_credit_transactions.c.wallet_id == wallet_id) + ) + .group_by(resource_tracker_service_runs.c.service_key) + ) + + if user_id: + base_query = base_query.where( + resource_tracker_service_runs.c.user_id == user_id + ) + if started_from: + base_query = base_query.where( + sa.func.DATE(resource_tracker_service_runs.c.started_at) + >= started_from.date() + ) + if started_until: + base_query = base_query.where( + sa.func.DATE(resource_tracker_service_runs.c.started_at) + <= started_until.date() + ) + + subquery = base_query.subquery() + count_query = sa.select(sa.func.count()).select_from(subquery) + count_result = await conn.execute(count_query) + + # Default ordering and pagination + list_query = ( + base_query.order_by(resource_tracker_service_runs.c.service_key.asc()) + .offset(offset) + .limit(limit) + ) + list_result = await conn.execute(list_query) + + return ( + cast(int, count_result.scalar()), + [ + OsparcCreditsAggregatedByServiceKeyDB.from_orm(row) + for row in list_result.fetchall() + ], + ) + async def export_service_runs_table_to_s3( self, product_name: ProductName, diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/resource_tracker_service_runs.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/resource_tracker_service_runs.py index d704ef79fe5..75e17e28c96 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/resource_tracker_service_runs.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/resource_tracker_service_runs.py @@ -1,14 +1,20 @@ -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone import shortuuid from aws_library.s3 import SimcoreS3API from models_library.api_schemas_resource_usage_tracker.service_runs import ( + OsparcCreditsAggregatedByServiceGet, + OsparcCreditsAggregatedUsagesPage, ServiceRunGet, ServiceRunPage, ) from models_library.api_schemas_storage import S3BucketName from models_library.products import ProductName -from models_library.resource_tracker import ServiceResourceUsagesFilters +from models_library.resource_tracker import ( + ServiceResourceUsagesFilters, + ServicesAggregatedUsagesTimePeriod, + ServicesAggregatedUsagesType, +) from models_library.rest_ordering import OrderBy from models_library.users import UserID from models_library.wallets import WalletID @@ -40,7 +46,7 @@ async def list_service_runs( started_from = filters.started_at.from_ started_until = filters.started_at.until - # Situation when we want to see all usage of a specific user + # Situation when we want to see all usage of a specific user (ex. for Non billable product) if wallet_id is None and access_all_wallet_usage is False: total_service_runs: PositiveInt = await resource_tracker_repo.total_service_runs_by_product_and_user_and_wallet( product_name, @@ -177,3 +183,43 @@ async def export_service_runs( expiration_secs=_PRESIGNED_LINK_EXPIRATION_SEC, ) return generated_url + + +async def get_osparc_credits_aggregated_usages_page( + user_id: UserID, + product_name: ProductName, + resource_tracker_repo: ResourceTrackerRepository, + aggregated_by: ServicesAggregatedUsagesType, + time_period: ServicesAggregatedUsagesTimePeriod, + wallet_id: WalletID, + access_all_wallet_usage: bool = False, + limit: int = 20, + offset: int = 0, +) -> OsparcCreditsAggregatedUsagesPage: + current_datetime = datetime.now(tz=timezone.utc) + started_from = current_datetime - timedelta(days=time_period.value) + + assert aggregated_by == ServicesAggregatedUsagesType.services # nosec + + ( + count_output_list_db, + output_list_db, + ) = await resource_tracker_repo.get_osparc_credits_aggregated_by_service( + product_name=product_name, + user_id=user_id if access_all_wallet_usage is False else None, + wallet_id=wallet_id, + offset=offset, + limit=limit, + started_from=started_from, + started_until=None, + ) + output_api_model: list[OsparcCreditsAggregatedByServiceGet] = [] + for item in output_list_db: + output_api_model.append( + OsparcCreditsAggregatedByServiceGet.construct( + osparc_credits=item.osparc_credits, + service_key=item.service_key, + ) + ) + + return OsparcCreditsAggregatedUsagesPage(output_api_model, count_output_list_db) diff --git a/services/resource-usage-tracker/tests/unit/with_dbs/test_api_resource_tracker_service_runs__list_aggregated_usages.py b/services/resource-usage-tracker/tests/unit/with_dbs/test_api_resource_tracker_service_runs__list_aggregated_usages.py new file mode 100644 index 00000000000..36c9ef3feca --- /dev/null +++ b/services/resource-usage-tracker/tests/unit/with_dbs/test_api_resource_tracker_service_runs__list_aggregated_usages.py @@ -0,0 +1,226 @@ +from collections.abc import Iterator +from datetime import datetime, timedelta, timezone + +import pytest +import sqlalchemy as sa +from models_library.api_schemas_resource_usage_tracker.service_runs import ( + OsparcCreditsAggregatedUsagesPage, +) +from models_library.resource_tracker import ( + ServicesAggregatedUsagesTimePeriod, + ServicesAggregatedUsagesType, +) +from servicelib.rabbitmq import RabbitMQRPCClient +from servicelib.rabbitmq.rpc_interfaces.resource_usage_tracker import service_runs +from simcore_postgres_database.models.resource_tracker_credit_transactions import ( + resource_tracker_credit_transactions, +) +from simcore_postgres_database.models.resource_tracker_service_runs import ( + resource_tracker_service_runs, +) + +pytest_simcore_core_services_selection = ["postgres", "rabbit"] +pytest_simcore_ops_services_selection = [ + "adminer", +] + + +_USER_ID_1 = 1 +_USER_ID_2 = 2 +_SERVICE_RUN_ID_1 = "1" +_SERVICE_RUN_ID_2 = "2" +_SERVICE_RUN_ID_3 = "3" +_SERVICE_RUN_ID_4 = "4" +_SERVICE_RUN_ID_5 = "5" +_SERVICE_RUN_ID_6 = "6" +_WALLET_ID = 6 + + +@pytest.fixture() +def resource_tracker_setup_db( + postgres_db: sa.engine.Engine, + random_resource_tracker_service_run, + random_resource_tracker_credit_transactions, +) -> Iterator[None]: + with postgres_db.connect() as con: + # Service run table + result = con.execute( + resource_tracker_service_runs.insert() + .values( + [ + random_resource_tracker_service_run( + user_id=_USER_ID_1, + service_run_id=_SERVICE_RUN_ID_1, + started_at=datetime.now(tz=timezone.utc), + service_key="simcore/services/dynamic/jupyter-smash", + ), + random_resource_tracker_service_run( + user_id=_USER_ID_2, + service_run_id=_SERVICE_RUN_ID_2, + started_at=datetime.now(tz=timezone.utc), + service_key="simcore/services/dynamic/jupyter-smash", + ), + random_resource_tracker_service_run( + user_id=_USER_ID_1, + service_run_id=_SERVICE_RUN_ID_3, + started_at=datetime.now(tz=timezone.utc), + service_key="simcore/services/dynamic/jupyter-smash", + ), + random_resource_tracker_service_run( + user_id=_USER_ID_1, + service_run_id=_SERVICE_RUN_ID_4, + started_at=datetime.now(tz=timezone.utc), + service_key="simcore/services/dynamic/jupyter-smash", + ), + random_resource_tracker_service_run( + user_id=_USER_ID_1, + service_run_id=_SERVICE_RUN_ID_5, + started_at=datetime.now(tz=timezone.utc) - timedelta(days=3), + service_key="simcore/services/dynamic/jupyter-smash", + ), + random_resource_tracker_service_run( + user_id=_USER_ID_1, + service_run_id=_SERVICE_RUN_ID_6, + started_at=datetime.now(tz=timezone.utc) - timedelta(days=10), + service_key="simcore/services/dynamic/sim4life", + ), + ] + ) + .returning(resource_tracker_service_runs) + ) + row = result.first() + assert row + + # Transaction table + result = con.execute( + resource_tracker_credit_transactions.insert() + .values( + [ + random_resource_tracker_credit_transactions( + user_id=_USER_ID_1, + service_run_id=_SERVICE_RUN_ID_1, + product_name="osparc", + transaction_status="BILLED", + transaction_classification="DEDUCT_SERVICE_RUN", + wallet_id=_WALLET_ID, + ), + random_resource_tracker_credit_transactions( + user_id=_USER_ID_2, + service_run_id=_SERVICE_RUN_ID_2, + product_name="osparc", + transaction_status="BILLED", + transaction_classification="DEDUCT_SERVICE_RUN", + wallet_id=_WALLET_ID, + ), + random_resource_tracker_credit_transactions( + user_id=_USER_ID_1, + service_run_id=_SERVICE_RUN_ID_4, + product_name="osparc", + transaction_status="BILLED", + transaction_classification="DEDUCT_SERVICE_RUN", + wallet_id=_WALLET_ID, + ), + random_resource_tracker_credit_transactions( + user_id=_USER_ID_1, + service_run_id=_SERVICE_RUN_ID_5, + product_name="osparc", + transaction_status="BILLED", + transaction_classification="DEDUCT_SERVICE_RUN", + wallet_id=_WALLET_ID, + ), + random_resource_tracker_credit_transactions( + user_id=_USER_ID_1, + service_run_id=_SERVICE_RUN_ID_6, + product_name="osparc", + transaction_status="BILLED", + transaction_classification="DEDUCT_SERVICE_RUN", + wallet_id=_WALLET_ID, + ), + ] + ) + .returning(resource_tracker_credit_transactions) + ) + row = result.first() + assert row + + yield + + con.execute(resource_tracker_credit_transactions.delete()) + con.execute(resource_tracker_service_runs.delete()) + + +async def test_rpc_get_osparc_credits_aggregated_usages_page( + mocked_redis_server: None, + postgres_db: sa.engine.Engine, + rpc_client: RabbitMQRPCClient, + resource_tracker_setup_db: dict, +): + result = await service_runs.get_osparc_credits_aggregated_usages_page( + rpc_client, + user_id=_USER_ID_1, + product_name="osparc", + aggregated_by=ServicesAggregatedUsagesType.services, + time_period=ServicesAggregatedUsagesTimePeriod.ONE_DAY, + wallet_id=123, # <-- testing non existing wallet + access_all_wallet_usage=False, + ) + assert isinstance(result, OsparcCreditsAggregatedUsagesPage) + assert len(result.items) == 0 + assert result.total == 0 + + result = await service_runs.get_osparc_credits_aggregated_usages_page( + rpc_client, + user_id=_USER_ID_1, + product_name="osparc", + aggregated_by=ServicesAggregatedUsagesType.services, + time_period=ServicesAggregatedUsagesTimePeriod.ONE_DAY, # <-- testing + wallet_id=_WALLET_ID, + access_all_wallet_usage=False, + ) + assert isinstance(result, OsparcCreditsAggregatedUsagesPage) + assert len(result.items) == 1 + assert result.total == 1 + first_osparc_credits = result.items[0].osparc_credits + + result = await service_runs.get_osparc_credits_aggregated_usages_page( + rpc_client, + user_id=_USER_ID_1, + product_name="osparc", + aggregated_by=ServicesAggregatedUsagesType.services, + time_period=ServicesAggregatedUsagesTimePeriod.ONE_DAY, + wallet_id=_WALLET_ID, + access_all_wallet_usage=True, # <-- testing + ) + assert isinstance(result, OsparcCreditsAggregatedUsagesPage) + assert len(result.items) == 1 + assert result.total == 1 + second_osparc_credits = result.items[0].osparc_credits + assert second_osparc_credits < first_osparc_credits + + result = await service_runs.get_osparc_credits_aggregated_usages_page( + rpc_client, + user_id=_USER_ID_1, + product_name="osparc", + aggregated_by=ServicesAggregatedUsagesType.services, + time_period=ServicesAggregatedUsagesTimePeriod.ONE_WEEK, # <-- testing + wallet_id=_WALLET_ID, + access_all_wallet_usage=False, + ) + assert isinstance(result, OsparcCreditsAggregatedUsagesPage) + assert len(result.items) == 1 + assert result.total == 1 + third_osparc_credits = result.items[0].osparc_credits + assert third_osparc_credits < first_osparc_credits + + result = await service_runs.get_osparc_credits_aggregated_usages_page( + rpc_client, + user_id=_USER_ID_1, + product_name="osparc", + aggregated_by=ServicesAggregatedUsagesType.services, + time_period=ServicesAggregatedUsagesTimePeriod.ONE_MONTH, # <-- testing + wallet_id=_WALLET_ID, + access_all_wallet_usage=False, + ) + assert isinstance(result, OsparcCreditsAggregatedUsagesPage) + assert len(result.items) == 2 + assert result.total == 2 diff --git a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml index 7413f09d4d8..7204cd82d4b 100644 --- a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml +++ b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml @@ -3493,7 +3493,7 @@ paths: '403': description: ProjectInvalidRightsError '404': - description: ProjectNotFoundError, UserDefaultWalletNotFoundError + description: UserDefaultWalletNotFoundError, ProjectNotFoundError '409': description: ProjectTooManyProjectOpenedError '422': @@ -3733,6 +3733,55 @@ paths: application/json: schema: $ref: '#/components/schemas/Envelope_list_models_library.api_schemas_webserver.resource_usage.ServiceRunGet__' + /v0/services/-/aggregated-usages: + get: + tags: + - usage + summary: Used credits based on aggregate by type, currently supported `services`. + (user and product are taken from context, optionally wallet_id parameter might + be provided). + operationId: list_osparc_credits_aggregated_usages + parameters: + - required: true + schema: + $ref: '#/components/schemas/ServicesAggregatedUsagesType' + name: aggregated_by + in: query + - required: true + schema: + $ref: '#/components/schemas/ServicesAggregatedUsagesTimePeriod' + name: time_period + in: query + - required: true + schema: + title: Wallet Id + exclusiveMinimum: true + type: integer + minimum: 0 + name: wallet_id + in: query + - required: false + schema: + title: Limit + type: integer + default: 20 + name: limit + in: query + - required: false + schema: + title: Offset + minimum: 0 + type: integer + default: 0 + name: offset + in: query + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/Envelope_list_models_library.api_schemas_webserver.resource_usage.ServiceRunGet__' /v0/services/-/usage-report: get: tags: @@ -11153,6 +11202,20 @@ components: - backend type: string description: An enumeration. + ServicesAggregatedUsagesTimePeriod: + title: ServicesAggregatedUsagesTimePeriod + enum: + - 1 + - 7 + - 30 + type: integer + description: An enumeration. + ServicesAggregatedUsagesType: + title: ServicesAggregatedUsagesType + enum: + - services + type: string + description: An enumeration. SimCoreFileLink: title: SimCoreFileLink required: diff --git a/services/web/server/src/simcore_service_webserver/resource_usage/_service_runs_api.py b/services/web/server/src/simcore_service_webserver/resource_usage/_service_runs_api.py index cbe3911f9dd..0b277177624 100644 --- a/services/web/server/src/simcore_service_webserver/resource_usage/_service_runs_api.py +++ b/services/web/server/src/simcore_service_webserver/resource_usage/_service_runs_api.py @@ -1,10 +1,15 @@ from aiohttp import web from models_library.api_schemas_resource_usage_tracker.service_runs import ( + OsparcCreditsAggregatedUsagesPage, ServiceRunPage, ) from models_library.api_schemas_webserver.wallets import WalletGetPermissions from models_library.products import ProductName -from models_library.resource_tracker import ServiceResourceUsagesFilters +from models_library.resource_tracker import ( + ServiceResourceUsagesFilters, + ServicesAggregatedUsagesTimePeriod, + ServicesAggregatedUsagesType, +) from models_library.rest_ordering import OrderBy from models_library.users import UserID from models_library.wallets import WalletID @@ -48,6 +53,39 @@ async def list_usage_services( ) +async def get_osparc_credits_aggregated_usages_page( + app: web.Application, + user_id: UserID, + product_name: ProductName, + wallet_id: WalletID, + aggregated_by: ServicesAggregatedUsagesType, + time_period: ServicesAggregatedUsagesTimePeriod, + offset: int, + limit: NonNegativeInt, +) -> OsparcCreditsAggregatedUsagesPage: + access_all_wallet_usage = False + if wallet_id: + wallet: WalletGetPermissions = ( + await wallet_api.get_wallet_with_permissions_by_user( + app=app, user_id=user_id, wallet_id=wallet_id, product_name=product_name + ) + ) + access_all_wallet_usage = wallet.write is True + + rpc_client = get_rabbitmq_rpc_client(app) + return await service_runs.get_osparc_credits_aggregated_usages_page( + rpc_client, + user_id=user_id, + product_name=product_name, + wallet_id=wallet_id, + access_all_wallet_usage=access_all_wallet_usage, + aggregated_by=aggregated_by, + time_period=time_period, + offset=offset, + limit=limit, + ) + + async def export_usage_services( app: web.Application, user_id: UserID, diff --git a/services/web/server/src/simcore_service_webserver/resource_usage/_service_runs_handlers.py b/services/web/server/src/simcore_service_webserver/resource_usage/_service_runs_handlers.py index a5efdd515f2..3f0120178de 100644 --- a/services/web/server/src/simcore_service_webserver/resource_usage/_service_runs_handlers.py +++ b/services/web/server/src/simcore_service_webserver/resource_usage/_service_runs_handlers.py @@ -3,14 +3,20 @@ from aiohttp import web from models_library.api_schemas_resource_usage_tracker.service_runs import ( + OsparcCreditsAggregatedUsagesPage, ServiceRunPage, ) -from models_library.resource_tracker import ServiceResourceUsagesFilters +from models_library.resource_tracker import ( + ServiceResourceUsagesFilters, + ServicesAggregatedUsagesTimePeriod, + ServicesAggregatedUsagesType, +) from models_library.rest_ordering import OrderBy, OrderDirection from models_library.rest_pagination import ( DEFAULT_NUMBER_OF_ITEMS_PER_PAGE, MAXIMUM_NUMBER_OF_ITEMS_PER_PAGE, Page, + PageQueryParameters, ) from models_library.rest_pagination_utils import paginate_data from models_library.users import UserID @@ -126,6 +132,15 @@ class Config: extra = Extra.forbid +class _ListServicesAggregatedUsagesQueryParams(PageQueryParameters): + aggregated_by: ServicesAggregatedUsagesType + time_period: ServicesAggregatedUsagesTimePeriod + wallet_id: WalletID + + class Config: + extra = Extra.forbid + + # # API handlers # @@ -169,6 +184,47 @@ async def list_resource_usage_services(request: web.Request): ) +@routes.get( + f"/{VTAG}/services/-/aggregated-usages", + name="list_osparc_credits_aggregated_usages", +) +@login_required +@permission_required("resource-usage.read") +@_handle_resource_usage_exceptions +async def list_osparc_credits_aggregated_usages(request: web.Request): + req_ctx = _RequestContext.parse_obj(request) + query_params = parse_request_query_parameters_as( + _ListServicesAggregatedUsagesQueryParams, request + ) + + aggregated_services: OsparcCreditsAggregatedUsagesPage = ( + await api.get_osparc_credits_aggregated_usages_page( + app=request.app, + user_id=req_ctx.user_id, + product_name=req_ctx.product_name, + wallet_id=query_params.wallet_id, + aggregated_by=query_params.aggregated_by, + time_period=query_params.time_period, + offset=query_params.offset, + limit=query_params.limit, + ) + ) + + page = Page[dict[str, Any]].parse_obj( + paginate_data( + chunk=aggregated_services.items, + request_url=request.url, + total=aggregated_services.total, + limit=query_params.limit, + offset=query_params.offset, + ) + ) + return web.Response( + text=page.json(**RESPONSE_MODEL_POLICY), + content_type=MIMETYPE_APPLICATION_JSON, + ) + + @routes.get(f"/{VTAG}/services/-/usage-report", name="export_resource_usage_services") @login_required @permission_required("resource-usage.read") diff --git a/services/web/server/tests/unit/with_dbs/03/resource_usage/test_list_osparc_credits_aggregated_usages.py b/services/web/server/tests/unit/with_dbs/03/resource_usage/test_list_osparc_credits_aggregated_usages.py new file mode 100644 index 00000000000..2ed80e165a6 --- /dev/null +++ b/services/web/server/tests/unit/with_dbs/03/resource_usage/test_list_osparc_credits_aggregated_usages.py @@ -0,0 +1,102 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=unused-variable +# pylint: disable=too-many-arguments +# pylint: disable=too-many-statements + + +from collections.abc import Iterator +from decimal import Decimal +from http import HTTPStatus +from typing import cast +from unittest.mock import MagicMock + +import pytest +import sqlalchemy as sa +from aiohttp.test_utils import TestClient +from models_library.api_schemas_resource_usage_tracker.service_runs import ( + OsparcCreditsAggregatedByServiceGet, + OsparcCreditsAggregatedUsagesPage, +) +from pytest_mock.plugin import MockerFixture +from pytest_simcore.helpers.webserver_login import UserInfoDict +from servicelib.aiohttp import status +from simcore_postgres_database.models.wallets import wallets +from simcore_service_webserver.db.models import UserRole + +_SERVICE_RUN_GET = OsparcCreditsAggregatedUsagesPage( + items=[ + OsparcCreditsAggregatedByServiceGet( + **{ + "osparc_credits": Decimal(-50), + "service_key": "simcore/services/comp/itis/sleeper", + } + ) + ], + total=1, +) + + +@pytest.fixture +def mock_get_osparc_credits_aggregated_usages_page(mocker: MockerFixture) -> MagicMock: + return mocker.patch( + "simcore_service_webserver.resource_usage._service_runs_api.service_runs.get_osparc_credits_aggregated_usages_page", + spec=True, + return_value=_SERVICE_RUN_GET, + ) + + +@pytest.fixture() +def setup_wallets_db( + postgres_db: sa.engine.Engine, logged_user: UserInfoDict +) -> Iterator[int]: + with postgres_db.connect() as con: + result = con.execute( + wallets.insert() + .values( + name="My wallet", + owner=logged_user["primary_gid"], + status="ACTIVE", + product_name="osparc", + ) + .returning(sa.literal_column("*")) + ) + row = result.fetchone() + yield cast(int, row[0]) + con.execute(wallets.delete()) + + +@pytest.mark.parametrize( + "user_role,expected", + [ + (UserRole.ANONYMOUS, status.HTTP_401_UNAUTHORIZED), + (UserRole.GUEST, status.HTTP_403_FORBIDDEN), + (UserRole.USER, status.HTTP_200_OK), + (UserRole.TESTER, status.HTTP_200_OK), + (UserRole.PRODUCT_OWNER, status.HTTP_200_OK), + (UserRole.ADMIN, status.HTTP_200_OK), + ], +) +async def test_export_service_usage_redirection( + client: TestClient, + logged_user: UserInfoDict, + setup_wallets_db, + mock_get_osparc_credits_aggregated_usages_page, + user_role: UserRole, + expected: HTTPStatus, +): + assert client.app + url = ( + client.app.router["list_osparc_credits_aggregated_usages"] + .url_for() + .with_query( + wallet_id=f"{setup_wallets_db}", + aggregated_by="services", + time_period=1, + ) + ) + resp = await client.get(f"{url}") + assert resp.status == expected + + if resp.status == status.HTTP_200_OK: + assert mock_get_osparc_credits_aggregated_usages_page.called