mephisto.abstractions.providers.mturk.mturk_provider

View Source
#!/usr/bin/env python3

# Copyright (c) Meta Platforms and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

import os
from mephisto.abstractions.providers.mturk.provider_type import PROVIDER_TYPE
from mephisto.abstractions.providers.mturk.mturk_datastore import MTurkDatastore
from mephisto.abstractions.crowd_provider import CrowdProvider, ProviderArgs
from mephisto.data_model.requester import RequesterArgs
from mephisto.abstractions.providers.mturk.mturk_agent import MTurkAgent
from mephisto.abstractions.providers.mturk.mturk_requester import MTurkRequester
from mephisto.abstractions.providers.mturk.mturk_unit import MTurkUnit
from mephisto.abstractions.providers.mturk.mturk_worker import MTurkWorker
from mephisto.abstractions.providers.mturk.mturk_utils import (
    create_hit_type,
    create_hit_config,
    delete_qualification,
)
from mephisto.operations.registry import register_mephisto_abstraction
from dataclasses import dataclass, field

from typing import ClassVar, Dict, Any, Optional, Type, List, cast, TYPE_CHECKING

from mephisto.data_model.requester import Requester

if TYPE_CHECKING:
    from mephisto.abstractions.blueprint import SharedTaskState
    from mephisto.data_model.task_run import TaskRun
    from mephisto.data_model.unit import Unit
    from mephisto.data_model.worker import Worker
    from mephisto.data_model.agent import Agent
    from omegaconf import DictConfig


@dataclass
class MTurkProviderArgs(ProviderArgs):
    """Provider args for an MTurk provider"""

    _provider_type: str = PROVIDER_TYPE


@register_mephisto_abstraction()
class MTurkProvider(CrowdProvider):
    """
    Implementation of a crowdprovider that interfaces with MTurk
    """

    # Ensure inherited methods use this level's provider type
    PROVIDER_TYPE = PROVIDER_TYPE

    UnitClass: ClassVar[Type["Unit"]] = MTurkUnit

    RequesterClass: ClassVar[Type["Requester"]] = MTurkRequester

    WorkerClass: ClassVar[Type["Worker"]] = MTurkWorker

    AgentClass: ClassVar[Type["Agent"]] = MTurkAgent

    ArgsClass = MTurkProviderArgs

    def initialize_provider_datastore(self, storage_path: str) -> Any:
        """
        MTurk itself is the source of truth for most data required to run
        tasks on MTurk. The datastore holds sessions to connect with
        MTurk as well as mappings between MTurk ids and Mephisto ids
        """
        return MTurkDatastore(datastore_root=storage_path)

    def _get_client(self, requester_name: str) -> Any:
        """
        Get an mturk client for usage with mturk_utils
        """
        return self.datastore.get_client_for_requester(requester_name)

    def setup_resources_for_task_run(
        self,
        task_run: "TaskRun",
        args: "DictConfig",
        shared_state: "SharedTaskState",
        server_url: str,
    ) -> None:
        """Produce the HIT type for this task run."""
        requester = cast("MTurkRequester", task_run.get_requester())
        session = self.datastore.get_session_for_requester(requester._requester_name)
        task_run_id = task_run.db_id

        # Set up HIT config
        config_dir = os.path.join(self.datastore.datastore_root, task_run_id)
        task_args = args.task

        # Find or create relevant qualifications
        qualifications = []
        for qualification in shared_state.qualifications:
            applicable_providers = qualification["applicable_providers"]
            if (
                applicable_providers is None
                or self.PROVIDER_TYPE in applicable_providers
            ):
                qualifications.append(qualification)
        for qualification in qualifications:
            qualification_name = qualification["qualification_name"]
            if requester.PROVIDER_TYPE == "mturk_sandbox":
                qualification_name += "_sandbox"
            if self.datastore.get_qualification_mapping(qualification_name) is None:
                qualification[
                    "QualificationTypeId"
                ] = requester._create_new_mturk_qualification(qualification_name)

        if hasattr(shared_state, "mturk_specific_qualifications"):
            # TODO(OWN) standardize provider-specific qualifications
            qualifications += shared_state.mturk_specific_qualifications  # type: ignore

        # Set up HIT type
        client = self._get_client(requester._requester_name)
        hit_type_id = create_hit_type(client, task_args, qualifications)
        frame_height = (
            task_run.get_blueprint().get_frontend_args().get("frame_height", 0)
        )
        self.datastore.register_run(task_run_id, hit_type_id, config_dir, frame_height)

    def cleanup_resources_from_task_run(
        self, task_run: "TaskRun", server_url: str
    ) -> None:
        """No cleanup necessary for task type"""
        pass

    @classmethod
    def get_wrapper_js_path(cls):
        """
        Return the path to the `wrap_crowd_source.js` file for this
        provider to be deployed to the server
        """
        return os.path.join(os.path.dirname(__file__), "wrap_crowd_source.js")

    def cleanup_qualification(self, qualification_name: str) -> None:
        """Remove the qualification from the sandbox server, if it exists"""
        mapping = self.datastore.get_qualification_mapping(qualification_name)
        if mapping is None:
            return None

        requester_id = mapping["requester_id"]
        requester = Requester.get(self.db, requester_id)
        assert isinstance(requester, MTurkRequester), "Must be an mturk requester"
        client = requester._get_client(requester._requester_name)
        delete_qualification(client, mapping["mturk_qualification_id"])
