mephisto.abstractions.blueprints.abstract.static_task.static_blueprint

View Source
#!/usr/bin/env python3

# Copyright (c) Meta Platforms and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from mephisto.abstractions.blueprint import (
    Blueprint,
    BlueprintArgs,
    SharedTaskState,
)
from mephisto.abstractions.blueprints.mixins.onboarding_required import (
    OnboardingRequired,
    OnboardingSharedState,
    OnboardingRequiredArgs,
)
from dataclasses import dataclass, field
from omegaconf import MISSING, DictConfig
from mephisto.data_model.assignment import InitializationData
from mephisto.abstractions.blueprints.abstract.static_task.static_agent_state import (
    StaticAgentState,
)
from mephisto.abstractions.blueprints.abstract.static_task.static_task_runner import (
    StaticTaskRunner,
)
from mephisto.abstractions.blueprints.abstract.static_task.empty_task_builder import (
    EmptyStaticTaskBuilder,
)
from mephisto.operations.registry import register_mephisto_abstraction

import os
import time
import csv
import json
import types

from typing import ClassVar, List, Type, Any, Dict, Iterable, TYPE_CHECKING

if TYPE_CHECKING:
    from mephisto.data_model.task_run import TaskRun
    from mephisto.abstractions.blueprint import (
        AgentState,
        TaskRunner,
        TaskBuilder,
        OnboardingAgent,
    )
    from mephisto.data_model.assignment import Assignment
    from mephisto.data_model.worker import Worker
    from mephisto.data_model.unit import Unit


BLUEPRINT_TYPE_STATIC = "abstract_static"


@dataclass
class SharedStaticTaskState(OnboardingSharedState, SharedTaskState):
    static_task_data: Iterable[Any] = field(
        default_factory=list,
        metadata={
            "help": (
                "List or generator that returns dicts of task data. Generators can be "
                "used for tasks with lengths that aren't known at the start of a "
                "run, or are otherwise determined during the run. "
            ),
            "type": "Iterable[Dict[str, Any]]",
            "default": "[]",
        },
    )


@dataclass
class StaticBlueprintArgs(OnboardingRequiredArgs, BlueprintArgs):
    _blueprint_type: str = BLUEPRINT_TYPE_STATIC
    _group: str = field(
        default="StaticBlueprint",
        metadata={
            "help": (
                "Abstract Static Blueprints should not be launched manually, but "
                "include all tasks with units containing just one input and output "
                "of arbitrary data, with no live component. "
            )
        },
    )
    units_per_assignment: int = field(
        default=1, metadata={"help": "How many workers you want to do each assignment"}
    )
    extra_source_dir: str = field(
        default=MISSING,
        metadata={
            "help": (
                "Optional path to sources that the HTML may "
                "refer to (such as images/video/css/scripts)"
            )
        },
    )
    data_json: str = field(
        default=MISSING, metadata={"help": "Path to JSON file containing task data"}
    )
    data_jsonl: str = field(
        default=MISSING, metadata={"help": "Path to JSON-L file containing task data"}
    )
    data_csv: str = field(
        default=MISSING, metadata={"help": "Path to csv file containing task data"}
    )


