mephisto.abstractions.blueprint
View Source
#!/usr/bin/env python3 # Copyright (c) Meta Platforms and its affiliates. # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. from abc import ABC, abstractmethod from typing import ( ClassVar, List, Dict, Any, Type, Union, Iterable, Callable, TYPE_CHECKING, ) from dataclasses import dataclass, field from omegaconf import MISSING, DictConfig from mephisto.abstractions._subcomponents.task_builder import TaskBuilder from mephisto.abstractions._subcomponents.task_runner import TaskRunner from mephisto.abstractions._subcomponents.agent_state import AgentState if TYPE_CHECKING: from mephisto.data_model.agent import Agent, OnboardingAgent from mephisto.data_model.task_run import TaskRun from mephisto.data_model.assignment import InitializationData from mephisto.data_model.unit import Unit from mephisto.data_model.worker import Worker from mephisto.utils.logger_core import get_logger logger = get_logger(name=__name__) @dataclass class BlueprintArgs: _blueprint_type: str = MISSING block_qualification: str = field( default=MISSING, metadata={ "help": ("Specify the name of a qualification used to soft block workers.") }, ) @dataclass class SharedTaskState: """ Base class for specifying additional state that can't just be passed as Hydra args, like functions and objects """ task_config: Dict[str, Any] = field( default_factory=dict, metadata={ "help": ( "Values to be included in the frontend MephistoTask.task_config object" ), "type": "Dict[str, Any]", "default": "{}", }, ) qualifications: List[Any] = field( default_factory=list, metadata={ "help": ( "List of qualification dicts of the form returned by " "mephisto.utils.qualifications.make_qualification_dict " "to be used with this task run." ), "type": "List[Dict]", "default": "[]", }, ) worker_can_do_unit: Callable[["Worker", "Unit"], bool] = field( default_factory=lambda: (lambda worker, unit: True), metadata={ "help": ("Function to evaluate if a worker is eligible for a given unit"), "type": "Callable[[Worker, Unit], bool]", "default": "Returns True always", }, ) on_unit_submitted: Callable[["Unit"], None] = field( default_factory=lambda: (lambda unit: None), metadata={ "help": ("Function to evaluate on every unit completed or disconnected"), "type": "Callable[[Unit], None]", "default": "No-op function", }, ) class BlueprintMixin(ABC): """ Base class for compositional mixins for blueprints We expect mixins that subclass other mixins to handle subinitialization work, such that only the highest class needs to be called. """ # @property # @abstractmethod # def ArgsMixin(self) -> Type[object]: # Should be a dataclass, to extend BlueprintArgs # pass # @property # @abstractmethod # def SharedStateMixin( # self, # ) -> Type[object]: # Also should be a dataclass, to extend SharedTaskState # pass ArgsMixin: ClassVar[Type[object]] SharedStateMixin: ClassVar[Type[object]] @staticmethod def extract_unique_mixins(blueprint_class: Type["Blueprint"]): """Return the unique mixin classes that are used in the given blueprint class""" mixin_subclasses = [ clazz for clazz in blueprint_class.mro() if issubclass(clazz, BlueprintMixin) ] target_class: Union[Type["Blueprint"], Type["BlueprintMixin"]] = blueprint_class # Remove magic created with `mixin_args_and_state` while target_class.__name__ == "MixedInBlueprint": target_class = mixin_subclasses.pop(0) removed_locals = [ clazz for clazz in mixin_subclasses if "MixedInBlueprint" not in clazz.__name__ ] filtered_subclasses = set( clazz for clazz in removed_locals if clazz != BlueprintMixin and clazz != target_class ) # we also want to make sure that we don't double-count extensions of mixins, so remove classes that other classes are subclasses of def is_subclassed(clazz): return True in [ issubclass(x, clazz) and x != clazz for x in filtered_subclasses ] unique_subclasses = [ clazz for clazz in filtered_subclasses if not is_subclassed(clazz) ] return unique_subclasses @abstractmethod def init_mixin_config( self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState" ) -> None: """Method to initialize any required attributes to make this mixin function""" raise NotImplementedError() @classmethod @abstractmethod def assert_mixin_args( cls, args: "DictConfig", shared_state: "SharedTaskState" ) -> None: """Method to validate the incoming args and throw if something won't work""" raise NotImplementedError() @classmethod @abstractmethod def get_mixin_qualifications( cls, args: "DictConfig", shared_state: "SharedTaskState" ) -> List[Dict[str, Any]]: """Method to provide any required qualifications to make this mixin function""" raise NotImplementedError() @classmethod def mixin_args_and_state( mixin_cls: Type["BlueprintMixin"], target_cls: Type["Blueprint"] ): """ Magic utility decorator that can be used to inject mixin configurations (BlueprintArgs and SharedTaskState) without the user needing to define new classes for these. Should only be used by tasks that aren't already specifying new versions of these, which should just inherit otherwise. Usage: @register_mephisto_abstraction() @ABlueprintMixin.mixin_args_and_state class MyBlueprint(ABlueprintMixin, Blueprint): pass """ # Ignore typing on most of this, as mypy is not able to parse what's happening @dataclass class MixedInArgsClass(mixin_cls.ArgsMixin, target_cls.ArgsClass): # type: ignore pass @dataclass class MixedInSharedStateClass( mixin_cls.SharedStateMixin, target_cls.SharedStateClass # type: ignore ): pass class MixedInBlueprint(target_cls): # type: ignore ArgsClass = MixedInArgsClass SharedStateClass = MixedInSharedStateClass return MixedInBlueprint class Blueprint(ABC): """ Configuration class for the various parts of building, launching, and running a task of a specific task. Provides utility functions for managing between the three main components, which are separated into separate classes in acknowledgement that some tasks may have particularly complicated processes for them """ AgentStateClass: ClassVar[Type["AgentState"]] OnboardingAgentStateClass: ClassVar[Type["AgentState"]] = AgentState # type: ignore TaskRunnerClass: ClassVar[Type["TaskRunner"]] TaskBuilderClass: ClassVar[Type["TaskBuilder"]] ArgsClass: ClassVar[Type["BlueprintArgs"]] = BlueprintArgs SharedStateClass: ClassVar[Type["SharedTaskState"]] = SharedTaskState BLUEPRINT_TYPE: str def __init__( self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState" ): self.args = args self.shared_state = shared_state self.frontend_task_config = shared_state.task_config # We automatically call all mixins `init_mixin_config` methods available. mixin_subclasses = BlueprintMixin.extract_unique_mixins(self.__class__) for clazz in mixin_subclasses: clazz.init_mixin_config(self, task_run, args, shared_state) @classmethod def get_required_qualifications( cls, args: DictConfig, shared_state: "SharedTaskState" ): quals = [] for clazz in BlueprintMixin.extract_unique_mixins(cls): quals += clazz.get_mixin_qualifications(args, shared_state) return quals @classmethod def assert_task_args(cls, args: DictConfig, shared_state: "SharedTaskState"): """ Assert that the provided arguments are valid. Should fail if a task launched with these arguments would not work """ # We automatically call all mixins `assert_task_args` methods available. mixin_subclasses = BlueprintMixin.extract_unique_mixins(cls) for clazz in mixin_subclasses: clazz.assert_mixin_args(args, shared_state) return def get_frontend_args(self) -> Dict[str, Any]: """ Specifies what options should be fowarded to the client for use by the task's frontend """ return self.frontend_task_config.copy() @abstractmethod def get_initialization_data( self, ) -> Iterable["InitializationData"]: """ Get all of the data used to initialize tasks from this blueprint. Can either be a simple iterable if all the assignments can be processed at once, or a Generator if the number of tasks is unknown or changes based on something running concurrently with the job. """ raise NotImplementedError
View Source
class BlueprintArgs: _blueprint_type: str = MISSING block_qualification: str = field( default=MISSING, metadata={ "help": ("Specify the name of a qualification used to soft block workers.") }, )
BlueprintArgs(_blueprint_type: str = '???', block_qualification: str = '???')
View Source
class BlueprintMixin(ABC): """ Base class for compositional mixins for blueprints We expect mixins that subclass other mixins to handle subinitialization work, such that only the highest class needs to be called. """ # @property # @abstractmethod # def ArgsMixin(self) -> Type[object]: # Should be a dataclass, to extend BlueprintArgs # pass # @property # @abstractmethod # def SharedStateMixin( # self, # ) -> Type[object]: # Also should be a dataclass, to extend SharedTaskState # pass ArgsMixin: ClassVar[Type[object]] SharedStateMixin: ClassVar[Type[object]] @staticmethod def extract_unique_mixins(blueprint_class: Type["Blueprint"]): """Return the unique mixin classes that are used in the given blueprint class""" mixin_subclasses = [ clazz for clazz in blueprint_class.mro() if issubclass(clazz, BlueprintMixin) ] target_class: Union[Type["Blueprint"], Type["BlueprintMixin"]] = blueprint_class # Remove magic created with `mixin_args_and_state` while target_class.__name__ == "MixedInBlueprint": target_class = mixin_subclasses.pop(0) removed_locals = [ clazz for clazz in mixin_subclasses if "MixedInBlueprint" not in clazz.__name__ ] filtered_subclasses = set( clazz for clazz in removed_locals if clazz != BlueprintMixin and clazz != target_class ) # we also want to make sure that we don't double-count extensions of mixins, so remove classes that other classes are subclasses of def is_subclassed(clazz): return True in [ issubclass(x, clazz) and x != clazz for x in filtered_subclasses ] unique_subclasses = [ clazz for clazz in filtered_subclasses if not is_subclassed(clazz) ] return unique_subclasses @abstractmethod def init_mixin_config( self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState" ) -> None: """Method to initialize any required attributes to make this mixin function""" raise NotImplementedError() @classmethod @abstractmethod def assert_mixin_args( cls, args: "DictConfig", shared_state: "SharedTaskState" ) -> None: """Method to validate the incoming args and throw if something won't work""" raise NotImplementedError() @classmethod @abstractmethod def get_mixin_qualifications( cls, args: "DictConfig", shared_state: "SharedTaskState" ) -> List[Dict[str, Any]]: """Method to provide any required qualifications to make this mixin function""" raise NotImplementedError() @classmethod def mixin_args_and_state( mixin_cls: Type["BlueprintMixin"], target_cls: Type["Blueprint"] ): """ Magic utility decorator that can be used to inject mixin configurations (BlueprintArgs and SharedTaskState) without the user needing to define new classes for these. Should only be used by tasks that aren't already specifying new versions of these, which should just inherit otherwise. Usage: @register_mephisto_abstraction() @ABlueprintMixin.mixin_args_and_state class MyBlueprint(ABlueprintMixin, Blueprint): pass """ # Ignore typing on most of this, as mypy is not able to parse what's happening @dataclass class MixedInArgsClass(mixin_cls.ArgsMixin, target_cls.ArgsClass): # type: ignore pass @dataclass class MixedInSharedStateClass( mixin_cls.SharedStateMixin, target_cls.SharedStateClass # type: ignore ): pass class MixedInBlueprint(target_cls): # type: ignore ArgsClass = MixedInArgsClass SharedStateClass = MixedInSharedStateClass return MixedInBlueprint
Base class for compositional mixins for blueprints
We expect mixins that subclass other mixins to handle subinitialization work, such that only the highest class needs to be called.
View Source
@staticmethod def extract_unique_mixins(blueprint_class: Type["Blueprint"]): """Return the unique mixin classes that are used in the given blueprint class""" mixin_subclasses = [ clazz for clazz in blueprint_class.mro() if issubclass(clazz, BlueprintMixin) ] target_class: Union[Type["Blueprint"], Type["BlueprintMixin"]] = blueprint_class # Remove magic created with `mixin_args_and_state` while target_class.__name__ == "MixedInBlueprint": target_class = mixin_subclasses.pop(0) removed_locals = [ clazz for clazz in mixin_subclasses if "MixedInBlueprint" not in clazz.__name__ ] filtered_subclasses = set( clazz for clazz in removed_locals if clazz != BlueprintMixin and clazz != target_class ) # we also want to make sure that we don't double-count extensions of mixins, so remove classes that other classes are subclasses of def is_subclassed(clazz): return True in [ issubclass(x, clazz) and x != clazz for x in filtered_subclasses ] unique_subclasses = [ clazz for clazz in filtered_subclasses if not is_subclassed(clazz) ] return unique_subclasses
Return the unique mixin classes that are used in the given blueprint class
View Source
@abstractmethod def init_mixin_config( self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState" ) -> None: """Method to initialize any required attributes to make this mixin function""" raise NotImplementedError()
Method to initialize any required attributes to make this mixin function
View Source
@classmethod @abstractmethod def assert_mixin_args( cls, args: "DictConfig", shared_state: "SharedTaskState" ) -> None: """Method to validate the incoming args and throw if something won't work""" raise NotImplementedError()
Method to validate the incoming args and throw if something won't work
View Source
@classmethod @abstractmethod def get_mixin_qualifications( cls, args: "DictConfig", shared_state: "SharedTaskState" ) -> List[Dict[str, Any]]: """Method to provide any required qualifications to make this mixin function""" raise NotImplementedError()
Method to provide any required qualifications to make this mixin function
View Source
@classmethod def mixin_args_and_state( mixin_cls: Type["BlueprintMixin"], target_cls: Type["Blueprint"] ): """ Magic utility decorator that can be used to inject mixin configurations (BlueprintArgs and SharedTaskState) without the user needing to define new classes for these. Should only be used by tasks that aren't already specifying new versions of these, which should just inherit otherwise. Usage: @register_mephisto_abstraction() @ABlueprintMixin.mixin_args_and_state class MyBlueprint(ABlueprintMixin, Blueprint): pass """ # Ignore typing on most of this, as mypy is not able to parse what's happening @dataclass class MixedInArgsClass(mixin_cls.ArgsMixin, target_cls.ArgsClass): # type: ignore pass @dataclass class MixedInSharedStateClass( mixin_cls.SharedStateMixin, target_cls.SharedStateClass # type: ignore ): pass class MixedInBlueprint(target_cls): # type: ignore ArgsClass = MixedInArgsClass SharedStateClass = MixedInSharedStateClass return MixedInBlueprint
Magic utility decorator that can be used to inject mixin configurations (BlueprintArgs and SharedTaskState) without the user needing to define new classes for these. Should only be used by tasks that aren't already specifying new versions of these, which should just inherit otherwise.
Usage: @register_mephisto_abstraction() @ABlueprintMixin.mixin_args_and_state class MyBlueprint(ABlueprintMixin, Blueprint): pass
View Source
class Blueprint(ABC): """ Configuration class for the various parts of building, launching, and running a task of a specific task. Provides utility functions for managing between the three main components, which are separated into separate classes in acknowledgement that some tasks may have particularly complicated processes for them """ AgentStateClass: ClassVar[Type["AgentState"]] OnboardingAgentStateClass: ClassVar[Type["AgentState"]] = AgentState # type: ignore TaskRunnerClass: ClassVar[Type["TaskRunner"]] TaskBuilderClass: ClassVar[Type["TaskBuilder"]] ArgsClass: ClassVar[Type["BlueprintArgs"]] = BlueprintArgs SharedStateClass: ClassVar[Type["SharedTaskState"]] = SharedTaskState BLUEPRINT_TYPE: str def __init__( self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState" ): self.args = args self.shared_state = shared_state self.frontend_task_config = shared_state.task_config # We automatically call all mixins `init_mixin_config` methods available. mixin_subclasses = BlueprintMixin.extract_unique_mixins(self.__class__) for clazz in mixin_subclasses: clazz.init_mixin_config(self, task_run, args, shared_state) @classmethod def get_required_qualifications( cls, args: DictConfig, shared_state: "SharedTaskState" ): quals = [] for clazz in BlueprintMixin.extract_unique_mixins(cls): quals += clazz.get_mixin_qualifications(args, shared_state) return quals @classmethod def assert_task_args(cls, args: DictConfig, shared_state: "SharedTaskState"): """ Assert that the provided arguments are valid. Should fail if a task launched with these arguments would not work """ # We automatically call all mixins `assert_task_args` methods available. mixin_subclasses = BlueprintMixin.extract_unique_mixins(cls) for clazz in mixin_subclasses: clazz.assert_mixin_args(args, shared_state) return def get_frontend_args(self) -> Dict[str, Any]: """ Specifies what options should be fowarded to the client for use by the task's frontend """ return self.frontend_task_config.copy() @abstractmethod def get_initialization_data( self, ) -> Iterable["InitializationData"]: """ Get all of the data used to initialize tasks from this blueprint. Can either be a simple iterable if all the assignments can be processed at once, or a Generator if the number of tasks is unknown or changes based on something running concurrently with the job. """ raise NotImplementedError
Configuration class for the various parts of building, launching, and running a task of a specific task. Provides utility functions for managing between the three main components, which are separated into separate classes in acknowledgement that some tasks may have particularly complicated processes for them
View Source
@classmethod def get_required_qualifications( cls, args: DictConfig, shared_state: "SharedTaskState" ): quals = [] for clazz in BlueprintMixin.extract_unique_mixins(cls): quals += clazz.get_mixin_qualifications(args, shared_state) return quals
View Source
@classmethod def assert_task_args(cls, args: DictConfig, shared_state: "SharedTaskState"): """ Assert that the provided arguments are valid. Should fail if a task launched with these arguments would not work """ # We automatically call all mixins `assert_task_args` methods available. mixin_subclasses = BlueprintMixin.extract_unique_mixins(cls) for clazz in mixin_subclasses: clazz.assert_mixin_args(args, shared_state) return
Assert that the provided arguments are valid. Should fail if a task launched with these arguments would not work
View Source
def get_frontend_args(self) -> Dict[str, Any]: """ Specifies what options should be fowarded to the client for use by the task's frontend """ return self.frontend_task_config.copy()
Specifies what options should be fowarded to the client for use by the task's frontend
View Source
@abstractmethod def get_initialization_data( self, ) -> Iterable["InitializationData"]: """ Get all of the data used to initialize tasks from this blueprint. Can either be a simple iterable if all the assignments can be processed at once, or a Generator if the number of tasks is unknown or changes based on something running concurrently with the job. """ raise NotImplementedError
Get all of the data used to initialize tasks from this blueprint. Can either be a simple iterable if all the assignments can be processed at once, or a Generator if the number of tasks is unknown or changes based on something running concurrently with the job.
View Source
class AgentState(ABC): """ Class for holding state information about work by an Agent on a Unit, currently stored as current task work into a json file. Specific state implementations will need to be created for different Task Types, as different tasks store and load differing data. """ # Possible Agent Status Values STATUS_NONE = "none" STATUS_ACCEPTED = "accepted" STATUS_ONBOARDING = "onboarding" STATUS_WAITING = "waiting" STATUS_IN_TASK = "in task" STATUS_COMPLETED = "completed" STATUS_DISCONNECT = "disconnect" STATUS_TIMEOUT = "timeout" STATUS_PARTNER_DISCONNECT = "partner disconnect" STATUS_EXPIRED = "expired" STATUS_RETURNED = "returned" STATUS_APPROVED = "approved" STATUS_SOFT_REJECTED = "soft_rejected" STATUS_REJECTED = "rejected" def __new__(cls, agent: Union["Agent", "OnboardingAgent"]) -> "AgentState": """Return the correct agent state for the given agent""" if cls == AgentState: from mephisto.data_model.agent import Agent from mephisto.operations.registry import get_blueprint_from_type # We are trying to construct an AgentState, find what type to use and # create that instead if isinstance(agent, Agent): correct_class = get_blueprint_from_type(agent.task_type).AgentStateClass else: correct_class = get_blueprint_from_type( agent.task_type ).OnboardingAgentStateClass return super().__new__(correct_class) else: # We are constructing another instance directly return super().__new__(cls) @staticmethod def complete() -> List[str]: """Return all final Agent statuses which will not be updated by the WorkerPool""" return [ AgentState.STATUS_COMPLETED, AgentState.STATUS_DISCONNECT, AgentState.STATUS_TIMEOUT, AgentState.STATUS_EXPIRED, AgentState.STATUS_RETURNED, AgentState.STATUS_SOFT_REJECTED, AgentState.STATUS_APPROVED, AgentState.STATUS_REJECTED, ] @staticmethod def valid() -> List[str]: """Return all valid Agent statuses""" return [ AgentState.STATUS_NONE, AgentState.STATUS_ACCEPTED, AgentState.STATUS_ONBOARDING, AgentState.STATUS_WAITING, AgentState.STATUS_IN_TASK, AgentState.STATUS_COMPLETED, AgentState.STATUS_DISCONNECT, AgentState.STATUS_TIMEOUT, AgentState.STATUS_PARTNER_DISCONNECT, AgentState.STATUS_EXPIRED, AgentState.STATUS_RETURNED, AgentState.STATUS_SOFT_REJECTED, AgentState.STATUS_APPROVED, AgentState.STATUS_REJECTED, ] # Implementations of an AgentState must implement the following: @abstractmethod def __init__(self, agent: "Agent"): """ Create an AgentState to track the state of an agent's work on a Unit Implementations should initialize any required files for saving and loading state data somewhere. If said file already exists based on the given agent, load that data instead. """ raise NotImplementedError() @abstractmethod def set_init_state(self, data: Any) -> bool: """Set the initial state for this agent""" raise NotImplementedError() @abstractmethod def get_init_state(self) -> Optional[Any]: """ Return the initial state for this agent, None if no such state exists """ raise NotImplementedError() @abstractmethod def load_data(self) -> None: """ Load stored data from a file to this object """ raise NotImplementedError() @abstractmethod def get_data(self) -> Dict[str, Any]: """ Return the currently stored data for this task in the format expected by any frontend displays """ raise NotImplementedError() def get_parsed_data(self) -> Any: """ Return the portion of the data that is relevant to a human who wants to parse or analyze the data Utility function to handle stripping the data of any context that is only important for reproducing the task exactly. By default is just `get_data` """ return self.get_data() @abstractmethod def save_data(self) -> None: """ Save the relevant data from this Unit to a file in the expected location """ raise NotImplementedError() @abstractmethod def update_data(self, live_update: Dict[str, Any]) -> None: """ Put new live update data into this AgentState. Keep only what is necessary to recreate the task for evaluation and later use. """ raise NotImplementedError() @abstractmethod def update_submit(self, submit_data: Dict[str, Any]) -> None: """ Update this AgentState with the final submission data. """ raise NotImplementedError() def get_task_start(self) -> Optional[float]: """ Return the start time for this task, if it is available """ return 0.0 def get_task_end(self) -> Optional[float]: """ Return the end time for this task, if it is available """ return 0.0
Class for holding state information about work by an Agent on a Unit, currently stored as current task work into a json file.
Specific state implementations will need to be created for different Task Types, as different tasks store and load differing data.
Inherited Members
- mephisto.abstractions._subcomponents.agent_state.AgentState
- AgentState
- STATUS_NONE
- STATUS_ACCEPTED
- STATUS_ONBOARDING
- STATUS_WAITING
- STATUS_IN_TASK
- STATUS_COMPLETED
- STATUS_DISCONNECT
- STATUS_TIMEOUT
- STATUS_PARTNER_DISCONNECT
- STATUS_EXPIRED
- STATUS_RETURNED
- STATUS_APPROVED
- STATUS_SOFT_REJECTED
- STATUS_REJECTED
- complete
- valid
- set_init_state
- get_init_state
- load_data
- get_data
- get_parsed_data
- save_data
- update_data
- update_submit
- get_task_start
- get_task_end
View Source
class BlueprintArgs: _blueprint_type: str = MISSING block_qualification: str = field( default=MISSING, metadata={ "help": ("Specify the name of a qualification used to soft block workers.") }, )
BlueprintArgs(_blueprint_type: str = '???', block_qualification: str = '???')