mephisto.abstractions.blueprints.mock.mock_blueprint

View Source
#!/usr/bin/env python3

# Copyright (c) Meta Platforms and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from mephisto.abstractions.blueprint import (
    Blueprint,
    BlueprintArgs,
    SharedTaskState,
)
from mephisto.abstractions.blueprints.mixins.onboarding_required import (
    OnboardingRequired,
    OnboardingSharedState,
    OnboardingRequiredArgs,
)
from mephisto.abstractions.blueprints.mixins.screen_task_required import (
    ScreenTaskRequired,
    ScreenTaskSharedState,
    ScreenTaskRequiredArgs,
)
from dataclasses import dataclass, field
from omegaconf import MISSING, DictConfig
from mephisto.data_model.assignment import InitializationData
from mephisto.abstractions.blueprints.mock.mock_agent_state import MockAgentState
from mephisto.abstractions.blueprints.mock.mock_task_runner import MockTaskRunner
from mephisto.abstractions.blueprints.mock.mock_task_builder import MockTaskBuilder
from mephisto.operations.registry import register_mephisto_abstraction

import os
import time

from typing import ClassVar, List, Type, Any, Dict, Iterable, TYPE_CHECKING, Optional

if TYPE_CHECKING:
    from mephisto.data_model.agent import OnboardingAgent
    from mephisto.data_model.task_run import TaskRun
    from mephisto.abstractions.blueprint import AgentState, TaskRunner, TaskBuilder
    from mephisto.data_model.assignment import Assignment
    from mephisto.data_model.worker import Worker
    from mephisto.data_model.unit import Unit

BLUEPRINT_TYPE_MOCK = "mock"


@dataclass
class MockBlueprintArgs(BlueprintArgs, OnboardingRequiredArgs, ScreenTaskRequiredArgs):
    _blueprint_type: str = BLUEPRINT_TYPE_MOCK
    num_assignments: int = field(
        default=MISSING,
        metadata={
            "help": "How many workers you want to do each assignment",
            "required": True,
        },
    )
    use_onboarding: bool = field(
        default=False, metadata={"help": "Whether onboarding should be required"}
    )
    timeout_time: int = field(
        default=0,
        metadata={"help": "Whether acts in the run assignment should have a timeout"},
    )
    is_concurrent: bool = field(
        default=True,
        metadata={"help": "Whether to run this mock task as a concurrent task or not"},
    )


# Mock tasks right now inherit all mixins, this way we can test them.
# In the future, we'll likely want to compose mock tasks for mixin testing
@dataclass
class MockSharedState(SharedTaskState, OnboardingSharedState, ScreenTaskSharedState):
    pass


@register_mephisto_abstraction()
class MockBlueprint(Blueprint, OnboardingRequired, ScreenTaskRequired):
    """Mock of a task type, for use in testing"""

    AgentStateClass: ClassVar[Type["AgentState"]] = MockAgentState
    OnboardingAgentStateClass: ClassVar[Type["AgentState"]] = MockAgentState
    TaskBuilderClass: ClassVar[Type["TaskBuilder"]] = MockTaskBuilder
    TaskRunnerClass: ClassVar[Type["TaskRunner"]] = MockTaskRunner
    ArgsClass: ClassVar[Type["BlueprintArgs"]] = MockBlueprintArgs
    SharedStateClass: ClassVar[Type["SharedTaskState"]] = MockSharedState
    BLUEPRINT_TYPE = BLUEPRINT_TYPE_MOCK

    # making Mypy happy, these aren't used in a blueprint, only mixins
    ArgsMixin: ClassVar[Any]
    SharedStateMixin: ClassVar[Any]

    def __init__(
        self, task_run: "TaskRun", args: "DictConfig", shared_state: "MockSharedState"
    ):
        super().__init__(task_run, args, shared_state)

    def get_initialization_data(self) -> Iterable[InitializationData]:
        """
        Return the number of empty assignments specified in --num-assignments
        """
        return [
            MockTaskRunner.get_mock_assignment_data()
            for i in range(self.args.blueprint.num_assignments)
        ]

    def validate_onboarding(
        self, worker: "Worker", onboarding_agent: "OnboardingAgent"
    ) -> bool:
        """
        Onboarding validation for MockBlueprints just returns the 'should_pass' field
        """
        return onboarding_agent.state.get_data()["should_pass"]
