mephisto.abstractions.blueprints.remote_procedure.remote_procedure_blueprint

View Source
#!/usr/bin/env python3

# Copyright (c) Meta Platforms and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from mephisto.abstractions.blueprint import (
    Blueprint,
    BlueprintArgs,
    SharedTaskState,
)
from mephisto.abstractions.blueprints.mixins.onboarding_required import (
    OnboardingRequired,
    OnboardingSharedState,
    OnboardingRequiredArgs,
)
from dataclasses import dataclass, field
from mephisto.data_model.assignment import InitializationData
from mephisto.abstractions.blueprints.remote_procedure.remote_procedure_agent_state import (
    RemoteProcedureAgentState,
)
from mephisto.abstractions.blueprints.remote_procedure.remote_procedure_task_runner import (
    RemoteProcedureTaskRunner,
)
from mephisto.abstractions.blueprints.remote_procedure.remote_procedure_task_builder import (
    RemoteProcedureTaskBuilder,
)
from mephisto.operations.registry import register_mephisto_abstraction
from omegaconf import DictConfig, MISSING

import os
import time
import csv
import sys
import json
import types

from importlib import import_module

from typing import (
    ClassVar,
    Callable,
    List,
    Type,
    Any,
    Dict,
    Iterable,
    Optional,
    Mapping,
    TYPE_CHECKING,
)

if TYPE_CHECKING:
    from mephisto.data_model.worker import Worker
    from mephisto.data_model.agent import Agent, OnboardingAgent
    from mephisto.data_model.task_run import TaskRun
    from mephisto.abstractions.blueprint import AgentState, TaskRunner, TaskBuilder
    from mephisto.data_model.unit import Unit

BLUEPRINT_TYPE_REMOTE_PROCEDURE = "remote_procedure"


@dataclass
class SharedRemoteProcedureTaskState(OnboardingSharedState, SharedTaskState):
    function_registry: Optional[
        Mapping[
            str,
            Callable[
                [str, Dict[str, Any], "RemoteProcedureAgentState"],
                Optional[Dict[str, Any]],
            ],
        ]
    ] = None
    static_task_data: Iterable[Any] = field(default_factory=list)


@dataclass
class RemoteProcedureBlueprintArgs(OnboardingRequiredArgs, BlueprintArgs):
    _blueprint_type: str = BLUEPRINT_TYPE_REMOTE_PROCEDURE
    _group: str = field(
        default="RemoteProcedureBlueprintArgs",
        metadata={
            "help": """
                Tasks launched from remote query blueprints need a
                source html file to display to workers, as well as a csv
                containing values that will be inserted into templates in
                the html.
            """
        },
    )
    task_source: str = field(
        default=MISSING,
        metadata={
            "help": "Path to file containing javascript bundle for the task",
            "required": True,
        },
    )
    link_task_source: bool = field(
        default=False,
        metadata={
            "help": """
                Symlinks the task_source file in your development folder to the
                one used for the server. Useful for local development so you can run
                a watch-based build for your task_source, allowing the UI code to
                update without having to restart the server each time.
            """,
            "required": False,
        },
    )
    units_per_assignment: int = field(
        default=1, metadata={"help": "How many workers you want to do each assignment"}
    )


