mephisto.abstractions.providers.mock.mock_worker

View Source
#!/usr/bin/env python3

# Copyright (c) Meta Platforms and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from mephisto.data_model.worker import Worker
from mephisto.abstractions.providers.mock.provider_type import PROVIDER_TYPE
from typing import List, Optional, Tuple, Dict, Mapping, Type, Any, TYPE_CHECKING

if TYPE_CHECKING:
    from mephisto.abstractions.database import MephistoDB
    from mephisto.data_model.task_run import TaskRun
    from mephisto.data_model.unit import Unit
    from mephisto.data_model.agent import Agent
    from mephisto.data_model.requester import Requester
    from mephisto.abstractions.providers.mock.mock_datastore import MockDatastore


class MockWorker(Worker):
    """
    This class represents an individual - namely a person. It maintains components of ongoing identity for a user.
    """

    def __init__(
        self,
        db: "MephistoDB",
        db_id: str,
        row: Optional[Mapping[str, Any]] = None,
        _used_new_call: bool = False,
    ):
        super().__init__(db, db_id, row=row, _used_new_call=_used_new_call)
        self.datastore: "MockDatastore" = db.get_datastore_for_provider(PROVIDER_TYPE)

    def bonus_worker(
        self, amount: float, reason: str, unit: Optional["Unit"] = None
    ) -> Tuple[bool, str]:
        """Bonus this worker for work any reason. Return success of bonus"""
        return True, ""

    def block_worker(
        self,
        reason: str,
        unit: Optional["Unit"] = None,
        requester: Optional["Requester"] = None,
    ) -> Tuple[bool, str]:
        """Block this worker for a specified reason. Return success of block"""
        self.datastore.set_worker_blocked(self.db_id, True)
        return True, ""

    def unblock_worker(self, reason: str, requester: "Requester") -> bool:
        """unblock a blocked worker for the specified reason. Return success of unblock"""
        self.datastore.set_worker_blocked(self.db_id, False)
        return True

    def is_blocked(self, requester: "Requester") -> bool:
        """Determine if a worker is blocked"""
        return self.datastore.get_worker_blocked(self.db_id)

    def is_eligible(self, task_run: "TaskRun") -> bool:
        """Determine if this worker is eligible for the given task run"""
        return True

    @staticmethod
    def new(db: "MephistoDB", worker_id: str) -> "Worker":
        return MockWorker._register_worker(db, worker_id + "_sandbox", PROVIDER_TYPE)
View Source
class MockWorker(Worker):
    """
    This class represents an individual - namely a person. It maintains components of ongoing identity for a user.
    """

    def __init__(
        self,
        db: "MephistoDB",
        db_id: str,
        row: Optional[Mapping[str, Any]] = None,
        _used_new_call: bool = False,
    ):
        super().__init__(db, db_id, row=row, _used_new_call=_used_new_call)
        self.datastore: "MockDatastore" = db.get_datastore_for_provider(PROVIDER_TYPE)

    def bonus_worker(
        self, amount: float, reason: str, unit: Optional["Unit"] = None
    ) -> Tuple[bool, str]:
        """Bonus this worker for work any reason. Return success of bonus"""
        return True, ""

    def block_worker(
        self,
        reason: str,
        unit: Optional["Unit"] = None,
        requester: Optional["Requester"] = None,
    ) -> Tuple[bool, str]:
        """Block this worker for a specified reason. Return success of block"""
        self.datastore.set_worker_blocked(self.db_id, True)
        return True, ""

    def unblock_worker(self, reason: str, requester: "Requester") -> bool:
        """unblock a blocked worker for the specified reason. Return success of unblock"""
        self.datastore.set_worker_blocked(self.db_id, False)
        return True

    def is_blocked(self, requester: "Requester") -> bool:
        """Determine if a worker is blocked"""
        return self.datastore.get_worker_blocked(self.db_id)

    def is_eligible(self, task_run: "TaskRun") -> bool:
        """Determine if this worker is eligible for the given task run"""
        return True

    @staticmethod
    def new(db: "MephistoDB", worker_id: str) -> "Worker":
        return MockWorker._register_worker(db, worker_id + "_sandbox", PROVIDER_TYPE)

