mephisto.abstractions.crowd_provider

View Source
#!/usr/bin/env python3

# Copyright (c) Meta Platforms and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from abc import ABC, abstractmethod, abstractproperty
from dataclasses import dataclass, field
from omegaconf import MISSING, DictConfig
from mephisto.abstractions.blueprint import AgentState, SharedTaskState
from mephisto.data_model.unit import Unit
from mephisto.data_model.requester import Requester
from mephisto.data_model.worker import Worker
from mephisto.data_model.agent import Agent

from typing import List, Optional, Tuple, Dict, Any, ClassVar, Type, TYPE_CHECKING

if TYPE_CHECKING:
    from mephisto.abstractions.database import MephistoDB
    from mephisto.data_model.task_run import TaskRun
    from argparse import _ArgumentGroup as ArgumentGroup


@dataclass
class ProviderArgs:
    """Base class for arguments to configure Crowd Providers"""

    _provider_type: str = MISSING
    requester_name: str = MISSING


class CrowdProvider(ABC):
    """
    Base class that defines the required functionality for
    the mephisto system to be able to interface with an
    external crowdsourcing vendor.

    Implementing the methods within, as well as supplying
    wrapped Unit, Requester, Worker, and Agent classes
    should ensure support for a vendor.
    """

    PROVIDER_TYPE = "__PROVIDER_BASE_CLASS__"

    UnitClass: ClassVar[Type[Unit]] = Unit

    RequesterClass: ClassVar[Type[Requester]] = Requester

    WorkerClass: ClassVar[Type[Worker]] = Worker

    AgentClass: ClassVar[Type[Agent]] = Agent

    ArgsClass: ClassVar[Type[ProviderArgs]] = ProviderArgs

    def __init__(self, db: "MephistoDB"):
        """
        Crowd provider classes should keep as much of their state
        as possible in their non-python datastore. This way
        the system can work even after shutdowns, and the
        state of the system can be managed or observed from
        other processes.

        In order to set up a datastore, init should check to see
        if one is already set (using get_datastore_for_provider)
        and use that one if available, otherwise make a new one
        and register it with the database.
        """
        self.db = db
        if db.has_datastore_for_provider(self.PROVIDER_TYPE):
            self.datastore = db.get_datastore_for_provider(self.PROVIDER_TYPE)
        else:
            self.datastore_root = db.get_db_path_for_provider(self.PROVIDER_TYPE)
            self.datastore = self.initialize_provider_datastore(self.datastore_root)
            db.set_datastore_for_provider(self.PROVIDER_TYPE, self.datastore)

    @classmethod
    def is_sandbox(cls) -> bool:
        """Determine if the given crowd provider is a sandbox provider"""
        return cls.RequesterClass.is_sandbox()

    @classmethod
    def assert_task_args(cls, args: DictConfig, shared_state: "SharedTaskState"):
        """
        Assert that the provided arguments are valid. Should
        fail if a task launched with these arguments would
        not work
        """
        return

    @classmethod
    @abstractmethod
    def get_wrapper_js_path(cls):
        """
        Return the path to the `wrap_crowd_source.js` file for this
        provider to be deployed to the server
        """
        raise NotImplementedError

    @abstractmethod
    def initialize_provider_datastore(self, storage_path: str) -> Any:
        """
        Do whatever is required to initialize this provider insofar
        as setting up local or external state is required to ensure
        that this vendor is usable.

        Local data storage should be put into the given root path.

        This method should return the local data storage component that
        is required to do any object initialization, as it will be available
        from the MephistoDB in a db.get_provider_datastore(PROVIDER_TYPE).
        """
        raise NotImplementedError()

    @abstractmethod
    def setup_resources_for_task_run(
        self,
        task_run: "TaskRun",
        args: DictConfig,
        shared_state: "SharedTaskState",
        server_url: str,
    ) -> None:
        """
        Setup any required resources for managing any additional resources
        surrounding a specific task run.
        """
        raise NotImplementedError()

    @abstractmethod
    def cleanup_resources_from_task_run(
        self, task_run: "TaskRun", server_url: str
    ) -> None:
        """
        Destroy any resources set up specifically for this task run
        """
        raise NotImplementedError()

    def cleanup_qualification(self, qualification_name: str) -> None:
        """
        Remove the linked qualification from the crowdprovider if it exists
        """
        return None
#   class ProviderArgs:
View Source
class ProviderArgs:
    """Base class for arguments to configure Crowd Providers"""

    _provider_type: str = MISSING
    requester_name: str = MISSING

Base class for arguments to configure Crowd Providers