View Source
class MockBlueprintArgs(BlueprintArgs, OnboardingRequiredArgs, ScreenTaskRequiredArgs):
    _blueprint_type: str = BLUEPRINT_TYPE_MOCK
    num_assignments: int = field(
        default=MISSING,
        metadata={
            "help": "How many workers you want to do each assignment",
            "required": True,
        },
    )
    use_onboarding: bool = field(
        default=False, metadata={"help": "Whether onboarding should be required"}
    )
    timeout_time: int = field(
        default=0,
        metadata={"help": "Whether acts in the run assignment should have a timeout"},
    )
    is_concurrent: bool = field(
        default=True,
        metadata={"help": "Whether to run this mock task as a concurrent task or not"},
    )

MockBlueprintArgs(passed_qualification_name: str = '???', max_screening_units: int = '???', use_screening_task: bool = False, onboarding_qualification: str = '???', _blueprint_type: str = 'mock', block_qualification: str = '???', num_assignments: int = '???', use_onboarding: bool = False, timeout_time: int = 0, is_concurrent: bool = True)

#   MockBlueprintArgs( passed_qualification_name: str = '???', max_screening_units: int = '???', use_screening_task: bool = False, onboarding_qualification: str = '???', _blueprint_type: str = 'mock', block_qualification: str = '???', num_assignments: int = '???', use_onboarding: bool = False, timeout_time: int = 0, is_concurrent: bool = True )
#   num_assignments: int = '???'
#   use_onboarding: bool = False
#   timeout_time: int = 0
#   is_concurrent: bool = True
View Source
class MockSharedState(SharedTaskState, OnboardingSharedState, ScreenTaskSharedState):
    pass

MockSharedState(screening_data_factory: Tuple[bool, Generator[Dict[str, Any], NoneType, NoneType]] = , onboarding_data: Dict[str, Any] = , validate_onboarding: Callable[[Any], bool] = , task_config: Dict[str, Any] = , qualifications: List[Any] = , worker_can_do_unit: Callable[[ForwardRef('Worker'), ForwardRef('Unit')], bool] = , on_unit_submitted: Callable[[ForwardRef('Unit')], NoneType] = )

#   MockSharedState( screening_data_factory: Tuple[bool, Generator[Dict[str, Any], NoneType, NoneType]] = <factory>, onboarding_data: Dict[str, Any] = <factory>, validate_onboarding: Callable[[Any], bool] = <factory>, task_config: Dict[str, Any] = <factory>, qualifications: List[Any] = <factory>, worker_can_do_unit: collections.abc.Callable[mephisto.data_model.worker.Worker, mephisto.data_model.unit.Unit, bool] = <factory>, on_unit_submitted: collections.abc.Callable[mephisto.data_model.unit.Unit, NoneType] = <factory> )
View Source
class MockBlueprint(Blueprint, OnboardingRequired, ScreenTaskRequired):
    """Mock of a task type, for use in testing"""

    AgentStateClass: ClassVar[Type["AgentState"]] = MockAgentState
    OnboardingAgentStateClass: ClassVar[Type["AgentState"]] = MockAgentState
    TaskBuilderClass: ClassVar[Type["TaskBuilder"]] = MockTaskBuilder
    TaskRunnerClass: ClassVar[Type["TaskRunner"]] = MockTaskRunner
    ArgsClass: ClassVar[Type["BlueprintArgs"]] = MockBlueprintArgs
    SharedStateClass: ClassVar[Type["SharedTaskState"]] = MockSharedState
    BLUEPRINT_TYPE = BLUEPRINT_TYPE_MOCK

    # making Mypy happy, these aren't used in a blueprint, only mixins
    ArgsMixin: ClassVar[Any]
    SharedStateMixin: ClassVar[Any]

    def __init__(
        self, task_run: "TaskRun", args: "DictConfig", shared_state: "MockSharedState"
    ):
        super().__init__(task_run, args, shared_state)

    def get_initialization_data(self) -> Iterable[InitializationData]:
        """
        Return the number of empty assignments specified in --num-assignments
        """
        return [
            MockTaskRunner.get_mock_assignment_data()
            for i in range(self.args.blueprint.num_assignments)
        ]

    def validate_onboarding(
        self, worker: "Worker", onboarding_agent: "OnboardingAgent"
    ) -> bool:
        """
        Onboarding validation for MockBlueprints just returns the 'should_pass' field
        """
        return onboarding_agent.state.get_data()["should_pass"]

Mock of a task type, for use in testing