class StaticBlueprint(OnboardingRequired, Blueprint):
    """
    Abstract blueprint for a task that runs without any extensive backend.
    These are generally one-off tasks sending data to the frontend and then
    awaiting a response.
    """

    AgentStateClass: ClassVar[Type["AgentState"]] = StaticAgentState
    OnboardingAgentStateClass: ClassVar[Type["AgentState"]] = StaticAgentState
    TaskBuilderClass: ClassVar[Type["TaskBuilder"]] = EmptyStaticTaskBuilder
    TaskRunnerClass: ClassVar[Type["TaskRunner"]] = StaticTaskRunner
    ArgsClass: ClassVar[Type["BlueprintArgs"]] = StaticBlueprintArgs
    SharedStateClass = SharedStaticTaskState

    def __init__(
        self,
        task_run: "TaskRun",
        args: "DictConfig",
        shared_state: "SharedStaticTaskState",
    ):
        super().__init__(task_run, args, shared_state)

        # Originally just a list of dicts, but can also be a generator of dicts
        self._initialization_data_dicts: Iterable[Dict[str, Any]] = []
        blue_args = args.blueprint
        if blue_args.get("data_csv", None) is not None:
            csv_file = os.path.expanduser(blue_args.data_csv)
            with open(csv_file, "r", encoding="utf-8-sig") as csv_fp:
                csv_reader = csv.reader(csv_fp)
                headers = next(csv_reader)
                for row in csv_reader:
                    row_data = {}
                    for i, col in enumerate(row):
                        row_data[headers[i]] = col
                    self._initialization_data_dicts.append(row_data)
        elif blue_args.get("data_json", None) is not None:
            json_file = os.path.expanduser(blue_args.data_json)
            with open(json_file, "r", encoding="utf-8-sig") as json_fp:
                json_data = json.load(json_fp)
            for jd in json_data:
                self._initialization_data_dicts.append(jd)
        elif blue_args.get("data_jsonl", None) is not None:
            jsonl_file = os.path.expanduser(blue_args.data_jsonl)
            with open(jsonl_file, "r", encoding="utf-8-sig") as jsonl_fp:
                line = jsonl_fp.readline()
                while line:
                    j = json.loads(line)
                    self._initialization_data_dicts.append(j)
                    line = jsonl_fp.readline()
        elif shared_state.static_task_data is not None:
            self._initialization_data_dicts = shared_state.static_task_data
        else:
            # instantiating a version of the blueprint, but not necessarily needing the data
            pass

    @classmethod
    def assert_task_args(cls, args: DictConfig, shared_state: "SharedTaskState"):
        """Ensure that the data can be properly loaded"""
        super().assert_task_args(args, shared_state)

        assert isinstance(
            shared_state, SharedStaticTaskState
        ), "Must use SharedStaticTaskState for static blueprints"
        blue_args = args.blueprint
        if blue_args.get("data_csv", None) is not None:
            csv_file = os.path.expanduser(blue_args.data_csv)
            assert os.path.exists(
                csv_file
            ), f"Provided csv file {csv_file} doesn't exist"
        elif blue_args.get("data_json", None) is not None:
            json_file = os.path.expanduser(blue_args.data_json)
            assert os.path.exists(
                json_file
            ), f"Provided JSON file {json_file} doesn't exist"
        elif blue_args.get("data_jsonl", None) is not None:
            jsonl_file = os.path.expanduser(blue_args.data_jsonl)
            assert os.path.exists(
                jsonl_file
            ), f"Provided JSON-L file {jsonl_file} doesn't exist"
        elif shared_state.static_task_data is not None:
            if isinstance(shared_state.static_task_data, types.GeneratorType):
                # TODO(#97) can we check something about this?
                # Some discussion here: https://stackoverflow.com/questions/661603/how-do-i-know-if-a-generator-is-empty-from-the-start
                pass
            else:
                assert (
                    len([x for x in shared_state.static_task_data]) > 0
                ), "Length of data dict provided was 0"
        else:
            raise AssertionError(
                "Must provide one of a data csv, json, json-L, or a list of tasks"
            )

    def get_initialization_data(self) -> Iterable["InitializationData"]:
        """
        Return the InitializationData retrieved from the specified stream
        """
        if isinstance(self._initialization_data_dicts, types.GeneratorType):

            def data_generator() -> Iterable["InitializationData"]:
                for item in self._initialization_data_dicts:
                    yield InitializationData(
                        shared=item,
                        unit_data=[{}] * self.args.blueprint.units_per_assignment,
                    )

            return data_generator()
        else:
            return [
                InitializationData(
                    shared=d, unit_data=[{}] * self.args.blueprint.units_per_assignment
                )
                for d in self._initialization_data_dicts
            ]
View Source
class SharedStaticTaskState(OnboardingSharedState, SharedTaskState):
    static_task_data: Iterable[Any] = field(
        default_factory=list,
        metadata={
            "help": (
                "List or generator that returns dicts of task data. Generators can be "
                "used for tasks with lengths that aren't known at the start of a "
                "run, or are otherwise determined during the run. "
            ),
            "type": "Iterable[Dict[str, Any]]",
            "default": "[]",
        },
    )