@register_mephisto_abstraction()
class RemoteProcedureBlueprint(OnboardingRequired, Blueprint):
    """Blueprint for a task that runs a parlai chat"""

    AgentStateClass: ClassVar[Type["AgentState"]] = RemoteProcedureAgentState
    OnboardingAgentStateClass: ClassVar[Type["AgentState"]] = RemoteProcedureAgentState
    TaskBuilderClass: ClassVar[Type["TaskBuilder"]] = RemoteProcedureTaskBuilder
    TaskRunnerClass: ClassVar[Type["TaskRunner"]] = RemoteProcedureTaskRunner
    ArgsClass = RemoteProcedureBlueprintArgs
    SharedStateClass = SharedRemoteProcedureTaskState
    BLUEPRINT_TYPE = BLUEPRINT_TYPE_REMOTE_PROCEDURE

    def __init__(
        self,
        task_run: "TaskRun",
        args: "DictConfig",
        shared_state: "SharedRemoteProcedureTaskState",
    ):
        super().__init__(task_run, args, shared_state)
        self._initialization_data_dicts: Iterable[Dict[str, Any]] = []
        blue_args = args.blueprint
        if blue_args.get("data_csv", None) is not None:
            csv_file = os.path.expanduser(blue_args.data_csv)
            with open(csv_file, "r", encoding="utf-8-sig") as csv_fp:
                csv_reader = csv.reader(csv_fp)
                headers = next(csv_reader)
                for row in csv_reader:
                    row_data = {}
                    for i, col in enumerate(row):
                        row_data[headers[i]] = col
                    self._initialization_data_dicts.append(row_data)
        elif blue_args.get("data_json", None) is not None:
            json_file = os.path.expanduser(blue_args.data_json)
            with open(json_file, "r", encoding="utf-8-sig") as json_fp:
                json_data = json.load(json_fp)
            for jd in json_data:
                self._initialization_data_dicts.append(jd)
        elif blue_args.get("data_jsonl", None) is not None:
            jsonl_file = os.path.expanduser(blue_args.data_jsonl)
            with open(jsonl_file, "r", encoding="utf-8-sig") as jsonl_fp:
                line = jsonl_fp.readline()
                while line:
                    j = json.loads(line)
                    self._initialization_data_dicts.append(j)
                    line = jsonl_fp.readline()
        elif shared_state.static_task_data is not None:
            self._initialization_data_dicts = shared_state.static_task_data
        else:
            # instantiating a version of the blueprint, but not necessarily needing the data
            pass

    @classmethod
    def assert_task_args(
        cls, args: "DictConfig", shared_state: "SharedTaskState"
    ) -> None:
        """Ensure that arguments are properly configured to launch this task"""
        assert isinstance(
            shared_state, SharedRemoteProcedureTaskState
        ), "Must use SharedTaskState with RemoteProcedureBlueprint"
        blue_args = args.blueprint
        if blue_args.get("data_csv", None) is not None:
            csv_file = os.path.expanduser(blue_args.data_csv)
            assert os.path.exists(
                csv_file
            ), f"Provided csv file {csv_file} doesn't exist"
        elif blue_args.get("data_json", None) is not None:
            json_file = os.path.expanduser(blue_args.data_json)
            assert os.path.exists(
                json_file
            ), f"Provided JSON file {json_file} doesn't exist"
        elif blue_args.get("data_jsonl", None) is not None:
            jsonl_file = os.path.expanduser(blue_args.data_jsonl)
            assert os.path.exists(
                jsonl_file
            ), f"Provided JSON-L file {jsonl_file} doesn't exist"
        elif shared_state.static_task_data is not None:
            if isinstance(shared_state.static_task_data, types.GeneratorType):
                # TODO can we check something about this?
                pass
            else:
                assert (
                    len([x for x in shared_state.static_task_data]) > 0
                ), "Length of data dict provided was 0"
        else:
            raise AssertionError(
                "Must provide one of a data csv, json, json-L, or a list of tasks"
            )
        assert shared_state.function_registry is not None, (
            "Must provide a valid function registry to use with the task, a mapping "
            "of function names to functions that take as input a string and an agent "
            "and return a string. "
        )

    def get_initialization_data(self) -> Iterable["InitializationData"]:
        """
        Return the InitializationData retrieved from the specified stream
        """
        if isinstance(self._initialization_data_dicts, types.GeneratorType):

            def data_generator() -> Iterable["InitializationData"]:
                for item in self._initialization_data_dicts:
                    yield InitializationData(
                        shared=item,
                        unit_data=[{}] * self.args.blueprint.units_per_assignment,
                    )

            return data_generator()
        else:
            return [
                InitializationData(
                    shared=d, unit_data=[{}] * self.args.blueprint.units_per_assignment
                )
                for d in self._initialization_data_dicts
            ]
View Source
class SharedRemoteProcedureTaskState(OnboardingSharedState, SharedTaskState):
    function_registry: Optional[
        Mapping[
            str,
            Callable[
                [str, Dict[str, Any], "RemoteProcedureAgentState"],
                Optional[Dict[str, Any]],
            ],
        ]
    ] = None
    static_task_data: Iterable[Any] = field(default_factory=list)

SharedRemoteProcedureTaskState(task_config: Dict[str, Any] = , qualifications: List[Any] = , worker_can_do_unit: Callable[[ForwardRef('Worker'), ForwardRef('Unit')], bool] = , on_unit_submitted: Callable[[ForwardRef('Unit')], NoneType] = , onboarding_data: Dict[str, Any] = , validate_onboarding: Callable[[Any], bool] = , function_registry: Union[Mapping[str, Callable[[str, Dict[str, Any], ForwardRef('RemoteProcedureAgentState')], Union[Dict[str, Any], NoneType]]], NoneType] = None, static_task_data: Iterable[Any] = )

