mephisto.abstractions.blueprint

View Source
#!/usr/bin/env python3

# Copyright (c) Meta Platforms and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from abc import ABC, abstractmethod
from typing import (
    ClassVar,
    List,
    Dict,
    Any,
    Type,
    Union,
    Iterable,
    Callable,
    TYPE_CHECKING,
)

from dataclasses import dataclass, field
from omegaconf import MISSING, DictConfig

from mephisto.abstractions._subcomponents.task_builder import TaskBuilder
from mephisto.abstractions._subcomponents.task_runner import TaskRunner
from mephisto.abstractions._subcomponents.agent_state import AgentState

if TYPE_CHECKING:
    from mephisto.data_model.agent import Agent, OnboardingAgent
    from mephisto.data_model.task_run import TaskRun
    from mephisto.data_model.assignment import InitializationData
    from mephisto.data_model.unit import Unit
    from mephisto.data_model.worker import Worker

from mephisto.utils.logger_core import get_logger

logger = get_logger(name=__name__)


@dataclass
class BlueprintArgs:
    _blueprint_type: str = MISSING
    block_qualification: str = field(
        default=MISSING,
        metadata={
            "help": ("Specify the name of a qualification used to soft block workers.")
        },
    )


@dataclass
class SharedTaskState:
    """
    Base class for specifying additional state that can't just
    be passed as Hydra args, like functions and objects
    """

    task_config: Dict[str, Any] = field(
        default_factory=dict,
        metadata={
            "help": (
                "Values to be included in the frontend MephistoTask.task_config object"
            ),
            "type": "Dict[str, Any]",
            "default": "{}",
        },
    )
    qualifications: List[Any] = field(
        default_factory=list,
        metadata={
            "help": (
                "List of qualification dicts of the form returned by "
                "mephisto.utils.qualifications.make_qualification_dict "
                "to be used with this task run."
            ),
            "type": "List[Dict]",
            "default": "[]",
        },
    )
    worker_can_do_unit: Callable[["Worker", "Unit"], bool] = field(
        default_factory=lambda: (lambda worker, unit: True),
        metadata={
            "help": ("Function to evaluate if a worker is eligible for a given unit"),
            "type": "Callable[[Worker, Unit], bool]",
            "default": "Returns True always",
        },
    )
    on_unit_submitted: Callable[["Unit"], None] = field(
        default_factory=lambda: (lambda unit: None),
        metadata={
            "help": ("Function to evaluate on every unit completed or disconnected"),
            "type": "Callable[[Unit], None]",
            "default": "No-op function",
        },
    )