SharedStaticTaskState(task_config: Dict[str, Any] = , qualifications: List[Any] = , worker_can_do_unit: Callable[[ForwardRef('Worker'), ForwardRef('Unit')], bool] = , on_unit_submitted: Callable[[ForwardRef('Unit')], NoneType] = , onboarding_data: Dict[str, Any] = , validate_onboarding: Callable[[Any], bool] = , static_task_data: Iterable[Any] = )

#   SharedStaticTaskState( task_config: Dict[str, Any] = <factory>, qualifications: List[Any] = <factory>, worker_can_do_unit: collections.abc.Callable[mephisto.data_model.worker.Worker, mephisto.data_model.unit.Unit, bool] = <factory>, on_unit_submitted: collections.abc.Callable[mephisto.data_model.unit.Unit, NoneType] = <factory>, onboarding_data: Dict[str, Any] = <factory>, validate_onboarding: Callable[[Any], bool] = <factory>, static_task_data: Iterable[Any] = <factory> )
View Source
class StaticBlueprintArgs(OnboardingRequiredArgs, BlueprintArgs):
    _blueprint_type: str = BLUEPRINT_TYPE_STATIC
    _group: str = field(
        default="StaticBlueprint",
        metadata={
            "help": (
                "Abstract Static Blueprints should not be launched manually, but "
                "include all tasks with units containing just one input and output "
                "of arbitrary data, with no live component. "
            )
        },
    )
    units_per_assignment: int = field(
        default=1, metadata={"help": "How many workers you want to do each assignment"}
    )
    extra_source_dir: str = field(
        default=MISSING,
        metadata={
            "help": (
                "Optional path to sources that the HTML may "
                "refer to (such as images/video/css/scripts)"
            )
        },
    )
    data_json: str = field(
        default=MISSING, metadata={"help": "Path to JSON file containing task data"}
    )
    data_jsonl: str = field(
        default=MISSING, metadata={"help": "Path to JSON-L file containing task data"}
    )
    data_csv: str = field(
        default=MISSING, metadata={"help": "Path to csv file containing task data"}
    )

StaticBlueprintArgs(_blueprint_type: str = 'abstract_static', block_qualification: str = '???', onboarding_qualification: str = '???', _group: str = 'StaticBlueprint', units_per_assignment: int = 1, extra_source_dir: str = '???', data_json: str = '???', data_jsonl: str = '???', data_csv: str = '???')