#   SharedRemoteProcedureTaskState( task_config: Dict[str, Any] = <factory>, qualifications: List[Any] = <factory>, worker_can_do_unit: collections.abc.Callable[mephisto.data_model.worker.Worker, mephisto.data_model.unit.Unit, bool] = <factory>, on_unit_submitted: collections.abc.Callable[mephisto.data_model.unit.Unit, NoneType] = <factory>, onboarding_data: Dict[str, Any] = <factory>, validate_onboarding: Callable[[Any], bool] = <factory>, function_registry: Union[collections.abc.Mapping[str, collections.abc.Callable[str, Dict[str, Any], mephisto.abstractions.blueprints.remote_procedure.remote_procedure_agent_state.RemoteProcedureAgentState, Union[Dict[str, Any], NoneType]]], NoneType] = None, static_task_data: Iterable[Any] = <factory> )
#   function_registry: Union[collections.abc.Mapping[str, collections.abc.Callable[str, Dict[str, Any], mephisto.abstractions.blueprints.remote_procedure.remote_procedure_agent_state.RemoteProcedureAgentState, Union[Dict[str, Any], NoneType]]], NoneType] = None
View Source
class RemoteProcedureBlueprintArgs(OnboardingRequiredArgs, BlueprintArgs):
    _blueprint_type: str = BLUEPRINT_TYPE_REMOTE_PROCEDURE
    _group: str = field(
        default="RemoteProcedureBlueprintArgs",
        metadata={
            "help": """
                Tasks launched from remote query blueprints need a
                source html file to display to workers, as well as a csv
                containing values that will be inserted into templates in
                the html.
            """
        },
    )
    task_source: str = field(
        default=MISSING,
        metadata={
            "help": "Path to file containing javascript bundle for the task",
            "required": True,
        },
    )
    link_task_source: bool = field(
        default=False,
        metadata={
            "help": """
                Symlinks the task_source file in your development folder to the
                one used for the server. Useful for local development so you can run
                a watch-based build for your task_source, allowing the UI code to
                update without having to restart the server each time.
            """,
            "required": False,
        },
    )
    units_per_assignment: int = field(
        default=1, metadata={"help": "How many workers you want to do each assignment"}
    )

RemoteProcedureBlueprintArgs(_blueprint_type: str = 'remote_procedure', block_qualification: str = '???', onboarding_qualification: str = '???', _group: str = 'RemoteProcedureBlueprintArgs', task_source: str = '???', link_task_source: bool = False, units_per_assignment: int = 1)

#   RemoteProcedureBlueprintArgs( _blueprint_type: str = 'remote_procedure', block_qualification: str = '???', onboarding_qualification: str = '???', _group: str = 'RemoteProcedureBlueprintArgs', task_source: str = '???', link_task_source: bool = False, units_per_assignment: int = 1 )
#   task_source: str = '???'
#   units_per_assignment: int = 1
View Source
class RemoteProcedureBlueprint(OnboardingRequired, Blueprint):
    """Blueprint for a task that runs a parlai chat"""

    AgentStateClass: ClassVar[Type["AgentState"]] = RemoteProcedureAgentState
    OnboardingAgentStateClass: ClassVar[Type["AgentState"]] = RemoteProcedureAgentState
    TaskBuilderClass: ClassVar[Type["TaskBuilder"]] = RemoteProcedureTaskBuilder
    TaskRunnerClass: ClassVar[Type["TaskRunner"]] = RemoteProcedureTaskRunner
    ArgsClass = RemoteProcedureBlueprintArgs
    SharedStateClass = SharedRemoteProcedureTaskState
    BLUEPRINT_TYPE = BLUEPRINT_TYPE_REMOTE_PROCEDURE

    def __init__(
        self,
        task_run: "TaskRun",
        args: "DictConfig",
        shared_state: "SharedRemoteProcedureTaskState",
    ):
        super().__init__(task_run, args, shared_state)
        self._initialization_data_dicts: Iterable[Dict[str, Any]] = []
        blue_args = args.blueprint
        if blue_args.get("data_csv", None) is not None:
            csv_file = os.path.expanduser(blue_args.data_csv)
            with open(csv_file, "r", encoding="utf-8-sig") as csv_fp:
                csv_reader = csv.reader(csv_fp)
                headers = next(csv_reader)
                for row in csv_reader:
                    row_data = {}
                    for i, col in enumerate(row):
                        row_data[headers[i]] = col
                    self._initialization_data_dicts.append(row_data)
        elif blue_args.get("data_json", None) is not None:
            json_file = os.path.expanduser(blue_args.data_json)
            with open(json_file, "r", encoding="utf-8-sig") as json_fp:
                json_data = json.load(json_fp)
            for jd in json_data:
                self._initialization_data_dicts.append(jd)
        elif blue_args.get("data_jsonl", None) is not None:
            jsonl_file = os.path.expanduser(blue_args.data_jsonl)
            with open(jsonl_file, "r", encoding="utf-8-sig") as jsonl_fp:
                line = jsonl_fp.readline()
                while line:
                    j = json.loads(line)
                    self._initialization_data_dicts.append(j)
                    line = jsonl_fp.readline()
        elif shared_state.static_task_data is not None:
            self._initialization_data_dicts = shared_state.static_task_data
        else:
            # instantiating a version of the blueprint, but not necessarily needing the data
            pass

    @classmethod
    def assert_task_args(
        cls, args: "DictConfig", shared_state: "SharedTaskState"
    ) -> None:
        """Ensure that arguments are properly configured to launch this task"""
        assert isinstance(
            shared_state, SharedRemoteProcedureTaskState
        ), "Must use SharedTaskState with RemoteProcedureBlueprint"
        blue_args = args.blueprint
        if blue_args.get("data_csv", None) is not None:
            csv_file = os.path.expanduser(blue_args.data_csv)
            assert os.path.exists(
                csv_file
            ), f"Provided csv file {csv_file} doesn't exist"
        elif blue_args.get("data_json", None) is not None:
            json_file = os.path.expanduser(blue_args.data_json)
            assert os.path.exists(
                json_file
            ), f"Provided JSON file {json_file} doesn't exist"
        elif blue_args.get("data_jsonl", None) is not None:
            jsonl_file = os.path.expanduser(blue_args.data_jsonl)
            assert os.path.exists(
                jsonl_file
            ), f"Provided JSON-L file {jsonl_file} doesn't exist"
        elif shared_state.static_task_data is not None:
            if isinstance(shared_state.static_task_data, types.GeneratorType):
                # TODO can we check something about this?
                pass
            else:
                assert (
                    len([x for x in shared_state.static_task_data]) > 0
                ), "Length of data dict provided was 0"
        else:
            raise AssertionError(
                "Must provide one of a data csv, json, json-L, or a list of tasks"
            )
        assert shared_state.function_registry is not None, (
            "Must provide a valid function registry to use with the task, a mapping "
            "of function names to functions that take as input a string and an agent "
            "and return a string. "
        )

    def get_initialization_data(self) -> Iterable["InitializationData"]:
        """
        Return the InitializationData retrieved from the specified stream
        """
        if isinstance(self._initialization_data_dicts, types.GeneratorType):

            def data_generator() -> Iterable["InitializationData"]:
                for item in self._initialization_data_dicts:
                    yield InitializationData(
                        shared=item,
                        unit_data=[{}] * self.args.blueprint.units_per_assignment,
                    )

            return data_generator()
        else:
            return [
                InitializationData(
                    shared=d, unit_data=[{}] * self.args.blueprint.units_per_assignment
                )
                for d in self._initialization_data_dicts
            ]