class BlueprintMixin(ABC):
    """
    Base class for compositional mixins for blueprints

    We expect mixins that subclass other mixins to handle subinitialization
    work, such that only the highest class needs to be called.
    """

    # @property
    # @abstractmethod
    # def ArgsMixin(self) -> Type[object]:  # Should be a dataclass, to extend BlueprintArgs
    #     pass

    # @property
    # @abstractmethod
    # def SharedStateMixin(
    #     self,
    # ) -> Type[object]:  # Also should be a dataclass, to extend SharedTaskState
    #     pass
    ArgsMixin: ClassVar[Type[object]]
    SharedStateMixin: ClassVar[Type[object]]

    @staticmethod
    def extract_unique_mixins(blueprint_class: Type["Blueprint"]):
        """Return the unique mixin classes that are used in the given blueprint class"""
        mixin_subclasses = [
            clazz
            for clazz in blueprint_class.mro()
            if issubclass(clazz, BlueprintMixin)
        ]
        target_class: Union[Type["Blueprint"], Type["BlueprintMixin"]] = blueprint_class
        # Remove magic created with `mixin_args_and_state`
        while target_class.__name__ == "MixedInBlueprint":
            target_class = mixin_subclasses.pop(0)
        removed_locals = [
            clazz
            for clazz in mixin_subclasses
            if "MixedInBlueprint" not in clazz.__name__
        ]
        filtered_subclasses = set(
            clazz
            for clazz in removed_locals
            if clazz != BlueprintMixin and clazz != target_class
        )
        # we also want to make sure that we don't double-count extensions of mixins, so remove classes that other classes are subclasses of
        def is_subclassed(clazz):
            return True in [
                issubclass(x, clazz) and x != clazz for x in filtered_subclasses
            ]

        unique_subclasses = [
            clazz for clazz in filtered_subclasses if not is_subclassed(clazz)
        ]
        return unique_subclasses

    @abstractmethod
    def init_mixin_config(
        self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState"
    ) -> None:
        """Method to initialize any required attributes to make this mixin function"""
        raise NotImplementedError()

    @classmethod
    @abstractmethod
    def assert_mixin_args(
        cls, args: "DictConfig", shared_state: "SharedTaskState"
    ) -> None:
        """Method to validate the incoming args and throw if something won't work"""
        raise NotImplementedError()

    @classmethod
    @abstractmethod
    def get_mixin_qualifications(
        cls, args: "DictConfig", shared_state: "SharedTaskState"
    ) -> List[Dict[str, Any]]:
        """Method to provide any required qualifications to make this mixin function"""
        raise NotImplementedError()

    @classmethod
    def mixin_args_and_state(
        mixin_cls: Type["BlueprintMixin"], target_cls: Type["Blueprint"]
    ):
        """
        Magic utility decorator that can be used to inject mixin configurations
        (BlueprintArgs and SharedTaskState) without the user needing to define new
        classes for these. Should only be used by tasks that aren't already specifying
        new versions of these, which should just inherit otherwise.

        Usage:
          @register_mephisto_abstraction()
          @ABlueprintMixin.mixin_args_and_state
          class MyBlueprint(ABlueprintMixin, Blueprint):
              pass
        """
        # Ignore typing on most of this, as mypy is not able to parse what's happening
        @dataclass
        class MixedInArgsClass(mixin_cls.ArgsMixin, target_cls.ArgsClass):  # type: ignore
            pass

        @dataclass
        class MixedInSharedStateClass(
            mixin_cls.SharedStateMixin, target_cls.SharedStateClass  # type: ignore
        ):
            pass

        class MixedInBlueprint(target_cls):  # type: ignore
            ArgsClass = MixedInArgsClass
            SharedStateClass = MixedInSharedStateClass

        return MixedInBlueprint


class Blueprint(ABC):
    """
    Configuration class for the various parts of building, launching,
    and running a task of a specific task. Provides utility functions
    for managing between the three main components, which are separated
    into separate classes in acknowledgement that some tasks may have
    particularly complicated processes for them
    """

    AgentStateClass: ClassVar[Type["AgentState"]]
    OnboardingAgentStateClass: ClassVar[Type["AgentState"]] = AgentState  # type: ignore
    TaskRunnerClass: ClassVar[Type["TaskRunner"]]
    TaskBuilderClass: ClassVar[Type["TaskBuilder"]]
    ArgsClass: ClassVar[Type["BlueprintArgs"]] = BlueprintArgs
    SharedStateClass: ClassVar[Type["SharedTaskState"]] = SharedTaskState
    BLUEPRINT_TYPE: str

    def __init__(
        self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState"
    ):
        self.args = args
        self.shared_state = shared_state
        self.frontend_task_config = shared_state.task_config

        # We automatically call all mixins `init_mixin_config` methods available.
        mixin_subclasses = BlueprintMixin.extract_unique_mixins(self.__class__)
        for clazz in mixin_subclasses:
            clazz.init_mixin_config(self, task_run, args, shared_state)

    @classmethod
    def get_required_qualifications(
        cls, args: DictConfig, shared_state: "SharedTaskState"
    ):
        quals = []
        for clazz in BlueprintMixin.extract_unique_mixins(cls):
            quals += clazz.get_mixin_qualifications(args, shared_state)
        return quals

    @classmethod
    def assert_task_args(cls, args: DictConfig, shared_state: "SharedTaskState"):
        """
        Assert that the provided arguments are valid. Should
        fail if a task launched with these arguments would
        not work
        """
        # We automatically call all mixins `assert_task_args` methods available.
        mixin_subclasses = BlueprintMixin.extract_unique_mixins(cls)
        for clazz in mixin_subclasses:
            clazz.assert_mixin_args(args, shared_state)
        return

    def get_frontend_args(self) -> Dict[str, Any]:
        """
        Specifies what options should be fowarded
        to the client for use by the task's frontend
        """
        return self.frontend_task_config.copy()

    @abstractmethod
    def get_initialization_data(
        self,
    ) -> Iterable["InitializationData"]:
        """
        Get all of the data used to initialize tasks from this blueprint.
        Can either be a simple iterable if all the assignments can
        be processed at once, or a Generator if the number
        of tasks is unknown or changes based on something running
        concurrently with the job.
        """
        raise NotImplementedError