#   ProviderArgs(_provider_type: str = '???', requester_name: str = '???')
#   requester_name: str = '???'
#   class CrowdProvider(abc.ABC):
View Source
class CrowdProvider(ABC):
    """
    Base class that defines the required functionality for
    the mephisto system to be able to interface with an
    external crowdsourcing vendor.

    Implementing the methods within, as well as supplying
    wrapped Unit, Requester, Worker, and Agent classes
    should ensure support for a vendor.
    """

    PROVIDER_TYPE = "__PROVIDER_BASE_CLASS__"

    UnitClass: ClassVar[Type[Unit]] = Unit

    RequesterClass: ClassVar[Type[Requester]] = Requester

    WorkerClass: ClassVar[Type[Worker]] = Worker

    AgentClass: ClassVar[Type[Agent]] = Agent

    ArgsClass: ClassVar[Type[ProviderArgs]] = ProviderArgs

    def __init__(self, db: "MephistoDB"):
        """
        Crowd provider classes should keep as much of their state
        as possible in their non-python datastore. This way
        the system can work even after shutdowns, and the
        state of the system can be managed or observed from
        other processes.

        In order to set up a datastore, init should check to see
        if one is already set (using get_datastore_for_provider)
        and use that one if available, otherwise make a new one
        and register it with the database.
        """
        self.db = db
        if db.has_datastore_for_provider(self.PROVIDER_TYPE):
            self.datastore = db.get_datastore_for_provider(self.PROVIDER_TYPE)
        else:
            self.datastore_root = db.get_db_path_for_provider(self.PROVIDER_TYPE)
            self.datastore = self.initialize_provider_datastore(self.datastore_root)
            db.set_datastore_for_provider(self.PROVIDER_TYPE, self.datastore)

    @classmethod
    def is_sandbox(cls) -> bool:
        """Determine if the given crowd provider is a sandbox provider"""
        return cls.RequesterClass.is_sandbox()

    @classmethod
    def assert_task_args(cls, args: DictConfig, shared_state: "SharedTaskState"):
        """
        Assert that the provided arguments are valid. Should
        fail if a task launched with these arguments would
        not work
        """
        return

    @classmethod
    @abstractmethod
    def get_wrapper_js_path(cls):
        """
        Return the path to the `wrap_crowd_source.js` file for this
        provider to be deployed to the server
        """
        raise NotImplementedError

    @abstractmethod
    def initialize_provider_datastore(self, storage_path: str) -> Any:
        """
        Do whatever is required to initialize this provider insofar
        as setting up local or external state is required to ensure
        that this vendor is usable.

        Local data storage should be put into the given root path.

        This method should return the local data storage component that
        is required to do any object initialization, as it will be available
        from the MephistoDB in a db.get_provider_datastore(PROVIDER_TYPE).
        """
        raise NotImplementedError()

    @abstractmethod
    def setup_resources_for_task_run(
        self,
        task_run: "TaskRun",
        args: DictConfig,
        shared_state: "SharedTaskState",
        server_url: str,
    ) -> None:
        """
        Setup any required resources for managing any additional resources
        surrounding a specific task run.
        """
        raise NotImplementedError()

    @abstractmethod
    def cleanup_resources_from_task_run(
        self, task_run: "TaskRun", server_url: str
    ) -> None:
        """
        Destroy any resources set up specifically for this task run
        """
        raise NotImplementedError()

    def cleanup_qualification(self, qualification_name: str) -> None:
        """
        Remove the linked qualification from the crowdprovider if it exists
        """
        return None

Base class that defines the required functionality for the mephisto system to be able to interface with an external crowdsourcing vendor.

Implementing the methods within, as well as supplying wrapped Unit, Requester, Worker, and Agent classes should ensure support for a vendor.

View Source
    def __init__(self, db: "MephistoDB"):
        """
        Crowd provider classes should keep as much of their state
        as possible in their non-python datastore. This way
        the system can work even after shutdowns, and the
        state of the system can be managed or observed from
        other processes.

        In order to set up a datastore, init should check to see
        if one is already set (using get_datastore_for_provider)
        and use that one if available, otherwise make a new one
        and register it with the database.
        """
        self.db = db
        if db.has_datastore_for_provider(self.PROVIDER_TYPE):
            self.datastore = db.get_datastore_for_provider(self.PROVIDER_TYPE)
        else:
            self.datastore_root = db.get_db_path_for_provider(self.PROVIDER_TYPE)
            self.datastore = self.initialize_provider_datastore(self.datastore_root)
            db.set_datastore_for_provider(self.PROVIDER_TYPE, self.datastore)

Crowd provider classes should keep as much of their state as possible in their non-python datastore. This way the system can work even after shutdowns, and the state of the system can be managed or observed from other processes.

In order to set up a datastore, init should check to see if one is already set (using get_datastore_for_provider) and use that one if available, otherwise make a new one and register it with the database.

#   PROVIDER_TYPE = '__PROVIDER_BASE_CLASS__'
#  
@classmethod
def is_sandbox(cls) -> bool:
View Source
    @classmethod
    def is_sandbox(cls) -> bool:
        """Determine if the given crowd provider is a sandbox provider"""
        return cls.RequesterClass.is_sandbox()

Determine if the given crowd provider is a sandbox provider

#  
@classmethod
def assert_task_args( cls, args: omegaconf.dictconfig.DictConfig, shared_state: mephisto.abstractions.blueprint.SharedTaskState ):
View Source
    @classmethod
    def assert_task_args(cls, args: DictConfig, shared_state: "SharedTaskState"):
        """
        Assert that the provided arguments are valid. Should
        fail if a task launched with these arguments would
        not work
        """
        return

Assert that the provided arguments are valid. Should fail if a task launched with these arguments would not work

#  
@classmethod
@abstractmethod
def get_wrapper_js_path(cls):
View Source
    @classmethod
    @abstractmethod
    def get_wrapper_js_path(cls):
        """
        Return the path to the `wrap_crowd_source.js` file for this
        provider to be deployed to the server
        """
        raise NotImplementedError

Return the path to the wrap_crowd_source.js file for this provider to be deployed to the server

#  
@abstractmethod
def initialize_provider_datastore(self, storage_path: str) -> Any:
View Source
    @abstractmethod
    def initialize_provider_datastore(self, storage_path: str) -> Any:
        """
        Do whatever is required to initialize this provider insofar
        as setting up local or external state is required to ensure
        that this vendor is usable.

        Local data storage should be put into the given root path.

        This method should return the local data storage component that
        is required to do any object initialization, as it will be available
        from the MephistoDB in a db.get_provider_datastore(PROVIDER_TYPE).
        """
        raise NotImplementedError()

Do whatever is required to initialize this provider insofar as setting up local or external state is required to ensure that this vendor is usable.

Local data storage should be put into the given root path.

This method should return the local data storage component that is required to do any object initialization, as it will be available from the MephistoDB in a db.get_provider_datastore(PROVIDER_TYPE).

#  
@abstractmethod
def setup_resources_for_task_run( self, task_run: mephisto.data_model.task_run.TaskRun, args: omegaconf.dictconfig.DictConfig, shared_state: mephisto.abstractions.blueprint.SharedTaskState, server_url: str ) -> None:
View Source
    @abstractmethod
    def setup_resources_for_task_run(
        self,
        task_run: "TaskRun",
        args: DictConfig,
        shared_state: "SharedTaskState",
        server_url: str,
    ) -> None:
        """
        Setup any required resources for managing any additional resources
        surrounding a specific task run.
        """
        raise NotImplementedError()

Setup any required resources for managing any additional resources surrounding a specific task run.

#  
@abstractmethod
def cleanup_resources_from_task_run( self, task_run: mephisto.data_model.task_run.TaskRun, server_url: str ) -> None:
View Source
    @abstractmethod
    def cleanup_resources_from_task_run(
        self, task_run: "TaskRun", server_url: str
    ) -> None:
        """
        Destroy any resources set up specifically for this task run
        """
        raise NotImplementedError()