Blueprint for a task that runs a parlai chat

View Source
    def __init__(
        self,
        task_run: "TaskRun",
        args: "DictConfig",
        shared_state: "SharedRemoteProcedureTaskState",
    ):
        super().__init__(task_run, args, shared_state)
        self._initialization_data_dicts: Iterable[Dict[str, Any]] = []
        blue_args = args.blueprint
        if blue_args.get("data_csv", None) is not None:
            csv_file = os.path.expanduser(blue_args.data_csv)
            with open(csv_file, "r", encoding="utf-8-sig") as csv_fp:
                csv_reader = csv.reader(csv_fp)
                headers = next(csv_reader)
                for row in csv_reader:
                    row_data = {}
                    for i, col in enumerate(row):
                        row_data[headers[i]] = col
                    self._initialization_data_dicts.append(row_data)
        elif blue_args.get("data_json", None) is not None:
            json_file = os.path.expanduser(blue_args.data_json)
            with open(json_file, "r", encoding="utf-8-sig") as json_fp:
                json_data = json.load(json_fp)
            for jd in json_data:
                self._initialization_data_dicts.append(jd)
        elif blue_args.get("data_jsonl", None) is not None:
            jsonl_file = os.path.expanduser(blue_args.data_jsonl)
            with open(jsonl_file, "r", encoding="utf-8-sig") as jsonl_fp:
                line = jsonl_fp.readline()
                while line:
                    j = json.loads(line)
                    self._initialization_data_dicts.append(j)
                    line = jsonl_fp.readline()
        elif shared_state.static_task_data is not None:
            self._initialization_data_dicts = shared_state.static_task_data
        else:
            # instantiating a version of the blueprint, but not necessarily needing the data
            pass
