mephisto.data_model.requester

View Source
#!/usr/bin/env python3

# Copyright (c) Meta Platforms and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from abc import abstractmethod, abstractstaticmethod
from mephisto.data_model._db_backed_meta import (
    MephistoDBBackedABCMeta,
    MephistoDataModelComponentMixin,
)
from dataclasses import dataclass, field
from omegaconf import MISSING, DictConfig

from typing import List, Optional, Mapping, Dict, TYPE_CHECKING, Any, Type, ClassVar

if TYPE_CHECKING:
    from mephisto.abstractions.database import MephistoDB
    from mephisto.data_model.task_run import TaskRun
    from argparse import _ArgumentGroup as ArgumentGroup

from mephisto.utils.logger_core import get_logger

logger = get_logger(name=__name__)


@dataclass
class RequesterArgs:
    """Base class for arguments to register a requester"""

    name: str = field(
        default=MISSING,
        metadata={
            "help": "Name for the requester in the Mephisto DB.",
            "required": True,
        },
    )


class Requester(MephistoDataModelComponentMixin, metaclass=MephistoDBBackedABCMeta):
    """
    High level class representing a requester on some kind of crowd provider. Sets some default
    initializations, but mostly should be extended by the specific requesters for crowd providers
    with whatever implementation details are required to get those to work.
    """

    ArgsClass: ClassVar[Type["RequesterArgs"]] = RequesterArgs

    def __init__(
        self,
        db: "MephistoDB",
        db_id: str,
        row: Optional[Mapping[str, Any]] = None,
        _used_new_call: bool = False,
    ):
        if not _used_new_call:
            raise AssertionError(
                "Direct Requester and data model access via Requester(db, id) is "
                "now deprecated in favor of calling Requester.get(db, id). "
            )
        self.db: "MephistoDB" = db
        if row is None:
            row = db.get_requester(db_id)
        assert row is not None, f"Given db_id {db_id} did not exist in given db"
        self.db_id: str = row["requester_id"]
        self.provider_type: str = row["provider_type"]
        self.requester_name: str = row["requester_name"]

    def __new__(
        cls,
        db: "MephistoDB",
        db_id: str,
        row: Optional[Mapping[str, Any]] = None,
        _used_new_call: bool = False,
    ) -> "Requester":
        """
        The new method is overridden to be able to automatically generate
        the expected Requester class without needing to specifically find it
        for a given db_id. As such it is impossible to create a base Requester
        as you will instead be returned the correct Requester class according to
        the crowdprovider associated with this Requester.
        """
        from mephisto.operations.registry import get_crowd_provider_from_type

        if cls == Requester:
            # We are trying to construct a Requester, find what type to use and
            # create that instead
            if row is None:
                row = db.get_requester(db_id)
            assert row is not None, f"Given db_id {db_id} did not exist in given db"
            correct_class = get_crowd_provider_from_type(
                row["provider_type"]
            ).RequesterClass
            return super().__new__(correct_class)
        else:
            # We are constructing another instance directly
            return super().__new__(cls)

    def get_task_runs(self) -> List["TaskRun"]:
        """
        Return the list of task runs that are run by this requester
        """
        return self.db.find_task_runs(requester_id=self.db_id)

    def get_total_spend(self) -> float:
        """
        Return the total amount of funding spent by this requester
        across all tasks.
        """
        task_runs = self.db.find_task_runs(requester_id=self.db_id)
        total_spend = 0.0
        for run in task_runs:
            total_spend += run.get_total_spend()
        return total_spend

    @classmethod
    def is_sandbox(self) -> bool:
        """
        Determine if this is a requester on a sandbox/test account
        """
        return False

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}({self.db_id})"

    @staticmethod
    def _register_requester(
        db: "MephistoDB", requester_id: str, provider_type: str
    ) -> "Requester":
        """
        Create an entry for this requester in the database
        """
        db_id = db.new_requester(requester_id, provider_type)
        requester = Requester.get(db, db_id)
        logger.debug(f"Registered new requester {requester}")
        return requester

    # Children classes should implement the following methods

    def register(self, args: Optional[DictConfig] = None) -> None:
        """
        Register this requester with the crowd provider by providing any required credentials
        or such. If no args are provided, assume the registration is already made and try
        to assert it as such.

        Should throw an exception if something is wrong with provided registration arguments.
        """
        raise NotImplementedError()

    def is_registered(self) -> bool:
        """Check to see if this requester has already been registered"""
        raise NotImplementedError()

    def get_available_budget(self) -> float:
        """
        Return the funds that this requester profile has available for usage with
        its crowdsourcing vendor
        """
        raise NotImplementedError()

    def to_dict(self) -> Dict[str, Any]:
        """
        Produce a dict of this requester and important features for json serialization
        """
        return {
            "requester_id": self.db_id,
            "provider_type": self.provider_type,
            "requester_name": self.requester_name,
            "registered": self.is_registered(),
        }

    @staticmethod
    def new(db: "MephistoDB", requester_name: str) -> "Requester":
        """
        Try to create a new requester by this name, raise an exception if
        the name already exists.

        Implementation should call _register_requester(db, requester_id) when sure the requester
        can be successfully created to have it put into the db and return the result.
        """
        raise NotImplementedError()
