mephisto.abstractions.blueprints.abstract.static_task.static_blueprint
View Source
#!/usr/bin/env python3 # Copyright (c) Meta Platforms and its affiliates. # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. from mephisto.abstractions.blueprint import ( Blueprint, BlueprintArgs, SharedTaskState, ) from mephisto.abstractions.blueprints.mixins.onboarding_required import ( OnboardingRequired, OnboardingSharedState, OnboardingRequiredArgs, ) from dataclasses import dataclass, field from omegaconf import MISSING, DictConfig from mephisto.data_model.assignment import InitializationData from mephisto.abstractions.blueprints.abstract.static_task.static_agent_state import ( StaticAgentState, ) from mephisto.abstractions.blueprints.abstract.static_task.static_task_runner import ( StaticTaskRunner, ) from mephisto.abstractions.blueprints.abstract.static_task.empty_task_builder import ( EmptyStaticTaskBuilder, ) from mephisto.operations.registry import register_mephisto_abstraction import os import time import csv import json import types from typing import ClassVar, List, Type, Any, Dict, Iterable, TYPE_CHECKING if TYPE_CHECKING: from mephisto.data_model.task_run import TaskRun from mephisto.abstractions.blueprint import ( AgentState, TaskRunner, TaskBuilder, OnboardingAgent, ) from mephisto.data_model.assignment import Assignment from mephisto.data_model.worker import Worker from mephisto.data_model.unit import Unit BLUEPRINT_TYPE_STATIC = "abstract_static" @dataclass class SharedStaticTaskState(OnboardingSharedState, SharedTaskState): static_task_data: Iterable[Any] = field( default_factory=list, metadata={ "help": ( "List or generator that returns dicts of task data. Generators can be " "used for tasks with lengths that aren't known at the start of a " "run, or are otherwise determined during the run. " ), "type": "Iterable[Dict[str, Any]]", "default": "[]", }, ) @dataclass class StaticBlueprintArgs(OnboardingRequiredArgs, BlueprintArgs): _blueprint_type: str = BLUEPRINT_TYPE_STATIC _group: str = field( default="StaticBlueprint", metadata={ "help": ( "Abstract Static Blueprints should not be launched manually, but " "include all tasks with units containing just one input and output " "of arbitrary data, with no live component. " ) }, ) units_per_assignment: int = field( default=1, metadata={"help": "How many workers you want to do each assignment"} ) extra_source_dir: str = field( default=MISSING, metadata={ "help": ( "Optional path to sources that the HTML may " "refer to (such as images/video/css/scripts)" ) }, ) data_json: str = field( default=MISSING, metadata={"help": "Path to JSON file containing task data"} ) data_jsonl: str = field( default=MISSING, metadata={"help": "Path to JSON-L file containing task data"} ) data_csv: str = field( default=MISSING, metadata={"help": "Path to csv file containing task data"} ) class StaticBlueprint(OnboardingRequired, Blueprint): """ Abstract blueprint for a task that runs without any extensive backend. These are generally one-off tasks sending data to the frontend and then awaiting a response. """ AgentStateClass: ClassVar[Type["AgentState"]] = StaticAgentState OnboardingAgentStateClass: ClassVar[Type["AgentState"]] = StaticAgentState TaskBuilderClass: ClassVar[Type["TaskBuilder"]] = EmptyStaticTaskBuilder TaskRunnerClass: ClassVar[Type["TaskRunner"]] = StaticTaskRunner ArgsClass: ClassVar[Type["BlueprintArgs"]] = StaticBlueprintArgs SharedStateClass = SharedStaticTaskState def __init__( self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedStaticTaskState", ): super().__init__(task_run, args, shared_state) # Originally just a list of dicts, but can also be a generator of dicts self._initialization_data_dicts: Iterable[Dict[str, Any]] = [] blue_args = args.blueprint if blue_args.get("data_csv", None) is not None: csv_file = os.path.expanduser(blue_args.data_csv) with open(csv_file, "r", encoding="utf-8-sig") as csv_fp: csv_reader = csv.reader(csv_fp) headers = next(csv_reader) for row in csv_reader: row_data = {} for i, col in enumerate(row): row_data[headers[i]] = col self._initialization_data_dicts.append(row_data) elif blue_args.get("data_json", None) is not None: json_file = os.path.expanduser(blue_args.data_json) with open(json_file, "r", encoding="utf-8-sig") as json_fp: json_data = json.load(json_fp) for jd in json_data: self._initialization_data_dicts.append(jd) elif blue_args.get("data_jsonl", None) is not None: jsonl_file = os.path.expanduser(blue_args.data_jsonl) with open(jsonl_file, "r", encoding="utf-8-sig") as jsonl_fp: line = jsonl_fp.readline() while line: j = json.loads(line) self._initialization_data_dicts.append(j) line = jsonl_fp.readline() elif shared_state.static_task_data is not None: self._initialization_data_dicts = shared_state.static_task_data else: # instantiating a version of the blueprint, but not necessarily needing the data pass @classmethod def assert_task_args(cls, args: DictConfig, shared_state: "SharedTaskState"): """Ensure that the data can be properly loaded""" super().assert_task_args(args, shared_state) assert isinstance( shared_state, SharedStaticTaskState ), "Must use SharedStaticTaskState for static blueprints" blue_args = args.blueprint if blue_args.get("data_csv", None) is not None: csv_file = os.path.expanduser(blue_args.data_csv) assert os.path.exists( csv_file ), f"Provided csv file {csv_file} doesn't exist" elif blue_args.get("data_json", None) is not None: json_file = os.path.expanduser(blue_args.data_json) assert os.path.exists( json_file ), f"Provided JSON file {json_file} doesn't exist" elif blue_args.get("data_jsonl", None) is not None: jsonl_file = os.path.expanduser(blue_args.data_jsonl) assert os.path.exists( jsonl_file ), f"Provided JSON-L file {jsonl_file} doesn't exist" elif shared_state.static_task_data is not None: if isinstance(shared_state.static_task_data, types.GeneratorType): # TODO(#97) can we check something about this? # Some discussion here: https://stackoverflow.com/questions/661603/how-do-i-know-if-a-generator-is-empty-from-the-start pass else: assert ( len([x for x in shared_state.static_task_data]) > 0 ), "Length of data dict provided was 0" else: raise AssertionError( "Must provide one of a data csv, json, json-L, or a list of tasks" ) def get_initialization_data(self) -> Iterable["InitializationData"]: """ Return the InitializationData retrieved from the specified stream """ if isinstance(self._initialization_data_dicts, types.GeneratorType): def data_generator() -> Iterable["InitializationData"]: for item in self._initialization_data_dicts: yield InitializationData( shared=item, unit_data=[{}] * self.args.blueprint.units_per_assignment, ) return data_generator() else: return [ InitializationData( shared=d, unit_data=[{}] * self.args.blueprint.units_per_assignment ) for d in self._initialization_data_dicts ]
View Source
class StaticBlueprintArgs(OnboardingRequiredArgs, BlueprintArgs): _blueprint_type: str = BLUEPRINT_TYPE_STATIC _group: str = field( default="StaticBlueprint", metadata={ "help": ( "Abstract Static Blueprints should not be launched manually, but " "include all tasks with units containing just one input and output " "of arbitrary data, with no live component. " ) }, ) units_per_assignment: int = field( default=1, metadata={"help": "How many workers you want to do each assignment"} ) extra_source_dir: str = field( default=MISSING, metadata={ "help": ( "Optional path to sources that the HTML may " "refer to (such as images/video/css/scripts)" ) }, ) data_json: str = field( default=MISSING, metadata={"help": "Path to JSON file containing task data"} ) data_jsonl: str = field( default=MISSING, metadata={"help": "Path to JSON-L file containing task data"} ) data_csv: str = field( default=MISSING, metadata={"help": "Path to csv file containing task data"} )
StaticBlueprintArgs(_blueprint_type: str = 'abstract_static', block_qualification: str = '???', onboarding_qualification: str = '???', _group: str = 'StaticBlueprint', units_per_assignment: int = 1, extra_source_dir: str = '???', data_json: str = '???', data_jsonl: str = '???', data_csv: str = '???')
View Source
class StaticBlueprint(OnboardingRequired, Blueprint): """ Abstract blueprint for a task that runs without any extensive backend. These are generally one-off tasks sending data to the frontend and then awaiting a response. """ AgentStateClass: ClassVar[Type["AgentState"]] = StaticAgentState OnboardingAgentStateClass: ClassVar[Type["AgentState"]] = StaticAgentState TaskBuilderClass: ClassVar[Type["TaskBuilder"]] = EmptyStaticTaskBuilder TaskRunnerClass: ClassVar[Type["TaskRunner"]] = StaticTaskRunner ArgsClass: ClassVar[Type["BlueprintArgs"]] = StaticBlueprintArgs SharedStateClass = SharedStaticTaskState def __init__( self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedStaticTaskState", ): super().__init__(task_run, args, shared_state) # Originally just a list of dicts, but can also be a generator of dicts self._initialization_data_dicts: Iterable[Dict[str, Any]] = [] blue_args = args.blueprint if blue_args.get("data_csv", None) is not None: csv_file = os.path.expanduser(blue_args.data_csv) with open(csv_file, "r", encoding="utf-8-sig") as csv_fp: csv_reader = csv.reader(csv_fp) headers = next(csv_reader) for row in csv_reader: row_data = {} for i, col in enumerate(row): row_data[headers[i]] = col self._initialization_data_dicts.append(row_data) elif blue_args.get("data_json", None) is not None: json_file = os.path.expanduser(blue_args.data_json) with open(json_file, "r", encoding="utf-8-sig") as json_fp: json_data = json.load(json_fp) for jd in json_data: self._initialization_data_dicts.append(jd) elif blue_args.get("data_jsonl", None) is not None: jsonl_file = os.path.expanduser(blue_args.data_jsonl) with open(jsonl_file, "r", encoding="utf-8-sig") as jsonl_fp: line = jsonl_fp.readline() while line: j = json.loads(line) self._initialization_data_dicts.append(j) line = jsonl_fp.readline() elif shared_state.static_task_data is not None: self._initialization_data_dicts = shared_state.static_task_data else: # instantiating a version of the blueprint, but not necessarily needing the data pass @classmethod def assert_task_args(cls, args: DictConfig, shared_state: "SharedTaskState"): """Ensure that the data can be properly loaded""" super().assert_task_args(args, shared_state) assert isinstance( shared_state, SharedStaticTaskState ), "Must use SharedStaticTaskState for static blueprints" blue_args = args.blueprint if blue_args.get("data_csv", None) is not None: csv_file = os.path.expanduser(blue_args.data_csv) assert os.path.exists( csv_file ), f"Provided csv file {csv_file} doesn't exist" elif blue_args.get("data_json", None) is not None: json_file = os.path.expanduser(blue_args.data_json) assert os.path.exists( json_file ), f"Provided JSON file {json_file} doesn't exist" elif blue_args.get("data_jsonl", None) is not None: jsonl_file = os.path.expanduser(blue_args.data_jsonl) assert os.path.exists( jsonl_file ), f"Provided JSON-L file {jsonl_file} doesn't exist" elif shared_state.static_task_data is not None: if isinstance(shared_state.static_task_data, types.GeneratorType): # TODO(#97) can we check something about this? # Some discussion here: https://stackoverflow.com/questions/661603/how-do-i-know-if-a-generator-is-empty-from-the-start pass else: assert ( len([x for x in shared_state.static_task_data]) > 0 ), "Length of data dict provided was 0" else: raise AssertionError( "Must provide one of a data csv, json, json-L, or a list of tasks" ) def get_initialization_data(self) -> Iterable["InitializationData"]: """ Return the InitializationData retrieved from the specified stream """ if isinstance(self._initialization_data_dicts, types.GeneratorType): def data_generator() -> Iterable["InitializationData"]: for item in self._initialization_data_dicts: yield InitializationData( shared=item, unit_data=[{}] * self.args.blueprint.units_per_assignment, ) return data_generator() else: return [ InitializationData( shared=d, unit_data=[{}] * self.args.blueprint.units_per_assignment ) for d in self._initialization_data_dicts ]
Abstract blueprint for a task that runs without any extensive backend. These are generally one-off tasks sending data to the frontend and then awaiting a response.
View Source
def __init__( self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedStaticTaskState", ): super().__init__(task_run, args, shared_state) # Originally just a list of dicts, but can also be a generator of dicts self._initialization_data_dicts: Iterable[Dict[str, Any]] = [] blue_args = args.blueprint if blue_args.get("data_csv", None) is not None: csv_file = os.path.expanduser(blue_args.data_csv) with open(csv_file, "r", encoding="utf-8-sig") as csv_fp: csv_reader = csv.reader(csv_fp) headers = next(csv_reader) for row in csv_reader: row_data = {} for i, col in enumerate(row): row_data[headers[i]] = col self._initialization_data_dicts.append(row_data) elif blue_args.get("data_json", None) is not None: json_file = os.path.expanduser(blue_args.data_json) with open(json_file, "r", encoding="utf-8-sig") as json_fp: json_data = json.load(json_fp) for jd in json_data: self._initialization_data_dicts.append(jd) elif blue_args.get("data_jsonl", None) is not None: jsonl_file = os.path.expanduser(blue_args.data_jsonl) with open(jsonl_file, "r", encoding="utf-8-sig") as jsonl_fp: line = jsonl_fp.readline() while line: j = json.loads(line) self._initialization_data_dicts.append(j) line = jsonl_fp.readline() elif shared_state.static_task_data is not None: self._initialization_data_dicts = shared_state.static_task_data else: # instantiating a version of the blueprint, but not necessarily needing the data pass
View Source
@classmethod def assert_task_args(cls, args: DictConfig, shared_state: "SharedTaskState"): """Ensure that the data can be properly loaded""" super().assert_task_args(args, shared_state) assert isinstance( shared_state, SharedStaticTaskState ), "Must use SharedStaticTaskState for static blueprints" blue_args = args.blueprint if blue_args.get("data_csv", None) is not None: csv_file = os.path.expanduser(blue_args.data_csv) assert os.path.exists( csv_file ), f"Provided csv file {csv_file} doesn't exist" elif blue_args.get("data_json", None) is not None: json_file = os.path.expanduser(blue_args.data_json) assert os.path.exists( json_file ), f"Provided JSON file {json_file} doesn't exist" elif blue_args.get("data_jsonl", None) is not None: jsonl_file = os.path.expanduser(blue_args.data_jsonl) assert os.path.exists( jsonl_file ), f"Provided JSON-L file {jsonl_file} doesn't exist" elif shared_state.static_task_data is not None: if isinstance(shared_state.static_task_data, types.GeneratorType): # TODO(#97) can we check something about this? # Some discussion here: https://stackoverflow.com/questions/661603/how-do-i-know-if-a-generator-is-empty-from-the-start pass else: assert ( len([x for x in shared_state.static_task_data]) > 0 ), "Length of data dict provided was 0" else: raise AssertionError( "Must provide one of a data csv, json, json-L, or a list of tasks" )
Ensure that the data can be properly loaded
View Source
def get_initialization_data(self) -> Iterable["InitializationData"]: """ Return the InitializationData retrieved from the specified stream """ if isinstance(self._initialization_data_dicts, types.GeneratorType): def data_generator() -> Iterable["InitializationData"]: for item in self._initialization_data_dicts: yield InitializationData( shared=item, unit_data=[{}] * self.args.blueprint.units_per_assignment, ) return data_generator() else: return [ InitializationData( shared=d, unit_data=[{}] * self.args.blueprint.units_per_assignment ) for d in self._initialization_data_dicts ]
Return the InitializationData retrieved from the specified stream
Inherited Members
View Source
class StaticAgentState(AgentState): """ Agent state for static tasks. """ def _get_empty_state(self) -> Dict[str, Optional[Dict[str, Any]]]: return { "inputs": None, "outputs": None, "times": {"task_start": 0, "task_end": 0}, } def __init__(self, agent: "Agent"): """ Static agent states should store input dict -> output dict pairs to disc """ self.agent = weakref.proxy(agent) self.state: Dict[str, Optional[Dict[str, Any]]] = self._get_empty_state() self.load_data() def set_init_state(self, data: Any) -> bool: """Set the initial state for this agent""" if self.get_init_state() is not None: # Initial state is already set return False else: self.state["inputs"] = data times_dict = self.state["times"] assert isinstance(times_dict, dict) times_dict["task_start"] = time.time() self.save_data() return True def get_init_state(self) -> Optional[Dict[str, Any]]: """ Return the initial state for this agent, None if no such state exists """ if self.state["inputs"] is None: return None return self.state["inputs"].copy() def load_data(self) -> None: """Load data for this agent from disk""" data_dir = self.agent.get_data_dir() data_path = os.path.join(data_dir, DATA_FILE) if os.path.exists(data_path): with open(data_path, "r") as data_file: self.state = json.load(data_file) else: self.state = self._get_empty_state() def get_data(self) -> Dict[str, Any]: """Return dict of this agent's state""" return self.state.copy() def save_data(self) -> None: """Save static agent data to disk""" data_dir = self.agent.get_data_dir() os.makedirs(data_dir, exist_ok=True) out_filename = os.path.join(data_dir, DATA_FILE) with open(out_filename, "w+") as data_file: json.dump(self.state, data_file) logger.info(f"SAVED_DATA_TO_DISC at {out_filename}") def update_data(self, live_update: Dict[str, Any]) -> None: """ Process the incoming data packet, and handle updating the state """ raise Exception("Static tasks should only have final act, but got live update") def update_submit(self, submission_data: Dict[str, Any]) -> None: """Move the submitted output to the local dict""" outputs: Dict[str, Any] output_files = submission_data.get("files") if output_files is not None: submission_data["files"] = [f["filename"] for f in submission_data["files"]] self.state["outputs"] = submission_data times_dict = self.state["times"] assert isinstance(times_dict, dict) times_dict["task_end"] = time.time() self.save_data() def get_task_start(self) -> Optional[float]: """ Extract out and return the start time recorded for this task. """ stored_times = self.state["times"] assert stored_times is not None return stored_times["task_start"] def get_task_end(self) -> Optional[float]: """ Extract out and return the end time recorded for this task. """ stored_times = self.state["times"] assert stored_times is not None return stored_times["task_end"]
Agent state for static tasks.
Inherited Members
- mephisto.abstractions.blueprints.abstract.static_task.static_agent_state.StaticAgentState
- StaticAgentState
- set_init_state
- get_init_state
- load_data
- get_data
- save_data
- update_data
- update_submit
- get_task_start
- get_task_end
- mephisto.abstractions._subcomponents.agent_state.AgentState
- STATUS_NONE
- STATUS_ACCEPTED
- STATUS_ONBOARDING
- STATUS_WAITING
- STATUS_IN_TASK
- STATUS_COMPLETED
- STATUS_DISCONNECT
- STATUS_TIMEOUT
- STATUS_PARTNER_DISCONNECT
- STATUS_EXPIRED
- STATUS_RETURNED
- STATUS_APPROVED
- STATUS_SOFT_REJECTED
- STATUS_REJECTED
- complete
- valid
- get_parsed_data
View Source
class StaticAgentState(AgentState): """ Agent state for static tasks. """ def _get_empty_state(self) -> Dict[str, Optional[Dict[str, Any]]]: return { "inputs": None, "outputs": None, "times": {"task_start": 0, "task_end": 0}, } def __init__(self, agent: "Agent"): """ Static agent states should store input dict -> output dict pairs to disc """ self.agent = weakref.proxy(agent) self.state: Dict[str, Optional[Dict[str, Any]]] = self._get_empty_state() self.load_data() def set_init_state(self, data: Any) -> bool: """Set the initial state for this agent""" if self.get_init_state() is not None: # Initial state is already set return False else: self.state["inputs"] = data times_dict = self.state["times"] assert isinstance(times_dict, dict) times_dict["task_start"] = time.time() self.save_data() return True def get_init_state(self) -> Optional[Dict[str, Any]]: """ Return the initial state for this agent, None if no such state exists """ if self.state["inputs"] is None: return None return self.state["inputs"].copy() def load_data(self) -> None: """Load data for this agent from disk""" data_dir = self.agent.get_data_dir() data_path = os.path.join(data_dir, DATA_FILE) if os.path.exists(data_path): with open(data_path, "r") as data_file: self.state = json.load(data_file) else: self.state = self._get_empty_state() def get_data(self) -> Dict[str, Any]: """Return dict of this agent's state""" return self.state.copy() def save_data(self) -> None: """Save static agent data to disk""" data_dir = self.agent.get_data_dir() os.makedirs(data_dir, exist_ok=True) out_filename = os.path.join(data_dir, DATA_FILE) with open(out_filename, "w+") as data_file: json.dump(self.state, data_file) logger.info(f"SAVED_DATA_TO_DISC at {out_filename}") def update_data(self, live_update: Dict[str, Any]) -> None: """ Process the incoming data packet, and handle updating the state """ raise Exception("Static tasks should only have final act, but got live update") def update_submit(self, submission_data: Dict[str, Any]) -> None: """Move the submitted output to the local dict""" outputs: Dict[str, Any] output_files = submission_data.get("files") if output_files is not None: submission_data["files"] = [f["filename"] for f in submission_data["files"]] self.state["outputs"] = submission_data times_dict = self.state["times"] assert isinstance(times_dict, dict) times_dict["task_end"] = time.time() self.save_data() def get_task_start(self) -> Optional[float]: """ Extract out and return the start time recorded for this task. """ stored_times = self.state["times"] assert stored_times is not None return stored_times["task_start"] def get_task_end(self) -> Optional[float]: """ Extract out and return the end time recorded for this task. """ stored_times = self.state["times"] assert stored_times is not None return stored_times["task_end"]
Agent state for static tasks.
Inherited Members
- mephisto.abstractions.blueprints.abstract.static_task.static_agent_state.StaticAgentState
- StaticAgentState
- set_init_state
- get_init_state
- load_data
- get_data
- save_data
- update_data
- update_submit
- get_task_start
- get_task_end
- mephisto.abstractions._subcomponents.agent_state.AgentState
- STATUS_NONE
- STATUS_ACCEPTED
- STATUS_ONBOARDING
- STATUS_WAITING
- STATUS_IN_TASK
- STATUS_COMPLETED
- STATUS_DISCONNECT
- STATUS_TIMEOUT
- STATUS_PARTNER_DISCONNECT
- STATUS_EXPIRED
- STATUS_RETURNED
- STATUS_APPROVED
- STATUS_SOFT_REJECTED
- STATUS_REJECTED
- complete
- valid
- get_parsed_data
View Source
class EmptyStaticTaskBuilder(TaskBuilder): """ Abstract class for a task builder for static tasks """ def build_in_dir(self, build_dir: str): """Build the frontend if it doesn't exist, then copy into the server directory""" raise AssertionError( "Classes that extend the abstract StaticBlueprint must define a custom " "TaskBuilder class that pulls the correct frontend together. Examples " "can be seen in the static_react_task and static_html_task folders. " "Note that extra static content will be provided in `args.blueprint.extra_source_dir` " )
Abstract class for a task builder for static tasks
Inherited Members
- mephisto.abstractions._subcomponents.task_builder.TaskBuilder
- TaskBuilder
View Source
class StaticTaskRunner(TaskRunner): """ Task runner for a static task Static tasks always assume single unit assignments, as only one person can work on them at a time """ def __init__( self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState" ): super().__init__(task_run, args, shared_state) self.is_concurrent = False self.assignment_duration_in_seconds = ( task_run.get_task_args().assignment_duration_in_seconds ) def get_init_data_for_agent(self, agent: "Agent") -> Dict[str, Any]: """ Return the data for an agent already assigned to a particular unit """ init_state = agent.state.get_init_state() if init_state is not None: # reconnecting agent, give what we've got return init_state else: assignment = agent.get_unit().get_assignment() assignment_data = self.get_data_for_assignment(assignment) agent.state.set_init_state(assignment_data.shared) return assignment_data.shared def run_onboarding(self, agent: "OnboardingAgent"): """ Static onboarding flows exactly like a regular task, waiting for the submit to come through """ agent.await_submit(self.assignment_duration_in_seconds) def cleanup_onboarding(self, agent: "OnboardingAgent"): """Nothing to clean up in a static onboarding""" return def run_unit(self, unit: "Unit", agent: "Agent") -> None: """ Static runners will get the task data, send it to the user, then wait for the agent to act (the data to be completed) """ agent.await_submit(self.assignment_duration_in_seconds) def cleanup_unit(self, unit: "Unit") -> None: """There is currently no cleanup associated with killing an incomplete task""" return
Task runner for a static task
Static tasks always assume single unit assignments, as only one person can work on them at a time
Inherited Members
- mephisto.abstractions.blueprints.abstract.static_task.static_task_runner.StaticTaskRunner
- StaticTaskRunner
- get_init_data_for_agent
- run_onboarding
- cleanup_onboarding
- run_unit
- cleanup_unit
- mephisto.abstractions._subcomponents.task_runner.TaskRunner
- execute_onboarding
- execute_unit
- execute_assignment
- get_data_for_assignment
- filter_units_for_worker
- shutdown
- run_assignment
- cleanup_assignment
View Source
class StaticBlueprintArgs(OnboardingRequiredArgs, BlueprintArgs): _blueprint_type: str = BLUEPRINT_TYPE_STATIC _group: str = field( default="StaticBlueprint", metadata={ "help": ( "Abstract Static Blueprints should not be launched manually, but " "include all tasks with units containing just one input and output " "of arbitrary data, with no live component. " ) }, ) units_per_assignment: int = field( default=1, metadata={"help": "How many workers you want to do each assignment"} ) extra_source_dir: str = field( default=MISSING, metadata={ "help": ( "Optional path to sources that the HTML may " "refer to (such as images/video/css/scripts)" ) }, ) data_json: str = field( default=MISSING, metadata={"help": "Path to JSON file containing task data"} ) data_jsonl: str = field( default=MISSING, metadata={"help": "Path to JSON-L file containing task data"} ) data_csv: str = field( default=MISSING, metadata={"help": "Path to csv file containing task data"} )
StaticBlueprintArgs(_blueprint_type: str = 'abstract_static', block_qualification: str = '???', onboarding_qualification: str = '???', _group: str = 'StaticBlueprint', units_per_assignment: int = 1, extra_source_dir: str = '???', data_json: str = '???', data_jsonl: str = '???', data_csv: str = '???')