View Source
class MTurkProviderArgs(ProviderArgs):
    """Provider args for an MTurk provider"""

    _provider_type: str = PROVIDER_TYPE

Provider args for an MTurk provider

#   MTurkProviderArgs(_provider_type: str = 'mturk', requester_name: str = '???')
View Source
class MTurkProvider(CrowdProvider):
    """
    Implementation of a crowdprovider that interfaces with MTurk
    """

    # Ensure inherited methods use this level's provider type
    PROVIDER_TYPE = PROVIDER_TYPE

    UnitClass: ClassVar[Type["Unit"]] = MTurkUnit

    RequesterClass: ClassVar[Type["Requester"]] = MTurkRequester

    WorkerClass: ClassVar[Type["Worker"]] = MTurkWorker

    AgentClass: ClassVar[Type["Agent"]] = MTurkAgent

    ArgsClass = MTurkProviderArgs

    def initialize_provider_datastore(self, storage_path: str) -> Any:
        """
        MTurk itself is the source of truth for most data required to run
        tasks on MTurk. The datastore holds sessions to connect with
        MTurk as well as mappings between MTurk ids and Mephisto ids
        """
        return MTurkDatastore(datastore_root=storage_path)

    def _get_client(self, requester_name: str) -> Any:
        """
        Get an mturk client for usage with mturk_utils
        """
        return self.datastore.get_client_for_requester(requester_name)

    def setup_resources_for_task_run(
        self,
        task_run: "TaskRun",
        args: "DictConfig",
        shared_state: "SharedTaskState",
        server_url: str,
    ) -> None:
        """Produce the HIT type for this task run."""
        requester = cast("MTurkRequester", task_run.get_requester())
        session = self.datastore.get_session_for_requester(requester._requester_name)
        task_run_id = task_run.db_id

        # Set up HIT config
        config_dir = os.path.join(self.datastore.datastore_root, task_run_id)
        task_args = args.task

        # Find or create relevant qualifications
        qualifications = []
        for qualification in shared_state.qualifications:
            applicable_providers = qualification["applicable_providers"]
            if (
                applicable_providers is None
                or self.PROVIDER_TYPE in applicable_providers
            ):
                qualifications.append(qualification)
        for qualification in qualifications:
            qualification_name = qualification["qualification_name"]
            if requester.PROVIDER_TYPE == "mturk_sandbox":
                qualification_name += "_sandbox"
            if self.datastore.get_qualification_mapping(qualification_name) is None:
                qualification[
                    "QualificationTypeId"
                ] = requester._create_new_mturk_qualification(qualification_name)

        if hasattr(shared_state, "mturk_specific_qualifications"):
            # TODO(OWN) standardize provider-specific qualifications
            qualifications += shared_state.mturk_specific_qualifications  # type: ignore

        # Set up HIT type
        client = self._get_client(requester._requester_name)
        hit_type_id = create_hit_type(client, task_args, qualifications)
        frame_height = (
            task_run.get_blueprint().get_frontend_args().get("frame_height", 0)
        )
        self.datastore.register_run(task_run_id, hit_type_id, config_dir, frame_height)

    def cleanup_resources_from_task_run(
        self, task_run: "TaskRun", server_url: str
    ) -> None:
        """No cleanup necessary for task type"""
        pass

    @classmethod
    def get_wrapper_js_path(cls):
        """
        Return the path to the `wrap_crowd_source.js` file for this
        provider to be deployed to the server
        """
        return os.path.join(os.path.dirname(__file__), "wrap_crowd_source.js")

    def cleanup_qualification(self, qualification_name: str) -> None:
        """Remove the qualification from the sandbox server, if it exists"""
        mapping = self.datastore.get_qualification_mapping(qualification_name)
        if mapping is None:
            return None

        requester_id = mapping["requester_id"]
        requester = Requester.get(self.db, requester_id)
        assert isinstance(requester, MTurkRequester), "Must be an mturk requester"
        client = requester._get_client(requester._requester_name)
        delete_qualification(client, mapping["mturk_qualification_id"])

Implementation of a crowdprovider that interfaces with MTurk

#   PROVIDER_TYPE = 'mturk'
#   def initialize_provider_datastore(self, storage_path: str) -> Any:
View Source
    def initialize_provider_datastore(self, storage_path: str) -> Any:
        """
        MTurk itself is the source of truth for most data required to run
        tasks on MTurk. The datastore holds sessions to connect with
        MTurk as well as mappings between MTurk ids and Mephisto ids
        """
        return MTurkDatastore(datastore_root=storage_path)

MTurk itself is the source of truth for most data required to run tasks on MTurk. The datastore holds sessions to connect with MTurk as well as mappings between MTurk ids and Mephisto ids

