mephisto.abstractions.providers.mturk.mturk_requester

View Source
#!/usr/bin/env python3

# Copyright (c) Meta Platforms and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from uuid import uuid4
import time
import random

from dataclasses import dataclass, field
from omegaconf import MISSING, DictConfig
from mephisto.data_model.requester import Requester, RequesterArgs
from mephisto.abstractions.providers.mturk.mturk_utils import (
    setup_aws_credentials,
    get_requester_balance,
    check_aws_credentials,
    find_or_create_qualification as find_or_create_mturk_qualification,
)
from mephisto.abstractions.providers.mturk.provider_type import PROVIDER_TYPE

from typing import List, Optional, Mapping, Dict, Any, TYPE_CHECKING

if TYPE_CHECKING:
    from mephisto.abstractions.database import MephistoDB
    from mephisto.data_model.task_run import TaskRun
    from mephisto.abstractions.providers.mturk.mturk_datastore import MTurkDatastore
    from argparse import _ArgumentGroup as ArgumentGroup


MAX_QUALIFICATION_ATTEMPTS = 300


@dataclass
class MTurkRequesterArgs(RequesterArgs):
    _group: str = field(
        default="MTurkRequester",
        metadata={
            "help": (
                "AWS is required to create a new Requester. "
                "Please create an IAM user with programmatic access and "
                "AmazonMechanicalTurkFullAccess policy at "
                'https://console.aws.amazon.com/iam/ (On the "Set permissions" '
                'page, choose "Attach existing policies directly" and then select '
                '"AmazonMechanicalTurkFullAccess" policy). After creating '
                "the IAM user, you should get an Access Key ID "
                "and Secret Access Key. "
            )
        },
    )
    access_key_id: str = field(
        default=MISSING, metadata={"required": True, "help": "IAM Access Key ID"}
    )
    secret_access_key: str = field(
        default=MISSING, metadata={"required": True, "help": "IAM Secret Access Key"}
    )


class MTurkRequester(Requester):
    """
    Wrapper for requester behavior as provided by MTurk. Makes
    all requests directly to MTurk through boto3.
    """

    # Ensure inherited methods use this level's provider type
    PROVIDER_TYPE = PROVIDER_TYPE
    ArgsClass = MTurkRequesterArgs

    def __init__(
        self,
        db: "MephistoDB",
        db_id: str,
        row: Optional[Mapping[str, Any]] = None,
        _used_new_call: bool = False,
    ):
        super().__init__(db, db_id, row=row, _used_new_call=_used_new_call)
        self.datastore: "MTurkDatastore" = self.db.get_datastore_for_provider(
            self.PROVIDER_TYPE
        )
        # Use _requester_name to preserve sandbox behavior which
        # utilizes a different requester_name
        self._requester_name = self.requester_name

    def _get_client(self, requester_name: str) -> Any:
        """
        Get an mturk client for usage with mturk_utils
        """
        return self.datastore.get_client_for_requester(requester_name)

    # Required functions for a Requester implementation

    def register(self, args: Optional[DictConfig] = None) -> None:
        """
        Register this requester with the crowd provider by providing any required credentials
        or such. If no args are provided, assume the registration is already made and try
        to assert it as such.
        """
        for req_field in ["access_key_id", "secret_access_key"]:
            if args is not None and req_field not in args:
                raise Exception(
                    f'Missing IAM "{req_field}" in requester registration args'
                )
        setup_aws_credentials(self._requester_name, args)

    def is_registered(self) -> bool:
        """Return whether or not this requester has registered yet"""
        return check_aws_credentials(self._requester_name)

    def get_available_budget(self) -> float:
        """Get the available budget from MTurk"""
        client = self._get_client(self._requester_name)
        return get_requester_balance(client)

    def _create_new_mturk_qualification(self, qualification_name: str) -> str:
        """
        Create a new qualification on MTurk owned by the requester provided
        """
        client = self._get_client(self._requester_name)
        qualification_desc = f"Equivalent qualification for {qualification_name}."
        use_qualification_name = qualification_name
        qualification_id = find_or_create_mturk_qualification(
            client, qualification_name, qualification_desc, must_be_owned=True
        )
        if qualification_id is None:
            # Try to append time to make the qualification unique
            use_qualification_name = f"{qualification_name}_{time.time()}"
            qualification_id = find_or_create_mturk_qualification(
                client, use_qualification_name, qualification_desc, must_be_owned=True
            )
            attempts = 0
            while qualification_id is None:
                # Append something somewhat random
                use_qualification_name = f"{qualification_name}_{str(uuid4())}"
                qualification_id = find_or_create_mturk_qualification(
                    client,
                    use_qualification_name,
                    qualification_desc,
                    must_be_owned=True,
                )
                attempts += 1
                if attempts > MAX_QUALIFICATION_ATTEMPTS:
                    raise Exception(
                        "Something has gone extremely wrong with creating qualification "
                        f"{qualification_name} for requester {self.requester_name}"
                    )
        # Store the new qualification in the datastore
        self.datastore.create_qualification_mapping(
            qualification_name, self.db_id, use_qualification_name, qualification_id
        )
        return qualification_id

    @staticmethod
    def new(db: "MephistoDB", requester_name: str) -> "Requester":
        return MTurkRequester._register_requester(db, requester_name, PROVIDER_TYPE)