#   class BlueprintArgs:
View Source
class BlueprintArgs:
    _blueprint_type: str = MISSING
    block_qualification: str = field(
        default=MISSING,
        metadata={
            "help": ("Specify the name of a qualification used to soft block workers.")
        },
    )

BlueprintArgs(_blueprint_type: str = '???', block_qualification: str = '???')

#   BlueprintArgs(_blueprint_type: str = '???', block_qualification: str = '???')
#   block_qualification: str = '???'
#   class SharedTaskState:
View Source
class SharedTaskState:
    """
    Base class for specifying additional state that can't just
    be passed as Hydra args, like functions and objects
    """

    task_config: Dict[str, Any] = field(
        default_factory=dict,
        metadata={
            "help": (
                "Values to be included in the frontend MephistoTask.task_config object"
            ),
            "type": "Dict[str, Any]",
            "default": "{}",
        },
    )
    qualifications: List[Any] = field(
        default_factory=list,
        metadata={
            "help": (
                "List of qualification dicts of the form returned by "
                "mephisto.utils.qualifications.make_qualification_dict "
                "to be used with this task run."
            ),
            "type": "List[Dict]",
            "default": "[]",
        },
    )
    worker_can_do_unit: Callable[["Worker", "Unit"], bool] = field(
        default_factory=lambda: (lambda worker, unit: True),
        metadata={
            "help": ("Function to evaluate if a worker is eligible for a given unit"),
            "type": "Callable[[Worker, Unit], bool]",
            "default": "Returns True always",
        },
    )
    on_unit_submitted: Callable[["Unit"], None] = field(
        default_factory=lambda: (lambda unit: None),
        metadata={
            "help": ("Function to evaluate on every unit completed or disconnected"),
            "type": "Callable[[Unit], None]",
            "default": "No-op function",
        },
    )

Base class for specifying additional state that can't just be passed as Hydra args, like functions and objects