#   def setup_resources_for_task_run( self, task_run: mephisto.data_model.task_run.TaskRun, args: omegaconf.dictconfig.DictConfig, shared_state: mephisto.abstractions.blueprint.SharedTaskState, server_url: str ) -> None:
View Source
    def setup_resources_for_task_run(
        self,
        task_run: "TaskRun",
        args: "DictConfig",
        shared_state: "SharedTaskState",
        server_url: str,
    ) -> None:
        """Produce the HIT type for this task run."""
        requester = cast("MTurkRequester", task_run.get_requester())
        session = self.datastore.get_session_for_requester(requester._requester_name)
        task_run_id = task_run.db_id

        # Set up HIT config
        config_dir = os.path.join(self.datastore.datastore_root, task_run_id)
        task_args = args.task

        # Find or create relevant qualifications
        qualifications = []
        for qualification in shared_state.qualifications:
            applicable_providers = qualification["applicable_providers"]
            if (
                applicable_providers is None
                or self.PROVIDER_TYPE in applicable_providers
            ):
                qualifications.append(qualification)
        for qualification in qualifications:
            qualification_name = qualification["qualification_name"]
            if requester.PROVIDER_TYPE == "mturk_sandbox":
                qualification_name += "_sandbox"
            if self.datastore.get_qualification_mapping(qualification_name) is None:
                qualification[
                    "QualificationTypeId"
                ] = requester._create_new_mturk_qualification(qualification_name)

        if hasattr(shared_state, "mturk_specific_qualifications"):
            # TODO(OWN) standardize provider-specific qualifications
            qualifications += shared_state.mturk_specific_qualifications  # type: ignore

        # Set up HIT type
        client = self._get_client(requester._requester_name)
        hit_type_id = create_hit_type(client, task_args, qualifications)
        frame_height = (
            task_run.get_blueprint().get_frontend_args().get("frame_height", 0)
        )
        self.datastore.register_run(task_run_id, hit_type_id, config_dir, frame_height)

Produce the HIT type for this task run.

#   def cleanup_resources_from_task_run( self, task_run: mephisto.data_model.task_run.TaskRun, server_url: str ) -> None:
View Source
    def cleanup_resources_from_task_run(
        self, task_run: "TaskRun", server_url: str
    ) -> None:
        """No cleanup necessary for task type"""
        pass

No cleanup necessary for task type

#  
@classmethod
def get_wrapper_js_path(cls):
View Source
    @classmethod
    def get_wrapper_js_path(cls):
        """
        Return the path to the `wrap_crowd_source.js` file for this
        provider to be deployed to the server
        """
        return os.path.join(os.path.dirname(__file__), "wrap_crowd_source.js")

Return the path to the wrap_crowd_source.js file for this provider to be deployed to the server

#   def cleanup_qualification(self, qualification_name: str) -> None:
View Source
    def cleanup_qualification(self, qualification_name: str) -> None:
        """Remove the qualification from the sandbox server, if it exists"""
        mapping = self.datastore.get_qualification_mapping(qualification_name)
        if mapping is None:
            return None

        requester_id = mapping["requester_id"]
        requester = Requester.get(self.db, requester_id)
        assert isinstance(requester, MTurkRequester), "Must be an mturk requester"
        client = requester._get_client(requester._requester_name)
        delete_qualification(client, mapping["mturk_qualification_id"])

Remove the qualification from the sandbox server, if it exists

