mephisto.abstractions.providers.mturk_sandbox.sandbox_mturk_unit
View Source
#!/usr/bin/env python3 # Copyright (c) Meta Platforms and its affiliates. # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. from datetime import datetime from mephisto.abstractions.providers.mturk.mturk_unit import MTurkUnit from mephisto.abstractions.providers.mturk_sandbox.provider_type import PROVIDER_TYPE from typing import Any, TYPE_CHECKING if TYPE_CHECKING: from mephisto.data_model.unit import Unit from mephisto.abstractions.database import MephistoDB from mephisto.data_model.assignment import Assignment class SandboxMTurkUnit(MTurkUnit): """ This class tracks the status of an individual worker's contribution to a higher level assignment. It is the smallest 'unit' of work to complete the assignment, and this class is only responsible for checking the status of that work itself being done. """ # Ensure inherited methods use this level's provider type PROVIDER_TYPE = PROVIDER_TYPE def _get_client(self, requester_name: str) -> Any: """ Get an mturk client for usage with mturk_utils """ return self.datastore.get_sandbox_client_for_requester(requester_name) @staticmethod def new( db: "MephistoDB", assignment: "Assignment", index: int, pay_amount: float ) -> "Unit": """Create a Unit for the given assignment""" return SandboxMTurkUnit._register_unit( db, assignment, index, pay_amount, PROVIDER_TYPE )
View Source
class SandboxMTurkUnit(MTurkUnit): """ This class tracks the status of an individual worker's contribution to a higher level assignment. It is the smallest 'unit' of work to complete the assignment, and this class is only responsible for checking the status of that work itself being done. """ # Ensure inherited methods use this level's provider type PROVIDER_TYPE = PROVIDER_TYPE def _get_client(self, requester_name: str) -> Any: """ Get an mturk client for usage with mturk_utils """ return self.datastore.get_sandbox_client_for_requester(requester_name) @staticmethod def new( db: "MephistoDB", assignment: "Assignment", index: int, pay_amount: float ) -> "Unit": """Create a Unit for the given assignment""" return SandboxMTurkUnit._register_unit( db, assignment, index, pay_amount, PROVIDER_TYPE )
This class tracks the status of an individual worker's contribution to a higher level assignment. It is the smallest 'unit' of work to complete the assignment, and this class is only responsible for checking the status of that work itself being done.
#  
@staticmethod
def
new(
db: mephisto.abstractions.database.MephistoDB,
assignment: mephisto.data_model.assignment.Assignment,
index: int,
pay_amount: float
) -> mephisto.data_model.unit.Unit:
View Source
@staticmethod def new( db: "MephistoDB", assignment: "Assignment", index: int, pay_amount: float ) -> "Unit": """Create a Unit for the given assignment""" return SandboxMTurkUnit._register_unit( db, assignment, index, pay_amount, PROVIDER_TYPE )
Create a Unit for the given assignment