#   SharedTaskState( task_config: Dict[str, Any] = <factory>, qualifications: List[Any] = <factory>, worker_can_do_unit: collections.abc.Callable[mephisto.data_model.worker.Worker, mephisto.data_model.unit.Unit, bool] = <factory>, on_unit_submitted: collections.abc.Callable[mephisto.data_model.unit.Unit, NoneType] = <factory> )
#   class BlueprintMixin(abc.ABC):
View Source
class BlueprintMixin(ABC):
    """
    Base class for compositional mixins for blueprints

    We expect mixins that subclass other mixins to handle subinitialization
    work, such that only the highest class needs to be called.
    """

    # @property
    # @abstractmethod
    # def ArgsMixin(self) -> Type[object]:  # Should be a dataclass, to extend BlueprintArgs
    #     pass

    # @property
    # @abstractmethod
    # def SharedStateMixin(
    #     self,
    # ) -> Type[object]:  # Also should be a dataclass, to extend SharedTaskState
    #     pass
    ArgsMixin: ClassVar[Type[object]]
    SharedStateMixin: ClassVar[Type[object]]

    @staticmethod
    def extract_unique_mixins(blueprint_class: Type["Blueprint"]):
        """Return the unique mixin classes that are used in the given blueprint class"""
        mixin_subclasses = [
            clazz
            for clazz in blueprint_class.mro()
            if issubclass(clazz, BlueprintMixin)
        ]
        target_class: Union[Type["Blueprint"], Type["BlueprintMixin"]] = blueprint_class
        # Remove magic created with `mixin_args_and_state`
        while target_class.__name__ == "MixedInBlueprint":
            target_class = mixin_subclasses.pop(0)
        removed_locals = [
            clazz
            for clazz in mixin_subclasses
            if "MixedInBlueprint" not in clazz.__name__
        ]
        filtered_subclasses = set(
            clazz
            for clazz in removed_locals
            if clazz != BlueprintMixin and clazz != target_class
        )
        # we also want to make sure that we don't double-count extensions of mixins, so remove classes that other classes are subclasses of
        def is_subclassed(clazz):
            return True in [
                issubclass(x, clazz) and x != clazz for x in filtered_subclasses
            ]

        unique_subclasses = [
            clazz for clazz in filtered_subclasses if not is_subclassed(clazz)
        ]
        return unique_subclasses

    @abstractmethod
    def init_mixin_config(
        self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState"
    ) -> None:
        """Method to initialize any required attributes to make this mixin function"""
        raise NotImplementedError()

    @classmethod
    @abstractmethod
    def assert_mixin_args(
        cls, args: "DictConfig", shared_state: "SharedTaskState"
    ) -> None:
        """Method to validate the incoming args and throw if something won't work"""
        raise NotImplementedError()

    @classmethod
    @abstractmethod
    def get_mixin_qualifications(
        cls, args: "DictConfig", shared_state: "SharedTaskState"
    ) -> List[Dict[str, Any]]:
        """Method to provide any required qualifications to make this mixin function"""
        raise NotImplementedError()

    @classmethod
    def mixin_args_and_state(
        mixin_cls: Type["BlueprintMixin"], target_cls: Type["Blueprint"]
    ):
        """
        Magic utility decorator that can be used to inject mixin configurations
        (BlueprintArgs and SharedTaskState) without the user needing to define new
        classes for these. Should only be used by tasks that aren't already specifying
        new versions of these, which should just inherit otherwise.

        Usage:
          @register_mephisto_abstraction()
          @ABlueprintMixin.mixin_args_and_state
          class MyBlueprint(ABlueprintMixin, Blueprint):
              pass
        """
        # Ignore typing on most of this, as mypy is not able to parse what's happening
        @dataclass
        class MixedInArgsClass(mixin_cls.ArgsMixin, target_cls.ArgsClass):  # type: ignore
            pass

        @dataclass
        class MixedInSharedStateClass(
            mixin_cls.SharedStateMixin, target_cls.SharedStateClass  # type: ignore
        ):
            pass

        class MixedInBlueprint(target_cls):  # type: ignore
            ArgsClass = MixedInArgsClass
            SharedStateClass = MixedInSharedStateClass

        return MixedInBlueprint

Base class for compositional mixins for blueprints

We expect mixins that subclass other mixins to handle subinitialization work, such that only the highest class needs to be called.

#  
@staticmethod
def extract_unique_mixins(blueprint_class: type[mephisto.abstractions.blueprint.Blueprint]):
View Source
    @staticmethod
    def extract_unique_mixins(blueprint_class: Type["Blueprint"]):
        """Return the unique mixin classes that are used in the given blueprint class"""
        mixin_subclasses = [
            clazz
            for clazz in blueprint_class.mro()
            if issubclass(clazz, BlueprintMixin)
        ]
        target_class: Union[Type["Blueprint"], Type["BlueprintMixin"]] = blueprint_class
        # Remove magic created with `mixin_args_and_state`
        while target_class.__name__ == "MixedInBlueprint":
            target_class = mixin_subclasses.pop(0)
        removed_locals = [
            clazz
            for clazz in mixin_subclasses
            if "MixedInBlueprint" not in clazz.__name__
        ]
        filtered_subclasses = set(
            clazz
            for clazz in removed_locals
            if clazz != BlueprintMixin and clazz != target_class
        )
        # we also want to make sure that we don't double-count extensions of mixins, so remove classes that other classes are subclasses of
        def is_subclassed(clazz):
            return True in [
                issubclass(x, clazz) and x != clazz for x in filtered_subclasses
            ]

        unique_subclasses = [
            clazz for clazz in filtered_subclasses if not is_subclassed(clazz)
        ]
        return unique_subclasses