#   StaticBlueprintArgs( _blueprint_type: str = 'abstract_static', block_qualification: str = '???', onboarding_qualification: str = '???', _group: str = 'StaticBlueprint', units_per_assignment: int = 1, extra_source_dir: str = '???', data_json: str = '???', data_jsonl: str = '???', data_csv: str = '???' )
#   units_per_assignment: int = 1
#   extra_source_dir: str = '???'
#   data_json: str = '???'
#   data_jsonl: str = '???'
#   data_csv: str = '???'
View Source
class StaticBlueprint(OnboardingRequired, Blueprint):
    """
    Abstract blueprint for a task that runs without any extensive backend.
    These are generally one-off tasks sending data to the frontend and then
    awaiting a response.
    """

    AgentStateClass: ClassVar[Type["AgentState"]] = StaticAgentState
    OnboardingAgentStateClass: ClassVar[Type["AgentState"]] = StaticAgentState
    TaskBuilderClass: ClassVar[Type["TaskBuilder"]] = EmptyStaticTaskBuilder
    TaskRunnerClass: ClassVar[Type["TaskRunner"]] = StaticTaskRunner
    ArgsClass: ClassVar[Type["BlueprintArgs"]] = StaticBlueprintArgs
    SharedStateClass = SharedStaticTaskState

    def __init__(
        self,
        task_run: "TaskRun",
        args: "DictConfig",
        shared_state: "SharedStaticTaskState",
    ):
        super().__init__(task_run, args, shared_state)

        # Originally just a list of dicts, but can also be a generator of dicts
        self._initialization_data_dicts: Iterable[Dict[str, Any]] = []
        blue_args = args.blueprint
        if blue_args.get("data_csv", None) is not None:
            csv_file = os.path.expanduser(blue_args.data_csv)
            with open(csv_file, "r", encoding="utf-8-sig") as csv_fp:
                csv_reader = csv.reader(csv_fp)
                headers = next(csv_reader)
                for row in csv_reader:
                    row_data = {}
                    for i, col in enumerate(row):
                        row_data[headers[i]] = col
                    self._initialization_data_dicts.append(row_data)
        elif blue_args.get("data_json", None) is not None:
            json_file = os.path.expanduser(blue_args.data_json)
            with open(json_file, "r", encoding="utf-8-sig") as json_fp:
                json_data = json.load(json_fp)
            for jd in json_data:
                self._initialization_data_dicts.append(jd)
        elif blue_args.get("data_jsonl", None) is not None:
            jsonl_file = os.path.expanduser(blue_args.data_jsonl)
            with open(jsonl_file, "r", encoding="utf-8-sig") as jsonl_fp:
                line = jsonl_fp.readline()
                while line:
                    j = json.loads(line)
                    self._initialization_data_dicts.append(j)
                    line = jsonl_fp.readline()
        elif shared_state.static_task_data is not None:
            self._initialization_data_dicts = shared_state.static_task_data
        else:
            # instantiating a version of the blueprint, but not necessarily needing the data
            pass

    @classmethod
    def assert_task_args(cls, args: DictConfig, shared_state: "SharedTaskState"):
        """Ensure that the data can be properly loaded"""
        super().assert_task_args(args, shared_state)

        assert isinstance(
            shared_state, SharedStaticTaskState
        ), "Must use SharedStaticTaskState for static blueprints"
        blue_args = args.blueprint
        if blue_args.get("data_csv", None) is not None:
            csv_file = os.path.expanduser(blue_args.data_csv)
            assert os.path.exists(
                csv_file
            ), f"Provided csv file {csv_file} doesn't exist"
        elif blue_args.get("data_json", None) is not None:
            json_file = os.path.expanduser(blue_args.data_json)
            assert os.path.exists(
                json_file
            ), f"Provided JSON file {json_file} doesn't exist"
        elif blue_args.get("data_jsonl", None) is not None:
            jsonl_file = os.path.expanduser(blue_args.data_jsonl)
            assert os.path.exists(
                jsonl_file
            ), f"Provided JSON-L file {jsonl_file} doesn't exist"
        elif shared_state.static_task_data is not None:
            if isinstance(shared_state.static_task_data, types.GeneratorType):
                # TODO(#97) can we check something about this?
                # Some discussion here: https://stackoverflow.com/questions/661603/how-do-i-know-if-a-generator-is-empty-from-the-start
                pass
            else:
                assert (
                    len([x for x in shared_state.static_task_data]) > 0
                ), "Length of data dict provided was 0"
        else:
            raise AssertionError(
                "Must provide one of a data csv, json, json-L, or a list of tasks"
            )

    def get_initialization_data(self) -> Iterable["InitializationData"]:
        """
        Return the InitializationData retrieved from the specified stream
        """
        if isinstance(self._initialization_data_dicts, types.GeneratorType):

            def data_generator() -> Iterable["InitializationData"]:
                for item in self._initialization_data_dicts:
                    yield InitializationData(
                        shared=item,
                        unit_data=[{}] * self.args.blueprint.units_per_assignment,
                    )

            return data_generator()
        else:
            return [
                InitializationData(
                    shared=d, unit_data=[{}] * self.args.blueprint.units_per_assignment
                )
                for d in self._initialization_data_dicts
            ]

Abstract blueprint for a task that runs without any extensive backend. These are generally one-off tasks sending data to the frontend and then awaiting a response.

View Source
    def __init__(
        self,
        task_run: "TaskRun",
        args: "DictConfig",
        shared_state: "SharedStaticTaskState",
    ):
        super().__init__(task_run, args, shared_state)

        # Originally just a list of dicts, but can also be a generator of dicts
        self._initialization_data_dicts: Iterable[Dict[str, Any]] = []
        blue_args = args.blueprint
        if blue_args.get("data_csv", None) is not None:
            csv_file = os.path.expanduser(blue_args.data_csv)
            with open(csv_file, "r", encoding="utf-8-sig") as csv_fp:
                csv_reader = csv.reader(csv_fp)
                headers = next(csv_reader)
                for row in csv_reader:
                    row_data = {}
                    for i, col in enumerate(row):
                        row_data[headers[i]] = col
                    self._initialization_data_dicts.append(row_data)
        elif blue_args.get("data_json", None) is not None:
            json_file = os.path.expanduser(blue_args.data_json)
            with open(json_file, "r", encoding="utf-8-sig") as json_fp:
                json_data = json.load(json_fp)
            for jd in json_data:
                self._initialization_data_dicts.append(jd)
        elif blue_args.get("data_jsonl", None) is not None:
            jsonl_file = os.path.expanduser(blue_args.data_jsonl)
            with open(jsonl_file, "r", encoding="utf-8-sig") as jsonl_fp:
                line = jsonl_fp.readline()
                while line:
                    j = json.loads(line)
                    self._initialization_data_dicts.append(j)
                    line = jsonl_fp.readline()
        elif shared_state.static_task_data is not None:
            self._initialization_data_dicts = shared_state.static_task_data
        else:
            # instantiating a version of the blueprint, but not necessarily needing the data
            pass
