mephisto.abstractions.providers.mock.mock_provider

View Source
#!/usr/bin/env python3

# Copyright (c) Meta Platforms and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from mephisto.abstractions.crowd_provider import CrowdProvider, ProviderArgs
from mephisto.abstractions.providers.mock.mock_agent import MockAgent
from mephisto.abstractions.providers.mock.mock_requester import MockRequester
from mephisto.abstractions.providers.mock.mock_unit import MockUnit
from mephisto.abstractions.providers.mock.mock_worker import MockWorker
from mephisto.abstractions.providers.mock.mock_datastore import MockDatastore
from mephisto.abstractions.providers.mock.provider_type import PROVIDER_TYPE
from mephisto.data_model.requester import RequesterArgs
from mephisto.operations.registry import register_mephisto_abstraction
from dataclasses import dataclass, field

from typing import ClassVar, Dict, Any, Optional, Type, List, TYPE_CHECKING

import os

if TYPE_CHECKING:
    from mephisto.data_model.task_run import TaskRun
    from mephisto.data_model.unit import Unit
    from mephisto.data_model.worker import Worker
    from mephisto.data_model.requester import Requester
    from mephisto.data_model.agent import Agent
    from mephisto.abstractions.blueprint import SharedTaskState
    from omegaconf import DictConfig


@dataclass
class MockProviderArgs(ProviderArgs):
    """Base class for arguments to configure Crowd Providers"""

    _provider_type: str = PROVIDER_TYPE


@register_mephisto_abstraction()
class MockProvider(CrowdProvider):
    """
    Mock implementation of a CrowdProvider that stores everything
    in a local state in the class for use in tests.
    """

    UnitClass: ClassVar[Type["Unit"]] = MockUnit

    RequesterClass: ClassVar[Type["Requester"]] = MockRequester

    WorkerClass: ClassVar[Type["Worker"]] = MockWorker

    AgentClass: ClassVar[Type["Agent"]] = MockAgent

    ArgsClass = MockProviderArgs

    PROVIDER_TYPE = PROVIDER_TYPE

    curr_db_location: ClassVar[str]

    def initialize_provider_datastore(self, storage_path: str) -> Any:
        """Mocks don't need any initialization"""
        return MockDatastore(datastore_root=storage_path)

    def setup_resources_for_task_run(
        self,
        task_run: "TaskRun",
        args: "DictConfig",
        shared_state: "SharedTaskState",
        server_url: str,
    ) -> None:
        """Mocks don't do any initialization"""
        return None

    def cleanup_resources_from_task_run(
        self, task_run: "TaskRun", server_url: str
    ) -> None:
        """Mocks don't do any initialization"""
        return None

    @classmethod
    def get_wrapper_js_path(cls):
        """
        Return the path to the `wrap_crowd_source.js` file for this
        provider to be deployed to the server
        """
        return os.path.join(os.path.dirname(__file__), "wrap_crowd_source.js")
View Source
class MockProviderArgs(ProviderArgs):
    """Base class for arguments to configure Crowd Providers"""

    _provider_type: str = PROVIDER_TYPE

Base class for arguments to configure Crowd Providers

#   MockProviderArgs(_provider_type: str = 'mock', requester_name: str = '???')
View Source
class MockProvider(CrowdProvider):
    """
    Mock implementation of a CrowdProvider that stores everything
    in a local state in the class for use in tests.
    """

    UnitClass: ClassVar[Type["Unit"]] = MockUnit

    RequesterClass: ClassVar[Type["Requester"]] = MockRequester

    WorkerClass: ClassVar[Type["Worker"]] = MockWorker

    AgentClass: ClassVar[Type["Agent"]] = MockAgent

    ArgsClass = MockProviderArgs

    PROVIDER_TYPE = PROVIDER_TYPE

    curr_db_location: ClassVar[str]

    def initialize_provider_datastore(self, storage_path: str) -> Any:
        """Mocks don't need any initialization"""
        return MockDatastore(datastore_root=storage_path)

    def setup_resources_for_task_run(
        self,
        task_run: "TaskRun",
        args: "DictConfig",
        shared_state: "SharedTaskState",
        server_url: str,
    ) -> None:
        """Mocks don't do any initialization"""
        return None

    def cleanup_resources_from_task_run(
        self, task_run: "TaskRun", server_url: str
    ) -> None:
        """Mocks don't do any initialization"""
        return None

    @classmethod
    def get_wrapper_js_path(cls):
        """
        Return the path to the `wrap_crowd_source.js` file for this
        provider to be deployed to the server
        """
        return os.path.join(os.path.dirname(__file__), "wrap_crowd_source.js")