Return the unique mixin classes that are used in the given blueprint class

#  
@abstractmethod
def init_mixin_config( self, task_run: mephisto.data_model.task_run.TaskRun, args: omegaconf.dictconfig.DictConfig, shared_state: mephisto.abstractions.blueprint.SharedTaskState ) -> None:
View Source
    @abstractmethod
    def init_mixin_config(
        self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState"
    ) -> None:
        """Method to initialize any required attributes to make this mixin function"""
        raise NotImplementedError()

Method to initialize any required attributes to make this mixin function

#  
@classmethod
@abstractmethod
def assert_mixin_args( cls, args: omegaconf.dictconfig.DictConfig, shared_state: mephisto.abstractions.blueprint.SharedTaskState ) -> None:
View Source
    @classmethod
    @abstractmethod
    def assert_mixin_args(
        cls, args: "DictConfig", shared_state: "SharedTaskState"
    ) -> None:
        """Method to validate the incoming args and throw if something won't work"""
        raise NotImplementedError()

Method to validate the incoming args and throw if something won't work

#  
@classmethod
@abstractmethod
def get_mixin_qualifications( cls, args: omegaconf.dictconfig.DictConfig, shared_state: mephisto.abstractions.blueprint.SharedTaskState ) -> List[Dict[str, Any]]:
View Source
    @classmethod
    @abstractmethod
    def get_mixin_qualifications(
        cls, args: "DictConfig", shared_state: "SharedTaskState"
    ) -> List[Dict[str, Any]]:
        """Method to provide any required qualifications to make this mixin function"""
        raise NotImplementedError()

Method to provide any required qualifications to make this mixin function

#  
@classmethod
def mixin_args_and_state( mixin_cls: type[mephisto.abstractions.blueprint.BlueprintMixin], target_cls: type[mephisto.abstractions.blueprint.Blueprint] ):
View Source
    @classmethod
    def mixin_args_and_state(
        mixin_cls: Type["BlueprintMixin"], target_cls: Type["Blueprint"]
    ):
        """
        Magic utility decorator that can be used to inject mixin configurations
        (BlueprintArgs and SharedTaskState) without the user needing to define new
        classes for these. Should only be used by tasks that aren't already specifying
        new versions of these, which should just inherit otherwise.

        Usage:
          @register_mephisto_abstraction()
          @ABlueprintMixin.mixin_args_and_state
          class MyBlueprint(ABlueprintMixin, Blueprint):
              pass
        """
        # Ignore typing on most of this, as mypy is not able to parse what's happening
        @dataclass
        class MixedInArgsClass(mixin_cls.ArgsMixin, target_cls.ArgsClass):  # type: ignore
            pass

        @dataclass
        class MixedInSharedStateClass(
            mixin_cls.SharedStateMixin, target_cls.SharedStateClass  # type: ignore
        ):
            pass

        class MixedInBlueprint(target_cls):  # type: ignore
            ArgsClass = MixedInArgsClass
            SharedStateClass = MixedInSharedStateClass

        return MixedInBlueprint

Magic utility decorator that can be used to inject mixin configurations (BlueprintArgs and SharedTaskState) without the user needing to define new classes for these. Should only be used by tasks that aren't already specifying new versions of these, which should just inherit otherwise.

Usage: @register_mephisto_abstraction() @ABlueprintMixin.mixin_args_and_state class MyBlueprint(ABlueprintMixin, Blueprint): pass