#  
@classmethod
def assert_task_args( cls, args: omegaconf.dictconfig.DictConfig, shared_state: mephisto.abstractions.blueprint.SharedTaskState ):
View Source
    @classmethod
    def assert_task_args(cls, args: DictConfig, shared_state: "SharedTaskState"):
        """Ensure that the data can be properly loaded"""
        super().assert_task_args(args, shared_state)

        assert isinstance(
            shared_state, SharedStaticTaskState
        ), "Must use SharedStaticTaskState for static blueprints"
        blue_args = args.blueprint
        if blue_args.get("data_csv", None) is not None:
            csv_file = os.path.expanduser(blue_args.data_csv)
            assert os.path.exists(
                csv_file
            ), f"Provided csv file {csv_file} doesn't exist"
        elif blue_args.get("data_json", None) is not None:
            json_file = os.path.expanduser(blue_args.data_json)
            assert os.path.exists(
                json_file
            ), f"Provided JSON file {json_file} doesn't exist"
        elif blue_args.get("data_jsonl", None) is not None:
            jsonl_file = os.path.expanduser(blue_args.data_jsonl)
            assert os.path.exists(
                jsonl_file
            ), f"Provided JSON-L file {jsonl_file} doesn't exist"
        elif shared_state.static_task_data is not None:
            if isinstance(shared_state.static_task_data, types.GeneratorType):
                # TODO(#97) can we check something about this?
                # Some discussion here: https://stackoverflow.com/questions/661603/how-do-i-know-if-a-generator-is-empty-from-the-start
                pass
            else:
                assert (
                    len([x for x in shared_state.static_task_data]) > 0
                ), "Length of data dict provided was 0"
        else:
            raise AssertionError(
                "Must provide one of a data csv, json, json-L, or a list of tasks"
            )

Ensure that the data can be properly loaded

#   def get_initialization_data( self ) -> collections.abc.Iterable[mephisto.data_model.assignment.InitializationData]:
View Source
    def get_initialization_data(self) -> Iterable["InitializationData"]:
        """
        Return the InitializationData retrieved from the specified stream
        """
        if isinstance(self._initialization_data_dicts, types.GeneratorType):

            def data_generator() -> Iterable["InitializationData"]:
                for item in self._initialization_data_dicts:
                    yield InitializationData(
                        shared=item,
                        unit_data=[{}] * self.args.blueprint.units_per_assignment,
                    )

            return data_generator()
        else:
            return [
                InitializationData(
                    shared=d, unit_data=[{}] * self.args.blueprint.units_per_assignment
                )
                for d in self._initialization_data_dicts
            ]

Return the InitializationData retrieved from the specified stream