This class represents an individual - namely a person. It maintains components of ongoing identity for a user.

#   MockWorker( db: mephisto.abstractions.database.MephistoDB, db_id: str, row: Union[Mapping[str, Any], NoneType] = None, _used_new_call: bool = False )
View Source
    def __new__(
        cls,
        db: "MephistoDB",
        db_id: str,
        row: Optional[Mapping[str, Any]] = None,
        _used_new_call: bool = False,
    ) -> "Worker":
        """
        The new method is overridden to be able to automatically generate
        the expected Worker class without needing to specifically find it
        for a given db_id. As such it is impossible to create a base Worker
        as you will instead be returned the correct Worker class according to
        the crowdprovider associated with this Worker.
        """
        from mephisto.operations.registry import get_crowd_provider_from_type

        if cls == Worker:
            # We are trying to construct a Worker, find what type to use and
            # create that instead
            if row is None:
                row = db.get_worker(db_id)
            assert row is not None, f"Given db_id {db_id} did not exist in given db"
            correct_class: Type[Worker] = get_crowd_provider_from_type(
                row["provider_type"]
            ).WorkerClass
            return super().__new__(correct_class)
        else:
            # We are constructing another instance directly
            return super().__new__(cls)

The new method is overridden to be able to automatically generate the expected Worker class without needing to specifically find it for a given db_id. As such it is impossible to create a base Worker as you will instead be returned the correct Worker class according to the crowdprovider associated with this Worker.

#   def bonus_worker( self, amount: float, reason: str, unit: Union[mephisto.data_model.unit.Unit, NoneType] = None ) -> Tuple[bool, str]:
View Source
    def bonus_worker(
        self, amount: float, reason: str, unit: Optional["Unit"] = None
    ) -> Tuple[bool, str]:
        """Bonus this worker for work any reason. Return success of bonus"""
        return True, ""

Bonus this worker for work any reason. Return success of bonus

#   def block_worker( self, reason: str, unit: Union[mephisto.data_model.unit.Unit, NoneType] = None, requester: Union[mephisto.data_model.requester.Requester, NoneType] = None ) -> Tuple[bool, str]:
View Source
    def block_worker(
        self,
        reason: str,
        unit: Optional["Unit"] = None,
        requester: Optional["Requester"] = None,
    ) -> Tuple[bool, str]:
        """Block this worker for a specified reason. Return success of block"""
        self.datastore.set_worker_blocked(self.db_id, True)
        return True, ""

Block this worker for a specified reason. Return success of block

#   def unblock_worker( self, reason: str, requester: mephisto.data_model.requester.Requester ) -> bool:
View Source
    def unblock_worker(self, reason: str, requester: "Requester") -> bool:
        """unblock a blocked worker for the specified reason. Return success of unblock"""
        self.datastore.set_worker_blocked(self.db_id, False)
        return True

unblock a blocked worker for the specified reason. Return success of unblock

#   def is_blocked(self, requester: mephisto.data_model.requester.Requester) -> bool:
View Source
    def is_blocked(self, requester: "Requester") -> bool:
        """Determine if a worker is blocked"""
        return self.datastore.get_worker_blocked(self.db_id)

Determine if a worker is blocked

#   def is_eligible(self, task_run: mephisto.data_model.task_run.TaskRun) -> bool:
View Source
    def is_eligible(self, task_run: "TaskRun") -> bool:
        """Determine if this worker is eligible for the given task run"""
        return True

Determine if this worker is eligible for the given task run

View Source
    @staticmethod
    def new(db: "MephistoDB", worker_id: str) -> "Worker":
        return MockWorker._register_worker(db, worker_id + "_sandbox", PROVIDER_TYPE)

Create a new worker attached to the given identifier, assuming it doesn't already exist in the database.

Implementation should return the result of _register_worker when sure the worker can be successfully created to have it put into the db.