#   BLUEPRINT_TYPE: str = 'remote_procedure'
#  
@classmethod
def assert_task_args( cls, args: omegaconf.dictconfig.DictConfig, shared_state: mephisto.abstractions.blueprint.SharedTaskState ) -> None:
View Source
    @classmethod
    def assert_task_args(
        cls, args: "DictConfig", shared_state: "SharedTaskState"
    ) -> None:
        """Ensure that arguments are properly configured to launch this task"""
        assert isinstance(
            shared_state, SharedRemoteProcedureTaskState
        ), "Must use SharedTaskState with RemoteProcedureBlueprint"
        blue_args = args.blueprint
        if blue_args.get("data_csv", None) is not None:
            csv_file = os.path.expanduser(blue_args.data_csv)
            assert os.path.exists(
                csv_file
            ), f"Provided csv file {csv_file} doesn't exist"
        elif blue_args.get("data_json", None) is not None:
            json_file = os.path.expanduser(blue_args.data_json)
            assert os.path.exists(
                json_file
            ), f"Provided JSON file {json_file} doesn't exist"
        elif blue_args.get("data_jsonl", None) is not None:
            jsonl_file = os.path.expanduser(blue_args.data_jsonl)
            assert os.path.exists(
                jsonl_file
            ), f"Provided JSON-L file {jsonl_file} doesn't exist"
        elif shared_state.static_task_data is not None:
            if isinstance(shared_state.static_task_data, types.GeneratorType):
                # TODO can we check something about this?
                pass
            else:
                assert (
                    len([x for x in shared_state.static_task_data]) > 0
                ), "Length of data dict provided was 0"
        else:
            raise AssertionError(
                "Must provide one of a data csv, json, json-L, or a list of tasks"
            )
        assert shared_state.function_registry is not None, (
            "Must provide a valid function registry to use with the task, a mapping "
            "of function names to functions that take as input a string and an agent "
            "and return a string. "
        )

Ensure that arguments are properly configured to launch this task

#   def get_initialization_data( self ) -> collections.abc.Iterable[mephisto.data_model.assignment.InitializationData]:
View Source
    def get_initialization_data(self) -> Iterable["InitializationData"]:
        """
        Return the InitializationData retrieved from the specified stream
        """
        if isinstance(self._initialization_data_dicts, types.GeneratorType):

            def data_generator() -> Iterable["InitializationData"]:
                for item in self._initialization_data_dicts:
                    yield InitializationData(
                        shared=item,
                        unit_data=[{}] * self.args.blueprint.units_per_assignment,
                    )

            return data_generator()
        else:
            return [
                InitializationData(
                    shared=d, unit_data=[{}] * self.args.blueprint.units_per_assignment
                )
                for d in self._initialization_data_dicts
            ]

Return the InitializationData retrieved from the specified stream

#   class RemoteProcedureBlueprint.AgentStateClass(mephisto.abstractions._subcomponents.agent_state.AgentState):
View Source
class RemoteProcedureAgentState(AgentState):
    """
    Holds information about tasks with live interactions in a remote query model.
    """

    def __init__(self, agent: "Agent"):
        """
        Create an agent state that keeps track of incoming actions from the frontend client
        Initialize with an existing file if it exists.
        """
        self.agent = weakref.proxy(agent)
        data_file = self._get_expected_data_file()
        if os.path.exists(data_file):
            self.load_data()
        else:
            self.requests: Dict[str, RemoteRequest] = {}
            self.start_time = time.time()
            self.end_time = -1
            self.init_data: Optional[Dict[str, Any]] = None
            self.final_submission: Optional[Dict[str, Any]] = None
            self.save_data()

    def set_init_state(self, data: Any) -> bool:
        """Set the initial state for this agent"""
        if self.init_data is not None:
            # Initial state is already set
            return False
        else:
            self.init_data = data
            self.save_data()
            return True

    def get_init_state(self) -> Optional[Dict[str, Any]]:
        """
        Return the initial state for this agent,
        None if no such state exists
        """
        if self.init_data is None:
            return None
        prev_requests = []
        if len(self.requests) > 0:
            requests = self.requests.values()
            sorted_requests = sorted(requests, key=lambda x: x.timestamp)
            prev_requests = [r.to_dict() for r in sorted_requests]
        return {"task_data": self.init_data, "previous_requests": prev_requests}

    def _get_expected_data_file(self) -> str:
        """Return the place we would expect to find data for this agent state"""
        agent_dir = self.agent.get_data_dir()
        os.makedirs(agent_dir, exist_ok=True)
        return os.path.join(agent_dir, "state.json")

    def load_data(self) -> None:
        """Load stored data from a file to this object"""
        agent_file = self._get_expected_data_file()
        with open(agent_file, "r") as state_json:
            state = json.load(state_json)
            self.requests = {x["uuid"]: x for x in state["requests"]}
            self.init_data = state["init_data"]
            self.outputs = state["final_submission"]

    def get_data(self) -> Dict[str, Any]:
        """Return dict with the messages of this agent"""
        return {
            "final_submission": self.final_submission,
            "init_data": self.init_data,
            "requests": [r.to_dict() for r in self.requests.values()],
            "start_time": self.start_time,
            "end_time": self.end_time,
        }

    def get_parsed_data(self) -> Dict[str, Any]:
        """Return the formatted content"""
        # TODO implement actually getting this data
        return self.get_data()

    def get_task_start(self) -> float:
        """
        Return the start time for this task
        """
        return self.start_time

    def get_task_end(self) -> float:
        """
        Return the end time for this task
        """
        return self.end_time

    def save_data(self) -> None:
        """Save all messages from this agent to"""
        agent_file = self._get_expected_data_file()
        with open(agent_file, "w+") as state_json:
            json.dump(self.get_data(), state_json)

    def update_data(self, live_update: Dict[str, Any]) -> None:
        """
        Append the incoming packet as well as who it came from
        """
        if "handles" in live_update:
            # outgoing
            response_id = str(uuid4())
            response = RemoteRequest(
                uuid=response_id,
                target=live_update["handles"],
                args_json=None,
                response_json=live_update["response"],
                timestamp=time.time(),
            )
            self.requests[response_id] = response
        else:
            # incoming
            request = RemoteRequest(
                uuid=live_update["request_id"],
                target=live_update["target"],
                args_json=live_update["args"],
                response_json=None,
                timestamp=time.time(),
            )
            self.requests[live_update["request_id"]] = request

    def update_submit(self, submitted_data: Dict[str, Any]) -> None:
        """Append any final submission to this state"""
        self.final_submission = submitted_data
        self.save_data()