#   class Blueprint(abc.ABC):
View Source
class Blueprint(ABC):
    """
    Configuration class for the various parts of building, launching,
    and running a task of a specific task. Provides utility functions
    for managing between the three main components, which are separated
    into separate classes in acknowledgement that some tasks may have
    particularly complicated processes for them
    """

    AgentStateClass: ClassVar[Type["AgentState"]]
    OnboardingAgentStateClass: ClassVar[Type["AgentState"]] = AgentState  # type: ignore
    TaskRunnerClass: ClassVar[Type["TaskRunner"]]
    TaskBuilderClass: ClassVar[Type["TaskBuilder"]]
    ArgsClass: ClassVar[Type["BlueprintArgs"]] = BlueprintArgs
    SharedStateClass: ClassVar[Type["SharedTaskState"]] = SharedTaskState
    BLUEPRINT_TYPE: str

    def __init__(
        self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState"
    ):
        self.args = args
        self.shared_state = shared_state
        self.frontend_task_config = shared_state.task_config

        # We automatically call all mixins `init_mixin_config` methods available.
        mixin_subclasses = BlueprintMixin.extract_unique_mixins(self.__class__)
        for clazz in mixin_subclasses:
            clazz.init_mixin_config(self, task_run, args, shared_state)

    @classmethod
    def get_required_qualifications(
        cls, args: DictConfig, shared_state: "SharedTaskState"
    ):
        quals = []
        for clazz in BlueprintMixin.extract_unique_mixins(cls):
            quals += clazz.get_mixin_qualifications(args, shared_state)
        return quals

    @classmethod
    def assert_task_args(cls, args: DictConfig, shared_state: "SharedTaskState"):
        """
        Assert that the provided arguments are valid. Should
        fail if a task launched with these arguments would
        not work
        """
        # We automatically call all mixins `assert_task_args` methods available.
        mixin_subclasses = BlueprintMixin.extract_unique_mixins(cls)
        for clazz in mixin_subclasses:
            clazz.assert_mixin_args(args, shared_state)
        return

    def get_frontend_args(self) -> Dict[str, Any]:
        """
        Specifies what options should be fowarded
        to the client for use by the task's frontend
        """
        return self.frontend_task_config.copy()

    @abstractmethod
    def get_initialization_data(
        self,
    ) -> Iterable["InitializationData"]:
        """
        Get all of the data used to initialize tasks from this blueprint.
        Can either be a simple iterable if all the assignments can
        be processed at once, or a Generator if the number
        of tasks is unknown or changes based on something running
        concurrently with the job.
        """
        raise NotImplementedError

Configuration class for the various parts of building, launching, and running a task of a specific task. Provides utility functions for managing between the three main components, which are separated into separate classes in acknowledgement that some tasks may have particularly complicated processes for them

#  
@classmethod
def get_required_qualifications( cls, args: omegaconf.dictconfig.DictConfig, shared_state: mephisto.abstractions.blueprint.SharedTaskState ):
View Source
    @classmethod
    def get_required_qualifications(
        cls, args: DictConfig, shared_state: "SharedTaskState"
    ):
        quals = []
        for clazz in BlueprintMixin.extract_unique_mixins(cls):
            quals += clazz.get_mixin_qualifications(args, shared_state)
        return quals
#  
@classmethod
def assert_task_args( cls, args: omegaconf.dictconfig.DictConfig, shared_state: mephisto.abstractions.blueprint.SharedTaskState ):
View Source
    @classmethod
    def assert_task_args(cls, args: DictConfig, shared_state: "SharedTaskState"):
        """
        Assert that the provided arguments are valid. Should
        fail if a task launched with these arguments would
        not work
        """
        # We automatically call all mixins `assert_task_args` methods available.
        mixin_subclasses = BlueprintMixin.extract_unique_mixins(cls)
        for clazz in mixin_subclasses:
            clazz.assert_mixin_args(args, shared_state)
        return

Assert that the provided arguments are valid. Should fail if a task launched with these arguments would not work

#   def get_frontend_args(self) -> Dict[str, Any]:
View Source
    def get_frontend_args(self) -> Dict[str, Any]:
        """
        Specifies what options should be fowarded
        to the client for use by the task's frontend
        """
        return self.frontend_task_config.copy()

Specifies what options should be fowarded to the client for use by the task's frontend

#  
@abstractmethod
def get_initialization_data( self ) -> collections.abc.Iterable[mephisto.data_model.assignment.InitializationData]:
View Source
    @abstractmethod
    def get_initialization_data(
        self,
    ) -> Iterable["InitializationData"]:
        """
        Get all of the data used to initialize tasks from this blueprint.
        Can either be a simple iterable if all the assignments can
        be processed at once, or a Generator if the number
        of tasks is unknown or changes based on something running
        concurrently with the job.
        """
        raise NotImplementedError