Mock implementation of a CrowdProvider that stores everything in a local state in the class for use in tests.

#   PROVIDER_TYPE = 'mock'
#   def initialize_provider_datastore(self, storage_path: str) -> Any:
View Source
    def initialize_provider_datastore(self, storage_path: str) -> Any:
        """Mocks don't need any initialization"""
        return MockDatastore(datastore_root=storage_path)

Mocks don't need any initialization

#   def setup_resources_for_task_run( self, task_run: mephisto.data_model.task_run.TaskRun, args: omegaconf.dictconfig.DictConfig, shared_state: mephisto.abstractions.blueprint.SharedTaskState, server_url: str ) -> None:
View Source
    def setup_resources_for_task_run(
        self,
        task_run: "TaskRun",
        args: "DictConfig",
        shared_state: "SharedTaskState",
        server_url: str,
    ) -> None:
        """Mocks don't do any initialization"""
        return None

Mocks don't do any initialization

#   def cleanup_resources_from_task_run( self, task_run: mephisto.data_model.task_run.TaskRun, server_url: str ) -> None:
View Source
    def cleanup_resources_from_task_run(
        self, task_run: "TaskRun", server_url: str
    ) -> None:
        """Mocks don't do any initialization"""
        return None

Mocks don't do any initialization

#  
@classmethod
def get_wrapper_js_path(cls):
View Source
    @classmethod
    def get_wrapper_js_path(cls):
        """
        Return the path to the `wrap_crowd_source.js` file for this
        provider to be deployed to the server
        """
        return os.path.join(os.path.dirname(__file__), "wrap_crowd_source.js")

Return the path to the wrap_crowd_source.js file for this provider to be deployed to the server

#   class MockProvider.UnitClass(mephisto.data_model.unit.Unit):
View Source
class MockUnit(Unit):
    """
    This class tracks the status of an individual worker's contribution to a
    higher level assignment. It is the smallest 'unit' of work to complete
    the assignment, and this class is only responsible for checking
    the status of that work itself being done.

    It should be extended for usage with a specific crowd provider
    """

    def __init__(
        self,
        db: "MephistoDB",
        db_id: str,
        row: Optional[Mapping[str, Any]] = None,
        _used_new_call: bool = False,
    ):
        super().__init__(db, db_id, row=row, _used_new_call=_used_new_call)
        self.datastore: "MockDatastore" = db.get_datastore_for_provider(PROVIDER_TYPE)

    def launch(self, task_url: str) -> None:
        """Mock launches do nothing right now beyond updating state"""
        self.set_db_status(status=AssignmentState.LAUNCHED)

        # TODO(OWN) get this link to the frontend
        port = task_url.split(":")[1].split("/")[0]
        print(task_url)
        print(
            f"Mock task launched: localhost:{port} for preview, "
            f"localhost:{port}/?worker_id=x&assignment_id={self.db_id}"
        )
        logger.info(
            f"Mock task launched: localhost:{port} for preview, "
            f"localhost:{port}/?worker_id=x&assignment_id={self.db_id} for assignment {self.assignment_id}"
        )

        return None

    def expire(self) -> float:
        """Expiration is immediate on Mocks"""
        if self.get_status() not in [
            AssignmentState.EXPIRED,
            AssignmentState.COMPLETED,
        ]:
            self.set_db_status(AssignmentState.EXPIRED)
        self.datastore.set_unit_expired(self.db_id, True)
        return 0.0

    def is_expired(self) -> bool:
        """Determine if this unit is expired as according to the vendor."""
        return self.datastore.get_unit_expired(self.db_id)

    @staticmethod
    def new(
        db: "MephistoDB", assignment: "Assignment", index: int, pay_amount: float
    ) -> "Unit":
        """Create a Unit for the given assignment"""
        return MockUnit._register_unit(db, assignment, index, pay_amount, PROVIDER_TYPE)