#   MockBlueprint( task_run: mephisto.data_model.task_run.TaskRun, args: omegaconf.dictconfig.DictConfig, shared_state: mephisto.abstractions.blueprints.mock.mock_blueprint.MockSharedState )
View Source
    def __init__(
        self, task_run: "TaskRun", args: "DictConfig", shared_state: "MockSharedState"
    ):
        super().__init__(task_run, args, shared_state)
#   BLUEPRINT_TYPE: str = 'mock'
#   def get_initialization_data(self) -> Iterable[mephisto.data_model.assignment.InitializationData]:
View Source
    def get_initialization_data(self) -> Iterable[InitializationData]:
        """
        Return the number of empty assignments specified in --num-assignments
        """
        return [
            MockTaskRunner.get_mock_assignment_data()
            for i in range(self.args.blueprint.num_assignments)
        ]

Return the number of empty assignments specified in --num-assignments

#   def validate_onboarding( self, worker: mephisto.data_model.worker.Worker, onboarding_agent: mephisto.data_model.agent.OnboardingAgent ) -> bool:
View Source
    def validate_onboarding(
        self, worker: "Worker", onboarding_agent: "OnboardingAgent"
    ) -> bool:
        """
        Onboarding validation for MockBlueprints just returns the 'should_pass' field
        """
        return onboarding_agent.state.get_data()["should_pass"]

Onboarding validation for MockBlueprints just returns the 'should_pass' field

#   class MockBlueprint.AgentStateClass(mephisto.abstractions._subcomponents.agent_state.AgentState):
View Source
class MockAgentState(AgentState):
    """
    Mock agent state that is to be used for testing
    """

    def __init__(self, agent: "Agent"):
        """Mock agent states keep everything in local memory"""
        self.agent = weakref.proxy(agent)
        self.state: Dict[str, Any] = {}
        self.init_state: Any = None

    def set_init_state(self, data: Any) -> bool:
        """Set the initial state for this agent"""
        if self.init_state is not None:
            # Initial state is already set
            return False
        else:
            self.init_state = data
            self.save_data()
            return True

    def get_init_state(self) -> Optional[Dict[str, Any]]:
        """
        Return the initial state for this agent,
        None if no such state exists
        """
        return self.init_state

    def load_data(self) -> None:
        """Mock agent states have no data stored"""
        pass

    def get_data(self) -> Dict[str, Any]:
        """Return dict of this agent's state"""
        return self.state

    def save_data(self) -> None:
        """Mock agents don't save data (yet)"""
        pass

    def update_data(self, live_update: Dict[str, Any]) -> None:
        """Put new data into this mock state"""
        self.state = live_update

    def update_submit(self, submitted_data: Dict[str, Any]) -> None:
        """Move the submitted data into the live state"""
        self.state = submitted_data

Mock agent state that is to be used for testing

Inherited Members
mephisto.abstractions.blueprints.mock.mock_agent_state.MockAgentState
MockAgentState
set_init_state
get_init_state
load_data
get_data
save_data
update_data
update_submit
mephisto.abstractions._subcomponents.agent_state.AgentState
STATUS_NONE
STATUS_ACCEPTED
STATUS_ONBOARDING
STATUS_WAITING
STATUS_IN_TASK
STATUS_COMPLETED
STATUS_DISCONNECT
STATUS_TIMEOUT
STATUS_PARTNER_DISCONNECT
STATUS_EXPIRED
STATUS_RETURNED
STATUS_APPROVED
STATUS_SOFT_REJECTED
STATUS_REJECTED
complete
valid
get_parsed_data
get_task_start
get_task_end
#   class MockBlueprint.OnboardingAgentStateClass(mephisto.abstractions._subcomponents.agent_state.AgentState):
View Source
class MockAgentState(AgentState):
    """
    Mock agent state that is to be used for testing
    """

    def __init__(self, agent: "Agent"):
        """Mock agent states keep everything in local memory"""
        self.agent = weakref.proxy(agent)
        self.state: Dict[str, Any] = {}
        self.init_state: Any = None

    def set_init_state(self, data: Any) -> bool:
        """Set the initial state for this agent"""
        if self.init_state is not None:
            # Initial state is already set
            return False
        else:
            self.init_state = data
            self.save_data()
            return True

    def get_init_state(self) -> Optional[Dict[str, Any]]:
        """
        Return the initial state for this agent,
        None if no such state exists
        """
        return self.init_state

    def load_data(self) -> None:
        """Mock agent states have no data stored"""
        pass

    def get_data(self) -> Dict[str, Any]:
        """Return dict of this agent's state"""
        return self.state

    def save_data(self) -> None:
        """Mock agents don't save data (yet)"""
        pass

    def update_data(self, live_update: Dict[str, Any]) -> None:
        """Put new data into this mock state"""
        self.state = live_update

    def update_submit(self, submitted_data: Dict[str, Any]) -> None:
        """Move the submitted data into the live state"""
        self.state = submitted_data