#   class MTurkProvider.UnitClass(mephisto.data_model.unit.Unit):
View Source
class MTurkUnit(Unit):
    """
    This class tracks the status of an individual worker's contribution to a
    higher level assignment. It is the smallest 'unit' of work to complete
    the assignment, and this class is only responsible for checking
    the status of that work itself being done.
    """

    # Ensure inherited methods use this level's provider type
    PROVIDER_TYPE = PROVIDER_TYPE

    def __init__(
        self,
        db: "MephistoDB",
        db_id: str,
        row: Optional[Mapping[str, Any]] = None,
        _used_new_call: bool = False,
    ):
        super().__init__(db, db_id, row=row, _used_new_call=_used_new_call)
        self.datastore: "MTurkDatastore" = self.db.get_datastore_for_provider(
            self.PROVIDER_TYPE
        )
        self.hit_id: Optional[str] = None
        self._last_sync_time = 0.0
        self._sync_hit_mapping()
        self.__requester: Optional["MTurkRequester"] = None

    def _get_client(self, requester_name: str) -> Any:
        """
        Get an mturk client for usage with mturk_utils
        """
        return self.datastore.get_client_for_requester(requester_name)

    def _sync_hit_mapping(self) -> None:
        """Sync with the datastore to see if any mappings have updated"""
        if self.datastore.is_hit_mapping_in_sync(self.db_id, self._last_sync_time):
            return
        try:
            mapping = dict(self.datastore.get_hit_mapping(self.db_id))
            self.hit_id = mapping["hit_id"]
            self.mturk_assignment_id = mapping.get("assignment_id")
            self.assignment_time_in_seconds = mapping.get("assignment_time_in_seconds")
        except IndexError:
            # HIT does not appear to exist
            self.hit_id = None
            self.mturk_assignment_id = None
            self.assignment_time_in_seconds = -1
        # We update to a time slightly earlier than now, in order
        # to reduce the risk of a race condition caching an old
        # value the moment it's registered
        self._last_sync_time = time.monotonic() - 1

    def register_from_provider_data(
        self, hit_id: str, mturk_assignment_id: str
    ) -> None:
        """Update the datastore and local information from this registration"""
        self.datastore.register_assignment_to_hit(
            hit_id, self.db_id, mturk_assignment_id
        )
        self._sync_hit_mapping()

    def get_mturk_assignment_id(self) -> Optional[str]:
        """
        Return the MTurk assignment id associated with this unit
        """
        self._sync_hit_mapping()
        return self.mturk_assignment_id

    def get_mturk_hit_id(self) -> Optional[str]:
        """
        Return the MTurk hit id associated with this unit
        """
        self._sync_hit_mapping()
        return self.hit_id

    def get_requester(self) -> "MTurkRequester":
        """Wrapper around regular Requester as this will be MTurkRequesters"""
        if self.__requester is None:
            self.__requester = cast("MTurkRequester", super().get_requester())
        return self.__requester

    def set_db_status(self, status: str) -> None:
        """
        Set the status reflected in the database for this Unit
        """
        super().set_db_status(status)
        if status == AssignmentState.COMPLETED:
            agent = cast("MTurkAgent", self.get_assigned_agent())
            if agent is not None:
                agent_status = agent.get_status()
                if agent_status == AgentState.STATUS_IN_TASK:
                    # Oh no, MTurk has completed the unit, but we don't have
                    # the data. We need to reconcile
                    logger.warning(
                        f"Unit {self} moved to completed, but the agent didn't... "
                        f"Attempting to reconcile with MTurk directly"
                    )
                    try:
                        hit_id = self.get_mturk_hit_id()
                        assert (
                            hit_id is not None
                        ), f"This unit does not have an ID! {self}"

                        agent.attempt_to_reconcile_submitted_data(hit_id)
                    except Exception as e:
                        logger.warning(
                            f"Was not able to reconcile due to an error, {e}. "
                            f"You may need to reconcile this specific Agent manually "
                            f"after the task is completed. See here for details: "
                            f"https://github.com/facebookresearch/Mephisto/pull/442"
                        )
                elif agent_status == AgentState.STATUS_TIMEOUT:
                    # Oh no, this is also bad. we shouldn't be completing for a timed out agent.
                    logger.warning(
                        "Found a timeout that's trying to be pushed to completed with a timed out agent"
                    )
                    pass
            else:
                logger.warning(f"No agent found for completed unit {self}...")

    def clear_assigned_agent(self) -> None:
        """
        Additionally to clearing the agent, we also need to dissociate the
        hit_id from this unit in the MTurkDatastore
        """
        if self.db_status == AssignmentState.COMPLETED:
            logger.warning(
                f"Clearing an agent when COMPLETED, it's likely a submit happened "
                f"but could not be received by the Mephisto backend. This "
                f"assignment clear is thus being ignored, but this message "
                f"is indicative of some data loss. "
            )
            # TODO(OWN) how can we reconcile missing data here? Marking this agent as
            # COMPLETED will pollute the data, but not marking it means that
            # it will have to be the auto-approve deadline.
            return
        super().clear_assigned_agent()
        mturk_hit_id = self.get_mturk_hit_id()
        if mturk_hit_id is not None:
            self.datastore.clear_hit_from_unit(self.db_id)
            self._sync_hit_mapping()

        if self.db_status == AssignmentState.ASSIGNED:
            self.set_db_status(AssignmentState.LAUNCHED)

    # Required Unit functions

    def get_status(self) -> str:
        """Get status for this unit directly from MTurk, fall back on local info"""
        if self.db_status == AssignmentState.CREATED:
            return super().get_status()
        elif self.db_status in [
            AssignmentState.ACCEPTED,
            AssignmentState.EXPIRED,
            AssignmentState.SOFT_REJECTED,
        ]:
            # These statuses don't change with a get_status call
            return self.db_status

        if self.db_status in [AssignmentState.COMPLETED, AssignmentState.REJECTED]:
            # These statuses only change on agent dependent changes
            agent = self.get_assigned_agent()
            found_status = self.db_status
            if agent is not None:
                agent_status = agent.get_status()
                if agent_status == AgentState.STATUS_APPROVED:
                    found_status = AssignmentState.ACCEPTED
                elif agent_status == AgentState.STATUS_REJECTED:
                    found_status = AssignmentState.REJECTED
                elif agent_status == AgentState.STATUS_SOFT_REJECTED:
                    found_status = AssignmentState.SOFT_REJECTED
            else:
                logger.warning(f"Agent for unit {self} is None")
            if found_status != self.db_status:
                self.set_db_status(found_status)
            return self.db_status

        # Remaining statuses are tracking a live HIT

        mturk_hit_id = self.get_mturk_hit_id()
        if mturk_hit_id is None:
            # If the hit_id is None and there's an agent still assigned,
            # then that agent has timed out and we should expire
            agent = self.get_assigned_agent()
            if agent is not None:
                if agent.get_status() != AgentState.STATUS_EXPIRED:
                    agent.update_status(AgentState.STATUS_EXPIRED)

            # Can't determine anything else if there is no HIT on this unit
            return self.db_status

        requester = self.get_requester()
        client = self._get_client(requester._requester_name)
        hit = get_hit(client, mturk_hit_id)
        if hit is None:
            return AssignmentState.EXPIRED
        hit_data = hit["HIT"]

        local_status = self.db_status
        external_status = self.db_status

        if hit_data["HITStatus"] == "Assignable":
            external_status = AssignmentState.LAUNCHED
        elif hit_data["HITStatus"] == "Unassignable":
            external_status = AssignmentState.ASSIGNED
        elif hit_data["HITStatus"] in ["Reviewable", "Reviewing"]:
            external_status = AssignmentState.COMPLETED
            if hit_data["NumberOfAssignmentsAvailable"] != 0:
                external_status = AssignmentState.EXPIRED
        elif hit_data["HITStatus"] == "Disposed":
            # The HIT was deleted, must rely on what we have
            external_status = local_status
        else:
            raise Exception(f"Unexpected HIT status {hit_data['HITStatus']}")

        if external_status != local_status:
            if local_status == AssignmentState.ASSIGNED and external_status in [
                AssignmentState.LAUNCHED,
                AssignmentState.EXPIRED,
            ]:
                # Treat this as a return event, this hit may be doable by someone else
                agent = self.get_assigned_agent()
                if agent is not None and agent.get_status() in [
                    AgentState.STATUS_IN_TASK,
                    AgentState.STATUS_ONBOARDING,
                    AgentState.STATUS_WAITING,
                    AgentState.STATUS_PARTNER_DISCONNECT,
                ]:
                    # mark the in-task agent as having returned the HIT, to
                    # free any running tasks and have Blueprint decide on cleanup.
                    agent.update_status(AgentState.STATUS_RETURNED)
                if external_status == AssignmentState.EXPIRED:
                    # If we're expired, then it won't be doable, and we should update
                    self.set_db_status(external_status)
            else:
                self.set_db_status(external_status)

        return self.db_status

    def launch(self, task_url: str) -> None:
        """Create this HIT on MTurk (making it availalbe) and register the ids in the local db"""
        task_run = self.get_assignment().get_task_run()
        duration = task_run.get_task_args().assignment_duration_in_seconds
        run_id = task_run.db_id
        run_details = self.datastore.get_run(run_id)
        hit_type_id = run_details["hit_type_id"]
        requester = self.get_requester()
        client = self._get_client(requester._requester_name)
        frame_height = run_details["frame_height"]
        hit_link, hit_id, response = create_hit_with_hit_type(
            client, frame_height, task_url, hit_type_id
        )
        # TODO(OWN) get this link to the mephisto frontend
        print(hit_link)

        # We create a hit for this unit, but note that this unit may not
        # necessarily match with the same HIT that was launched for it.
        self.datastore.new_hit(hit_id, hit_link, duration, run_id)
        self.set_db_status(AssignmentState.LAUNCHED)
        return None

    def expire(self) -> float:
        """
        Send a request to expire the HIT, and if it's not assigned return,
        otherwise just return the maximum assignment duration
        """
        delay = 0
        status = self.get_status()
        if status in [AssignmentState.EXPIRED, AssignmentState.COMPLETED]:
            return delay
        if status == AssignmentState.ASSIGNED:
            # The assignment is currently being worked on,
            # so we will set the wait time to be the
            # amount of time we granted for working on this assignment
            if self.assignment_time_in_seconds is not None:
                delay = self.assignment_time_in_seconds
            logger.debug(f"Expiring a unit that is ASSIGNED after delay {delay}")
        mturk_hit_id = self.get_mturk_hit_id()
        requester = self.get_requester()
        client = self._get_client(requester._requester_name)
        if mturk_hit_id is not None:
            expire_hit(client, mturk_hit_id)
            return delay
        else:
            unassigned_hit_ids = self.datastore.get_unassigned_hit_ids(self.task_run_id)

            if len(unassigned_hit_ids) == 0:
                self.set_db_status(AssignmentState.EXPIRED)
                return delay
            hit_id = unassigned_hit_ids[0]
            expire_hit(client, hit_id)
            self.datastore.register_assignment_to_hit(hit_id, self.db_id)
            self.set_db_status(AssignmentState.EXPIRED)
            return delay

    def is_expired(self) -> bool:
        """
        Determine if this unit is expired as according to the vendor.

        In this case, we keep track of the expiration locally by refreshing
        the hit's status and seeing if we've expired.
        """
        return self.get_status() == AssignmentState.EXPIRED

    @staticmethod
    def new(
        db: "MephistoDB", assignment: "Assignment", index: int, pay_amount: float
    ) -> "Unit":
        """Create a Unit for the given assignment"""
        return MTurkUnit._register_unit(
            db, assignment, index, pay_amount, PROVIDER_TYPE
        )

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}({self.db_id}, {self.get_mturk_hit_id()}, {self.db_status})"