#   class StaticBlueprint.AgentStateClass(mephisto.abstractions._subcomponents.agent_state.AgentState):
View Source
class StaticAgentState(AgentState):
    """
    Agent state for static tasks.
    """

    def _get_empty_state(self) -> Dict[str, Optional[Dict[str, Any]]]:
        return {
            "inputs": None,
            "outputs": None,
            "times": {"task_start": 0, "task_end": 0},
        }

    def __init__(self, agent: "Agent"):
        """
        Static agent states should store
        input dict -> output dict pairs to disc
        """
        self.agent = weakref.proxy(agent)
        self.state: Dict[str, Optional[Dict[str, Any]]] = self._get_empty_state()
        self.load_data()

    def set_init_state(self, data: Any) -> bool:
        """Set the initial state for this agent"""
        if self.get_init_state() is not None:
            # Initial state is already set
            return False
        else:
            self.state["inputs"] = data
            times_dict = self.state["times"]
            assert isinstance(times_dict, dict)
            times_dict["task_start"] = time.time()
            self.save_data()
            return True

    def get_init_state(self) -> Optional[Dict[str, Any]]:
        """
        Return the initial state for this agent,
        None if no such state exists
        """
        if self.state["inputs"] is None:
            return None
        return self.state["inputs"].copy()

    def load_data(self) -> None:
        """Load data for this agent from disk"""
        data_dir = self.agent.get_data_dir()
        data_path = os.path.join(data_dir, DATA_FILE)
        if os.path.exists(data_path):
            with open(data_path, "r") as data_file:
                self.state = json.load(data_file)
        else:
            self.state = self._get_empty_state()

    def get_data(self) -> Dict[str, Any]:
        """Return dict of this agent's state"""
        return self.state.copy()

    def save_data(self) -> None:
        """Save static agent data to disk"""
        data_dir = self.agent.get_data_dir()
        os.makedirs(data_dir, exist_ok=True)
        out_filename = os.path.join(data_dir, DATA_FILE)
        with open(out_filename, "w+") as data_file:
            json.dump(self.state, data_file)
        logger.info(f"SAVED_DATA_TO_DISC at {out_filename}")

    def update_data(self, live_update: Dict[str, Any]) -> None:
        """
        Process the incoming data packet, and handle updating the state
        """
        raise Exception("Static tasks should only have final act, but got live update")

    def update_submit(self, submission_data: Dict[str, Any]) -> None:
        """Move the submitted output to the local dict"""
        outputs: Dict[str, Any]
        output_files = submission_data.get("files")
        if output_files is not None:
            submission_data["files"] = [f["filename"] for f in submission_data["files"]]
        self.state["outputs"] = submission_data
        times_dict = self.state["times"]
        assert isinstance(times_dict, dict)
        times_dict["task_end"] = time.time()
        self.save_data()

    def get_task_start(self) -> Optional[float]:
        """
        Extract out and return the start time recorded for this task.
        """
        stored_times = self.state["times"]
        assert stored_times is not None
        return stored_times["task_start"]

    def get_task_end(self) -> Optional[float]:
        """
        Extract out and return the end time recorded for this task.
        """
        stored_times = self.state["times"]
        assert stored_times is not None
        return stored_times["task_end"]

Agent state for static tasks.