Get all of the data used to initialize tasks from this blueprint. Can either be a simple iterable if all the assignments can be processed at once, or a Generator if the number of tasks is unknown or changes based on something running concurrently with the job.

#   class Blueprint.OnboardingAgentStateClass(abc.ABC):
View Source
class AgentState(ABC):
    """
    Class for holding state information about work by an Agent on a Unit, currently
    stored as current task work into a json file.

    Specific state implementations will need to be created for different Task Types,
    as different tasks store and load differing data.
    """

    # Possible Agent Status Values
    STATUS_NONE = "none"
    STATUS_ACCEPTED = "accepted"
    STATUS_ONBOARDING = "onboarding"
    STATUS_WAITING = "waiting"
    STATUS_IN_TASK = "in task"
    STATUS_COMPLETED = "completed"
    STATUS_DISCONNECT = "disconnect"
    STATUS_TIMEOUT = "timeout"
    STATUS_PARTNER_DISCONNECT = "partner disconnect"
    STATUS_EXPIRED = "expired"
    STATUS_RETURNED = "returned"
    STATUS_APPROVED = "approved"
    STATUS_SOFT_REJECTED = "soft_rejected"
    STATUS_REJECTED = "rejected"

    def __new__(cls, agent: Union["Agent", "OnboardingAgent"]) -> "AgentState":
        """Return the correct agent state for the given agent"""
        if cls == AgentState:
            from mephisto.data_model.agent import Agent
            from mephisto.operations.registry import get_blueprint_from_type

            # We are trying to construct an AgentState, find what type to use and
            # create that instead
            if isinstance(agent, Agent):
                correct_class = get_blueprint_from_type(agent.task_type).AgentStateClass
            else:
                correct_class = get_blueprint_from_type(
                    agent.task_type
                ).OnboardingAgentStateClass
            return super().__new__(correct_class)
        else:
            # We are constructing another instance directly
            return super().__new__(cls)

    @staticmethod
    def complete() -> List[str]:
        """Return all final Agent statuses which will not be updated by the WorkerPool"""
        return [
            AgentState.STATUS_COMPLETED,
            AgentState.STATUS_DISCONNECT,
            AgentState.STATUS_TIMEOUT,
            AgentState.STATUS_EXPIRED,
            AgentState.STATUS_RETURNED,
            AgentState.STATUS_SOFT_REJECTED,
            AgentState.STATUS_APPROVED,
            AgentState.STATUS_REJECTED,
        ]

    @staticmethod
    def valid() -> List[str]:
        """Return all valid Agent statuses"""
        return [
            AgentState.STATUS_NONE,
            AgentState.STATUS_ACCEPTED,
            AgentState.STATUS_ONBOARDING,
            AgentState.STATUS_WAITING,
            AgentState.STATUS_IN_TASK,
            AgentState.STATUS_COMPLETED,
            AgentState.STATUS_DISCONNECT,
            AgentState.STATUS_TIMEOUT,
            AgentState.STATUS_PARTNER_DISCONNECT,
            AgentState.STATUS_EXPIRED,
            AgentState.STATUS_RETURNED,
            AgentState.STATUS_SOFT_REJECTED,
            AgentState.STATUS_APPROVED,
            AgentState.STATUS_REJECTED,
        ]

    # Implementations of an AgentState must implement the following:

    @abstractmethod
    def __init__(self, agent: "Agent"):
        """
        Create an AgentState to track the state of an agent's work on a Unit

        Implementations should initialize any required files for saving and
        loading state data somewhere.

        If said file already exists based on the given agent, load that data
        instead.
        """
        raise NotImplementedError()

    @abstractmethod
    def set_init_state(self, data: Any) -> bool:
        """Set the initial state for this agent"""
        raise NotImplementedError()

    @abstractmethod
    def get_init_state(self) -> Optional[Any]:
        """
        Return the initial state for this agent,
        None if no such state exists
        """
        raise NotImplementedError()

    @abstractmethod
    def load_data(self) -> None:
        """
        Load stored data from a file to this object
        """
        raise NotImplementedError()

    @abstractmethod
    def get_data(self) -> Dict[str, Any]:
        """
        Return the currently stored data for this task in the format
        expected by any frontend displays
        """
        raise NotImplementedError()

    def get_parsed_data(self) -> Any:
        """
        Return the portion of the data that is relevant to a human
        who wants to parse or analyze the data

        Utility function to handle stripping the data of any
        context that is only important for reproducing the task
        exactly. By default is just `get_data`
        """
        return self.get_data()

    @abstractmethod
    def save_data(self) -> None:
        """
        Save the relevant data from this Unit to a file in the expected location
        """
        raise NotImplementedError()

    @abstractmethod
    def update_data(self, live_update: Dict[str, Any]) -> None:
        """
        Put new live update data into this AgentState. Keep only what is
        necessary to recreate the task for evaluation and later use.
        """
        raise NotImplementedError()

    @abstractmethod
    def update_submit(self, submit_data: Dict[str, Any]) -> None:
        """
        Update this AgentState with the final submission data.
        """
        raise NotImplementedError()

    def get_task_start(self) -> Optional[float]:
        """
        Return the start time for this task, if it is available
        """
        return 0.0

    def get_task_end(self) -> Optional[float]:
        """
        Return the end time for this task, if it is available
        """
        return 0.0