This class tracks the status of an individual worker's contribution to a higher level assignment. It is the smallest 'unit' of work to complete the assignment, and this class is only responsible for checking the status of that work itself being done.

#   class MTurkProvider.RequesterClass(mephisto.data_model.requester.Requester):
View Source
class MTurkRequester(Requester):
    """
    Wrapper for requester behavior as provided by MTurk. Makes
    all requests directly to MTurk through boto3.
    """

    # Ensure inherited methods use this level's provider type
    PROVIDER_TYPE = PROVIDER_TYPE
    ArgsClass = MTurkRequesterArgs

    def __init__(
        self,
        db: "MephistoDB",
        db_id: str,
        row: Optional[Mapping[str, Any]] = None,
        _used_new_call: bool = False,
    ):
        super().__init__(db, db_id, row=row, _used_new_call=_used_new_call)
        self.datastore: "MTurkDatastore" = self.db.get_datastore_for_provider(
            self.PROVIDER_TYPE
        )
        # Use _requester_name to preserve sandbox behavior which
        # utilizes a different requester_name
        self._requester_name = self.requester_name

    def _get_client(self, requester_name: str) -> Any:
        """
        Get an mturk client for usage with mturk_utils
        """
        return self.datastore.get_client_for_requester(requester_name)

    # Required functions for a Requester implementation

    def register(self, args: Optional[DictConfig] = None) -> None:
        """
        Register this requester with the crowd provider by providing any required credentials
        or such. If no args are provided, assume the registration is already made and try
        to assert it as such.
        """
        for req_field in ["access_key_id", "secret_access_key"]:
            if args is not None and req_field not in args:
                raise Exception(
                    f'Missing IAM "{req_field}" in requester registration args'
                )
        setup_aws_credentials(self._requester_name, args)

    def is_registered(self) -> bool:
        """Return whether or not this requester has registered yet"""
        return check_aws_credentials(self._requester_name)

    def get_available_budget(self) -> float:
        """Get the available budget from MTurk"""
        client = self._get_client(self._requester_name)
        return get_requester_balance(client)

    def _create_new_mturk_qualification(self, qualification_name: str) -> str:
        """
        Create a new qualification on MTurk owned by the requester provided
        """
        client = self._get_client(self._requester_name)
        qualification_desc = f"Equivalent qualification for {qualification_name}."
        use_qualification_name = qualification_name
        qualification_id = find_or_create_mturk_qualification(
            client, qualification_name, qualification_desc, must_be_owned=True
        )
        if qualification_id is None:
            # Try to append time to make the qualification unique
            use_qualification_name = f"{qualification_name}_{time.time()}"
            qualification_id = find_or_create_mturk_qualification(
                client, use_qualification_name, qualification_desc, must_be_owned=True
            )
            attempts = 0
            while qualification_id is None:
                # Append something somewhat random
                use_qualification_name = f"{qualification_name}_{str(uuid4())}"
                qualification_id = find_or_create_mturk_qualification(
                    client,
                    use_qualification_name,
                    qualification_desc,
                    must_be_owned=True,
                )
                attempts += 1
                if attempts > MAX_QUALIFICATION_ATTEMPTS:
                    raise Exception(
                        "Something has gone extremely wrong with creating qualification "
                        f"{qualification_name} for requester {self.requester_name}"
                    )
        # Store the new qualification in the datastore
        self.datastore.create_qualification_mapping(
            qualification_name, self.db_id, use_qualification_name, qualification_id
        )
        return qualification_id

    @staticmethod
    def new(db: "MephistoDB", requester_name: str) -> "Requester":
        return MTurkRequester._register_requester(db, requester_name, PROVIDER_TYPE)