Mock agent state that is to be used for testing

Inherited Members
mephisto.abstractions.blueprints.mock.mock_agent_state.MockAgentState
MockAgentState
set_init_state
get_init_state
load_data
get_data
save_data
update_data
update_submit
mephisto.abstractions._subcomponents.agent_state.AgentState
STATUS_NONE
STATUS_ACCEPTED
STATUS_ONBOARDING
STATUS_WAITING
STATUS_IN_TASK
STATUS_COMPLETED
STATUS_DISCONNECT
STATUS_TIMEOUT
STATUS_PARTNER_DISCONNECT
STATUS_EXPIRED
STATUS_RETURNED
STATUS_APPROVED
STATUS_SOFT_REJECTED
STATUS_REJECTED
complete
valid
get_parsed_data
get_task_start
get_task_end
#   class MockBlueprint.TaskBuilderClass(mephisto.abstractions._subcomponents.task_builder.TaskBuilder):
View Source
class MockTaskBuilder(TaskBuilder):
    """Builder for a mock task, for use in testing"""

    BUILT_FILE = "done.built"
    BUILT_MESSAGE = "built!"

    def build_in_dir(self, build_dir: str):
        """Mock tasks don't really build anything (yet)"""
        with open(os.path.join(build_dir, self.BUILT_FILE), "w+") as built_file:
            built_file.write(self.BUILT_MESSAGE)

Builder for a mock task, for use in testing

Inherited Members
mephisto.abstractions._subcomponents.task_builder.TaskBuilder
TaskBuilder
mephisto.abstractions.blueprints.mock.mock_task_builder.MockTaskBuilder
BUILT_FILE
BUILT_MESSAGE
build_in_dir
#   class MockBlueprint.TaskRunnerClass(mephisto.abstractions._subcomponents.task_runner.TaskRunner):
View Source
class MockTaskRunner(TaskRunner):
    """Mock of a task runner, for use in testing"""

    def __init__(
        self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState"
    ):
        super().__init__(task_run, args, shared_state)
        self.timeout = args.blueprint.timeout_time
        self.tracked_tasks: Dict[str, Union["Assignment", "Unit"]] = {}
        self.is_concurrent = args.blueprint.get("is_concurrent", True)

    @staticmethod
    def get_mock_assignment_data() -> InitializationData:
        return InitializationData(shared={}, unit_data=[{}, {}])

    @staticmethod
    def get_data_for_assignment(assignment: "Assignment") -> InitializationData:
        """
        Mock tasks have no data unless given during testing
        """
        return MockTaskRunner.get_mock_assignment_data()

    def get_init_data_for_agent(self, agent: "Agent") -> Dict[str, Any]:
        """
        Return the data for an agent already assigned to a particular unit
        """
        # TODO(#97) implement
        pass

    def run_onboarding(self, onboarding_agent: "OnboardingAgent"):
        """
        Mock runners simply wait for an act to come in with whether
        or not onboarding is complete
        """
        onboarding_agent.await_submit(self.timeout)

    def run_unit(self, unit: "Unit", agent: "Agent"):
        """
        Mock runners will pass the agents for the given assignment
        all of the required messages to finish a task.
        """
        self.tracked_tasks[unit.db_id] = unit
        time.sleep(0.3)
        assigned_agent = unit.get_assigned_agent()
        assert assigned_agent is not None, "No agent was assigned"
        assert (
            assigned_agent.db_id == agent.db_id
        ), "Task was not given to assigned agent"
        packet = agent.get_live_update(timeout=self.timeout)
        if packet is not None:
            agent.observe(packet)
        agent.await_submit(self.timeout)
        del self.tracked_tasks[unit.db_id]

    def run_assignment(self, assignment: "Assignment", agents: List["Agent"]):
        """
        Mock runners will pass the agents for the given assignment
        all of the required messages to finish a task.
        """
        self.tracked_tasks[assignment.db_id] = assignment
        agent_dict = {a.db_id: a for a in agents}
        time.sleep(0.3)
        agents = []
        for unit in assignment.get_units():
            assigned_agent = unit.get_assigned_agent()
            assert assigned_agent is not None, "Task was not fully assigned"
            agent = agent_dict.get(assigned_agent.db_id)
            assert agent is not None, "Task was not launched with assigned agents"
            agents.append(agent)
        for agent in agents:
            packet = agent.get_live_update(timeout=self.timeout)
            if packet is not None:
                agent.observe(packet)
        for agent in agents:
            agent.await_submit(self.timeout)
        del self.tracked_tasks[assignment.db_id]

    def cleanup_assignment(self, assignment: "Assignment"):
        """No cleanup required yet for ending mock runs"""
        pass

    def cleanup_unit(self, unit: "Unit"):
        """No cleanup required yet for ending mock runs"""
        pass

    def cleanup_onboarding(self, onboarding_agent: "OnboardingAgent"):
        """No cleanup required yet for ending onboarding in mocks"""
        pass

