mephisto.abstractions.providers.mock.mock_agent

View Source
#!/usr/bin/env python3

# Copyright (c) Meta Platforms and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from mephisto.data_model.agent import Agent
from mephisto.abstractions.blueprint import AgentState
from mephisto.abstractions.providers.mock.provider_type import PROVIDER_TYPE

from typing import List, Optional, Tuple, Dict, Mapping, Any, TYPE_CHECKING

if TYPE_CHECKING:
    from mephisto.data_model.unit import Unit
    from mephisto.abstractions.database import MephistoDB
    from mephisto.data_model.worker import Worker
    from mephisto.data_model.packet import Packet
    from mephisto.abstractions.providers.mock.mock_datastore import MockDatastore


class MockAgent(Agent):
    """
    This class encompasses a worker as they are working on an individual assignment.
    It maintains details for the current task at hand such as start and end time,
    connection status, etc.
    """

    def __init__(
        self,
        db: "MephistoDB",
        db_id: str,
        row: Optional[Mapping[str, Any]] = None,
        _used_new_call: bool = False,
    ):
        super().__init__(db, db_id, row=row, _used_new_call=_used_new_call)
        self.datastore: "MockDatastore" = db.get_datastore_for_provider(PROVIDER_TYPE)
        if db_id not in self.datastore.agent_data:
            self.datastore.agent_data[db_id] = {
                "observed": [],
                "pending_acts": [],
                "acts": [],
                "pending_submit": None,
            }

    def observe(self, live_update: Dict[str, Any]) -> None:
        """Put observations into this mock agent's observation list"""
        self.datastore.agent_data[self.db_id]["observed"].append(live_update)
        super().observe(live_update)

    def enqueue_mock_live_update(self, data: Dict[str, Any]) -> None:
        """Add a fake observation to pull off on the next act call"""
        self.datastore.agent_data[self.db_id]["pending_acts"] = data

    def enqueue_mock_submit_event(self, data: Dict[str, Any]) -> None:
        """
        Add a final submit event to put in the queue for this agent
        to be called on completion
        """
        self.datastore.agent_data[self.db_id]["pending_submit"] = data

    def get_live_update(self, timeout=None) -> Optional[Dict[str, Any]]:
        """
        Either take an act from this mock agent's act queue (for use
        by tests and other mock purposes) or request a regular act
        (for use in manual testing).
        """
        if len(self.datastore.agent_data[self.db_id]["pending_acts"]) > 0:
            act = self.datastore.agent_data[self.db_id]["pending_acts"].pop(0)
        else:
            act = super().get_live_update(timeout=timeout)

        if act is not None:
            self.datastore.agent_data[self.db_id]["acts"].append(act)
        return act

    def approve_work(self) -> None:
        """
        Approve the work done on this specific Unit

        Mock Units
        """
        self.update_status(AgentState.STATUS_APPROVED)

    def reject_work(self, reason) -> None:
        """
        Reject the work done on this specific Unit
        """
        self.update_status(AgentState.STATUS_REJECTED)

    def mark_done(self):
        """No need to tell mock crowd provider about doneness"""
        pass

    def mark_disconnected(self) -> None:
        """Mark this mock agent as having disconnected"""
        self.update_status(AgentState.STATUS_DISCONNECT)

    def await_submit(self, timeout: Optional[int] = None) -> bool:
        """
        Check the submission status of this agent, first popping off
        and triggering a local submit if there is one on a timeout submit
        """
        if self.did_submit.is_set():
            return True
        if timeout is not None:
            local_submit = self.datastore.agent_data[self.db_id]["pending_submit"]
            if local_submit is not None:
                self.handle_submit(local_submit)
        return super().await_submit(timeout)

    @staticmethod
    def new(db: "MephistoDB", worker: "Worker", unit: "Unit") -> "Agent":
        """Create an agent for this worker to be used for work on the given Unit."""
        return MockAgent._register_agent(db, worker, unit, PROVIDER_TYPE)
