mephisto.abstractions.providers.mock.mock_provider
View Source
#!/usr/bin/env python3 # Copyright (c) Meta Platforms and its affiliates. # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. from mephisto.abstractions.crowd_provider import CrowdProvider, ProviderArgs from mephisto.abstractions.providers.mock.mock_agent import MockAgent from mephisto.abstractions.providers.mock.mock_requester import MockRequester from mephisto.abstractions.providers.mock.mock_unit import MockUnit from mephisto.abstractions.providers.mock.mock_worker import MockWorker from mephisto.abstractions.providers.mock.mock_datastore import MockDatastore from mephisto.abstractions.providers.mock.provider_type import PROVIDER_TYPE from mephisto.data_model.requester import RequesterArgs from mephisto.operations.registry import register_mephisto_abstraction from dataclasses import dataclass, field from typing import ClassVar, Dict, Any, Optional, Type, List, TYPE_CHECKING import os if TYPE_CHECKING: from mephisto.data_model.task_run import TaskRun from mephisto.data_model.unit import Unit from mephisto.data_model.worker import Worker from mephisto.data_model.requester import Requester from mephisto.data_model.agent import Agent from mephisto.abstractions.blueprint import SharedTaskState from omegaconf import DictConfig @dataclass class MockProviderArgs(ProviderArgs): """Base class for arguments to configure Crowd Providers""" _provider_type: str = PROVIDER_TYPE @register_mephisto_abstraction() class MockProvider(CrowdProvider): """ Mock implementation of a CrowdProvider that stores everything in a local state in the class for use in tests. """ UnitClass: ClassVar[Type["Unit"]] = MockUnit RequesterClass: ClassVar[Type["Requester"]] = MockRequester WorkerClass: ClassVar[Type["Worker"]] = MockWorker AgentClass: ClassVar[Type["Agent"]] = MockAgent ArgsClass = MockProviderArgs PROVIDER_TYPE = PROVIDER_TYPE curr_db_location: ClassVar[str] def initialize_provider_datastore(self, storage_path: str) -> Any: """Mocks don't need any initialization""" return MockDatastore(datastore_root=storage_path) def setup_resources_for_task_run( self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState", server_url: str, ) -> None: """Mocks don't do any initialization""" return None def cleanup_resources_from_task_run( self, task_run: "TaskRun", server_url: str ) -> None: """Mocks don't do any initialization""" return None @classmethod def get_wrapper_js_path(cls): """ Return the path to the `wrap_crowd_source.js` file for this provider to be deployed to the server """ return os.path.join(os.path.dirname(__file__), "wrap_crowd_source.js")
View Source
class MockProviderArgs(ProviderArgs): """Base class for arguments to configure Crowd Providers""" _provider_type: str = PROVIDER_TYPE
Base class for arguments to configure Crowd Providers
Inherited Members
View Source
class MockProvider(CrowdProvider): """ Mock implementation of a CrowdProvider that stores everything in a local state in the class for use in tests. """ UnitClass: ClassVar[Type["Unit"]] = MockUnit RequesterClass: ClassVar[Type["Requester"]] = MockRequester WorkerClass: ClassVar[Type["Worker"]] = MockWorker AgentClass: ClassVar[Type["Agent"]] = MockAgent ArgsClass = MockProviderArgs PROVIDER_TYPE = PROVIDER_TYPE curr_db_location: ClassVar[str] def initialize_provider_datastore(self, storage_path: str) -> Any: """Mocks don't need any initialization""" return MockDatastore(datastore_root=storage_path) def setup_resources_for_task_run( self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState", server_url: str, ) -> None: """Mocks don't do any initialization""" return None def cleanup_resources_from_task_run( self, task_run: "TaskRun", server_url: str ) -> None: """Mocks don't do any initialization""" return None @classmethod def get_wrapper_js_path(cls): """ Return the path to the `wrap_crowd_source.js` file for this provider to be deployed to the server """ return os.path.join(os.path.dirname(__file__), "wrap_crowd_source.js")
Mock implementation of a CrowdProvider that stores everything in a local state in the class for use in tests.
View Source
def initialize_provider_datastore(self, storage_path: str) -> Any: """Mocks don't need any initialization""" return MockDatastore(datastore_root=storage_path)
Mocks don't need any initialization
View Source
def setup_resources_for_task_run( self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState", server_url: str, ) -> None: """Mocks don't do any initialization""" return None
Mocks don't do any initialization
View Source
def cleanup_resources_from_task_run( self, task_run: "TaskRun", server_url: str ) -> None: """Mocks don't do any initialization""" return None
Mocks don't do any initialization
View Source
@classmethod def get_wrapper_js_path(cls): """ Return the path to the `wrap_crowd_source.js` file for this provider to be deployed to the server """ return os.path.join(os.path.dirname(__file__), "wrap_crowd_source.js")
Return the path to the wrap_crowd_source.js
file for this
provider to be deployed to the server
View Source
class MockUnit(Unit): """ This class tracks the status of an individual worker's contribution to a higher level assignment. It is the smallest 'unit' of work to complete the assignment, and this class is only responsible for checking the status of that work itself being done. It should be extended for usage with a specific crowd provider """ def __init__( self, db: "MephistoDB", db_id: str, row: Optional[Mapping[str, Any]] = None, _used_new_call: bool = False, ): super().__init__(db, db_id, row=row, _used_new_call=_used_new_call) self.datastore: "MockDatastore" = db.get_datastore_for_provider(PROVIDER_TYPE) def launch(self, task_url: str) -> None: """Mock launches do nothing right now beyond updating state""" self.set_db_status(status=AssignmentState.LAUNCHED) # TODO(OWN) get this link to the frontend port = task_url.split(":")[1].split("/")[0] print(task_url) print( f"Mock task launched: localhost:{port} for preview, " f"localhost:{port}/?worker_id=x&assignment_id={self.db_id}" ) logger.info( f"Mock task launched: localhost:{port} for preview, " f"localhost:{port}/?worker_id=x&assignment_id={self.db_id} for assignment {self.assignment_id}" ) return None def expire(self) -> float: """Expiration is immediate on Mocks""" if self.get_status() not in [ AssignmentState.EXPIRED, AssignmentState.COMPLETED, ]: self.set_db_status(AssignmentState.EXPIRED) self.datastore.set_unit_expired(self.db_id, True) return 0.0 def is_expired(self) -> bool: """Determine if this unit is expired as according to the vendor.""" return self.datastore.get_unit_expired(self.db_id) @staticmethod def new( db: "MephistoDB", assignment: "Assignment", index: int, pay_amount: float ) -> "Unit": """Create a Unit for the given assignment""" return MockUnit._register_unit(db, assignment, index, pay_amount, PROVIDER_TYPE)
This class tracks the status of an individual worker's contribution to a higher level assignment. It is the smallest 'unit' of work to complete the assignment, and this class is only responsible for checking the status of that work itself being done.
It should be extended for usage with a specific crowd provider
Inherited Members
View Source
class MockRequester(Requester): """ High level class representing a requester on some kind of crowd provider. Sets some default initializations, but mostly should be extended by the specific requesters for crowd providers with whatever implementation details are required to get those to work. """ ArgsClass = MockRequesterArgs def __init__( self, db: "MephistoDB", db_id: str, row: Optional[Mapping[str, Any]] = None, _used_new_call: bool = False, ): super().__init__(db, db_id, row=row, _used_new_call=_used_new_call) self.datastore: "MockDatastore" = db.get_datastore_for_provider(PROVIDER_TYPE) def register(self, args: Optional["DictConfig"] = None) -> None: """Mock requesters don't actually register credentials""" if args is not None: if args.get("force_fail") is True: raise Exception("Forced failure test exception was set") else: self.datastore.set_requester_registered(self.db_id, True) def is_registered(self) -> bool: """Return the registration status""" return self.datastore.get_requester_registered(self.db_id) def get_available_budget(self) -> float: """MockRequesters have $100000 to spend""" return MOCK_BUDGET @classmethod def is_sandbox(self) -> bool: """MockRequesters are for testing only, and are thus treated as sandbox""" return True @staticmethod def new(db: "MephistoDB", requester_name: str) -> "Requester": return MockRequester._register_requester(db, requester_name, PROVIDER_TYPE)
High level class representing a requester on some kind of crowd provider. Sets some default initializations, but mostly should be extended by the specific requesters for crowd providers with whatever implementation details are required to get those to work.
Inherited Members
View Source
class MockWorker(Worker): """ This class represents an individual - namely a person. It maintains components of ongoing identity for a user. """ def __init__( self, db: "MephistoDB", db_id: str, row: Optional[Mapping[str, Any]] = None, _used_new_call: bool = False, ): super().__init__(db, db_id, row=row, _used_new_call=_used_new_call) self.datastore: "MockDatastore" = db.get_datastore_for_provider(PROVIDER_TYPE) def bonus_worker( self, amount: float, reason: str, unit: Optional["Unit"] = None ) -> Tuple[bool, str]: """Bonus this worker for work any reason. Return success of bonus""" return True, "" def block_worker( self, reason: str, unit: Optional["Unit"] = None, requester: Optional["Requester"] = None, ) -> Tuple[bool, str]: """Block this worker for a specified reason. Return success of block""" self.datastore.set_worker_blocked(self.db_id, True) return True, "" def unblock_worker(self, reason: str, requester: "Requester") -> bool: """unblock a blocked worker for the specified reason. Return success of unblock""" self.datastore.set_worker_blocked(self.db_id, False) return True def is_blocked(self, requester: "Requester") -> bool: """Determine if a worker is blocked""" return self.datastore.get_worker_blocked(self.db_id) def is_eligible(self, task_run: "TaskRun") -> bool: """Determine if this worker is eligible for the given task run""" return True @staticmethod def new(db: "MephistoDB", worker_id: str) -> "Worker": return MockWorker._register_worker(db, worker_id + "_sandbox", PROVIDER_TYPE)
This class represents an individual - namely a person. It maintains components of ongoing identity for a user.
Inherited Members
View Source
class MockAgent(Agent): """ This class encompasses a worker as they are working on an individual assignment. It maintains details for the current task at hand such as start and end time, connection status, etc. """ def __init__( self, db: "MephistoDB", db_id: str, row: Optional[Mapping[str, Any]] = None, _used_new_call: bool = False, ): super().__init__(db, db_id, row=row, _used_new_call=_used_new_call) self.datastore: "MockDatastore" = db.get_datastore_for_provider(PROVIDER_TYPE) if db_id not in self.datastore.agent_data: self.datastore.agent_data[db_id] = { "observed": [], "pending_acts": [], "acts": [], "pending_submit": None, } def observe(self, live_update: Dict[str, Any]) -> None: """Put observations into this mock agent's observation list""" self.datastore.agent_data[self.db_id]["observed"].append(live_update) super().observe(live_update) def enqueue_mock_live_update(self, data: Dict[str, Any]) -> None: """Add a fake observation to pull off on the next act call""" self.datastore.agent_data[self.db_id]["pending_acts"] = data def enqueue_mock_submit_event(self, data: Dict[str, Any]) -> None: """ Add a final submit event to put in the queue for this agent to be called on completion """ self.datastore.agent_data[self.db_id]["pending_submit"] = data def get_live_update(self, timeout=None) -> Optional[Dict[str, Any]]: """ Either take an act from this mock agent's act queue (for use by tests and other mock purposes) or request a regular act (for use in manual testing). """ if len(self.datastore.agent_data[self.db_id]["pending_acts"]) > 0: act = self.datastore.agent_data[self.db_id]["pending_acts"].pop(0) else: act = super().get_live_update(timeout=timeout) if act is not None: self.datastore.agent_data[self.db_id]["acts"].append(act) return act def approve_work(self) -> None: """ Approve the work done on this specific Unit Mock Units """ self.update_status(AgentState.STATUS_APPROVED) def reject_work(self, reason) -> None: """ Reject the work done on this specific Unit """ self.update_status(AgentState.STATUS_REJECTED) def mark_done(self): """No need to tell mock crowd provider about doneness""" pass def mark_disconnected(self) -> None: """Mark this mock agent as having disconnected""" self.update_status(AgentState.STATUS_DISCONNECT) def await_submit(self, timeout: Optional[int] = None) -> bool: """ Check the submission status of this agent, first popping off and triggering a local submit if there is one on a timeout submit """ if self.did_submit.is_set(): return True if timeout is not None: local_submit = self.datastore.agent_data[self.db_id]["pending_submit"] if local_submit is not None: self.handle_submit(local_submit) return super().await_submit(timeout) @staticmethod def new(db: "MephistoDB", worker: "Worker", unit: "Unit") -> "Agent": """Create an agent for this worker to be used for work on the given Unit.""" return MockAgent._register_agent(db, worker, unit, PROVIDER_TYPE)
This class encompasses a worker as they are working on an individual assignment. It maintains details for the current task at hand such as start and end time, connection status, etc.
Inherited Members
View Source
class MockProviderArgs(ProviderArgs): """Base class for arguments to configure Crowd Providers""" _provider_type: str = PROVIDER_TYPE
Base class for arguments to configure Crowd Providers