mephisto.abstractions.blueprints.mock.mock_blueprint
View Source
#!/usr/bin/env python3 # Copyright (c) Meta Platforms and its affiliates. # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. from mephisto.abstractions.blueprint import ( Blueprint, BlueprintArgs, SharedTaskState, ) from mephisto.abstractions.blueprints.mixins.onboarding_required import ( OnboardingRequired, OnboardingSharedState, OnboardingRequiredArgs, ) from mephisto.abstractions.blueprints.mixins.screen_task_required import ( ScreenTaskRequired, ScreenTaskSharedState, ScreenTaskRequiredArgs, ) from dataclasses import dataclass, field from omegaconf import MISSING, DictConfig from mephisto.data_model.assignment import InitializationData from mephisto.abstractions.blueprints.mock.mock_agent_state import MockAgentState from mephisto.abstractions.blueprints.mock.mock_task_runner import MockTaskRunner from mephisto.abstractions.blueprints.mock.mock_task_builder import MockTaskBuilder from mephisto.operations.registry import register_mephisto_abstraction import os import time from typing import ClassVar, List, Type, Any, Dict, Iterable, TYPE_CHECKING, Optional if TYPE_CHECKING: from mephisto.data_model.agent import OnboardingAgent from mephisto.data_model.task_run import TaskRun from mephisto.abstractions.blueprint import AgentState, TaskRunner, TaskBuilder from mephisto.data_model.assignment import Assignment from mephisto.data_model.worker import Worker from mephisto.data_model.unit import Unit BLUEPRINT_TYPE_MOCK = "mock" @dataclass class MockBlueprintArgs(BlueprintArgs, OnboardingRequiredArgs, ScreenTaskRequiredArgs): _blueprint_type: str = BLUEPRINT_TYPE_MOCK num_assignments: int = field( default=MISSING, metadata={ "help": "How many workers you want to do each assignment", "required": True, }, ) use_onboarding: bool = field( default=False, metadata={"help": "Whether onboarding should be required"} ) timeout_time: int = field( default=0, metadata={"help": "Whether acts in the run assignment should have a timeout"}, ) is_concurrent: bool = field( default=True, metadata={"help": "Whether to run this mock task as a concurrent task or not"}, ) # Mock tasks right now inherit all mixins, this way we can test them. # In the future, we'll likely want to compose mock tasks for mixin testing @dataclass class MockSharedState(SharedTaskState, OnboardingSharedState, ScreenTaskSharedState): pass @register_mephisto_abstraction() class MockBlueprint(Blueprint, OnboardingRequired, ScreenTaskRequired): """Mock of a task type, for use in testing""" AgentStateClass: ClassVar[Type["AgentState"]] = MockAgentState OnboardingAgentStateClass: ClassVar[Type["AgentState"]] = MockAgentState TaskBuilderClass: ClassVar[Type["TaskBuilder"]] = MockTaskBuilder TaskRunnerClass: ClassVar[Type["TaskRunner"]] = MockTaskRunner ArgsClass: ClassVar[Type["BlueprintArgs"]] = MockBlueprintArgs SharedStateClass: ClassVar[Type["SharedTaskState"]] = MockSharedState BLUEPRINT_TYPE = BLUEPRINT_TYPE_MOCK # making Mypy happy, these aren't used in a blueprint, only mixins ArgsMixin: ClassVar[Any] SharedStateMixin: ClassVar[Any] def __init__( self, task_run: "TaskRun", args: "DictConfig", shared_state: "MockSharedState" ): super().__init__(task_run, args, shared_state) def get_initialization_data(self) -> Iterable[InitializationData]: """ Return the number of empty assignments specified in --num-assignments """ return [ MockTaskRunner.get_mock_assignment_data() for i in range(self.args.blueprint.num_assignments) ] def validate_onboarding( self, worker: "Worker", onboarding_agent: "OnboardingAgent" ) -> bool: """ Onboarding validation for MockBlueprints just returns the 'should_pass' field """ return onboarding_agent.state.get_data()["should_pass"]
View Source
class MockBlueprintArgs(BlueprintArgs, OnboardingRequiredArgs, ScreenTaskRequiredArgs): _blueprint_type: str = BLUEPRINT_TYPE_MOCK num_assignments: int = field( default=MISSING, metadata={ "help": "How many workers you want to do each assignment", "required": True, }, ) use_onboarding: bool = field( default=False, metadata={"help": "Whether onboarding should be required"} ) timeout_time: int = field( default=0, metadata={"help": "Whether acts in the run assignment should have a timeout"}, ) is_concurrent: bool = field( default=True, metadata={"help": "Whether to run this mock task as a concurrent task or not"}, )
MockBlueprintArgs(passed_qualification_name: str = '???', max_screening_units: int = '???', use_screening_task: bool = False, onboarding_qualification: str = '???', _blueprint_type: str = 'mock', block_qualification: str = '???', num_assignments: int = '???', use_onboarding: bool = False, timeout_time: int = 0, is_concurrent: bool = True)
Inherited Members
View Source
class MockBlueprint(Blueprint, OnboardingRequired, ScreenTaskRequired): """Mock of a task type, for use in testing""" AgentStateClass: ClassVar[Type["AgentState"]] = MockAgentState OnboardingAgentStateClass: ClassVar[Type["AgentState"]] = MockAgentState TaskBuilderClass: ClassVar[Type["TaskBuilder"]] = MockTaskBuilder TaskRunnerClass: ClassVar[Type["TaskRunner"]] = MockTaskRunner ArgsClass: ClassVar[Type["BlueprintArgs"]] = MockBlueprintArgs SharedStateClass: ClassVar[Type["SharedTaskState"]] = MockSharedState BLUEPRINT_TYPE = BLUEPRINT_TYPE_MOCK # making Mypy happy, these aren't used in a blueprint, only mixins ArgsMixin: ClassVar[Any] SharedStateMixin: ClassVar[Any] def __init__( self, task_run: "TaskRun", args: "DictConfig", shared_state: "MockSharedState" ): super().__init__(task_run, args, shared_state) def get_initialization_data(self) -> Iterable[InitializationData]: """ Return the number of empty assignments specified in --num-assignments """ return [ MockTaskRunner.get_mock_assignment_data() for i in range(self.args.blueprint.num_assignments) ] def validate_onboarding( self, worker: "Worker", onboarding_agent: "OnboardingAgent" ) -> bool: """ Onboarding validation for MockBlueprints just returns the 'should_pass' field """ return onboarding_agent.state.get_data()["should_pass"]
Mock of a task type, for use in testing
View Source
def __init__( self, task_run: "TaskRun", args: "DictConfig", shared_state: "MockSharedState" ): super().__init__(task_run, args, shared_state)
View Source
def get_initialization_data(self) -> Iterable[InitializationData]: """ Return the number of empty assignments specified in --num-assignments """ return [ MockTaskRunner.get_mock_assignment_data() for i in range(self.args.blueprint.num_assignments) ]
Return the number of empty assignments specified in --num-assignments
View Source
def validate_onboarding( self, worker: "Worker", onboarding_agent: "OnboardingAgent" ) -> bool: """ Onboarding validation for MockBlueprints just returns the 'should_pass' field """ return onboarding_agent.state.get_data()["should_pass"]
Onboarding validation for MockBlueprints just returns the 'should_pass' field
Inherited Members
- mephisto.abstractions.blueprint.Blueprint
- get_required_qualifications
- assert_task_args
- get_frontend_args
- mephisto.abstractions.blueprints.mixins.onboarding_required.OnboardingRequired
- init_mixin_config
- assert_mixin_args
- get_mixin_qualifications
- get_failed_qual
- init_onboarding_config
- get_onboarding_data
View Source
class MockAgentState(AgentState): """ Mock agent state that is to be used for testing """ def __init__(self, agent: "Agent"): """Mock agent states keep everything in local memory""" self.agent = weakref.proxy(agent) self.state: Dict[str, Any] = {} self.init_state: Any = None def set_init_state(self, data: Any) -> bool: """Set the initial state for this agent""" if self.init_state is not None: # Initial state is already set return False else: self.init_state = data self.save_data() return True def get_init_state(self) -> Optional[Dict[str, Any]]: """ Return the initial state for this agent, None if no such state exists """ return self.init_state def load_data(self) -> None: """Mock agent states have no data stored""" pass def get_data(self) -> Dict[str, Any]: """Return dict of this agent's state""" return self.state def save_data(self) -> None: """Mock agents don't save data (yet)""" pass def update_data(self, live_update: Dict[str, Any]) -> None: """Put new data into this mock state""" self.state = live_update def update_submit(self, submitted_data: Dict[str, Any]) -> None: """Move the submitted data into the live state""" self.state = submitted_data
Mock agent state that is to be used for testing
Inherited Members
- mephisto.abstractions.blueprints.mock.mock_agent_state.MockAgentState
- MockAgentState
- set_init_state
- get_init_state
- load_data
- get_data
- save_data
- update_data
- update_submit
- mephisto.abstractions._subcomponents.agent_state.AgentState
- STATUS_NONE
- STATUS_ACCEPTED
- STATUS_ONBOARDING
- STATUS_WAITING
- STATUS_IN_TASK
- STATUS_COMPLETED
- STATUS_DISCONNECT
- STATUS_TIMEOUT
- STATUS_PARTNER_DISCONNECT
- STATUS_EXPIRED
- STATUS_RETURNED
- STATUS_APPROVED
- STATUS_SOFT_REJECTED
- STATUS_REJECTED
- complete
- valid
- get_parsed_data
- get_task_start
- get_task_end
View Source
class MockAgentState(AgentState): """ Mock agent state that is to be used for testing """ def __init__(self, agent: "Agent"): """Mock agent states keep everything in local memory""" self.agent = weakref.proxy(agent) self.state: Dict[str, Any] = {} self.init_state: Any = None def set_init_state(self, data: Any) -> bool: """Set the initial state for this agent""" if self.init_state is not None: # Initial state is already set return False else: self.init_state = data self.save_data() return True def get_init_state(self) -> Optional[Dict[str, Any]]: """ Return the initial state for this agent, None if no such state exists """ return self.init_state def load_data(self) -> None: """Mock agent states have no data stored""" pass def get_data(self) -> Dict[str, Any]: """Return dict of this agent's state""" return self.state def save_data(self) -> None: """Mock agents don't save data (yet)""" pass def update_data(self, live_update: Dict[str, Any]) -> None: """Put new data into this mock state""" self.state = live_update def update_submit(self, submitted_data: Dict[str, Any]) -> None: """Move the submitted data into the live state""" self.state = submitted_data
Mock agent state that is to be used for testing
Inherited Members
- mephisto.abstractions.blueprints.mock.mock_agent_state.MockAgentState
- MockAgentState
- set_init_state
- get_init_state
- load_data
- get_data
- save_data
- update_data
- update_submit
- mephisto.abstractions._subcomponents.agent_state.AgentState
- STATUS_NONE
- STATUS_ACCEPTED
- STATUS_ONBOARDING
- STATUS_WAITING
- STATUS_IN_TASK
- STATUS_COMPLETED
- STATUS_DISCONNECT
- STATUS_TIMEOUT
- STATUS_PARTNER_DISCONNECT
- STATUS_EXPIRED
- STATUS_RETURNED
- STATUS_APPROVED
- STATUS_SOFT_REJECTED
- STATUS_REJECTED
- complete
- valid
- get_parsed_data
- get_task_start
- get_task_end
View Source
class MockTaskBuilder(TaskBuilder): """Builder for a mock task, for use in testing""" BUILT_FILE = "done.built" BUILT_MESSAGE = "built!" def build_in_dir(self, build_dir: str): """Mock tasks don't really build anything (yet)""" with open(os.path.join(build_dir, self.BUILT_FILE), "w+") as built_file: built_file.write(self.BUILT_MESSAGE)
Builder for a mock task, for use in testing
Inherited Members
- mephisto.abstractions._subcomponents.task_builder.TaskBuilder
- TaskBuilder
View Source
class MockTaskRunner(TaskRunner): """Mock of a task runner, for use in testing""" def __init__( self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState" ): super().__init__(task_run, args, shared_state) self.timeout = args.blueprint.timeout_time self.tracked_tasks: Dict[str, Union["Assignment", "Unit"]] = {} self.is_concurrent = args.blueprint.get("is_concurrent", True) @staticmethod def get_mock_assignment_data() -> InitializationData: return InitializationData(shared={}, unit_data=[{}, {}]) @staticmethod def get_data_for_assignment(assignment: "Assignment") -> InitializationData: """ Mock tasks have no data unless given during testing """ return MockTaskRunner.get_mock_assignment_data() def get_init_data_for_agent(self, agent: "Agent") -> Dict[str, Any]: """ Return the data for an agent already assigned to a particular unit """ # TODO(#97) implement pass def run_onboarding(self, onboarding_agent: "OnboardingAgent"): """ Mock runners simply wait for an act to come in with whether or not onboarding is complete """ onboarding_agent.await_submit(self.timeout) def run_unit(self, unit: "Unit", agent: "Agent"): """ Mock runners will pass the agents for the given assignment all of the required messages to finish a task. """ self.tracked_tasks[unit.db_id] = unit time.sleep(0.3) assigned_agent = unit.get_assigned_agent() assert assigned_agent is not None, "No agent was assigned" assert ( assigned_agent.db_id == agent.db_id ), "Task was not given to assigned agent" packet = agent.get_live_update(timeout=self.timeout) if packet is not None: agent.observe(packet) agent.await_submit(self.timeout) del self.tracked_tasks[unit.db_id] def run_assignment(self, assignment: "Assignment", agents: List["Agent"]): """ Mock runners will pass the agents for the given assignment all of the required messages to finish a task. """ self.tracked_tasks[assignment.db_id] = assignment agent_dict = {a.db_id: a for a in agents} time.sleep(0.3) agents = [] for unit in assignment.get_units(): assigned_agent = unit.get_assigned_agent() assert assigned_agent is not None, "Task was not fully assigned" agent = agent_dict.get(assigned_agent.db_id) assert agent is not None, "Task was not launched with assigned agents" agents.append(agent) for agent in agents: packet = agent.get_live_update(timeout=self.timeout) if packet is not None: agent.observe(packet) for agent in agents: agent.await_submit(self.timeout) del self.tracked_tasks[assignment.db_id] def cleanup_assignment(self, assignment: "Assignment"): """No cleanup required yet for ending mock runs""" pass def cleanup_unit(self, unit: "Unit"): """No cleanup required yet for ending mock runs""" pass def cleanup_onboarding(self, onboarding_agent: "OnboardingAgent"): """No cleanup required yet for ending onboarding in mocks""" pass
Mock of a task runner, for use in testing
Inherited Members
- mephisto.abstractions.blueprints.mock.mock_task_runner.MockTaskRunner
- MockTaskRunner
- get_mock_assignment_data
- get_data_for_assignment
- get_init_data_for_agent
- run_onboarding
- run_unit
- run_assignment
- cleanup_assignment
- cleanup_unit
- cleanup_onboarding
- mephisto.abstractions._subcomponents.task_runner.TaskRunner
- execute_onboarding
- execute_unit
- execute_assignment
- filter_units_for_worker
- shutdown
View Source
class MockBlueprintArgs(BlueprintArgs, OnboardingRequiredArgs, ScreenTaskRequiredArgs): _blueprint_type: str = BLUEPRINT_TYPE_MOCK num_assignments: int = field( default=MISSING, metadata={ "help": "How many workers you want to do each assignment", "required": True, }, ) use_onboarding: bool = field( default=False, metadata={"help": "Whether onboarding should be required"} ) timeout_time: int = field( default=0, metadata={"help": "Whether acts in the run assignment should have a timeout"}, ) is_concurrent: bool = field( default=True, metadata={"help": "Whether to run this mock task as a concurrent task or not"}, )
MockBlueprintArgs(passed_qualification_name: str = '???', max_screening_units: int = '???', use_screening_task: bool = False, onboarding_qualification: str = '???', _blueprint_type: str = 'mock', block_qualification: str = '???', num_assignments: int = '???', use_onboarding: bool = False, timeout_time: int = 0, is_concurrent: bool = True)
Inherited Members
View Source
class OnboardingRequiredArgs: onboarding_qualification: str = field( default=MISSING, metadata={ "help": ( "Specify the name of a qualification used to block workers who fail onboarding, " "Empty will skip onboarding." ) }, )
OnboardingRequiredArgs(onboarding_qualification: str = '???')