Inherited Members
mephisto.abstractions.blueprints.abstract.static_task.static_agent_state.StaticAgentState
StaticAgentState
set_init_state
get_init_state
load_data
get_data
save_data
update_data
update_submit
get_task_start
get_task_end
mephisto.abstractions._subcomponents.agent_state.AgentState
STATUS_NONE
STATUS_ACCEPTED
STATUS_ONBOARDING
STATUS_WAITING
STATUS_IN_TASK
STATUS_COMPLETED
STATUS_DISCONNECT
STATUS_TIMEOUT
STATUS_PARTNER_DISCONNECT
STATUS_EXPIRED
STATUS_RETURNED
STATUS_APPROVED
STATUS_SOFT_REJECTED
STATUS_REJECTED
complete
valid
get_parsed_data
#   class StaticBlueprint.OnboardingAgentStateClass(mephisto.abstractions._subcomponents.agent_state.AgentState):
View Source
class StaticAgentState(AgentState):
    """
    Agent state for static tasks.
    """

    def _get_empty_state(self) -> Dict[str, Optional[Dict[str, Any]]]:
        return {
            "inputs": None,
            "outputs": None,
            "times": {"task_start": 0, "task_end": 0},
        }

    def __init__(self, agent: "Agent"):
        """
        Static agent states should store
        input dict -> output dict pairs to disc
        """
        self.agent = weakref.proxy(agent)
        self.state: Dict[str, Optional[Dict[str, Any]]] = self._get_empty_state()
        self.load_data()

    def set_init_state(self, data: Any) -> bool:
        """Set the initial state for this agent"""
        if self.get_init_state() is not None:
            # Initial state is already set
            return False
        else:
            self.state["inputs"] = data
            times_dict = self.state["times"]
            assert isinstance(times_dict, dict)
            times_dict["task_start"] = time.time()
            self.save_data()
            return True

    def get_init_state(self) -> Optional[Dict[str, Any]]:
        """
        Return the initial state for this agent,
        None if no such state exists
        """
        if self.state["inputs"] is None:
            return None
        return self.state["inputs"].copy()

    def load_data(self) -> None:
        """Load data for this agent from disk"""
        data_dir = self.agent.get_data_dir()
        data_path = os.path.join(data_dir, DATA_FILE)
        if os.path.exists(data_path):
            with open(data_path, "r") as data_file:
                self.state = json.load(data_file)
        else:
            self.state = self._get_empty_state()

    def get_data(self) -> Dict[str, Any]:
        """Return dict of this agent's state"""
        return self.state.copy()

    def save_data(self) -> None:
        """Save static agent data to disk"""
        data_dir = self.agent.get_data_dir()
        os.makedirs(data_dir, exist_ok=True)
        out_filename = os.path.join(data_dir, DATA_FILE)
        with open(out_filename, "w+") as data_file:
            json.dump(self.state, data_file)
        logger.info(f"SAVED_DATA_TO_DISC at {out_filename}")

    def update_data(self, live_update: Dict[str, Any]) -> None:
        """
        Process the incoming data packet, and handle updating the state
        """
        raise Exception("Static tasks should only have final act, but got live update")

    def update_submit(self, submission_data: Dict[str, Any]) -> None:
        """Move the submitted output to the local dict"""
        outputs: Dict[str, Any]
        output_files = submission_data.get("files")
        if output_files is not None:
            submission_data["files"] = [f["filename"] for f in submission_data["files"]]
        self.state["outputs"] = submission_data
        times_dict = self.state["times"]
        assert isinstance(times_dict, dict)
        times_dict["task_end"] = time.time()
        self.save_data()

    def get_task_start(self) -> Optional[float]:
        """
        Extract out and return the start time recorded for this task.
        """
        stored_times = self.state["times"]
        assert stored_times is not None
        return stored_times["task_start"]

    def get_task_end(self) -> Optional[float]:
        """
        Extract out and return the end time recorded for this task.
        """
        stored_times = self.state["times"]
        assert stored_times is not None
        return stored_times["task_end"]

Agent state for static tasks.

Inherited Members
mephisto.abstractions.blueprints.abstract.static_task.static_agent_state.StaticAgentState
StaticAgentState
set_init_state
get_init_state
load_data
get_data
save_data
update_data
update_submit
get_task_start
get_task_end
mephisto.abstractions._subcomponents.agent_state.AgentState
STATUS_NONE
STATUS_ACCEPTED
STATUS_ONBOARDING
STATUS_WAITING
STATUS_IN_TASK
STATUS_COMPLETED
STATUS_DISCONNECT
STATUS_TIMEOUT
STATUS_PARTNER_DISCONNECT
STATUS_EXPIRED
STATUS_RETURNED
STATUS_APPROVED
STATUS_SOFT_REJECTED
STATUS_REJECTED
complete
valid
get_parsed_data
#   class StaticBlueprint.TaskBuilderClass(mephisto.abstractions._subcomponents.task_builder.TaskBuilder):
View Source
class EmptyStaticTaskBuilder(TaskBuilder):
    """
    Abstract class for a task builder for static tasks
    """

    def build_in_dir(self, build_dir: str):
        """Build the frontend if it doesn't exist, then copy into the server directory"""
        raise AssertionError(
            "Classes that extend the abstract StaticBlueprint must define a custom "
            "TaskBuilder class that pulls the correct frontend together. Examples "
            "can be seen in the static_react_task and static_html_task folders. "
            "Note that extra static content will be provided in `args.blueprint.extra_source_dir` "
        )

Abstract class for a task builder for static tasks