View Source
class MockAgent(Agent):
    """
    This class encompasses a worker as they are working on an individual assignment.
    It maintains details for the current task at hand such as start and end time,
    connection status, etc.
    """

    def __init__(
        self,
        db: "MephistoDB",
        db_id: str,
        row: Optional[Mapping[str, Any]] = None,
        _used_new_call: bool = False,
    ):
        super().__init__(db, db_id, row=row, _used_new_call=_used_new_call)
        self.datastore: "MockDatastore" = db.get_datastore_for_provider(PROVIDER_TYPE)
        if db_id not in self.datastore.agent_data:
            self.datastore.agent_data[db_id] = {
                "observed": [],
                "pending_acts": [],
                "acts": [],
                "pending_submit": None,
            }

    def observe(self, live_update: Dict[str, Any]) -> None:
        """Put observations into this mock agent's observation list"""
        self.datastore.agent_data[self.db_id]["observed"].append(live_update)
        super().observe(live_update)

    def enqueue_mock_live_update(self, data: Dict[str, Any]) -> None:
        """Add a fake observation to pull off on the next act call"""
        self.datastore.agent_data[self.db_id]["pending_acts"] = data

    def enqueue_mock_submit_event(self, data: Dict[str, Any]) -> None:
        """
        Add a final submit event to put in the queue for this agent
        to be called on completion
        """
        self.datastore.agent_data[self.db_id]["pending_submit"] = data

    def get_live_update(self, timeout=None) -> Optional[Dict[str, Any]]:
        """
        Either take an act from this mock agent's act queue (for use
        by tests and other mock purposes) or request a regular act
        (for use in manual testing).
        """
        if len(self.datastore.agent_data[self.db_id]["pending_acts"]) > 0:
            act = self.datastore.agent_data[self.db_id]["pending_acts"].pop(0)
        else:
            act = super().get_live_update(timeout=timeout)

        if act is not None:
            self.datastore.agent_data[self.db_id]["acts"].append(act)
        return act

    def approve_work(self) -> None:
        """
        Approve the work done on this specific Unit

        Mock Units
        """
        self.update_status(AgentState.STATUS_APPROVED)

    def reject_work(self, reason) -> None:
        """
        Reject the work done on this specific Unit
        """
        self.update_status(AgentState.STATUS_REJECTED)

    def mark_done(self):
        """No need to tell mock crowd provider about doneness"""
        pass

    def mark_disconnected(self) -> None:
        """Mark this mock agent as having disconnected"""
        self.update_status(AgentState.STATUS_DISCONNECT)

    def await_submit(self, timeout: Optional[int] = None) -> bool:
        """
        Check the submission status of this agent, first popping off
        and triggering a local submit if there is one on a timeout submit
        """
        if self.did_submit.is_set():
            return True
        if timeout is not None:
            local_submit = self.datastore.agent_data[self.db_id]["pending_submit"]
            if local_submit is not None:
                self.handle_submit(local_submit)
        return super().await_submit(timeout)

    @staticmethod
    def new(db: "MephistoDB", worker: "Worker", unit: "Unit") -> "Agent":
        """Create an agent for this worker to be used for work on the given Unit."""
        return MockAgent._register_agent(db, worker, unit, PROVIDER_TYPE)

This class encompasses a worker as they are working on an individual assignment. It maintains details for the current task at hand such as start and end time, connection status, etc.

#   MockAgent( db: mephisto.abstractions.database.MephistoDB, db_id: str, row: Union[Mapping[str, Any], NoneType] = None, _used_new_call: bool = False )
View Source
    def __new__(
        cls,
        db: "MephistoDB",
        db_id: str,
        row: Optional[Mapping[str, Any]] = None,
        _used_new_call: bool = False,
    ) -> "Agent":
        """
        The new method is overridden to be able to automatically generate
        the expected Agent class without needing to specifically find it
        for a given db_id. As such it is impossible to create a base Agent
        as you will instead be returned the correct Agent class according to
        the crowdprovider associated with this Agent.
        """
        from mephisto.operations.registry import get_crowd_provider_from_type

        if cls == Agent:
            # We are trying to construct a Agent, find what type to use and
            # create that instead
            if row is None:
                row = db.get_agent(db_id)
            assert row is not None, f"Given db_id {db_id} did not exist in given db"
            correct_class = get_crowd_provider_from_type(
                row["provider_type"]
            ).AgentClass
            return super().__new__(correct_class)
        else:
            # We are constructing another instance directly
            return super().__new__(cls)

