mephisto.abstractions.providers.mturk.mturk_provider
View Source
#!/usr/bin/env python3 # Copyright (c) Meta Platforms and its affiliates. # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. import os from mephisto.abstractions.providers.mturk.provider_type import PROVIDER_TYPE from mephisto.abstractions.providers.mturk.mturk_datastore import MTurkDatastore from mephisto.abstractions.crowd_provider import CrowdProvider, ProviderArgs from mephisto.data_model.requester import RequesterArgs from mephisto.abstractions.providers.mturk.mturk_agent import MTurkAgent from mephisto.abstractions.providers.mturk.mturk_requester import MTurkRequester from mephisto.abstractions.providers.mturk.mturk_unit import MTurkUnit from mephisto.abstractions.providers.mturk.mturk_worker import MTurkWorker from mephisto.abstractions.providers.mturk.mturk_utils import ( create_hit_type, create_hit_config, delete_qualification, ) from mephisto.operations.registry import register_mephisto_abstraction from dataclasses import dataclass, field from typing import ClassVar, Dict, Any, Optional, Type, List, cast, TYPE_CHECKING from mephisto.data_model.requester import Requester if TYPE_CHECKING: from mephisto.abstractions.blueprint import SharedTaskState from mephisto.data_model.task_run import TaskRun from mephisto.data_model.unit import Unit from mephisto.data_model.worker import Worker from mephisto.data_model.agent import Agent from omegaconf import DictConfig @dataclass class MTurkProviderArgs(ProviderArgs): """Provider args for an MTurk provider""" _provider_type: str = PROVIDER_TYPE @register_mephisto_abstraction() class MTurkProvider(CrowdProvider): """ Implementation of a crowdprovider that interfaces with MTurk """ # Ensure inherited methods use this level's provider type PROVIDER_TYPE = PROVIDER_TYPE UnitClass: ClassVar[Type["Unit"]] = MTurkUnit RequesterClass: ClassVar[Type["Requester"]] = MTurkRequester WorkerClass: ClassVar[Type["Worker"]] = MTurkWorker AgentClass: ClassVar[Type["Agent"]] = MTurkAgent ArgsClass = MTurkProviderArgs def initialize_provider_datastore(self, storage_path: str) -> Any: """ MTurk itself is the source of truth for most data required to run tasks on MTurk. The datastore holds sessions to connect with MTurk as well as mappings between MTurk ids and Mephisto ids """ return MTurkDatastore(datastore_root=storage_path) def _get_client(self, requester_name: str) -> Any: """ Get an mturk client for usage with mturk_utils """ return self.datastore.get_client_for_requester(requester_name) def setup_resources_for_task_run( self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState", server_url: str, ) -> None: """Produce the HIT type for this task run.""" requester = cast("MTurkRequester", task_run.get_requester()) session = self.datastore.get_session_for_requester(requester._requester_name) task_run_id = task_run.db_id # Set up HIT config config_dir = os.path.join(self.datastore.datastore_root, task_run_id) task_args = args.task # Find or create relevant qualifications qualifications = [] for qualification in shared_state.qualifications: applicable_providers = qualification["applicable_providers"] if ( applicable_providers is None or self.PROVIDER_TYPE in applicable_providers ): qualifications.append(qualification) for qualification in qualifications: qualification_name = qualification["qualification_name"] if requester.PROVIDER_TYPE == "mturk_sandbox": qualification_name += "_sandbox" if self.datastore.get_qualification_mapping(qualification_name) is None: qualification[ "QualificationTypeId" ] = requester._create_new_mturk_qualification(qualification_name) if hasattr(shared_state, "mturk_specific_qualifications"): # TODO(OWN) standardize provider-specific qualifications qualifications += shared_state.mturk_specific_qualifications # type: ignore # Set up HIT type client = self._get_client(requester._requester_name) hit_type_id = create_hit_type(client, task_args, qualifications) frame_height = ( task_run.get_blueprint().get_frontend_args().get("frame_height", 0) ) self.datastore.register_run(task_run_id, hit_type_id, config_dir, frame_height) def cleanup_resources_from_task_run( self, task_run: "TaskRun", server_url: str ) -> None: """No cleanup necessary for task type""" pass @classmethod def get_wrapper_js_path(cls): """ Return the path to the `wrap_crowd_source.js` file for this provider to be deployed to the server """ return os.path.join(os.path.dirname(__file__), "wrap_crowd_source.js") def cleanup_qualification(self, qualification_name: str) -> None: """Remove the qualification from the sandbox server, if it exists""" mapping = self.datastore.get_qualification_mapping(qualification_name) if mapping is None: return None requester_id = mapping["requester_id"] requester = Requester.get(self.db, requester_id) assert isinstance(requester, MTurkRequester), "Must be an mturk requester" client = requester._get_client(requester._requester_name) delete_qualification(client, mapping["mturk_qualification_id"])
View Source
class MTurkProviderArgs(ProviderArgs): """Provider args for an MTurk provider""" _provider_type: str = PROVIDER_TYPE
Provider args for an MTurk provider
Inherited Members
View Source
class MTurkProvider(CrowdProvider): """ Implementation of a crowdprovider that interfaces with MTurk """ # Ensure inherited methods use this level's provider type PROVIDER_TYPE = PROVIDER_TYPE UnitClass: ClassVar[Type["Unit"]] = MTurkUnit RequesterClass: ClassVar[Type["Requester"]] = MTurkRequester WorkerClass: ClassVar[Type["Worker"]] = MTurkWorker AgentClass: ClassVar[Type["Agent"]] = MTurkAgent ArgsClass = MTurkProviderArgs def initialize_provider_datastore(self, storage_path: str) -> Any: """ MTurk itself is the source of truth for most data required to run tasks on MTurk. The datastore holds sessions to connect with MTurk as well as mappings between MTurk ids and Mephisto ids """ return MTurkDatastore(datastore_root=storage_path) def _get_client(self, requester_name: str) -> Any: """ Get an mturk client for usage with mturk_utils """ return self.datastore.get_client_for_requester(requester_name) def setup_resources_for_task_run( self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState", server_url: str, ) -> None: """Produce the HIT type for this task run.""" requester = cast("MTurkRequester", task_run.get_requester()) session = self.datastore.get_session_for_requester(requester._requester_name) task_run_id = task_run.db_id # Set up HIT config config_dir = os.path.join(self.datastore.datastore_root, task_run_id) task_args = args.task # Find or create relevant qualifications qualifications = [] for qualification in shared_state.qualifications: applicable_providers = qualification["applicable_providers"] if ( applicable_providers is None or self.PROVIDER_TYPE in applicable_providers ): qualifications.append(qualification) for qualification in qualifications: qualification_name = qualification["qualification_name"] if requester.PROVIDER_TYPE == "mturk_sandbox": qualification_name += "_sandbox" if self.datastore.get_qualification_mapping(qualification_name) is None: qualification[ "QualificationTypeId" ] = requester._create_new_mturk_qualification(qualification_name) if hasattr(shared_state, "mturk_specific_qualifications"): # TODO(OWN) standardize provider-specific qualifications qualifications += shared_state.mturk_specific_qualifications # type: ignore # Set up HIT type client = self._get_client(requester._requester_name) hit_type_id = create_hit_type(client, task_args, qualifications) frame_height = ( task_run.get_blueprint().get_frontend_args().get("frame_height", 0) ) self.datastore.register_run(task_run_id, hit_type_id, config_dir, frame_height) def cleanup_resources_from_task_run( self, task_run: "TaskRun", server_url: str ) -> None: """No cleanup necessary for task type""" pass @classmethod def get_wrapper_js_path(cls): """ Return the path to the `wrap_crowd_source.js` file for this provider to be deployed to the server """ return os.path.join(os.path.dirname(__file__), "wrap_crowd_source.js") def cleanup_qualification(self, qualification_name: str) -> None: """Remove the qualification from the sandbox server, if it exists""" mapping = self.datastore.get_qualification_mapping(qualification_name) if mapping is None: return None requester_id = mapping["requester_id"] requester = Requester.get(self.db, requester_id) assert isinstance(requester, MTurkRequester), "Must be an mturk requester" client = requester._get_client(requester._requester_name) delete_qualification(client, mapping["mturk_qualification_id"])
Implementation of a crowdprovider that interfaces with MTurk
View Source
def initialize_provider_datastore(self, storage_path: str) -> Any: """ MTurk itself is the source of truth for most data required to run tasks on MTurk. The datastore holds sessions to connect with MTurk as well as mappings between MTurk ids and Mephisto ids """ return MTurkDatastore(datastore_root=storage_path)
MTurk itself is the source of truth for most data required to run tasks on MTurk. The datastore holds sessions to connect with MTurk as well as mappings between MTurk ids and Mephisto ids
View Source
def setup_resources_for_task_run( self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState", server_url: str, ) -> None: """Produce the HIT type for this task run.""" requester = cast("MTurkRequester", task_run.get_requester()) session = self.datastore.get_session_for_requester(requester._requester_name) task_run_id = task_run.db_id # Set up HIT config config_dir = os.path.join(self.datastore.datastore_root, task_run_id) task_args = args.task # Find or create relevant qualifications qualifications = [] for qualification in shared_state.qualifications: applicable_providers = qualification["applicable_providers"] if ( applicable_providers is None or self.PROVIDER_TYPE in applicable_providers ): qualifications.append(qualification) for qualification in qualifications: qualification_name = qualification["qualification_name"] if requester.PROVIDER_TYPE == "mturk_sandbox": qualification_name += "_sandbox" if self.datastore.get_qualification_mapping(qualification_name) is None: qualification[ "QualificationTypeId" ] = requester._create_new_mturk_qualification(qualification_name) if hasattr(shared_state, "mturk_specific_qualifications"): # TODO(OWN) standardize provider-specific qualifications qualifications += shared_state.mturk_specific_qualifications # type: ignore # Set up HIT type client = self._get_client(requester._requester_name) hit_type_id = create_hit_type(client, task_args, qualifications) frame_height = ( task_run.get_blueprint().get_frontend_args().get("frame_height", 0) ) self.datastore.register_run(task_run_id, hit_type_id, config_dir, frame_height)
Produce the HIT type for this task run.
View Source
def cleanup_resources_from_task_run( self, task_run: "TaskRun", server_url: str ) -> None: """No cleanup necessary for task type""" pass
No cleanup necessary for task type
View Source
@classmethod def get_wrapper_js_path(cls): """ Return the path to the `wrap_crowd_source.js` file for this provider to be deployed to the server """ return os.path.join(os.path.dirname(__file__), "wrap_crowd_source.js")
Return the path to the wrap_crowd_source.js
file for this
provider to be deployed to the server
View Source
def cleanup_qualification(self, qualification_name: str) -> None: """Remove the qualification from the sandbox server, if it exists""" mapping = self.datastore.get_qualification_mapping(qualification_name) if mapping is None: return None requester_id = mapping["requester_id"] requester = Requester.get(self.db, requester_id) assert isinstance(requester, MTurkRequester), "Must be an mturk requester" client = requester._get_client(requester._requester_name) delete_qualification(client, mapping["mturk_qualification_id"])
Remove the qualification from the sandbox server, if it exists
View Source
class MTurkUnit(Unit): """ This class tracks the status of an individual worker's contribution to a higher level assignment. It is the smallest 'unit' of work to complete the assignment, and this class is only responsible for checking the status of that work itself being done. """ # Ensure inherited methods use this level's provider type PROVIDER_TYPE = PROVIDER_TYPE def __init__( self, db: "MephistoDB", db_id: str, row: Optional[Mapping[str, Any]] = None, _used_new_call: bool = False, ): super().__init__(db, db_id, row=row, _used_new_call=_used_new_call) self.datastore: "MTurkDatastore" = self.db.get_datastore_for_provider( self.PROVIDER_TYPE ) self.hit_id: Optional[str] = None self._last_sync_time = 0.0 self._sync_hit_mapping() self.__requester: Optional["MTurkRequester"] = None def _get_client(self, requester_name: str) -> Any: """ Get an mturk client for usage with mturk_utils """ return self.datastore.get_client_for_requester(requester_name) def _sync_hit_mapping(self) -> None: """Sync with the datastore to see if any mappings have updated""" if self.datastore.is_hit_mapping_in_sync(self.db_id, self._last_sync_time): return try: mapping = dict(self.datastore.get_hit_mapping(self.db_id)) self.hit_id = mapping["hit_id"] self.mturk_assignment_id = mapping.get("assignment_id") self.assignment_time_in_seconds = mapping.get("assignment_time_in_seconds") except IndexError: # HIT does not appear to exist self.hit_id = None self.mturk_assignment_id = None self.assignment_time_in_seconds = -1 # We update to a time slightly earlier than now, in order # to reduce the risk of a race condition caching an old # value the moment it's registered self._last_sync_time = time.monotonic() - 1 def register_from_provider_data( self, hit_id: str, mturk_assignment_id: str ) -> None: """Update the datastore and local information from this registration""" self.datastore.register_assignment_to_hit( hit_id, self.db_id, mturk_assignment_id ) self._sync_hit_mapping() def get_mturk_assignment_id(self) -> Optional[str]: """ Return the MTurk assignment id associated with this unit """ self._sync_hit_mapping() return self.mturk_assignment_id def get_mturk_hit_id(self) -> Optional[str]: """ Return the MTurk hit id associated with this unit """ self._sync_hit_mapping() return self.hit_id def get_requester(self) -> "MTurkRequester": """Wrapper around regular Requester as this will be MTurkRequesters""" if self.__requester is None: self.__requester = cast("MTurkRequester", super().get_requester()) return self.__requester def set_db_status(self, status: str) -> None: """ Set the status reflected in the database for this Unit """ super().set_db_status(status) if status == AssignmentState.COMPLETED: agent = cast("MTurkAgent", self.get_assigned_agent()) if agent is not None: agent_status = agent.get_status() if agent_status == AgentState.STATUS_IN_TASK: # Oh no, MTurk has completed the unit, but we don't have # the data. We need to reconcile logger.warning( f"Unit {self} moved to completed, but the agent didn't... " f"Attempting to reconcile with MTurk directly" ) try: hit_id = self.get_mturk_hit_id() assert ( hit_id is not None ), f"This unit does not have an ID! {self}" agent.attempt_to_reconcile_submitted_data(hit_id) except Exception as e: logger.warning( f"Was not able to reconcile due to an error, {e}. " f"You may need to reconcile this specific Agent manually " f"after the task is completed. See here for details: " f"https://github.com/facebookresearch/Mephisto/pull/442" ) elif agent_status == AgentState.STATUS_TIMEOUT: # Oh no, this is also bad. we shouldn't be completing for a timed out agent. logger.warning( "Found a timeout that's trying to be pushed to completed with a timed out agent" ) pass else: logger.warning(f"No agent found for completed unit {self}...") def clear_assigned_agent(self) -> None: """ Additionally to clearing the agent, we also need to dissociate the hit_id from this unit in the MTurkDatastore """ if self.db_status == AssignmentState.COMPLETED: logger.warning( f"Clearing an agent when COMPLETED, it's likely a submit happened " f"but could not be received by the Mephisto backend. This " f"assignment clear is thus being ignored, but this message " f"is indicative of some data loss. " ) # TODO(OWN) how can we reconcile missing data here? Marking this agent as # COMPLETED will pollute the data, but not marking it means that # it will have to be the auto-approve deadline. return super().clear_assigned_agent() mturk_hit_id = self.get_mturk_hit_id() if mturk_hit_id is not None: self.datastore.clear_hit_from_unit(self.db_id) self._sync_hit_mapping() if self.db_status == AssignmentState.ASSIGNED: self.set_db_status(AssignmentState.LAUNCHED) # Required Unit functions def get_status(self) -> str: """Get status for this unit directly from MTurk, fall back on local info""" if self.db_status == AssignmentState.CREATED: return super().get_status() elif self.db_status in [ AssignmentState.ACCEPTED, AssignmentState.EXPIRED, AssignmentState.SOFT_REJECTED, ]: # These statuses don't change with a get_status call return self.db_status if self.db_status in [AssignmentState.COMPLETED, AssignmentState.REJECTED]: # These statuses only change on agent dependent changes agent = self.get_assigned_agent() found_status = self.db_status if agent is not None: agent_status = agent.get_status() if agent_status == AgentState.STATUS_APPROVED: found_status = AssignmentState.ACCEPTED elif agent_status == AgentState.STATUS_REJECTED: found_status = AssignmentState.REJECTED elif agent_status == AgentState.STATUS_SOFT_REJECTED: found_status = AssignmentState.SOFT_REJECTED else: logger.warning(f"Agent for unit {self} is None") if found_status != self.db_status: self.set_db_status(found_status) return self.db_status # Remaining statuses are tracking a live HIT mturk_hit_id = self.get_mturk_hit_id() if mturk_hit_id is None: # If the hit_id is None and there's an agent still assigned, # then that agent has timed out and we should expire agent = self.get_assigned_agent() if agent is not None: if agent.get_status() != AgentState.STATUS_EXPIRED: agent.update_status(AgentState.STATUS_EXPIRED) # Can't determine anything else if there is no HIT on this unit return self.db_status requester = self.get_requester() client = self._get_client(requester._requester_name) hit = get_hit(client, mturk_hit_id) if hit is None: return AssignmentState.EXPIRED hit_data = hit["HIT"] local_status = self.db_status external_status = self.db_status if hit_data["HITStatus"] == "Assignable": external_status = AssignmentState.LAUNCHED elif hit_data["HITStatus"] == "Unassignable": external_status = AssignmentState.ASSIGNED elif hit_data["HITStatus"] in ["Reviewable", "Reviewing"]: external_status = AssignmentState.COMPLETED if hit_data["NumberOfAssignmentsAvailable"] != 0: external_status = AssignmentState.EXPIRED elif hit_data["HITStatus"] == "Disposed": # The HIT was deleted, must rely on what we have external_status = local_status else: raise Exception(f"Unexpected HIT status {hit_data['HITStatus']}") if external_status != local_status: if local_status == AssignmentState.ASSIGNED and external_status in [ AssignmentState.LAUNCHED, AssignmentState.EXPIRED, ]: # Treat this as a return event, this hit may be doable by someone else agent = self.get_assigned_agent() if agent is not None and agent.get_status() in [ AgentState.STATUS_IN_TASK, AgentState.STATUS_ONBOARDING, AgentState.STATUS_WAITING, AgentState.STATUS_PARTNER_DISCONNECT, ]: # mark the in-task agent as having returned the HIT, to # free any running tasks and have Blueprint decide on cleanup. agent.update_status(AgentState.STATUS_RETURNED) if external_status == AssignmentState.EXPIRED: # If we're expired, then it won't be doable, and we should update self.set_db_status(external_status) else: self.set_db_status(external_status) return self.db_status def launch(self, task_url: str) -> None: """Create this HIT on MTurk (making it availalbe) and register the ids in the local db""" task_run = self.get_assignment().get_task_run() duration = task_run.get_task_args().assignment_duration_in_seconds run_id = task_run.db_id run_details = self.datastore.get_run(run_id) hit_type_id = run_details["hit_type_id"] requester = self.get_requester() client = self._get_client(requester._requester_name) frame_height = run_details["frame_height"] hit_link, hit_id, response = create_hit_with_hit_type( client, frame_height, task_url, hit_type_id ) # TODO(OWN) get this link to the mephisto frontend print(hit_link) # We create a hit for this unit, but note that this unit may not # necessarily match with the same HIT that was launched for it. self.datastore.new_hit(hit_id, hit_link, duration, run_id) self.set_db_status(AssignmentState.LAUNCHED) return None def expire(self) -> float: """ Send a request to expire the HIT, and if it's not assigned return, otherwise just return the maximum assignment duration """ delay = 0 status = self.get_status() if status in [AssignmentState.EXPIRED, AssignmentState.COMPLETED]: return delay if status == AssignmentState.ASSIGNED: # The assignment is currently being worked on, # so we will set the wait time to be the # amount of time we granted for working on this assignment if self.assignment_time_in_seconds is not None: delay = self.assignment_time_in_seconds logger.debug(f"Expiring a unit that is ASSIGNED after delay {delay}") mturk_hit_id = self.get_mturk_hit_id() requester = self.get_requester() client = self._get_client(requester._requester_name) if mturk_hit_id is not None: expire_hit(client, mturk_hit_id) return delay else: unassigned_hit_ids = self.datastore.get_unassigned_hit_ids(self.task_run_id) if len(unassigned_hit_ids) == 0: self.set_db_status(AssignmentState.EXPIRED) return delay hit_id = unassigned_hit_ids[0] expire_hit(client, hit_id) self.datastore.register_assignment_to_hit(hit_id, self.db_id) self.set_db_status(AssignmentState.EXPIRED) return delay def is_expired(self) -> bool: """ Determine if this unit is expired as according to the vendor. In this case, we keep track of the expiration locally by refreshing the hit's status and seeing if we've expired. """ return self.get_status() == AssignmentState.EXPIRED @staticmethod def new( db: "MephistoDB", assignment: "Assignment", index: int, pay_amount: float ) -> "Unit": """Create a Unit for the given assignment""" return MTurkUnit._register_unit( db, assignment, index, pay_amount, PROVIDER_TYPE ) def __repr__(self) -> str: return f"{self.__class__.__name__}({self.db_id}, {self.get_mturk_hit_id()}, {self.db_status})"
This class tracks the status of an individual worker's contribution to a higher level assignment. It is the smallest 'unit' of work to complete the assignment, and this class is only responsible for checking the status of that work itself being done.
Inherited Members
View Source
class MTurkRequester(Requester): """ Wrapper for requester behavior as provided by MTurk. Makes all requests directly to MTurk through boto3. """ # Ensure inherited methods use this level's provider type PROVIDER_TYPE = PROVIDER_TYPE ArgsClass = MTurkRequesterArgs def __init__( self, db: "MephistoDB", db_id: str, row: Optional[Mapping[str, Any]] = None, _used_new_call: bool = False, ): super().__init__(db, db_id, row=row, _used_new_call=_used_new_call) self.datastore: "MTurkDatastore" = self.db.get_datastore_for_provider( self.PROVIDER_TYPE ) # Use _requester_name to preserve sandbox behavior which # utilizes a different requester_name self._requester_name = self.requester_name def _get_client(self, requester_name: str) -> Any: """ Get an mturk client for usage with mturk_utils """ return self.datastore.get_client_for_requester(requester_name) # Required functions for a Requester implementation def register(self, args: Optional[DictConfig] = None) -> None: """ Register this requester with the crowd provider by providing any required credentials or such. If no args are provided, assume the registration is already made and try to assert it as such. """ for req_field in ["access_key_id", "secret_access_key"]: if args is not None and req_field not in args: raise Exception( f'Missing IAM "{req_field}" in requester registration args' ) setup_aws_credentials(self._requester_name, args) def is_registered(self) -> bool: """Return whether or not this requester has registered yet""" return check_aws_credentials(self._requester_name) def get_available_budget(self) -> float: """Get the available budget from MTurk""" client = self._get_client(self._requester_name) return get_requester_balance(client) def _create_new_mturk_qualification(self, qualification_name: str) -> str: """ Create a new qualification on MTurk owned by the requester provided """ client = self._get_client(self._requester_name) qualification_desc = f"Equivalent qualification for {qualification_name}." use_qualification_name = qualification_name qualification_id = find_or_create_mturk_qualification( client, qualification_name, qualification_desc, must_be_owned=True ) if qualification_id is None: # Try to append time to make the qualification unique use_qualification_name = f"{qualification_name}_{time.time()}" qualification_id = find_or_create_mturk_qualification( client, use_qualification_name, qualification_desc, must_be_owned=True ) attempts = 0 while qualification_id is None: # Append something somewhat random use_qualification_name = f"{qualification_name}_{str(uuid4())}" qualification_id = find_or_create_mturk_qualification( client, use_qualification_name, qualification_desc, must_be_owned=True, ) attempts += 1 if attempts > MAX_QUALIFICATION_ATTEMPTS: raise Exception( "Something has gone extremely wrong with creating qualification " f"{qualification_name} for requester {self.requester_name}" ) # Store the new qualification in the datastore self.datastore.create_qualification_mapping( qualification_name, self.db_id, use_qualification_name, qualification_id ) return qualification_id @staticmethod def new(db: "MephistoDB", requester_name: str) -> "Requester": return MTurkRequester._register_requester(db, requester_name, PROVIDER_TYPE)
Wrapper for requester behavior as provided by MTurk. Makes all requests directly to MTurk through boto3.
Inherited Members
View Source
class MTurkWorker(Worker): """ This class represents an individual - namely a person. It maintains components of ongoing identity for a user. """ # Ensure inherited methods use this level's provider type PROVIDER_TYPE = PROVIDER_TYPE def __init__( self, db: "MephistoDB", db_id: str, row: Optional[Mapping[str, Any]] = None, _used_new_call: bool = False, ): super().__init__(db, db_id, row=row, _used_new_call=_used_new_call) self.datastore: "MTurkDatastore" = self.db.get_datastore_for_provider( self.PROVIDER_TYPE ) self._worker_name = self.worker_name # sandbox workers use a different name @classmethod def get_from_mturk_worker_id( cls, db: "MephistoDB", mturk_worker_id: str ) -> Optional["MTurkWorker"]: """Get the MTurkWorker from the given worker_id""" if cls.PROVIDER_TYPE != PROVIDER_TYPE: mturk_worker_id += "_sandbox" workers = db.find_workers( worker_name=mturk_worker_id, provider_type=cls.PROVIDER_TYPE ) if len(workers) == 0: logger.warning( f"Could not find a Mephisto Worker for mturk_id {mturk_worker_id}" ) return None return cast("MTurkWorker", workers[0]) def get_mturk_worker_id(self): return self._worker_name def _get_client(self, requester_name: str) -> Any: """ Get an mturk client for usage with mturk_utils """ return self.datastore.get_client_for_requester(requester_name) def grant_crowd_qualification( self, qualification_name: str, value: int = 1 ) -> None: """ Grant a qualification by the given name to this worker. Check the local MTurk db to find the matching MTurk qualification to grant, and pass that. If no qualification exists, try to create one. In creating a new qualification, Mephisto resolves the ambiguity over which requester to associate that qualification with by using the FIRST requester of the given account type (either `mturk` or `mturk_sandbox`) """ mturk_qual_details = self.datastore.get_qualification_mapping( qualification_name ) if mturk_qual_details is not None: requester = Requester.get(self.db, mturk_qual_details["requester_id"]) qualification_id = mturk_qual_details["mturk_qualification_id"] else: target_type = ( "mturk_sandbox" if qualification_name.endswith("sandbox") else "mturk" ) requester = self.db.find_requesters(provider_type=target_type)[-1] assert isinstance( requester, MTurkRequester ), "find_requesters must return mturk requester for given provider types" qualification_id = requester._create_new_mturk_qualification( qualification_name ) assert isinstance( requester, MTurkRequester ), "Must be an MTurk requester for MTurk quals" client = self._get_client(requester._requester_name) give_worker_qualification( client, self.get_mturk_worker_id(), qualification_id, value ) return None def revoke_crowd_qualification(self, qualification_name: str) -> None: """ Revoke the qualification by the given name from this worker. Check the local MTurk db to find the matching MTurk qualification to revoke, pass if no such qualification exists. """ mturk_qual_details = self.datastore.get_qualification_mapping( qualification_name ) if mturk_qual_details is None: logger.error( f"No locally stored MTurk qualification to revoke for name {qualification_name}" ) return None requester = Requester.get(self.db, mturk_qual_details["requester_id"]) assert isinstance( requester, MTurkRequester ), "Must be an MTurk requester from MTurk quals" client = self._get_client(requester._requester_name) qualification_id = mturk_qual_details["mturk_qualification_id"] remove_worker_qualification( client, self.get_mturk_worker_id(), qualification_id ) return None def bonus_worker( self, amount: float, reason: str, unit: Optional["Unit"] = None ) -> Tuple[bool, str]: """Bonus this worker for work any reason. Return tuple of success and failure message""" if unit is None: # TODO(#652) implement. The content in scripts/mturk/launch_makeup_hits.py # may prove useful for this. return False, "bonusing via compensation tasks not yet available" unit = cast("MTurkUnit", unit) requester = cast( "MTurkRequester", unit.get_assignment().get_task_run().get_requester() ) client = self._get_client(requester._requester_name) mturk_assignment_id = unit.get_mturk_assignment_id() assert mturk_assignment_id is not None, "Cannot bonus for a unit with no agent" pay_bonus( client, self._worker_name, amount, mturk_assignment_id, reason, str(uuid4()) ) return True, "" def block_worker( self, reason: str, unit: Optional["Unit"] = None, requester: Optional["Requester"] = None, ) -> Tuple[bool, str]: """Block this worker for a specified reason. Return success of block""" if unit is None and requester is None: # TODO(WISH) soft block from all requesters? Maybe have the main # requester soft block? return ( False, "Blocking without a unit or requester not yet supported for MTurkWorkers", ) elif unit is not None and requester is None: requester = unit.get_assignment().get_task_run().get_requester() requester = cast("MTurkRequester", requester) client = self._get_client(requester._requester_name) block_worker(client, self._worker_name, reason) return True, "" def unblock_worker(self, reason: str, requester: "Requester") -> bool: """unblock a blocked worker for the specified reason. Return success of unblock""" requester = cast("MTurkRequester", requester) client = self._get_client(requester._requester_name) unblock_worker(client, self._worker_name, reason) return True def is_blocked(self, requester: "Requester") -> bool: """Determine if a worker is blocked""" requester = cast("MTurkRequester", requester) client = self._get_client(requester._requester_name) return is_worker_blocked(client, self._worker_name) def is_eligible(self, task_run: "TaskRun") -> bool: """ Qualifications are handled primarily by MTurk, so if a worker is able to get through to be able to access the task, they should be eligible """ return True @staticmethod def new(db: "MephistoDB", worker_id: str) -> "Worker": return MTurkWorker._register_worker(db, worker_id, PROVIDER_TYPE)
This class represents an individual - namely a person. It maintains components of ongoing identity for a user.
Inherited Members
View Source
class MTurkAgent(Agent): """ This class encompasses a worker as they are working on an individual assignment. It maintains details for the current task at hand such as start and end time, connection status, etc. """ # Ensure inherited methods use this level's provider type PROVIDER_TYPE = PROVIDER_TYPE def __init__( self, db: "MephistoDB", db_id: str, row: Optional[Mapping[str, Any]] = None, _used_new_call: bool = False, ): super().__init__(db, db_id, row=row, _used_new_call=_used_new_call) self.datastore: "MTurkDatastore" = self.db.get_datastore_for_provider( self.PROVIDER_TYPE ) unit: "MTurkUnit" = cast("MTurkUnit", self.get_unit()) self.mturk_assignment_id = unit.get_mturk_assignment_id() def _get_mturk_assignment_id(self): if self.mturk_assignment_id is None: self.mturk_assignment_id = self.get_unit().get_mturk_assignment_id() return self.mturk_assignment_id def _get_client(self) -> Any: """ Get an mturk client for usage with mturk_utils for this agent """ unit = self.get_unit() requester: "MTurkRequester" = cast("MTurkRequester", unit.get_requester()) return self.datastore.get_client_for_requester(requester._requester_name) @classmethod def new_from_provider_data( cls, db: "MephistoDB", worker: "Worker", unit: "Unit", provider_data: Dict[str, Any], ) -> "Agent": """ Wrapper around the new method that allows registering additional bookkeeping information from a crowd provider for this agent """ from mephisto.abstractions.providers.mturk.mturk_unit import MTurkUnit assert isinstance( unit, MTurkUnit ), "Can only register mturk agents to mturk units" unit.register_from_provider_data( provider_data["hit_id"], provider_data["assignment_id"] ) return super().new_from_provider_data(db, worker, unit, provider_data) def attempt_to_reconcile_submitted_data(self, mturk_hit_id: str): """ Hacky attempt to load the data directly from MTurk to handle data submitted that we missed somehow. Chance of failure is certainly non-zero. """ client = self._get_client() assignment = get_assignments_for_hit(client, mturk_hit_id)[0] xml_data = xmltodict.parse(assignment["Answer"]) paired_data = json.loads(json.dumps(xml_data["QuestionFormAnswers"]["Answer"])) parsed_data = { entry["QuestionIdentifier"]: entry["FreeText"] for entry in paired_data } parsed_data["MEPHISTO_MTURK_RECONCILED"] = True self.handle_submit(parsed_data) # Required functions for Agent Interface def approve_work(self) -> None: """Approve the work done on this specific Unit""" if self.get_status() == AgentState.STATUS_APPROVED: logger.info(f"Approving already approved agent {self}, skipping") return client = self._get_client() approve_work(client, self._get_mturk_assignment_id(), override_rejection=True) self.update_status(AgentState.STATUS_APPROVED) def reject_work(self, reason) -> None: """Reject the work done on this specific Unit""" if self.get_status() == AgentState.STATUS_APPROVED: logger.warning(f"Cannot reject {self}, it is already approved") return client = self._get_client() reject_work(client, self._get_mturk_assignment_id(), reason) self.update_status(AgentState.STATUS_REJECTED) def mark_done(self) -> None: """ MTurk agents are marked as done on the side of MTurk, so if this agent is marked as done there's nothing else we need to do as the task has been submitted. """ if self.get_status() != AgentState.STATUS_DISCONNECT: self.db.update_agent( agent_id=self.db_id, status=AgentState.STATUS_COMPLETED ) @staticmethod def new(db: "MephistoDB", worker: "Worker", unit: "Unit") -> "Agent": """Create an agent for this worker to be used for work on the given Unit.""" return MTurkAgent._register_agent(db, worker, unit, PROVIDER_TYPE)
This class encompasses a worker as they are working on an individual assignment. It maintains details for the current task at hand such as start and end time, connection status, etc.
Inherited Members
View Source
class MTurkProviderArgs(ProviderArgs): """Provider args for an MTurk provider""" _provider_type: str = PROVIDER_TYPE
Provider args for an MTurk provider