#   class RequesterArgs:
View Source
class RequesterArgs:
    """Base class for arguments to register a requester"""

    name: str = field(
        default=MISSING,
        metadata={
            "help": "Name for the requester in the Mephisto DB.",
            "required": True,
        },
    )

Base class for arguments to register a requester

#   RequesterArgs(name: str = '???')
#   name: str = '???'
View Source
class Requester(MephistoDataModelComponentMixin, metaclass=MephistoDBBackedABCMeta):
    """
    High level class representing a requester on some kind of crowd provider. Sets some default
    initializations, but mostly should be extended by the specific requesters for crowd providers
    with whatever implementation details are required to get those to work.
    """

    ArgsClass: ClassVar[Type["RequesterArgs"]] = RequesterArgs

    def __init__(
        self,
        db: "MephistoDB",
        db_id: str,
        row: Optional[Mapping[str, Any]] = None,
        _used_new_call: bool = False,
    ):
        if not _used_new_call:
            raise AssertionError(
                "Direct Requester and data model access via Requester(db, id) is "
                "now deprecated in favor of calling Requester.get(db, id). "
            )
        self.db: "MephistoDB" = db
        if row is None:
            row = db.get_requester(db_id)
        assert row is not None, f"Given db_id {db_id} did not exist in given db"
        self.db_id: str = row["requester_id"]
        self.provider_type: str = row["provider_type"]
        self.requester_name: str = row["requester_name"]

    def __new__(
        cls,
        db: "MephistoDB",
        db_id: str,
        row: Optional[Mapping[str, Any]] = None,
        _used_new_call: bool = False,
    ) -> "Requester":
        """
        The new method is overridden to be able to automatically generate
        the expected Requester class without needing to specifically find it
        for a given db_id. As such it is impossible to create a base Requester
        as you will instead be returned the correct Requester class according to
        the crowdprovider associated with this Requester.
        """
        from mephisto.operations.registry import get_crowd_provider_from_type

        if cls == Requester:
            # We are trying to construct a Requester, find what type to use and
            # create that instead
            if row is None:
                row = db.get_requester(db_id)
            assert row is not None, f"Given db_id {db_id} did not exist in given db"
            correct_class = get_crowd_provider_from_type(
                row["provider_type"]
            ).RequesterClass
            return super().__new__(correct_class)
        else:
            # We are constructing another instance directly
            return super().__new__(cls)

    def get_task_runs(self) -> List["TaskRun"]:
        """
        Return the list of task runs that are run by this requester
        """
        return self.db.find_task_runs(requester_id=self.db_id)

    def get_total_spend(self) -> float:
        """
        Return the total amount of funding spent by this requester
        across all tasks.
        """
        task_runs = self.db.find_task_runs(requester_id=self.db_id)
        total_spend = 0.0
        for run in task_runs:
            total_spend += run.get_total_spend()
        return total_spend

    @classmethod
    def is_sandbox(self) -> bool:
        """
        Determine if this is a requester on a sandbox/test account
        """
        return False

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}({self.db_id})"

    @staticmethod
    def _register_requester(
        db: "MephistoDB", requester_id: str, provider_type: str
    ) -> "Requester":
        """
        Create an entry for this requester in the database
        """
        db_id = db.new_requester(requester_id, provider_type)
        requester = Requester.get(db, db_id)
        logger.debug(f"Registered new requester {requester}")
        return requester

    # Children classes should implement the following methods

    def register(self, args: Optional[DictConfig] = None) -> None:
        """
        Register this requester with the crowd provider by providing any required credentials
        or such. If no args are provided, assume the registration is already made and try
        to assert it as such.

        Should throw an exception if something is wrong with provided registration arguments.
        """
        raise NotImplementedError()

    def is_registered(self) -> bool:
        """Check to see if this requester has already been registered"""
        raise NotImplementedError()

    def get_available_budget(self) -> float:
        """
        Return the funds that this requester profile has available for usage with
        its crowdsourcing vendor
        """
        raise NotImplementedError()

    def to_dict(self) -> Dict[str, Any]:
        """
        Produce a dict of this requester and important features for json serialization
        """
        return {
            "requester_id": self.db_id,
            "provider_type": self.provider_type,
            "requester_name": self.requester_name,
            "registered": self.is_registered(),
        }

    @staticmethod
    def new(db: "MephistoDB", requester_name: str) -> "Requester":
        """
        Try to create a new requester by this name, raise an exception if
        the name already exists.

        Implementation should call _register_requester(db, requester_id) when sure the requester
        can be successfully created to have it put into the db and return the result.
        """
        raise NotImplementedError()