The new method is overridden to be able to automatically generate the expected Agent class without needing to specifically find it for a given db_id. As such it is impossible to create a base Agent as you will instead be returned the correct Agent class according to the crowdprovider associated with this Agent.

#   def observe(self, live_update: Dict[str, Any]) -> None:
View Source
    def observe(self, live_update: Dict[str, Any]) -> None:
        """Put observations into this mock agent's observation list"""
        self.datastore.agent_data[self.db_id]["observed"].append(live_update)
        super().observe(live_update)

Put observations into this mock agent's observation list

#   def enqueue_mock_live_update(self, data: Dict[str, Any]) -> None:
View Source
    def enqueue_mock_live_update(self, data: Dict[str, Any]) -> None:
        """Add a fake observation to pull off on the next act call"""
        self.datastore.agent_data[self.db_id]["pending_acts"] = data

Add a fake observation to pull off on the next act call

#   def enqueue_mock_submit_event(self, data: Dict[str, Any]) -> None:
View Source
    def enqueue_mock_submit_event(self, data: Dict[str, Any]) -> None:
        """
        Add a final submit event to put in the queue for this agent
        to be called on completion
        """
        self.datastore.agent_data[self.db_id]["pending_submit"] = data

Add a final submit event to put in the queue for this agent to be called on completion

#   def get_live_update(self, timeout=None) -> Union[Dict[str, Any], NoneType]:
View Source
    def get_live_update(self, timeout=None) -> Optional[Dict[str, Any]]:
        """
        Either take an act from this mock agent's act queue (for use
        by tests and other mock purposes) or request a regular act
        (for use in manual testing).
        """
        if len(self.datastore.agent_data[self.db_id]["pending_acts"]) > 0:
            act = self.datastore.agent_data[self.db_id]["pending_acts"].pop(0)
        else:
            act = super().get_live_update(timeout=timeout)

        if act is not None:
            self.datastore.agent_data[self.db_id]["acts"].append(act)
        return act

Either take an act from this mock agent's act queue (for use by tests and other mock purposes) or request a regular act (for use in manual testing).

#   def approve_work(self) -> None:
View Source
    def approve_work(self) -> None:
        """
        Approve the work done on this specific Unit

        Mock Units
        """
        self.update_status(AgentState.STATUS_APPROVED)

Approve the work done on this specific Unit

Mock Units

#   def reject_work(self, reason) -> None:
View Source
    def reject_work(self, reason) -> None:
        """
        Reject the work done on this specific Unit
        """
        self.update_status(AgentState.STATUS_REJECTED)

Reject the work done on this specific Unit

#   def mark_done(self):
View Source
    def mark_done(self):
        """No need to tell mock crowd provider about doneness"""
        pass

No need to tell mock crowd provider about doneness

#   def mark_disconnected(self) -> None:
View Source
    def mark_disconnected(self) -> None:
        """Mark this mock agent as having disconnected"""
        self.update_status(AgentState.STATUS_DISCONNECT)

Mark this mock agent as having disconnected

#   def await_submit(self, timeout: Union[int, NoneType] = None) -> bool:
View Source
    def await_submit(self, timeout: Optional[int] = None) -> bool:
        """
        Check the submission status of this agent, first popping off
        and triggering a local submit if there is one on a timeout submit
        """
        if self.did_submit.is_set():
            return True
        if timeout is not None:
            local_submit = self.datastore.agent_data[self.db_id]["pending_submit"]
            if local_submit is not None:
                self.handle_submit(local_submit)
        return super().await_submit(timeout)

Check the submission status of this agent, first popping off and triggering a local submit if there is one on a timeout submit

View Source
    @staticmethod
    def new(db: "MephistoDB", worker: "Worker", unit: "Unit") -> "Agent":
        """Create an agent for this worker to be used for work on the given Unit."""
        return MockAgent._register_agent(db, worker, unit, PROVIDER_TYPE)

Create an agent for this worker to be used for work on the given Unit.