Destroy any resources set up specifically for this task run

#   def cleanup_qualification(self, qualification_name: str) -> None:
View Source
    def cleanup_qualification(self, qualification_name: str) -> None:
        """
        Remove the linked qualification from the crowdprovider if it exists
        """
        return None

Remove the linked qualification from the crowdprovider if it exists

View Source
class Unit(MephistoDataModelComponentMixin, metaclass=MephistoDBBackedABCMeta):
    """
    This class tracks the status of an individual worker's contribution to a
    higher level assignment. It is the smallest 'unit' of work to complete
    the assignment, and this class is only responsible for checking
    the status of that work itself being done.

    It should be extended for usage with a specific crowd provider
    """

    def __init__(
        self,
        db: "MephistoDB",
        db_id: str,
        row: Optional[Mapping[str, Any]] = None,
        _used_new_call: bool = False,
    ):
        if not _used_new_call:
            raise AssertionError(
                "Direct Unit and data model access via ...Unit(db, id) is "
                "now deprecated in favor of calling Unit.get(db, id). "
            )
        self.db: "MephistoDB" = db
        if row is None:
            row = db.get_unit(db_id)
        assert row is not None, f"Given db_id {db_id} did not exist in given db"
        self.db_id: str = row["unit_id"]
        self.assignment_id: str = row["assignment_id"]
        self.unit_index: int = row["unit_index"]
        self.pay_amount: float = row["pay_amount"]
        self.agent_id: Optional[str] = row["agent_id"]
        self.provider_type: str = row["provider_type"]
        self.db_status: str = row["status"]
        self.task_type: str = row["task_type"]
        self.task_id: str = row["task_id"]
        self.task_run_id: str = row["task_run_id"]
        self.sandbox: bool = row["sandbox"]
        self.requester_id: str = row["requester_id"]
        self.worker_id: str = row["worker_id"]

        # Deferred loading of related entities
        self.__task: Optional["Task"] = None
        self.__task_run: Optional["TaskRun"] = None
        self.__assignment: Optional["Assignment"] = None
        self.__requester: Optional["Requester"] = None
        self.__agent: Optional["Agent"] = None
        self.__worker: Optional["Worker"] = None

    def __new__(
        cls,
        db: "MephistoDB",
        db_id: str,
        row: Optional[Mapping[str, Any]] = None,
        _used_new_call: bool = False,
    ) -> "Unit":
        """
        The new method is overridden to be able to automatically generate
        the expected Unit class without needing to specifically find it
        for a given db_id. As such it is impossible to create a Unit
        as you will instead be returned the correct Unit class according to
        the crowdprovider associated with this Unit.
        """
        if cls == Unit:
            # We are trying to construct a Unit, find what type to use and
            # create that instead
            from mephisto.operations.registry import get_crowd_provider_from_type

            if row is None:
                row = db.get_unit(db_id)
            assert row is not None, f"Given db_id {db_id} did not exist in given db"
            correct_class = get_crowd_provider_from_type(row["provider_type"]).UnitClass
            return super().__new__(correct_class)
        else:
            # We are constructing another instance directly
            return super().__new__(cls)

    def get_crowd_provider_class(self) -> Type["CrowdProvider"]:
        """Get the CrowdProvider class that manages this Unit"""
        from mephisto.operations.registry import get_crowd_provider_from_type

        return get_crowd_provider_from_type(self.provider_type)

    def get_assignment_data(self) -> "InitializationData":
        """Return the specific assignment data for this assignment"""
        return self.get_assignment().get_assignment_data()

    def sync_status(self) -> None:
        """
        Ensure that the queried status from this unit and the db status
        are up to date
        """
        # TODO(#102) this will need to be run periodically/on crashes
        # to sync any lost state
        self.set_db_status(self.get_status())

    def get_db_status(self) -> str:
        """
        Return the status as currently stored in the database
        """
        if self.db_status in AssignmentState.final_unit():
            return self.db_status
        row = self.db.get_unit(self.db_id)
        assert row is not None, f"Unit {self.db_id} stopped existing in the db..."
        return row["status"]

    def set_db_status(self, status: str) -> None:
        """
        Set the status reflected in the database for this Unit
        """
        assert (
            status in AssignmentState.valid_unit()
        ), f"{status} not valid Assignment Status, not in {AssignmentState.valid_unit()}"
        if status == self.db_status:
            return
        logger.debug(f"Updating status for {self} to {status}")
        ACTIVE_UNIT_STATUSES.labels(
            status=self.db_status, unit_type=INDEX_TO_TYPE_MAP[self.unit_index]
        ).dec()
        ACTIVE_UNIT_STATUSES.labels(
            status=status, unit_type=INDEX_TO_TYPE_MAP[self.unit_index]
        ).inc()
        self.db_status = status
        self.db.update_unit(self.db_id, status=status)

    def _mark_agent_assignment(self) -> None:
        """Special helper to mark the transition from LAUNCHED to ASSIGNED"""
        assert (
            self.db_status == AssignmentState.LAUNCHED
        ), "can only mark LAUNCHED units"
        ACTIVE_UNIT_STATUSES.labels(
            status=AssignmentState.LAUNCHED,
            unit_type=INDEX_TO_TYPE_MAP[self.unit_index],
        ).dec()
        ACTIVE_UNIT_STATUSES.labels(
            status=AssignmentState.ASSIGNED,
            unit_type=INDEX_TO_TYPE_MAP[self.unit_index],
        ).inc()

    def get_assignment(self) -> "Assignment":
        """
        Return the assignment that this Unit is part of.
        """
        if self.__assignment is None:
            from mephisto.data_model.assignment import Assignment

            self.__assignment = Assignment.get(self.db, self.assignment_id)
        return self.__assignment

    def get_task_run(self) -> TaskRun:
        """
        Return the task run that this assignment is part of
        """
        if self.__task_run is None:
            if self.__assignment is not None:
                self.__task_run = self.__assignment.get_task_run()
            else:
                self.__task_run = TaskRun.get(self.db, self.task_run_id)
        return self.__task_run

    def get_task(self) -> Task:
        """
        Return the task that this assignment is part of
        """
        if self.__task is None:
            if self.__assignment is not None:
                self.__task = self.__assignment.get_task()
            elif self.__task_run is not None:
                self.__task = self.__task_run.get_task()
            else:
                self.__task = Task.get(self.db, self.task_id)
        return self.__task

    def get_requester(self) -> "Requester":
        """
        Return the requester who offered this Unit
        """
        if self.__requester is None:
            if self.__assignment is not None:
                self.__requester = self.__assignment.get_requester()
            elif self.__task_run is not None:
                self.__requester = self.__task_run.get_requester()
            else:
                self.__requester = Requester.get(self.db, self.requester_id)
        return self.__requester

    def clear_assigned_agent(self) -> None:
        """Clear the agent that is assigned to this unit"""
        logger.debug(f"Clearing assigned agent {self.agent_id} from {self}")
        self.db.clear_unit_agent_assignment(self.db_id)
        self.get_task_run().clear_reservation(self)
        self.agent_id = None
        self.__agent = None

    def get_assigned_agent(self) -> Optional[Agent]:
        """
        Get the agent assigned to this Unit if there is one, else return None
        """
        # In these statuses, we know the agent isn't changing anymore, and thus will
        # not need to be re-queried
        if self.db_status in AssignmentState.final_unit():
            if self.agent_id is None:
                return None
            return Agent.get(self.db, self.agent_id)

        # Query the database to get the most up-to-date assignment, as this can
        # change after instantiation if the Unit status isn't final
        unit_copy = Unit.get(self.db, self.db_id)
        self.agent_id = unit_copy.agent_id
        if self.agent_id is not None:
            return Agent.get(self.db, self.agent_id)
        return None

    @staticmethod
    def _register_unit(
        db: "MephistoDB",
        assignment: "Assignment",
        index: int,
        pay_amount: float,
        provider_type: str,
    ) -> "Unit":
        """
        Create an entry for this unit in the database
        """
        db_id = db.new_unit(
            assignment.task_id,
            assignment.task_run_id,
            assignment.requester_id,
            assignment.db_id,
            index,
            pay_amount,
            provider_type,
            assignment.task_type,
            sandbox=assignment.sandbox,
        )
        unit = Unit.get(db, db_id)
        ACTIVE_UNIT_STATUSES.labels(
            status=AssignmentState.CREATED, unit_type=INDEX_TO_TYPE_MAP[index]
        ).inc()
        logger.debug(f"Registered new unit {unit} for {assignment}.")
        return unit

    def get_pay_amount(self) -> float:
        """
        Return the amount that this Unit is costing against the budget,
        calculating additional fees as relevant
        """
        return self.pay_amount

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}({self.db_id}, {self.db_status})"

    # Children classes may need to override the following

    def get_status(self) -> str:
        """
        Get the status of this unit, as determined by whether there's
        a worker working on it at the moment, and any other possible states. Should
        return one of UNIT_STATUSES

        Accurate status is crowd-provider dependent, and thus this method should be
        defined in the child class to ensure that the local record matches
        the ground truth in the provider
        """
        from mephisto.abstractions.blueprint import AgentState

        db_status = self.db_status

        # Expiration is a terminal state, and shouldn't be changed
        if db_status == AssignmentState.EXPIRED:
            return db_status

        computed_status = AssignmentState.LAUNCHED

        agent = self.get_assigned_agent()
        if agent is None:
            row = self.db.get_unit(self.db_id)
            computed_status = row["status"]
        else:
            agent_status = agent.get_status()
            if agent_status == AgentState.STATUS_NONE:
                computed_status = AssignmentState.LAUNCHED
            elif agent_status in [
                AgentState.STATUS_ACCEPTED,
                AgentState.STATUS_ONBOARDING,
                AgentState.STATUS_PARTNER_DISCONNECT,
                AgentState.STATUS_WAITING,
                AgentState.STATUS_IN_TASK,
            ]:
                computed_status = AssignmentState.ASSIGNED
            elif agent_status in [AgentState.STATUS_COMPLETED]:
                computed_status = AssignmentState.COMPLETED
            elif agent_status in [AgentState.STATUS_SOFT_REJECTED]:
                computed_status = AssignmentState.SOFT_REJECTED
            elif agent_status in [AgentState.STATUS_EXPIRED]:
                computed_status = AssignmentState.EXPIRED
            elif agent_status in [
                AgentState.STATUS_DISCONNECT,
                AgentState.STATUS_RETURNED,
                AgentState.STATUS_TIMEOUT,
            ]:
                # Still assigned, as we expect the task launcher to explicitly
                # update our status to expired or to remove the agent
                computed_status = AssignmentState.ASSIGNED
            elif agent_status == AgentState.STATUS_APPROVED:
                computed_status = AssignmentState.ACCEPTED
            elif agent_status == AgentState.STATUS_REJECTED:
                computed_status = AssignmentState.REJECTED

        if computed_status != db_status:
            self.set_db_status(computed_status)

        return computed_status

    # Children classes should implement the below methods

    def launch(self, task_url: str) -> None:
        """
        Make this Unit available on the crowdsourcing vendor. Depending on
        the task type, this could mean a number of different setup steps.

        Some crowd providers require setting up a configuration for the
        very first launch, and this method should call a helper to manage
        that step if necessary.
        """
        raise NotImplementedError()

    def expire(self) -> float:
        """
        Expire this unit, removing it from being workable on the vendor.
        Return the maximum time needed to wait before we know it's taken down.
        """
        raise NotImplementedError()

    def is_expired(self) -> bool:
        """Determine if this unit is expired as according to the vendor."""
        raise NotImplementedError()

    @staticmethod
    def new(
        db: "MephistoDB", assignment: "Assignment", index: int, pay_amount: float
    ) -> "Unit":
        """
        Create a Unit for the given assignment

        Implementation should return the result of _register_unit when sure the unit
        can be successfully created to have it put into the db.
        """
        raise NotImplementedError()