View Source
class MTurkRequesterArgs(RequesterArgs):
    _group: str = field(
        default="MTurkRequester",
        metadata={
            "help": (
                "AWS is required to create a new Requester. "
                "Please create an IAM user with programmatic access and "
                "AmazonMechanicalTurkFullAccess policy at "
                'https://console.aws.amazon.com/iam/ (On the "Set permissions" '
                'page, choose "Attach existing policies directly" and then select '
                '"AmazonMechanicalTurkFullAccess" policy). After creating '
                "the IAM user, you should get an Access Key ID "
                "and Secret Access Key. "
            )
        },
    )
    access_key_id: str = field(
        default=MISSING, metadata={"required": True, "help": "IAM Access Key ID"}
    )
    secret_access_key: str = field(
        default=MISSING, metadata={"required": True, "help": "IAM Secret Access Key"}
    )

MTurkRequesterArgs(name: str = '???', _group: str = 'MTurkRequester', access_key_id: str = '???', secret_access_key: str = '???')

#   MTurkRequesterArgs( name: str = '???', _group: str = 'MTurkRequester', access_key_id: str = '???', secret_access_key: str = '???' )
#   access_key_id: str = '???'
#   secret_access_key: str = '???'
View Source
class MTurkRequester(Requester):
    """
    Wrapper for requester behavior as provided by MTurk. Makes
    all requests directly to MTurk through boto3.
    """

    # Ensure inherited methods use this level's provider type
    PROVIDER_TYPE = PROVIDER_TYPE
    ArgsClass = MTurkRequesterArgs

    def __init__(
        self,
        db: "MephistoDB",
        db_id: str,
        row: Optional[Mapping[str, Any]] = None,
        _used_new_call: bool = False,
    ):
        super().__init__(db, db_id, row=row, _used_new_call=_used_new_call)
        self.datastore: "MTurkDatastore" = self.db.get_datastore_for_provider(
            self.PROVIDER_TYPE
        )
        # Use _requester_name to preserve sandbox behavior which
        # utilizes a different requester_name
        self._requester_name = self.requester_name

    def _get_client(self, requester_name: str) -> Any:
        """
        Get an mturk client for usage with mturk_utils
        """
        return self.datastore.get_client_for_requester(requester_name)

    # Required functions for a Requester implementation

    def register(self, args: Optional[DictConfig] = None) -> None:
        """
        Register this requester with the crowd provider by providing any required credentials
        or such. If no args are provided, assume the registration is already made and try
        to assert it as such.
        """
        for req_field in ["access_key_id", "secret_access_key"]:
            if args is not None and req_field not in args:
                raise Exception(
                    f'Missing IAM "{req_field}" in requester registration args'
                )
        setup_aws_credentials(self._requester_name, args)

    def is_registered(self) -> bool:
        """Return whether or not this requester has registered yet"""
        return check_aws_credentials(self._requester_name)

    def get_available_budget(self) -> float:
        """Get the available budget from MTurk"""
        client = self._get_client(self._requester_name)
        return get_requester_balance(client)

    def _create_new_mturk_qualification(self, qualification_name: str) -> str:
        """
        Create a new qualification on MTurk owned by the requester provided
        """
        client = self._get_client(self._requester_name)
        qualification_desc = f"Equivalent qualification for {qualification_name}."
        use_qualification_name = qualification_name
        qualification_id = find_or_create_mturk_qualification(
            client, qualification_name, qualification_desc, must_be_owned=True
        )
        if qualification_id is None:
            # Try to append time to make the qualification unique
            use_qualification_name = f"{qualification_name}_{time.time()}"
            qualification_id = find_or_create_mturk_qualification(
                client, use_qualification_name, qualification_desc, must_be_owned=True
            )
            attempts = 0
            while qualification_id is None:
                # Append something somewhat random
                use_qualification_name = f"{qualification_name}_{str(uuid4())}"
                qualification_id = find_or_create_mturk_qualification(
                    client,
                    use_qualification_name,
                    qualification_desc,
                    must_be_owned=True,
                )
                attempts += 1
                if attempts > MAX_QUALIFICATION_ATTEMPTS:
                    raise Exception(
                        "Something has gone extremely wrong with creating qualification "
                        f"{qualification_name} for requester {self.requester_name}"
                    )
        # Store the new qualification in the datastore
        self.datastore.create_qualification_mapping(
            qualification_name, self.db_id, use_qualification_name, qualification_id
        )
        return qualification_id

    @staticmethod
    def new(db: "MephistoDB", requester_name: str) -> "Requester":
        return MTurkRequester._register_requester(db, requester_name, PROVIDER_TYPE)