Class for holding state information about work by an Agent on a Unit, currently stored as current task work into a json file.

Specific state implementations will need to be created for different Task Types, as different tasks store and load differing data.

Inherited Members
mephisto.abstractions._subcomponents.agent_state.AgentState
AgentState
STATUS_NONE
STATUS_ACCEPTED
STATUS_ONBOARDING
STATUS_WAITING
STATUS_IN_TASK
STATUS_COMPLETED
STATUS_DISCONNECT
STATUS_TIMEOUT
STATUS_PARTNER_DISCONNECT
STATUS_EXPIRED
STATUS_RETURNED
STATUS_APPROVED
STATUS_SOFT_REJECTED
STATUS_REJECTED
complete
valid
set_init_state
get_init_state
load_data
get_data
get_parsed_data
save_data
update_data
update_submit
get_task_start
get_task_end
#   class Blueprint.ArgsClass:
View Source
class BlueprintArgs:
    _blueprint_type: str = MISSING
    block_qualification: str = field(
        default=MISSING,
        metadata={
            "help": ("Specify the name of a qualification used to soft block workers.")
        },
    )

BlueprintArgs(_blueprint_type: str = '???', block_qualification: str = '???')

#   class Blueprint.SharedStateClass:
View Source
class SharedTaskState:
    """
    Base class for specifying additional state that can't just
    be passed as Hydra args, like functions and objects
    """

    task_config: Dict[str, Any] = field(
        default_factory=dict,
        metadata={
            "help": (
                "Values to be included in the frontend MephistoTask.task_config object"
            ),
            "type": "Dict[str, Any]",
            "default": "{}",
        },
    )
    qualifications: List[Any] = field(
        default_factory=list,
        metadata={
            "help": (
                "List of qualification dicts of the form returned by "
                "mephisto.utils.qualifications.make_qualification_dict "
                "to be used with this task run."
            ),
            "type": "List[Dict]",
            "default": "[]",
        },
    )
    worker_can_do_unit: Callable[["Worker", "Unit"], bool] = field(
        default_factory=lambda: (lambda worker, unit: True),
        metadata={
            "help": ("Function to evaluate if a worker is eligible for a given unit"),
            "type": "Callable[[Worker, Unit], bool]",
            "default": "Returns True always",
        },
    )
    on_unit_submitted: Callable[["Unit"], None] = field(
        default_factory=lambda: (lambda unit: None),
        metadata={
            "help": ("Function to evaluate on every unit completed or disconnected"),
            "type": "Callable[[Unit], None]",
            "default": "No-op function",
        },
    )

Base class for specifying additional state that can't just be passed as Hydra args, like functions and objects

Inherited Members
SharedTaskState
SharedTaskState