mephisto.abstractions.providers.mock.mock_worker
View Source
#!/usr/bin/env python3 # Copyright (c) Meta Platforms and its affiliates. # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. from mephisto.data_model.worker import Worker from mephisto.abstractions.providers.mock.provider_type import PROVIDER_TYPE from typing import List, Optional, Tuple, Dict, Mapping, Type, Any, TYPE_CHECKING if TYPE_CHECKING: from mephisto.abstractions.database import MephistoDB from mephisto.data_model.task_run import TaskRun from mephisto.data_model.unit import Unit from mephisto.data_model.agent import Agent from mephisto.data_model.requester import Requester from mephisto.abstractions.providers.mock.mock_datastore import MockDatastore class MockWorker(Worker): """ This class represents an individual - namely a person. It maintains components of ongoing identity for a user. """ def __init__( self, db: "MephistoDB", db_id: str, row: Optional[Mapping[str, Any]] = None, _used_new_call: bool = False, ): super().__init__(db, db_id, row=row, _used_new_call=_used_new_call) self.datastore: "MockDatastore" = db.get_datastore_for_provider(PROVIDER_TYPE) def bonus_worker( self, amount: float, reason: str, unit: Optional["Unit"] = None ) -> Tuple[bool, str]: """Bonus this worker for work any reason. Return success of bonus""" return True, "" def block_worker( self, reason: str, unit: Optional["Unit"] = None, requester: Optional["Requester"] = None, ) -> Tuple[bool, str]: """Block this worker for a specified reason. Return success of block""" self.datastore.set_worker_blocked(self.db_id, True) return True, "" def unblock_worker(self, reason: str, requester: "Requester") -> bool: """unblock a blocked worker for the specified reason. Return success of unblock""" self.datastore.set_worker_blocked(self.db_id, False) return True def is_blocked(self, requester: "Requester") -> bool: """Determine if a worker is blocked""" return self.datastore.get_worker_blocked(self.db_id) def is_eligible(self, task_run: "TaskRun") -> bool: """Determine if this worker is eligible for the given task run""" return True @staticmethod def new(db: "MephistoDB", worker_id: str) -> "Worker": return MockWorker._register_worker(db, worker_id + "_sandbox", PROVIDER_TYPE)
View Source
class MockWorker(Worker): """ This class represents an individual - namely a person. It maintains components of ongoing identity for a user. """ def __init__( self, db: "MephistoDB", db_id: str, row: Optional[Mapping[str, Any]] = None, _used_new_call: bool = False, ): super().__init__(db, db_id, row=row, _used_new_call=_used_new_call) self.datastore: "MockDatastore" = db.get_datastore_for_provider(PROVIDER_TYPE) def bonus_worker( self, amount: float, reason: str, unit: Optional["Unit"] = None ) -> Tuple[bool, str]: """Bonus this worker for work any reason. Return success of bonus""" return True, "" def block_worker( self, reason: str, unit: Optional["Unit"] = None, requester: Optional["Requester"] = None, ) -> Tuple[bool, str]: """Block this worker for a specified reason. Return success of block""" self.datastore.set_worker_blocked(self.db_id, True) return True, "" def unblock_worker(self, reason: str, requester: "Requester") -> bool: """unblock a blocked worker for the specified reason. Return success of unblock""" self.datastore.set_worker_blocked(self.db_id, False) return True def is_blocked(self, requester: "Requester") -> bool: """Determine if a worker is blocked""" return self.datastore.get_worker_blocked(self.db_id) def is_eligible(self, task_run: "TaskRun") -> bool: """Determine if this worker is eligible for the given task run""" return True @staticmethod def new(db: "MephistoDB", worker_id: str) -> "Worker": return MockWorker._register_worker(db, worker_id + "_sandbox", PROVIDER_TYPE)
This class represents an individual - namely a person. It maintains components of ongoing identity for a user.
View Source
def __new__( cls, db: "MephistoDB", db_id: str, row: Optional[Mapping[str, Any]] = None, _used_new_call: bool = False, ) -> "Worker": """ The new method is overridden to be able to automatically generate the expected Worker class without needing to specifically find it for a given db_id. As such it is impossible to create a base Worker as you will instead be returned the correct Worker class according to the crowdprovider associated with this Worker. """ from mephisto.operations.registry import get_crowd_provider_from_type if cls == Worker: # We are trying to construct a Worker, find what type to use and # create that instead if row is None: row = db.get_worker(db_id) assert row is not None, f"Given db_id {db_id} did not exist in given db" correct_class: Type[Worker] = get_crowd_provider_from_type( row["provider_type"] ).WorkerClass return super().__new__(correct_class) else: # We are constructing another instance directly return super().__new__(cls)
The new method is overridden to be able to automatically generate the expected Worker class without needing to specifically find it for a given db_id. As such it is impossible to create a base Worker as you will instead be returned the correct Worker class according to the crowdprovider associated with this Worker.
View Source
def bonus_worker( self, amount: float, reason: str, unit: Optional["Unit"] = None ) -> Tuple[bool, str]: """Bonus this worker for work any reason. Return success of bonus""" return True, ""
Bonus this worker for work any reason. Return success of bonus
View Source
def block_worker( self, reason: str, unit: Optional["Unit"] = None, requester: Optional["Requester"] = None, ) -> Tuple[bool, str]: """Block this worker for a specified reason. Return success of block""" self.datastore.set_worker_blocked(self.db_id, True) return True, ""
Block this worker for a specified reason. Return success of block
View Source
def unblock_worker(self, reason: str, requester: "Requester") -> bool: """unblock a blocked worker for the specified reason. Return success of unblock""" self.datastore.set_worker_blocked(self.db_id, False) return True
unblock a blocked worker for the specified reason. Return success of unblock
View Source
def is_blocked(self, requester: "Requester") -> bool: """Determine if a worker is blocked""" return self.datastore.get_worker_blocked(self.db_id)
Determine if a worker is blocked
View Source
def is_eligible(self, task_run: "TaskRun") -> bool: """Determine if this worker is eligible for the given task run""" return True
Determine if this worker is eligible for the given task run
View Source
@staticmethod def new(db: "MephistoDB", worker_id: str) -> "Worker": return MockWorker._register_worker(db, worker_id + "_sandbox", PROVIDER_TYPE)
Create a new worker attached to the given identifier, assuming it doesn't already exist in the database.
Implementation should return the result of _register_worker when sure the worker can be successfully created to have it put into the db.