Wrapper for requester behavior as provided by MTurk. Makes all requests directly to MTurk through boto3.

#   MTurkRequester( db: mephisto.abstractions.database.MephistoDB, db_id: str, row: Union[Mapping[str, Any], NoneType] = None, _used_new_call: bool = False )
View Source
    def __new__(
        cls,
        db: "MephistoDB",
        db_id: str,
        row: Optional[Mapping[str, Any]] = None,
        _used_new_call: bool = False,
    ) -> "Requester":
        """
        The new method is overridden to be able to automatically generate
        the expected Requester class without needing to specifically find it
        for a given db_id. As such it is impossible to create a base Requester
        as you will instead be returned the correct Requester class according to
        the crowdprovider associated with this Requester.
        """
        from mephisto.operations.registry import get_crowd_provider_from_type

        if cls == Requester:
            # We are trying to construct a Requester, find what type to use and
            # create that instead
            if row is None:
                row = db.get_requester(db_id)
            assert row is not None, f"Given db_id {db_id} did not exist in given db"
            correct_class = get_crowd_provider_from_type(
                row["provider_type"]
            ).RequesterClass
            return super().__new__(correct_class)
        else:
            # We are constructing another instance directly
            return super().__new__(cls)

The new method is overridden to be able to automatically generate the expected Requester class without needing to specifically find it for a given db_id. As such it is impossible to create a base Requester as you will instead be returned the correct Requester class according to the crowdprovider associated with this Requester.

#   PROVIDER_TYPE = 'mturk'
#   def register( self, args: Union[omegaconf.dictconfig.DictConfig, NoneType] = None ) -> None:
View Source
    def register(self, args: Optional[DictConfig] = None) -> None:
        """
        Register this requester with the crowd provider by providing any required credentials
        or such. If no args are provided, assume the registration is already made and try
        to assert it as such.
        """
        for req_field in ["access_key_id", "secret_access_key"]:
            if args is not None and req_field not in args:
                raise Exception(
                    f'Missing IAM "{req_field}" in requester registration args'
                )
        setup_aws_credentials(self._requester_name, args)

Register this requester with the crowd provider by providing any required credentials or such. If no args are provided, assume the registration is already made and try to assert it as such.

#   def is_registered(self) -> bool:
View Source
    def is_registered(self) -> bool:
        """Return whether or not this requester has registered yet"""
        return check_aws_credentials(self._requester_name)

Return whether or not this requester has registered yet

#   def get_available_budget(self) -> float:
View Source
    def get_available_budget(self) -> float:
        """Get the available budget from MTurk"""
        client = self._get_client(self._requester_name)
        return get_requester_balance(client)

Get the available budget from MTurk

#  
@staticmethod
def new( db: mephisto.abstractions.database.MephistoDB, requester_name: str ) -> mephisto.data_model.requester.Requester:
View Source
    @staticmethod
    def new(db: "MephistoDB", requester_name: str) -> "Requester":
        return MTurkRequester._register_requester(db, requester_name, PROVIDER_TYPE)

Try to create a new requester by this name, raise an exception if the name already exists.

Implementation should call _register_requester(db, requester_id) when sure the requester can be successfully created to have it put into the db and return the result.

#   class MTurkRequester.ArgsClass(mephisto.data_model.requester.RequesterArgs):
View Source
class MTurkRequesterArgs(RequesterArgs):
    _group: str = field(
        default="MTurkRequester",
        metadata={
            "help": (
                "AWS is required to create a new Requester. "
                "Please create an IAM user with programmatic access and "
                "AmazonMechanicalTurkFullAccess policy at "
                'https://console.aws.amazon.com/iam/ (On the "Set permissions" '
                'page, choose "Attach existing policies directly" and then select '
                '"AmazonMechanicalTurkFullAccess" policy). After creating '
                "the IAM user, you should get an Access Key ID "
                "and Secret Access Key. "
            )
        },
    )
    access_key_id: str = field(
        default=MISSING, metadata={"required": True, "help": "IAM Access Key ID"}
    )
    secret_access_key: str = field(
        default=MISSING, metadata={"required": True, "help": "IAM Secret Access Key"}
    )

MTurkRequesterArgs(name: str = '???', _group: str = 'MTurkRequester', access_key_id: str = '???', secret_access_key: str = '???')