This class tracks the status of an individual worker's contribution to a higher level assignment. It is the smallest 'unit' of work to complete the assignment, and this class is only responsible for checking the status of that work itself being done.

It should be extended for usage with a specific crowd provider

View Source
class Requester(MephistoDataModelComponentMixin, metaclass=MephistoDBBackedABCMeta):
    """
    High level class representing a requester on some kind of crowd provider. Sets some default
    initializations, but mostly should be extended by the specific requesters for crowd providers
    with whatever implementation details are required to get those to work.
    """

    ArgsClass: ClassVar[Type["RequesterArgs"]] = RequesterArgs

    def __init__(
        self,
        db: "MephistoDB",
        db_id: str,
        row: Optional[Mapping[str, Any]] = None,
        _used_new_call: bool = False,
    ):
        if not _used_new_call:
            raise AssertionError(
                "Direct Requester and data model access via Requester(db, id) is "
                "now deprecated in favor of calling Requester.get(db, id). "
            )
        self.db: "MephistoDB" = db
        if row is None:
            row = db.get_requester(db_id)
        assert row is not None, f"Given db_id {db_id} did not exist in given db"
        self.db_id: str = row["requester_id"]
        self.provider_type: str = row["provider_type"]
        self.requester_name: str = row["requester_name"]

    def __new__(
        cls,
        db: "MephistoDB",
        db_id: str,
        row: Optional[Mapping[str, Any]] = None,
        _used_new_call: bool = False,
    ) -> "Requester":
        """
        The new method is overridden to be able to automatically generate
        the expected Requester class without needing to specifically find it
        for a given db_id. As such it is impossible to create a base Requester
        as you will instead be returned the correct Requester class according to
        the crowdprovider associated with this Requester.
        """
        from mephisto.operations.registry import get_crowd_provider_from_type

        if cls == Requester:
            # We are trying to construct a Requester, find what type to use and
            # create that instead
            if row is None:
                row = db.get_requester(db_id)
            assert row is not None, f"Given db_id {db_id} did not exist in given db"
            correct_class = get_crowd_provider_from_type(
                row["provider_type"]
            ).RequesterClass
            return super().__new__(correct_class)
        else:
            # We are constructing another instance directly
            return super().__new__(cls)

    def get_task_runs(self) -> List["TaskRun"]:
        """
        Return the list of task runs that are run by this requester
        """
        return self.db.find_task_runs(requester_id=self.db_id)

    def get_total_spend(self) -> float:
        """
        Return the total amount of funding spent by this requester
        across all tasks.
        """
        task_runs = self.db.find_task_runs(requester_id=self.db_id)
        total_spend = 0.0
        for run in task_runs:
            total_spend += run.get_total_spend()
        return total_spend

    @classmethod
    def is_sandbox(self) -> bool:
        """
        Determine if this is a requester on a sandbox/test account
        """
        return False

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}({self.db_id})"

    @staticmethod
    def _register_requester(
        db: "MephistoDB", requester_id: str, provider_type: str
    ) -> "Requester":
        """
        Create an entry for this requester in the database
        """
        db_id = db.new_requester(requester_id, provider_type)
        requester = Requester.get(db, db_id)
        logger.debug(f"Registered new requester {requester}")
        return requester

    # Children classes should implement the following methods

    def register(self, args: Optional[DictConfig] = None) -> None:
        """
        Register this requester with the crowd provider by providing any required credentials
        or such. If no args are provided, assume the registration is already made and try
        to assert it as such.

        Should throw an exception if something is wrong with provided registration arguments.
        """
        raise NotImplementedError()

    def is_registered(self) -> bool:
        """Check to see if this requester has already been registered"""
        raise NotImplementedError()

    def get_available_budget(self) -> float:
        """
        Return the funds that this requester profile has available for usage with
        its crowdsourcing vendor
        """
        raise NotImplementedError()

    def to_dict(self) -> Dict[str, Any]:
        """
        Produce a dict of this requester and important features for json serialization
        """
        return {
            "requester_id": self.db_id,
            "provider_type": self.provider_type,
            "requester_name": self.requester_name,
            "registered": self.is_registered(),
        }

    @staticmethod
    def new(db: "MephistoDB", requester_name: str) -> "Requester":
        """
        Try to create a new requester by this name, raise an exception if
        the name already exists.

        Implementation should call _register_requester(db, requester_id) when sure the requester
        can be successfully created to have it put into the db and return the result.
        """
        raise NotImplementedError()