Wrapper for requester behavior as provided by MTurk. Makes all requests directly to MTurk through boto3.

#   class MTurkProvider.WorkerClass(mephisto.data_model.worker.Worker):
View Source
class MTurkWorker(Worker):
    """
    This class represents an individual - namely a person. It maintains components of ongoing identity for a user.
    """

    # Ensure inherited methods use this level's provider type
    PROVIDER_TYPE = PROVIDER_TYPE

    def __init__(
        self,
        db: "MephistoDB",
        db_id: str,
        row: Optional[Mapping[str, Any]] = None,
        _used_new_call: bool = False,
    ):
        super().__init__(db, db_id, row=row, _used_new_call=_used_new_call)
        self.datastore: "MTurkDatastore" = self.db.get_datastore_for_provider(
            self.PROVIDER_TYPE
        )
        self._worker_name = self.worker_name  # sandbox workers use a different name

    @classmethod
    def get_from_mturk_worker_id(
        cls, db: "MephistoDB", mturk_worker_id: str
    ) -> Optional["MTurkWorker"]:
        """Get the MTurkWorker from the given worker_id"""
        if cls.PROVIDER_TYPE != PROVIDER_TYPE:
            mturk_worker_id += "_sandbox"
        workers = db.find_workers(
            worker_name=mturk_worker_id, provider_type=cls.PROVIDER_TYPE
        )
        if len(workers) == 0:
            logger.warning(
                f"Could not find a Mephisto Worker for mturk_id {mturk_worker_id}"
            )
            return None
        return cast("MTurkWorker", workers[0])

    def get_mturk_worker_id(self):
        return self._worker_name

    def _get_client(self, requester_name: str) -> Any:
        """
        Get an mturk client for usage with mturk_utils
        """
        return self.datastore.get_client_for_requester(requester_name)

    def grant_crowd_qualification(
        self, qualification_name: str, value: int = 1
    ) -> None:
        """
        Grant a qualification by the given name to this worker. Check the local
        MTurk db to find the matching MTurk qualification to grant, and pass
        that. If no qualification exists, try to create one.

        In creating a new qualification, Mephisto resolves the ambiguity over which
        requester to associate that qualification with by using the FIRST requester
        of the given account type (either `mturk` or `mturk_sandbox`)
        """
        mturk_qual_details = self.datastore.get_qualification_mapping(
            qualification_name
        )
        if mturk_qual_details is not None:
            requester = Requester.get(self.db, mturk_qual_details["requester_id"])
            qualification_id = mturk_qual_details["mturk_qualification_id"]
        else:
            target_type = (
                "mturk_sandbox" if qualification_name.endswith("sandbox") else "mturk"
            )
            requester = self.db.find_requesters(provider_type=target_type)[-1]
            assert isinstance(
                requester, MTurkRequester
            ), "find_requesters must return mturk requester for given provider types"
            qualification_id = requester._create_new_mturk_qualification(
                qualification_name
            )
        assert isinstance(
            requester, MTurkRequester
        ), "Must be an MTurk requester for MTurk quals"
        client = self._get_client(requester._requester_name)
        give_worker_qualification(
            client, self.get_mturk_worker_id(), qualification_id, value
        )
        return None

    def revoke_crowd_qualification(self, qualification_name: str) -> None:
        """
        Revoke the qualification by the given name from this worker. Check the local
        MTurk db to find the matching MTurk qualification to revoke, pass if
        no such qualification exists.
        """
        mturk_qual_details = self.datastore.get_qualification_mapping(
            qualification_name
        )
        if mturk_qual_details is None:
            logger.error(
                f"No locally stored MTurk qualification to revoke for name {qualification_name}"
            )
            return None

        requester = Requester.get(self.db, mturk_qual_details["requester_id"])
        assert isinstance(
            requester, MTurkRequester
        ), "Must be an MTurk requester from MTurk quals"
        client = self._get_client(requester._requester_name)
        qualification_id = mturk_qual_details["mturk_qualification_id"]
        remove_worker_qualification(
            client, self.get_mturk_worker_id(), qualification_id
        )
        return None

    def bonus_worker(
        self, amount: float, reason: str, unit: Optional["Unit"] = None
    ) -> Tuple[bool, str]:
        """Bonus this worker for work any reason. Return tuple of success and failure message"""
        if unit is None:
            # TODO(#652) implement. The content in scripts/mturk/launch_makeup_hits.py
            # may prove useful for this.
            return False, "bonusing via compensation tasks not yet available"

        unit = cast("MTurkUnit", unit)
        requester = cast(
            "MTurkRequester", unit.get_assignment().get_task_run().get_requester()
        )
        client = self._get_client(requester._requester_name)
        mturk_assignment_id = unit.get_mturk_assignment_id()
        assert mturk_assignment_id is not None, "Cannot bonus for a unit with no agent"
        pay_bonus(
            client, self._worker_name, amount, mturk_assignment_id, reason, str(uuid4())
        )
        return True, ""

    def block_worker(
        self,
        reason: str,
        unit: Optional["Unit"] = None,
        requester: Optional["Requester"] = None,
    ) -> Tuple[bool, str]:
        """Block this worker for a specified reason. Return success of block"""
        if unit is None and requester is None:
            # TODO(WISH) soft block from all requesters? Maybe have the main
            # requester soft block?
            return (
                False,
                "Blocking without a unit or requester not yet supported for MTurkWorkers",
            )
        elif unit is not None and requester is None:
            requester = unit.get_assignment().get_task_run().get_requester()
        requester = cast("MTurkRequester", requester)
        client = self._get_client(requester._requester_name)
        block_worker(client, self._worker_name, reason)
        return True, ""

    def unblock_worker(self, reason: str, requester: "Requester") -> bool:
        """unblock a blocked worker for the specified reason. Return success of unblock"""
        requester = cast("MTurkRequester", requester)
        client = self._get_client(requester._requester_name)
        unblock_worker(client, self._worker_name, reason)
        return True

    def is_blocked(self, requester: "Requester") -> bool:
        """Determine if a worker is blocked"""
        requester = cast("MTurkRequester", requester)
        client = self._get_client(requester._requester_name)
        return is_worker_blocked(client, self._worker_name)

    def is_eligible(self, task_run: "TaskRun") -> bool:
        """
        Qualifications are handled primarily by MTurk, so if a worker is able to get
        through to be able to access the task, they should be eligible
        """
        return True

    @staticmethod
    def new(db: "MephistoDB", worker_id: str) -> "Worker":
        return MTurkWorker._register_worker(db, worker_id, PROVIDER_TYPE)