This class tracks the status of an individual worker's contribution to a higher level assignment. It is the smallest 'unit' of work to complete the assignment, and this class is only responsible for checking the status of that work itself being done.

It should be extended for usage with a specific crowd provider

#   class MockProvider.RequesterClass(mephisto.data_model.requester.Requester):
View Source
class MockRequester(Requester):
    """
    High level class representing a requester on some kind of crowd provider. Sets some default
    initializations, but mostly should be extended by the specific requesters for crowd providers
    with whatever implementation details are required to get those to work.
    """

    ArgsClass = MockRequesterArgs

    def __init__(
        self,
        db: "MephistoDB",
        db_id: str,
        row: Optional[Mapping[str, Any]] = None,
        _used_new_call: bool = False,
    ):
        super().__init__(db, db_id, row=row, _used_new_call=_used_new_call)
        self.datastore: "MockDatastore" = db.get_datastore_for_provider(PROVIDER_TYPE)

    def register(self, args: Optional["DictConfig"] = None) -> None:
        """Mock requesters don't actually register credentials"""
        if args is not None:
            if args.get("force_fail") is True:
                raise Exception("Forced failure test exception was set")
        else:
            self.datastore.set_requester_registered(self.db_id, True)

    def is_registered(self) -> bool:
        """Return the registration status"""
        return self.datastore.get_requester_registered(self.db_id)

    def get_available_budget(self) -> float:
        """MockRequesters have $100000 to spend"""
        return MOCK_BUDGET

    @classmethod
    def is_sandbox(self) -> bool:
        """MockRequesters are for testing only, and are thus treated as sandbox"""
        return True

    @staticmethod
    def new(db: "MephistoDB", requester_name: str) -> "Requester":
        return MockRequester._register_requester(db, requester_name, PROVIDER_TYPE)

High level class representing a requester on some kind of crowd provider. Sets some default initializations, but mostly should be extended by the specific requesters for crowd providers with whatever implementation details are required to get those to work.

#   class MockProvider.WorkerClass(mephisto.data_model.worker.Worker):
View Source
class MockWorker(Worker):
    """
    This class represents an individual - namely a person. It maintains components of ongoing identity for a user.
    """

    def __init__(
        self,
        db: "MephistoDB",
        db_id: str,
        row: Optional[Mapping[str, Any]] = None,
        _used_new_call: bool = False,
    ):
        super().__init__(db, db_id, row=row, _used_new_call=_used_new_call)
        self.datastore: "MockDatastore" = db.get_datastore_for_provider(PROVIDER_TYPE)

    def bonus_worker(
        self, amount: float, reason: str, unit: Optional["Unit"] = None
    ) -> Tuple[bool, str]:
        """Bonus this worker for work any reason. Return success of bonus"""
        return True, ""

    def block_worker(
        self,
        reason: str,
        unit: Optional["Unit"] = None,
        requester: Optional["Requester"] = None,
    ) -> Tuple[bool, str]:
        """Block this worker for a specified reason. Return success of block"""
        self.datastore.set_worker_blocked(self.db_id, True)
        return True, ""

    def unblock_worker(self, reason: str, requester: "Requester") -> bool:
        """unblock a blocked worker for the specified reason. Return success of unblock"""
        self.datastore.set_worker_blocked(self.db_id, False)
        return True

    def is_blocked(self, requester: "Requester") -> bool:
        """Determine if a worker is blocked"""
        return self.datastore.get_worker_blocked(self.db_id)

    def is_eligible(self, task_run: "TaskRun") -> bool:
        """Determine if this worker is eligible for the given task run"""
        return True

    @staticmethod
    def new(db: "MephistoDB", worker_id: str) -> "Worker":
        return MockWorker._register_worker(db, worker_id + "_sandbox", PROVIDER_TYPE)