High level class representing a requester on some kind of crowd provider. Sets some default initializations, but mostly should be extended by the specific requesters for crowd providers with whatever implementation details are required to get those to work.

View Source
class Worker(MephistoDataModelComponentMixin, metaclass=MephistoDBBackedABCMeta):
    """
    This class represents an individual - namely a person. It maintains components of ongoing identity for a user.
    """

    def __init__(
        self,
        db: "MephistoDB",
        db_id: str,
        row: Optional[Mapping[str, Any]] = None,
        _used_new_call: bool = False,
    ):
        if not _used_new_call:
            raise AssertionError(
                "Direct Worker and data model access via ...Worker(db, id) is "
                "now deprecated in favor of calling Worker.get(db, id). "
            )
        self.db: "MephistoDB" = db
        if row is None:
            row = db.get_worker(db_id)
        assert row is not None, f"Given db_id {db_id} did not exist in given db"
        self.db_id: str = row["worker_id"]
        self.provider_type = row["provider_type"]
        self.worker_name = row["worker_name"]
        # TODO(#568) Do we want any other attributes here?

    def __new__(
        cls,
        db: "MephistoDB",
        db_id: str,
        row: Optional[Mapping[str, Any]] = None,
        _used_new_call: bool = False,
    ) -> "Worker":
        """
        The new method is overridden to be able to automatically generate
        the expected Worker class without needing to specifically find it
        for a given db_id. As such it is impossible to create a base Worker
        as you will instead be returned the correct Worker class according to
        the crowdprovider associated with this Worker.
        """
        from mephisto.operations.registry import get_crowd_provider_from_type

        if cls == Worker:
            # We are trying to construct a Worker, find what type to use and
            # create that instead
            if row is None:
                row = db.get_worker(db_id)
            assert row is not None, f"Given db_id {db_id} did not exist in given db"
            correct_class: Type[Worker] = get_crowd_provider_from_type(
                row["provider_type"]
            ).WorkerClass
            return super().__new__(correct_class)
        else:
            # We are constructing another instance directly
            return super().__new__(cls)

    # TODO(#568) make getters for helpful worker statistics

    def get_agents(self, status: Optional[str] = None) -> List["Agent"]:
        """
        Get the list of agents that this worker was responsible for, by the given status
        if needed
        """
        assert status is None or status in AgentState.valid(), "Invalid agent status"
        return self.db.find_agents(worker_id=self.db_id, status=status)

    @staticmethod
    def _register_worker(
        db: "MephistoDB", worker_name: str, provider_type: str
    ) -> "Worker":
        """
        Create an entry for this worker in the database
        """
        db_id = db.new_worker(worker_name, provider_type)
        worker = Worker.get(db, db_id)
        logger.debug(f"Registered new worker {worker}")
        return worker

    @classmethod
    def new_from_provider_data(
        cls, db: "MephistoDB", creation_data: Dict[str, Any]
    ) -> "Worker":
        """
        Given the parameters passed through wrap_crowd_source.js, construct
        a new worker

        Basic case simply takes the worker id and registers it
        """
        return cls.new(db, creation_data["worker_name"])

    def get_granted_qualification(
        self, qualification_name: str
    ) -> Optional["GrantedQualification"]:
        """Return the granted qualification for this worker for the given name"""
        found_qualifications = self.db.find_qualifications(qualification_name)
        if len(found_qualifications) == 0:
            return None
        qualification = found_qualifications[0]
        granted_qualifications = self.db.check_granted_qualifications(
            qualification.db_id, self.db_id
        )
        if len(granted_qualifications) == 0:
            return None
        return granted_qualifications[0]

    def is_disqualified(self, qualification_name: str):
        """
        Find out if the given worker has been disqualified by the given qualification

        Returns True if the qualification exists and has a falsey value
        Returns False if the qualification doesn't exist or has a truthy value
        """
        qualification = self.get_granted_qualification(qualification_name)
        if qualification is None:
            return False
        return not qualification.value

    def is_qualified(self, qualification_name: str):
        """
        Find out if the given worker has qualified by the given qualification

        Returns True if the qualification exists and is truthy value
        Returns False if the qualification doesn't exist or falsey value
        """
        qualification = self.get_granted_qualification(qualification_name)
        if qualification is None:
            return False
        return bool(qualification.value)

    def revoke_qualification(self, qualification_name) -> bool:
        """
        Remove this user's qualification if it exists

        Returns True if removal happens locally and externally, False if an exception
        happens with the crowd provider
        """
        granted_qualification = self.get_granted_qualification(qualification_name)
        if granted_qualification is None:
            return False

        logger.debug(f"Revoking qualification {qualification_name} from worker {self}.")
        self.db.revoke_qualification(granted_qualification.qualification_id, self.db_id)
        try:
            self.revoke_crowd_qualification(qualification_name)
            return True
        except Exception as e:
            logger.exception(
                f"Found error while trying to revoke qualification: {repr(e)}",
                exc_info=True,
            )
            return False
        return True

    def grant_qualification(
        self, qualification_name: str, value: int = 1, skip_crowd=False
    ):
        """
        Grant a positive or negative qualification to this worker

        Returns True if granting happens locally and externally, False if an exception
        happens with the crowd provider
        """
        found_qualifications = self.db.find_qualifications(qualification_name)
        if len(found_qualifications) == 0:
            raise Exception(
                f"No qualification by the name {qualification_name} found in the db"
            )

        logger.debug(
            f"Granting worker {self} qualification {qualification_name}: {value}"
        )
        qualification = found_qualifications[0]
        self.db.grant_qualification(qualification.db_id, self.db_id, value=value)
        if not skip_crowd:
            try:
                self.grant_crowd_qualification(qualification_name, value)
                return True
            except Exception as e:
                logger.exception(
                    f"Found error while trying to grant qualification: {repr(e)}",
                    exc_info=True,
                )
                return False

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}({self.db_id})"

    # Children classes can implement the following methods

    def grant_crowd_qualification(
        self, qualification_name: str, value: int = 1
    ) -> None:
        """
        Grant a qualification by the given name to this worker

        If the CrowdProvider has a notion of qualifications, they can be granted
        in sync with Mephisto's qualifications
        """
        return None

    def revoke_crowd_qualification(self, qualification_name: str) -> None:
        """
        Revoke the qualification by the given name from this worker

        If the CrowdProvider has a notion of qualifications, they can be revoked
        in sync with Mephisto's qualifications
        """
        return None

    # Children classes should implement the following methods

    def bonus_worker(
        self, amount: float, reason: str, unit: Optional["Unit"] = None
    ) -> Tuple[bool, str]:
        """Bonus this worker for work any reason. Return success of bonus"""
        raise NotImplementedError()

    def block_worker(
        self,
        reason: str,
        unit: Optional["Unit"] = None,
        requester: Optional["Requester"] = None,
    ) -> Tuple[bool, str]:
        """Block this worker for a specified reason. Return success of block"""
        raise NotImplementedError()

    def unblock_worker(self, reason: str, requester: "Requester") -> bool:
        """unblock a blocked worker for the specified reason"""
        raise NotImplementedError()

    def is_blocked(self, requester: "Requester") -> bool:
        """Determine if a worker is blocked"""
        raise NotImplementedError()

    def is_eligible(self, task_run: "TaskRun") -> bool:
        """Determine if this worker is eligible for the given task run"""
        raise NotImplementedError()

    def register(self, args: Optional[Dict[str, str]] = None) -> None:
        """Register this worker with the crowdprovider, if necessary"""
        pass

    @staticmethod
    def new(db: "MephistoDB", worker_name: str) -> "Worker":
        """
        Create a new worker attached to the given identifier, assuming it doesn't already
        exist in the database.

        Implementation should return the result of _register_worker when sure the worker
        can be successfully created to have it put into the db.
        """
        raise NotImplementedError()