Inherited Members
mephisto.abstractions._subcomponents.task_builder.TaskBuilder
TaskBuilder
mephisto.abstractions.blueprints.abstract.static_task.empty_task_builder.EmptyStaticTaskBuilder
build_in_dir
#   class StaticBlueprint.TaskRunnerClass(mephisto.abstractions._subcomponents.task_runner.TaskRunner):
View Source
class StaticTaskRunner(TaskRunner):
    """
    Task runner for a static task

    Static tasks always assume single unit assignments,
    as only one person can work on them at a time
    """

    def __init__(
        self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState"
    ):
        super().__init__(task_run, args, shared_state)
        self.is_concurrent = False
        self.assignment_duration_in_seconds = (
            task_run.get_task_args().assignment_duration_in_seconds
        )

    def get_init_data_for_agent(self, agent: "Agent") -> Dict[str, Any]:
        """
        Return the data for an agent already assigned to a particular unit
        """
        init_state = agent.state.get_init_state()
        if init_state is not None:
            # reconnecting agent, give what we've got
            return init_state
        else:
            assignment = agent.get_unit().get_assignment()
            assignment_data = self.get_data_for_assignment(assignment)
            agent.state.set_init_state(assignment_data.shared)
            return assignment_data.shared

    def run_onboarding(self, agent: "OnboardingAgent"):
        """
        Static onboarding flows exactly like a regular task, waiting for
        the submit to come through
        """
        agent.await_submit(self.assignment_duration_in_seconds)

    def cleanup_onboarding(self, agent: "OnboardingAgent"):
        """Nothing to clean up in a static onboarding"""
        return

    def run_unit(self, unit: "Unit", agent: "Agent") -> None:
        """
        Static runners will get the task data, send it to the user, then
        wait for the agent to act (the data to be completed)
        """
        agent.await_submit(self.assignment_duration_in_seconds)

    def cleanup_unit(self, unit: "Unit") -> None:
        """There is currently no cleanup associated with killing an incomplete task"""
        return

Task runner for a static task

Static tasks always assume single unit assignments, as only one person can work on them at a time

Inherited Members
mephisto.abstractions.blueprints.abstract.static_task.static_task_runner.StaticTaskRunner
StaticTaskRunner
get_init_data_for_agent
run_onboarding
cleanup_onboarding
run_unit
cleanup_unit
mephisto.abstractions._subcomponents.task_runner.TaskRunner
execute_onboarding
execute_unit
execute_assignment
get_data_for_assignment
filter_units_for_worker
shutdown
run_assignment
cleanup_assignment
View Source
class StaticBlueprintArgs(OnboardingRequiredArgs, BlueprintArgs):
    _blueprint_type: str = BLUEPRINT_TYPE_STATIC
    _group: str = field(
        default="StaticBlueprint",
        metadata={
            "help": (
                "Abstract Static Blueprints should not be launched manually, but "
                "include all tasks with units containing just one input and output "
                "of arbitrary data, with no live component. "
            )
        },
    )
    units_per_assignment: int = field(
        default=1, metadata={"help": "How many workers you want to do each assignment"}
    )
    extra_source_dir: str = field(
        default=MISSING,
        metadata={
            "help": (
                "Optional path to sources that the HTML may "
                "refer to (such as images/video/css/scripts)"
            )
        },
    )
    data_json: str = field(
        default=MISSING, metadata={"help": "Path to JSON file containing task data"}
    )
    data_jsonl: str = field(
        default=MISSING, metadata={"help": "Path to JSON-L file containing task data"}
    )
    data_csv: str = field(
        default=MISSING, metadata={"help": "Path to csv file containing task data"}
    )

StaticBlueprintArgs(_blueprint_type: str = 'abstract_static', block_qualification: str = '???', onboarding_qualification: str = '???', _group: str = 'StaticBlueprint', units_per_assignment: int = 1, extra_source_dir: str = '???', data_json: str = '???', data_jsonl: str = '???', data_csv: str = '???')

View Source
class SharedStaticTaskState(OnboardingSharedState, SharedTaskState):
    static_task_data: Iterable[Any] = field(
        default_factory=list,
        metadata={
            "help": (
                "List or generator that returns dicts of task data. Generators can be "
                "used for tasks with lengths that aren't known at the start of a "
                "run, or are otherwise determined during the run. "
            ),
            "type": "Iterable[Dict[str, Any]]",
            "default": "[]",
        },
    )

SharedStaticTaskState(task_config: Dict[str, Any] = , qualifications: List[Any] = , worker_can_do_unit: Callable[[ForwardRef('Worker'), ForwardRef('Unit')], bool] = , on_unit_submitted: Callable[[ForwardRef('Unit')], NoneType] = , onboarding_data: Dict[str, Any] = , validate_onboarding: Callable[[Any], bool] = , static_task_data: Iterable[Any] = )