This class represents an individual - namely a person. It maintains components of ongoing identity for a user.

#   class MockProvider.AgentClass(mephisto.data_model.agent.Agent):
View Source
class MockAgent(Agent):
    """
    This class encompasses a worker as they are working on an individual assignment.
    It maintains details for the current task at hand such as start and end time,
    connection status, etc.
    """

    def __init__(
        self,
        db: "MephistoDB",
        db_id: str,
        row: Optional[Mapping[str, Any]] = None,
        _used_new_call: bool = False,
    ):
        super().__init__(db, db_id, row=row, _used_new_call=_used_new_call)
        self.datastore: "MockDatastore" = db.get_datastore_for_provider(PROVIDER_TYPE)
        if db_id not in self.datastore.agent_data:
            self.datastore.agent_data[db_id] = {
                "observed": [],
                "pending_acts": [],
                "acts": [],
                "pending_submit": None,
            }

    def observe(self, live_update: Dict[str, Any]) -> None:
        """Put observations into this mock agent's observation list"""
        self.datastore.agent_data[self.db_id]["observed"].append(live_update)
        super().observe(live_update)

    def enqueue_mock_live_update(self, data: Dict[str, Any]) -> None:
        """Add a fake observation to pull off on the next act call"""
        self.datastore.agent_data[self.db_id]["pending_acts"] = data

    def enqueue_mock_submit_event(self, data: Dict[str, Any]) -> None:
        """
        Add a final submit event to put in the queue for this agent
        to be called on completion
        """
        self.datastore.agent_data[self.db_id]["pending_submit"] = data

    def get_live_update(self, timeout=None) -> Optional[Dict[str, Any]]:
        """
        Either take an act from this mock agent's act queue (for use
        by tests and other mock purposes) or request a regular act
        (for use in manual testing).
        """
        if len(self.datastore.agent_data[self.db_id]["pending_acts"]) > 0:
            act = self.datastore.agent_data[self.db_id]["pending_acts"].pop(0)
        else:
            act = super().get_live_update(timeout=timeout)

        if act is not None:
            self.datastore.agent_data[self.db_id]["acts"].append(act)
        return act

    def approve_work(self) -> None:
        """
        Approve the work done on this specific Unit

        Mock Units
        """
        self.update_status(AgentState.STATUS_APPROVED)

    def reject_work(self, reason) -> None:
        """
        Reject the work done on this specific Unit
        """
        self.update_status(AgentState.STATUS_REJECTED)

    def mark_done(self):
        """No need to tell mock crowd provider about doneness"""
        pass

    def mark_disconnected(self) -> None:
        """Mark this mock agent as having disconnected"""
        self.update_status(AgentState.STATUS_DISCONNECT)

    def await_submit(self, timeout: Optional[int] = None) -> bool:
        """
        Check the submission status of this agent, first popping off
        and triggering a local submit if there is one on a timeout submit
        """
        if self.did_submit.is_set():
            return True
        if timeout is not None:
            local_submit = self.datastore.agent_data[self.db_id]["pending_submit"]
            if local_submit is not None:
                self.handle_submit(local_submit)
        return super().await_submit(timeout)

    @staticmethod
    def new(db: "MephistoDB", worker: "Worker", unit: "Unit") -> "Agent":
        """Create an agent for this worker to be used for work on the given Unit."""
        return MockAgent._register_agent(db, worker, unit, PROVIDER_TYPE)

This class encompasses a worker as they are working on an individual assignment. It maintains details for the current task at hand such as start and end time, connection status, etc.

View Source
class MockProviderArgs(ProviderArgs):
    """Base class for arguments to configure Crowd Providers"""

    _provider_type: str = PROVIDER_TYPE

Base class for arguments to configure Crowd Providers