This class represents an individual - namely a person. It maintains components of ongoing identity for a user.

#   class MTurkProvider.AgentClass(mephisto.data_model.agent.Agent):
View Source
class MTurkAgent(Agent):
    """
    This class encompasses a worker as they are working on an individual assignment.
    It maintains details for the current task at hand such as start and end time,
    connection status, etc.
    """

    # Ensure inherited methods use this level's provider type
    PROVIDER_TYPE = PROVIDER_TYPE

    def __init__(
        self,
        db: "MephistoDB",
        db_id: str,
        row: Optional[Mapping[str, Any]] = None,
        _used_new_call: bool = False,
    ):
        super().__init__(db, db_id, row=row, _used_new_call=_used_new_call)
        self.datastore: "MTurkDatastore" = self.db.get_datastore_for_provider(
            self.PROVIDER_TYPE
        )
        unit: "MTurkUnit" = cast("MTurkUnit", self.get_unit())
        self.mturk_assignment_id = unit.get_mturk_assignment_id()

    def _get_mturk_assignment_id(self):
        if self.mturk_assignment_id is None:
            self.mturk_assignment_id = self.get_unit().get_mturk_assignment_id()
        return self.mturk_assignment_id

    def _get_client(self) -> Any:
        """
        Get an mturk client for usage with mturk_utils for this agent
        """
        unit = self.get_unit()
        requester: "MTurkRequester" = cast("MTurkRequester", unit.get_requester())
        return self.datastore.get_client_for_requester(requester._requester_name)

    @classmethod
    def new_from_provider_data(
        cls,
        db: "MephistoDB",
        worker: "Worker",
        unit: "Unit",
        provider_data: Dict[str, Any],
    ) -> "Agent":
        """
        Wrapper around the new method that allows registering additional
        bookkeeping information from a crowd provider for this agent
        """
        from mephisto.abstractions.providers.mturk.mturk_unit import MTurkUnit

        assert isinstance(
            unit, MTurkUnit
        ), "Can only register mturk agents to mturk units"
        unit.register_from_provider_data(
            provider_data["hit_id"], provider_data["assignment_id"]
        )
        return super().new_from_provider_data(db, worker, unit, provider_data)

    def attempt_to_reconcile_submitted_data(self, mturk_hit_id: str):
        """
        Hacky attempt to load the data directly from MTurk to handle
        data submitted that we missed somehow. Chance of failure is
        certainly non-zero.
        """
        client = self._get_client()
        assignment = get_assignments_for_hit(client, mturk_hit_id)[0]
        xml_data = xmltodict.parse(assignment["Answer"])
        paired_data = json.loads(json.dumps(xml_data["QuestionFormAnswers"]["Answer"]))
        parsed_data = {
            entry["QuestionIdentifier"]: entry["FreeText"] for entry in paired_data
        }
        parsed_data["MEPHISTO_MTURK_RECONCILED"] = True
        self.handle_submit(parsed_data)

    # Required functions for Agent Interface

    def approve_work(self) -> None:
        """Approve the work done on this specific Unit"""
        if self.get_status() == AgentState.STATUS_APPROVED:
            logger.info(f"Approving already approved agent {self}, skipping")
            return
        client = self._get_client()
        approve_work(client, self._get_mturk_assignment_id(), override_rejection=True)
        self.update_status(AgentState.STATUS_APPROVED)

    def reject_work(self, reason) -> None:
        """Reject the work done on this specific Unit"""
        if self.get_status() == AgentState.STATUS_APPROVED:
            logger.warning(f"Cannot reject {self}, it is already approved")
            return
        client = self._get_client()
        reject_work(client, self._get_mturk_assignment_id(), reason)
        self.update_status(AgentState.STATUS_REJECTED)

    def mark_done(self) -> None:
        """
        MTurk agents are marked as done on the side of MTurk, so if this agent
        is marked as done there's nothing else we need to do as the task has been
        submitted.
        """
        if self.get_status() != AgentState.STATUS_DISCONNECT:
            self.db.update_agent(
                agent_id=self.db_id, status=AgentState.STATUS_COMPLETED
            )

    @staticmethod
    def new(db: "MephistoDB", worker: "Worker", unit: "Unit") -> "Agent":
        """Create an agent for this worker to be used for work on the given Unit."""
        return MTurkAgent._register_agent(db, worker, unit, PROVIDER_TYPE)

This class encompasses a worker as they are working on an individual assignment. It maintains details for the current task at hand such as start and end time, connection status, etc.

View Source
class MTurkProviderArgs(ProviderArgs):
    """Provider args for an MTurk provider"""

    _provider_type: str = PROVIDER_TYPE

Provider args for an MTurk provider