Holds information about tasks with live interactions in a remote query model.

Inherited Members
mephisto.abstractions.blueprints.remote_procedure.remote_procedure_agent_state.RemoteProcedureAgentState
RemoteProcedureAgentState
set_init_state
get_init_state
load_data
get_data
get_parsed_data
get_task_start
get_task_end
save_data
update_data
update_submit
mephisto.abstractions._subcomponents.agent_state.AgentState
STATUS_NONE
STATUS_ACCEPTED
STATUS_ONBOARDING
STATUS_WAITING
STATUS_IN_TASK
STATUS_COMPLETED
STATUS_DISCONNECT
STATUS_TIMEOUT
STATUS_PARTNER_DISCONNECT
STATUS_EXPIRED
STATUS_RETURNED
STATUS_APPROVED
STATUS_SOFT_REJECTED
STATUS_REJECTED
complete
valid
#   class RemoteProcedureBlueprint.OnboardingAgentStateClass(mephisto.abstractions._subcomponents.agent_state.AgentState):
View Source
class RemoteProcedureAgentState(AgentState):
    """
    Holds information about tasks with live interactions in a remote query model.
    """

    def __init__(self, agent: "Agent"):
        """
        Create an agent state that keeps track of incoming actions from the frontend client
        Initialize with an existing file if it exists.
        """
        self.agent = weakref.proxy(agent)
        data_file = self._get_expected_data_file()
        if os.path.exists(data_file):
            self.load_data()
        else:
            self.requests: Dict[str, RemoteRequest] = {}
            self.start_time = time.time()
            self.end_time = -1
            self.init_data: Optional[Dict[str, Any]] = None
            self.final_submission: Optional[Dict[str, Any]] = None
            self.save_data()

    def set_init_state(self, data: Any) -> bool:
        """Set the initial state for this agent"""
        if self.init_data is not None:
            # Initial state is already set
            return False
        else:
            self.init_data = data
            self.save_data()
            return True

    def get_init_state(self) -> Optional[Dict[str, Any]]:
        """
        Return the initial state for this agent,
        None if no such state exists
        """
        if self.init_data is None:
            return None
        prev_requests = []
        if len(self.requests) > 0:
            requests = self.requests.values()
            sorted_requests = sorted(requests, key=lambda x: x.timestamp)
            prev_requests = [r.to_dict() for r in sorted_requests]
        return {"task_data": self.init_data, "previous_requests": prev_requests}

    def _get_expected_data_file(self) -> str:
        """Return the place we would expect to find data for this agent state"""
        agent_dir = self.agent.get_data_dir()
        os.makedirs(agent_dir, exist_ok=True)
        return os.path.join(agent_dir, "state.json")

    def load_data(self) -> None:
        """Load stored data from a file to this object"""
        agent_file = self._get_expected_data_file()
        with open(agent_file, "r") as state_json:
            state = json.load(state_json)
            self.requests = {x["uuid"]: x for x in state["requests"]}
            self.init_data = state["init_data"]
            self.outputs = state["final_submission"]

    def get_data(self) -> Dict[str, Any]:
        """Return dict with the messages of this agent"""
        return {
            "final_submission": self.final_submission,
            "init_data": self.init_data,
            "requests": [r.to_dict() for r in self.requests.values()],
            "start_time": self.start_time,
            "end_time": self.end_time,
        }

    def get_parsed_data(self) -> Dict[str, Any]:
        """Return the formatted content"""
        # TODO implement actually getting this data
        return self.get_data()

    def get_task_start(self) -> float:
        """
        Return the start time for this task
        """
        return self.start_time

    def get_task_end(self) -> float:
        """
        Return the end time for this task
        """
        return self.end_time

    def save_data(self) -> None:
        """Save all messages from this agent to"""
        agent_file = self._get_expected_data_file()
        with open(agent_file, "w+") as state_json:
            json.dump(self.get_data(), state_json)

    def update_data(self, live_update: Dict[str, Any]) -> None:
        """
        Append the incoming packet as well as who it came from
        """
        if "handles" in live_update:
            # outgoing
            response_id = str(uuid4())
            response = RemoteRequest(
                uuid=response_id,
                target=live_update["handles"],
                args_json=None,
                response_json=live_update["response"],
                timestamp=time.time(),
            )
            self.requests[response_id] = response
        else:
            # incoming
            request = RemoteRequest(
                uuid=live_update["request_id"],
                target=live_update["target"],
                args_json=live_update["args"],
                response_json=None,
                timestamp=time.time(),
            )
            self.requests[live_update["request_id"]] = request

    def update_submit(self, submitted_data: Dict[str, Any]) -> None:
        """Append any final submission to this state"""
        self.final_submission = submitted_data
        self.save_data()