Mock of a task runner, for use in testing

Inherited Members
mephisto.abstractions.blueprints.mock.mock_task_runner.MockTaskRunner
MockTaskRunner
get_mock_assignment_data
get_data_for_assignment
get_init_data_for_agent
run_onboarding
run_unit
run_assignment
cleanup_assignment
cleanup_unit
cleanup_onboarding
mephisto.abstractions._subcomponents.task_runner.TaskRunner
execute_onboarding
execute_unit
execute_assignment
filter_units_for_worker
shutdown
View Source
class MockBlueprintArgs(BlueprintArgs, OnboardingRequiredArgs, ScreenTaskRequiredArgs):
    _blueprint_type: str = BLUEPRINT_TYPE_MOCK
    num_assignments: int = field(
        default=MISSING,
        metadata={
            "help": "How many workers you want to do each assignment",
            "required": True,
        },
    )
    use_onboarding: bool = field(
        default=False, metadata={"help": "Whether onboarding should be required"}
    )
    timeout_time: int = field(
        default=0,
        metadata={"help": "Whether acts in the run assignment should have a timeout"},
    )
    is_concurrent: bool = field(
        default=True,
        metadata={"help": "Whether to run this mock task as a concurrent task or not"},
    )

MockBlueprintArgs(passed_qualification_name: str = '???', max_screening_units: int = '???', use_screening_task: bool = False, onboarding_qualification: str = '???', _blueprint_type: str = 'mock', block_qualification: str = '???', num_assignments: int = '???', use_onboarding: bool = False, timeout_time: int = 0, is_concurrent: bool = True)

View Source
class MockSharedState(SharedTaskState, OnboardingSharedState, ScreenTaskSharedState):
    pass

MockSharedState(screening_data_factory: Tuple[bool, Generator[Dict[str, Any], NoneType, NoneType]] = , onboarding_data: Dict[str, Any] = , validate_onboarding: Callable[[Any], bool] = , task_config: Dict[str, Any] = , qualifications: List[Any] = , worker_can_do_unit: Callable[[ForwardRef('Worker'), ForwardRef('Unit')], bool] = , on_unit_submitted: Callable[[ForwardRef('Unit')], NoneType] = )

Inherited Members
MockSharedState
MockSharedState
#   class MockBlueprint.ArgsMixin:
View Source
class OnboardingRequiredArgs:
    onboarding_qualification: str = field(
        default=MISSING,
        metadata={
            "help": (
                "Specify the name of a qualification used to block workers who fail onboarding, "
                "Empty will skip onboarding."
            )
        },
    )

OnboardingRequiredArgs(onboarding_qualification: str = '???')

#   class MockBlueprint.SharedStateMixin:
View Source
class OnboardingSharedState:
    onboarding_data: Dict[str, Any] = field(
        default_factory=dict,
        metadata={
            "help": (
                "Task data to send for the initialTaskData available in the frontend during onboarding."
            ),
            "type": "Dict[str, Any]",
            "default": "{}",
        },
    )
    validate_onboarding: Callable[[Any], bool] = field(
        default_factory=lambda: (lambda x: True),
        metadata={
            "help": (
                "Function that takes in the result task data (from AgentState.get_data()) "
                "and returns whether or not the onboarding was successful."
            ),
            "type": "Callable[[Any], bool]",
            "default": "Returns true always",
        },
    )

OnboardingSharedState(onboarding_data: Dict[str, Any] = , validate_onboarding: Callable[[Any], bool] = )