High level class representing a requester on some kind of crowd provider. Sets some default initializations, but mostly should be extended by the specific requesters for crowd providers with whatever implementation details are required to get those to work.

#   Requester( db: mephisto.abstractions.database.MephistoDB, db_id: str, row: Union[Mapping[str, Any], NoneType] = None, _used_new_call: bool = False )
View Source
    def __new__(
        cls,
        db: "MephistoDB",
        db_id: str,
        row: Optional[Mapping[str, Any]] = None,
        _used_new_call: bool = False,
    ) -> "Requester":
        """
        The new method is overridden to be able to automatically generate
        the expected Requester class without needing to specifically find it
        for a given db_id. As such it is impossible to create a base Requester
        as you will instead be returned the correct Requester class according to
        the crowdprovider associated with this Requester.
        """
        from mephisto.operations.registry import get_crowd_provider_from_type

        if cls == Requester:
            # We are trying to construct a Requester, find what type to use and
            # create that instead
            if row is None:
                row = db.get_requester(db_id)
            assert row is not None, f"Given db_id {db_id} did not exist in given db"
            correct_class = get_crowd_provider_from_type(
                row["provider_type"]
            ).RequesterClass
            return super().__new__(correct_class)
        else:
            # We are constructing another instance directly
            return super().__new__(cls)

The new method is overridden to be able to automatically generate the expected Requester class without needing to specifically find it for a given db_id. As such it is impossible to create a base Requester as you will instead be returned the correct Requester class according to the crowdprovider associated with this Requester.

#   def get_task_runs(self) -> list[mephisto.data_model.task_run.TaskRun]:
View Source
    def get_task_runs(self) -> List["TaskRun"]:
        """
        Return the list of task runs that are run by this requester
        """
        return self.db.find_task_runs(requester_id=self.db_id)

Return the list of task runs that are run by this requester

#   def get_total_spend(self) -> float:
View Source
    def get_total_spend(self) -> float:
        """
        Return the total amount of funding spent by this requester
        across all tasks.
        """
        task_runs = self.db.find_task_runs(requester_id=self.db_id)
        total_spend = 0.0
        for run in task_runs:
            total_spend += run.get_total_spend()
        return total_spend

Return the total amount of funding spent by this requester across all tasks.

#  
@classmethod
def is_sandbox(self) -> bool:
View Source
    @classmethod
    def is_sandbox(self) -> bool:
        """
        Determine if this is a requester on a sandbox/test account
        """
        return False

Determine if this is a requester on a sandbox/test account

#   def register( self, args: Union[omegaconf.dictconfig.DictConfig, NoneType] = None ) -> None:
View Source
    def register(self, args: Optional[DictConfig] = None) -> None:
        """
        Register this requester with the crowd provider by providing any required credentials
        or such. If no args are provided, assume the registration is already made and try
        to assert it as such.

        Should throw an exception if something is wrong with provided registration arguments.
        """
        raise NotImplementedError()

Register this requester with the crowd provider by providing any required credentials or such. If no args are provided, assume the registration is already made and try to assert it as such.

Should throw an exception if something is wrong with provided registration arguments.

#   def is_registered(self) -> bool:
View Source
    def is_registered(self) -> bool:
        """Check to see if this requester has already been registered"""
        raise NotImplementedError()

Check to see if this requester has already been registered

#   def get_available_budget(self) -> float:
View Source
    def get_available_budget(self) -> float:
        """
        Return the funds that this requester profile has available for usage with
        its crowdsourcing vendor
        """
        raise NotImplementedError()

Return the funds that this requester profile has available for usage with its crowdsourcing vendor

#   def to_dict(self) -> Dict[str, Any]:
View Source
    def to_dict(self) -> Dict[str, Any]:
        """
        Produce a dict of this requester and important features for json serialization
        """
        return {
            "requester_id": self.db_id,
            "provider_type": self.provider_type,
            "requester_name": self.requester_name,
            "registered": self.is_registered(),
        }

Produce a dict of this requester and important features for json serialization

#  
@staticmethod
def new( db: mephisto.abstractions.database.MephistoDB, requester_name: str ) -> mephisto.data_model.requester.Requester:
View Source
    @staticmethod
    def new(db: "MephistoDB", requester_name: str) -> "Requester":
        """
        Try to create a new requester by this name, raise an exception if
        the name already exists.

        Implementation should call _register_requester(db, requester_id) when sure the requester
        can be successfully created to have it put into the db and return the result.
        """
        raise NotImplementedError()

Try to create a new requester by this name, raise an exception if the name already exists.

Implementation should call _register_requester(db, requester_id) when sure the requester can be successfully created to have it put into the db and return the result.

#   class Requester.ArgsClass:
View Source
class RequesterArgs:
    """Base class for arguments to register a requester"""

    name: str = field(
        default=MISSING,
        metadata={
            "help": "Name for the requester in the Mephisto DB.",
            "required": True,
        },
    )

Base class for arguments to register a requester

Inherited Members
RequesterArgs
RequesterArgs
name