Holds information about tasks with live interactions in a remote query model.

Inherited Members
mephisto.abstractions.blueprints.remote_procedure.remote_procedure_agent_state.RemoteProcedureAgentState
RemoteProcedureAgentState
set_init_state
get_init_state
load_data
get_data
get_parsed_data
get_task_start
get_task_end
save_data
update_data
update_submit
mephisto.abstractions._subcomponents.agent_state.AgentState
STATUS_NONE
STATUS_ACCEPTED
STATUS_ONBOARDING
STATUS_WAITING
STATUS_IN_TASK
STATUS_COMPLETED
STATUS_DISCONNECT
STATUS_TIMEOUT
STATUS_PARTNER_DISCONNECT
STATUS_EXPIRED
STATUS_RETURNED
STATUS_APPROVED
STATUS_SOFT_REJECTED
STATUS_REJECTED
complete
valid
View Source
class RemoteProcedureTaskBuilder(StaticReactTaskBuilder):
    """
    Builder for a "static task" that has access to remote queries.
    At the moment, simply a StaticReactTaskBuilder, as we will be using static react tasks
    in the same way.
    """

    pass

Builder for a "static task" that has access to remote queries. At the moment, simply a StaticReactTaskBuilder, as we will be using static react tasks in the same way.

Inherited Members
mephisto.abstractions._subcomponents.task_builder.TaskBuilder
TaskBuilder
mephisto.abstractions.blueprints.static_react_task.static_react_task_builder.StaticReactTaskBuilder
BUILT_FILE
BUILT_MESSAGE
build_in_dir
#   class RemoteProcedureBlueprint.TaskRunnerClass(mephisto.abstractions._subcomponents.task_runner.TaskRunner):
View Source
class RemoteProcedureTaskRunner(TaskRunner):
    """
    Task runner for a task with live remote queries on the local machine
    # TODO this is pretty gross, and would be better as a series of worker
    # threads that handle commands, as the functions all have direct access
    # to the full worker state.
    """

    def __init__(
        self,
        task_run: "TaskRun",
        args: "DictConfig",
        shared_state: "SharedRemoteProcedureTaskState",
    ):
        super().__init__(task_run, args, shared_state)
        # TODO load up the possible functions from the shared_state
        self.is_concurrent = False  # This task is 1 person w/ backend
        self.function_registry = shared_state.function_registry

    def get_init_data_for_agent(self, agent: "Agent") -> Dict[str, Any]:
        """
        Return the data for an agent already assigned to a particular unit
        """
        init_state = agent.state.get_init_state()
        if init_state is not None:
            # reconnecting agent, give what we've got
            return init_state
        else:
            assignment = agent.get_unit().get_assignment()
            assignment_data = self.get_data_for_assignment(assignment)
            agent.state.set_init_state(assignment_data.shared)
            new_state = agent.state.get_init_state()
            assert new_state is not None, "Recently initialized state still None"
            return new_state

    def _agent_in_onboarding_or_live(
        self, agent: Union["Agent", "OnboardingAgent"]
    ) -> bool:
        """Determine if an agent server should still be maintained"""
        return (
            agent.get_agent_id() in self.running_units
            or agent.get_agent_id() in self.running_onboardings
        )

    def _run_server_timestep_for_agent(self, agent: Union["Agent", "OnboardingAgent"]):
        """
        Both onboarding and regular tasks have access to the server for remote
        queries
        """
        live_update = agent.get_live_update()
        if live_update is not None and "request_id" in live_update:
            request_id = live_update["request_id"]
            # Execute commands that come in from the frontend
            # TODO extend scope to handle yield-style functions, and
            # move these to async tasks
            assert (
                self.function_registry is not None
                and live_update["target"] in self.function_registry
            ), f"Target function {live_update['target']} not found in registry: {self.function_registry}"
            state = agent.state
            assert isinstance(
                state, RemoteProcedureAgentState
            ), "Must use an agent with RemoteProcedureAgentState"
            res = self.function_registry[live_update["target"]](
                request_id, json.loads(live_update["args"]), state
            )

            agent.observe(
                {
                    "handles": request_id,
                    "response": json.dumps(res),
                }
            )

        # sleep to avoid tight loop
        time.sleep(THREAD_SHORT_SLEEP)

    def run_onboarding(self, agent: "OnboardingAgent") -> None:
        """
        Running onboarding with access to remote queries
        """
        # Run the server while the task isn't submitted yet
        while (
            not agent.await_submit(timeout=None)
            and agent.get_agent_id() in self.running_onboardings
        ):
            self._run_server_timestep_for_agent(agent)

        while not agent.await_submit(timeout=None):
            time.sleep(0.3)

    def cleanup_onboarding(self, agent: "OnboardingAgent") -> None:
        """Shutdown onboarding resources"""
        pass

    def run_unit(self, unit: "Unit", agent: "Agent") -> None:
        """
        Running a task with access to remote queries
        """
        while not agent.await_submit(timeout=None) and unit.db_id in self.running_units:
            self._run_server_timestep_for_agent(agent)

        while not agent.await_submit(timeout=None):
            time.sleep(0.3)

    def cleanup_unit(self, unit: "Unit") -> None:
        """Handle cleanup for a specific unit"""
        pass