This class represents an individual - namely a person. It maintains components of ongoing identity for a user.

View Source
class Agent(MephistoDataModelComponentMixin, metaclass=MephistoDBBackedABCMeta):
    """
    This class encompasses a worker as they are working on an individual assignment.
    It maintains details for the current task at hand such as start and end time,
    connection status, etc.
    """

    def __init__(
        self,
        db: "MephistoDB",
        db_id: str,
        row: Optional[Mapping[str, Any]] = None,
        _used_new_call: bool = False,
    ):
        if not _used_new_call:
            raise AssertionError(
                "Direct Agent and data model access via ...Agent(db, id) was "
                "now deprecated in favor of calling Agent.get(db, id). "
            )
        self.db: "MephistoDB" = db
        if row is None:
            row = db.get_agent(db_id)
        assert row is not None, f"Given db_id {db_id} did not exist in given db"
        self.db_id: str = row["agent_id"]
        self.db_status: str = row["status"]
        self.worker_id: str = row["worker_id"]
        self.unit_id: str = row["unit_id"]
        self.task_type: str = row["task_type"]
        self.provider_type: str = row["provider_type"]
        self.pending_actions: "Queue[Dict[str, Any]]" = Queue()
        self.has_live_update = threading.Event()
        self.has_live_update.clear()
        self.assignment_id: str = row["assignment_id"]
        self.task_run_id: str = row["task_run_id"]
        self.task_id: str = row["task_id"]
        self.did_submit = threading.Event()
        self.is_shutdown = False

        # Deferred loading of related entities
        self._worker: Optional["Worker"] = None
        self._unit: Optional["Unit"] = None
        self._assignment: Optional["Assignment"] = None
        self._task_run: Optional["TaskRun"] = None
        self._task: Optional["Task"] = None

        # Related entity set by a live run
        self._associated_live_run: Optional["LiveTaskRun"] = None

        # Follow-up initialization is deferred
        self._state = None  # type: ignore

    @property
    def state(self) -> "AgentState":
        if self._state is None:
            self._state = AgentState(self)  # type: ignore
        return cast("AgentState", self._state)

    def __new__(
        cls,
        db: "MephistoDB",
        db_id: str,
        row: Optional[Mapping[str, Any]] = None,
        _used_new_call: bool = False,
    ) -> "Agent":
        """
        The new method is overridden to be able to automatically generate
        the expected Agent class without needing to specifically find it
        for a given db_id. As such it is impossible to create a base Agent
        as you will instead be returned the correct Agent class according to
        the crowdprovider associated with this Agent.
        """
        from mephisto.operations.registry import get_crowd_provider_from_type

        if cls == Agent:
            # We are trying to construct a Agent, find what type to use and
            # create that instead
            if row is None:
                row = db.get_agent(db_id)
            assert row is not None, f"Given db_id {db_id} did not exist in given db"
            correct_class = get_crowd_provider_from_type(
                row["provider_type"]
            ).AgentClass
            return super().__new__(correct_class)
        else:
            # We are constructing another instance directly
            return super().__new__(cls)

    def set_live_run(self, live_run: "LiveTaskRun") -> None:
        """Set an associated live run for this agent"""
        self._associated_live_run = live_run

    def get_live_run(self) -> "LiveTaskRun":
        """Return the associated live run for this agent. Throw if not set"""
        if self._associated_live_run is None:
            raise AssertionError(
                "Should not be getting the live run, not set for given agent"
            )
        return self._associated_live_run

    def get_agent_id(self) -> str:
        """Return this agent's id"""
        return self.db_id

    def get_worker(self) -> Worker:
        """
        Return the worker that is using this agent for a task
        """
        if self._worker is None:
            self._worker = Worker.get(self.db, self.worker_id)
        return self._worker

    def get_unit(self) -> "Unit":
        """
        Return the Unit that this agent is working on.
        """
        if self._unit is None:
            from mephisto.data_model.unit import Unit

            self._unit = Unit.get(self.db, self.unit_id)
        return self._unit

    def get_assignment(self) -> "Assignment":
        """Return the assignment this agent is working on"""
        if self._assignment is None:
            if self._unit is not None:
                self._assignment = self._unit.get_assignment()
            else:
                from mephisto.data_model.assignment import Assignment

                self._assignment = Assignment.get(self.db, self.assignment_id)
        return self._assignment

    def get_task_run(self) -> "TaskRun":
        """Return the TaskRun this agent is working within"""
        if self._task_run is None:
            if self._unit is not None:
                self._task_run = self._unit.get_task_run()
            elif self._assignment is not None:
                self._task_run = self._assignment.get_task_run()
            else:
                from mephisto.data_model.task_run import TaskRun

                self._task_run = TaskRun.get(self.db, self.task_run_id)
        return self._task_run

    def get_task(self) -> "Task":
        """Return the Task this agent is working within"""
        if self._task is None:
            if self._unit is not None:
                self._task = self._unit.get_task()
            elif self._assignment is not None:
                self._task = self._assignment.get_task()
            elif self._task_run is not None:
                self._task = self._task_run.get_task()
            else:
                from mephisto.data_model.task import Task

                self._task = Task.get(self.db, self.task_id)
        return self._task

    def get_data_dir(self) -> str:
        """
        Return the directory to be storing any agent state for
        this agent into
        """
        assignment_dir = self.get_assignment().get_data_dir()
        return os.path.join(assignment_dir, self.db_id)

    def update_status(self, new_status: str) -> None:
        """Update the database status of this agent, and
        possibly send a message to the frontend agent informing
        them of this update"""
        if self.db_status == new_status:
            return  # Noop, this is already the case
        logger.debug(f"Updating {self} to {new_status}")
        if self.db_status in AgentState.complete():
            logger.info(f"Updating {self} from final status to {new_status}")

        old_status = self.db_status
        self.db.update_agent(self.db_id, status=new_status)
        self.db_status = new_status
        if self._associated_live_run is not None:
            live_run = self.get_live_run()
            live_run.loop_wrap.execute_coro(
                live_run.worker_pool.push_status_update(self)
            )
        if new_status in [
            AgentState.STATUS_RETURNED,
            AgentState.STATUS_DISCONNECT,
            AgentState.STATUS_TIMEOUT,
        ]:
            # Disconnect statuses should free any pending acts
            self.has_live_update.set()
            self.did_submit.set()
            if old_status == AgentState.STATUS_WAITING:
                # Waiting agents' unit can be reassigned, as no work
                # has been done yet.
                unit = self.get_unit()
                logger.debug(f"Clearing {self} from {unit} for update to {new_status}")
                unit.clear_assigned_agent()

        # Metrics changes
        ACTIVE_AGENT_STATUSES.labels(status=old_status, agent_type="main").dec()
        ACTIVE_AGENT_STATUSES.labels(status=new_status, agent_type="main").inc()
        if (
            old_status not in AgentState.complete()
            and new_status in AgentState.complete()
        ):
            ACTIVE_WORKERS.labels(worker_id=self.worker_id, agent_type="main").dec()

    @staticmethod
    def _register_agent(
        db: "MephistoDB", worker: Worker, unit: "Unit", provider_type: str
    ) -> "Agent":
        """
        Create this agent in the mephisto db with the correct setup
        """
        unit._mark_agent_assignment()
        db_id = db.new_agent(
            worker.db_id,
            unit.db_id,
            unit.task_id,
            unit.task_run_id,
            unit.assignment_id,
            unit.task_type,
            provider_type,
        )
        a = Agent.get(db, db_id)
        ACTIVE_AGENT_STATUSES.labels(
            status=AgentState.STATUS_NONE, agent_type="main"
        ).inc()
        ACTIVE_WORKERS.labels(worker_id=worker.db_id, agent_type="main").inc()
        logger.debug(f"Registered new agent {a} for {unit}.")
        a.update_status(AgentState.STATUS_ACCEPTED)
        return a

    # Specialized child cases may need to implement the following

    @classmethod
    def new_from_provider_data(
        cls,
        db: "MephistoDB",
        worker: Worker,
        unit: "Unit",
        provider_data: Dict[str, Any],
    ) -> "Agent":
        """
        Wrapper around the new method that allows registering additional
        bookkeeping information from a crowd provider for this agent
        """
        agent = cls.new(db, worker, unit)
        unit.worker_id = worker.db_id
        agent._unit = unit
        return agent

    def observe(self, live_update: "Dict[str, Any]") -> None:
        """
        Pass the observed information to the AgentState, then
        queue the information to be pushed to the user
        """
        if live_update.get("update_id") is None:
            live_update["update_id"] = str(uuid4())
        self.state.update_data(live_update)

        if self._associated_live_run is not None:
            live_run = self.get_live_run()
            live_run.client_io.send_live_update(self.get_agent_id(), live_update)

    def get_live_update(
        self, timeout: Optional[int] = None
    ) -> Optional[Dict[str, Any]]:
        """
        Request information from the Agent's frontend. If non-blocking,
        (timeout is None) should return None if no actions are ready
        to be returned.
        """
        if self.pending_actions.empty():
            if timeout is None or timeout == 0:
                return None
            self.has_live_update.wait(timeout)

        if self.pending_actions.empty():
            if self.is_shutdown:
                raise AgentShutdownError(self.db_id)
            # various disconnect cases
            status = self.get_status()
            if status == AgentState.STATUS_DISCONNECT:
                raise AgentDisconnectedError(self.db_id)
            elif status == AgentState.STATUS_RETURNED:
                raise AgentReturnedError(self.db_id)
            self.update_status(AgentState.STATUS_TIMEOUT)
            raise AgentTimeoutError(timeout, self.db_id)
        assert (
            not self.pending_actions.empty()
        ), "has_live_update released without an action!"

        act = self.pending_actions.get()

        if self.pending_actions.empty():
            self.has_live_update.clear()
        self.state.update_data(act)
        return act

    def act(self, timeout: Optional[int] = None) -> Optional[Dict[str, Any]]:
        """
        Request information from the Agent's frontend. If non-blocking,
        (timeout is None) should return None if no actions are ready
        to be returned.
        """
        warn_once(
            "As of Mephisto 1.0 Agent.act is being deprecated in favor of Agent.get_live_update. "
            "This functionality will no longer work in 1.1"
        )
        return self.get_live_update(timeout)

    def await_submit(self, timeout: Optional[int] = None) -> bool:
        """Blocking wait for this agent to submit their task"""
        if timeout is not None:
            self.did_submit.wait(timeout=timeout)
        return self.did_submit.is_set()

    def handle_submit(self, submit_data: Dict[str, Any]) -> None:
        """Handle final submission for an onboarding agent, with the given data"""
        self.did_submit.set()
        self.state.update_submit(submit_data)

    def get_status(self) -> str:
        """Get the status of this agent in their work on their unit"""
        if self.db_status not in AgentState.complete():
            row = self.db.get_agent(self.db_id)
            if row["status"] != self.db_status:
                if row["status"] in [
                    AgentState.STATUS_RETURNED,
                    AgentState.STATUS_DISCONNECT,
                ]:
                    # Disconnect statuses should free any pending acts
                    self.has_live_update.set()
                if self._associated_live_run is not None:
                    live_run = self.get_live_run()
                    live_run.loop_wrap.execute_coro(
                        live_run.worker_pool.push_status_update(self)
                    )
            self.db_status = row["status"]
        return self.db_status

    def shutdown(self) -> None:
        """
        Force the given agent to end any polling threads and throw an AgentShutdownError
        from any acts called on it, ensuring tasks using this agent can be cleaned up.
        """
        logger.debug(f"{self} is shutting down")
        self.has_live_update.set()
        self.is_shutdown = True

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}({self.db_id}, {self.db_status})"

    # Children classes should implement the following methods

    def approve_work(self) -> None:
        """Approve the work done on this agent's specific Unit"""
        raise NotImplementedError()

    def soft_reject_work(self) -> None:
        """
        Pay a worker for attempted work, but mark it as below the
        quality bar for this assignment
        """
        # TODO(OWN) extend this method to assign a soft block
        # qualification automatically if a threshold of
        # soft rejects as a proportion of total accepts
        # is exceeded
        self.approve_work()
        self.update_status(AgentState.STATUS_SOFT_REJECTED)

    def reject_work(self, reason) -> None:
        """Reject the work done on this agent's specific Unit"""
        raise NotImplementedError()

    def mark_done(self) -> None:
        """
        Take any required step with the crowd_provider to ensure that
        the worker can submit their work and be marked as complete via
        a call to get_status
        """
        raise NotImplementedError()

    @staticmethod
    def new(db: "MephistoDB", worker: Worker, unit: "Unit") -> "Agent":
        """
        Create an agent for this worker to be used for work on the given Unit.

        Implementation should return the result of _register_agent when sure the agent
        can be successfully created to have it put into the db.
        """
        raise NotImplementedError()

This class encompasses a worker as they are working on an individual assignment. It maintains details for the current task at hand such as start and end time, connection status, etc.

#   class CrowdProvider.ArgsClass:
View Source
class ProviderArgs:
    """Base class for arguments to configure Crowd Providers"""

    _provider_type: str = MISSING
    requester_name: str = MISSING

Base class for arguments to configure Crowd Providers