Task runner for a task with live remote queries on the local machine

TODO this is pretty gross, and would be better as a series of worker

threads that handle commands, as the functions all have direct access

to the full worker state.

Inherited Members
mephisto.abstractions.blueprints.remote_procedure.remote_procedure_task_runner.RemoteProcedureTaskRunner
RemoteProcedureTaskRunner
get_init_data_for_agent
run_onboarding
cleanup_onboarding
run_unit
cleanup_unit
mephisto.abstractions._subcomponents.task_runner.TaskRunner
execute_onboarding
execute_unit
execute_assignment
get_data_for_assignment
filter_units_for_worker
shutdown
run_assignment
cleanup_assignment
View Source
class RemoteProcedureBlueprintArgs(OnboardingRequiredArgs, BlueprintArgs):
    _blueprint_type: str = BLUEPRINT_TYPE_REMOTE_PROCEDURE
    _group: str = field(
        default="RemoteProcedureBlueprintArgs",
        metadata={
            "help": """
                Tasks launched from remote query blueprints need a
                source html file to display to workers, as well as a csv
                containing values that will be inserted into templates in
                the html.
            """
        },
    )
    task_source: str = field(
        default=MISSING,
        metadata={
            "help": "Path to file containing javascript bundle for the task",
            "required": True,
        },
    )
    link_task_source: bool = field(
        default=False,
        metadata={
            "help": """
                Symlinks the task_source file in your development folder to the
                one used for the server. Useful for local development so you can run
                a watch-based build for your task_source, allowing the UI code to
                update without having to restart the server each time.
            """,
            "required": False,
        },
    )
    units_per_assignment: int = field(
        default=1, metadata={"help": "How many workers you want to do each assignment"}
    )

RemoteProcedureBlueprintArgs(_blueprint_type: str = 'remote_procedure', block_qualification: str = '???', onboarding_qualification: str = '???', _group: str = 'RemoteProcedureBlueprintArgs', task_source: str = '???', link_task_source: bool = False, units_per_assignment: int = 1)

View Source
class SharedRemoteProcedureTaskState(OnboardingSharedState, SharedTaskState):
    function_registry: Optional[
        Mapping[
            str,
            Callable[
                [str, Dict[str, Any], "RemoteProcedureAgentState"],
                Optional[Dict[str, Any]],
            ],
        ]
    ] = None
    static_task_data: Iterable[Any] = field(default_factory=list)

SharedRemoteProcedureTaskState(task_config: Dict[str, Any] = , qualifications: List[Any] = , worker_can_do_unit: Callable[[ForwardRef('Worker'), ForwardRef('Unit')], bool] = , on_unit_submitted: Callable[[ForwardRef('Unit')], NoneType] = , onboarding_data: Dict[str, Any] = , validate_onboarding: Callable[[Any], bool] = , function_registry: Union[Mapping[str, Callable[[str, Dict[str, Any], ForwardRef('RemoteProcedureAgentState')], Union[Dict[str, Any], NoneType]]], NoneType] = None, static_task_data: Iterable[Any] = )