mephisto.abstractions.blueprints.remote_procedure.remote_procedure_blueprint
View Source
#!/usr/bin/env python3 # Copyright (c) Meta Platforms and its affiliates. # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. from mephisto.abstractions.blueprint import ( Blueprint, BlueprintArgs, SharedTaskState, ) from mephisto.abstractions.blueprints.mixins.onboarding_required import ( OnboardingRequired, OnboardingSharedState, OnboardingRequiredArgs, ) from dataclasses import dataclass, field from mephisto.data_model.assignment import InitializationData from mephisto.abstractions.blueprints.remote_procedure.remote_procedure_agent_state import ( RemoteProcedureAgentState, ) from mephisto.abstractions.blueprints.remote_procedure.remote_procedure_task_runner import ( RemoteProcedureTaskRunner, ) from mephisto.abstractions.blueprints.remote_procedure.remote_procedure_task_builder import ( RemoteProcedureTaskBuilder, ) from mephisto.operations.registry import register_mephisto_abstraction from omegaconf import DictConfig, MISSING import os import time import csv import sys import json import types from importlib import import_module from typing import ( ClassVar, Callable, List, Type, Any, Dict, Iterable, Optional, Mapping, TYPE_CHECKING, ) if TYPE_CHECKING: from mephisto.data_model.worker import Worker from mephisto.data_model.agent import Agent, OnboardingAgent from mephisto.data_model.task_run import TaskRun from mephisto.abstractions.blueprint import AgentState, TaskRunner, TaskBuilder from mephisto.data_model.unit import Unit BLUEPRINT_TYPE_REMOTE_PROCEDURE = "remote_procedure" @dataclass class SharedRemoteProcedureTaskState(OnboardingSharedState, SharedTaskState): function_registry: Optional[ Mapping[ str, Callable[ [str, Dict[str, Any], "RemoteProcedureAgentState"], Optional[Dict[str, Any]], ], ] ] = None static_task_data: Iterable[Any] = field(default_factory=list) @dataclass class RemoteProcedureBlueprintArgs(OnboardingRequiredArgs, BlueprintArgs): _blueprint_type: str = BLUEPRINT_TYPE_REMOTE_PROCEDURE _group: str = field( default="RemoteProcedureBlueprintArgs", metadata={ "help": """ Tasks launched from remote query blueprints need a source html file to display to workers, as well as a csv containing values that will be inserted into templates in the html. """ }, ) task_source: str = field( default=MISSING, metadata={ "help": "Path to file containing javascript bundle for the task", "required": True, }, ) link_task_source: bool = field( default=False, metadata={ "help": """ Symlinks the task_source file in your development folder to the one used for the server. Useful for local development so you can run a watch-based build for your task_source, allowing the UI code to update without having to restart the server each time. """, "required": False, }, ) units_per_assignment: int = field( default=1, metadata={"help": "How many workers you want to do each assignment"} ) @register_mephisto_abstraction() class RemoteProcedureBlueprint(OnboardingRequired, Blueprint): """Blueprint for a task that runs a parlai chat""" AgentStateClass: ClassVar[Type["AgentState"]] = RemoteProcedureAgentState OnboardingAgentStateClass: ClassVar[Type["AgentState"]] = RemoteProcedureAgentState TaskBuilderClass: ClassVar[Type["TaskBuilder"]] = RemoteProcedureTaskBuilder TaskRunnerClass: ClassVar[Type["TaskRunner"]] = RemoteProcedureTaskRunner ArgsClass = RemoteProcedureBlueprintArgs SharedStateClass = SharedRemoteProcedureTaskState BLUEPRINT_TYPE = BLUEPRINT_TYPE_REMOTE_PROCEDURE def __init__( self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedRemoteProcedureTaskState", ): super().__init__(task_run, args, shared_state) self._initialization_data_dicts: Iterable[Dict[str, Any]] = [] blue_args = args.blueprint if blue_args.get("data_csv", None) is not None: csv_file = os.path.expanduser(blue_args.data_csv) with open(csv_file, "r", encoding="utf-8-sig") as csv_fp: csv_reader = csv.reader(csv_fp) headers = next(csv_reader) for row in csv_reader: row_data = {} for i, col in enumerate(row): row_data[headers[i]] = col self._initialization_data_dicts.append(row_data) elif blue_args.get("data_json", None) is not None: json_file = os.path.expanduser(blue_args.data_json) with open(json_file, "r", encoding="utf-8-sig") as json_fp: json_data = json.load(json_fp) for jd in json_data: self._initialization_data_dicts.append(jd) elif blue_args.get("data_jsonl", None) is not None: jsonl_file = os.path.expanduser(blue_args.data_jsonl) with open(jsonl_file, "r", encoding="utf-8-sig") as jsonl_fp: line = jsonl_fp.readline() while line: j = json.loads(line) self._initialization_data_dicts.append(j) line = jsonl_fp.readline() elif shared_state.static_task_data is not None: self._initialization_data_dicts = shared_state.static_task_data else: # instantiating a version of the blueprint, but not necessarily needing the data pass @classmethod def assert_task_args( cls, args: "DictConfig", shared_state: "SharedTaskState" ) -> None: """Ensure that arguments are properly configured to launch this task""" assert isinstance( shared_state, SharedRemoteProcedureTaskState ), "Must use SharedTaskState with RemoteProcedureBlueprint" blue_args = args.blueprint if blue_args.get("data_csv", None) is not None: csv_file = os.path.expanduser(blue_args.data_csv) assert os.path.exists( csv_file ), f"Provided csv file {csv_file} doesn't exist" elif blue_args.get("data_json", None) is not None: json_file = os.path.expanduser(blue_args.data_json) assert os.path.exists( json_file ), f"Provided JSON file {json_file} doesn't exist" elif blue_args.get("data_jsonl", None) is not None: jsonl_file = os.path.expanduser(blue_args.data_jsonl) assert os.path.exists( jsonl_file ), f"Provided JSON-L file {jsonl_file} doesn't exist" elif shared_state.static_task_data is not None: if isinstance(shared_state.static_task_data, types.GeneratorType): # TODO can we check something about this? pass else: assert ( len([x for x in shared_state.static_task_data]) > 0 ), "Length of data dict provided was 0" else: raise AssertionError( "Must provide one of a data csv, json, json-L, or a list of tasks" ) assert shared_state.function_registry is not None, ( "Must provide a valid function registry to use with the task, a mapping " "of function names to functions that take as input a string and an agent " "and return a string. " ) def get_initialization_data(self) -> Iterable["InitializationData"]: """ Return the InitializationData retrieved from the specified stream """ if isinstance(self._initialization_data_dicts, types.GeneratorType): def data_generator() -> Iterable["InitializationData"]: for item in self._initialization_data_dicts: yield InitializationData( shared=item, unit_data=[{}] * self.args.blueprint.units_per_assignment, ) return data_generator() else: return [ InitializationData( shared=d, unit_data=[{}] * self.args.blueprint.units_per_assignment ) for d in self._initialization_data_dicts ]
View Source
class RemoteProcedureBlueprintArgs(OnboardingRequiredArgs, BlueprintArgs): _blueprint_type: str = BLUEPRINT_TYPE_REMOTE_PROCEDURE _group: str = field( default="RemoteProcedureBlueprintArgs", metadata={ "help": """ Tasks launched from remote query blueprints need a source html file to display to workers, as well as a csv containing values that will be inserted into templates in the html. """ }, ) task_source: str = field( default=MISSING, metadata={ "help": "Path to file containing javascript bundle for the task", "required": True, }, ) link_task_source: bool = field( default=False, metadata={ "help": """ Symlinks the task_source file in your development folder to the one used for the server. Useful for local development so you can run a watch-based build for your task_source, allowing the UI code to update without having to restart the server each time. """, "required": False, }, ) units_per_assignment: int = field( default=1, metadata={"help": "How many workers you want to do each assignment"} )
RemoteProcedureBlueprintArgs(_blueprint_type: str = 'remote_procedure', block_qualification: str = '???', onboarding_qualification: str = '???', _group: str = 'RemoteProcedureBlueprintArgs', task_source: str = '???', link_task_source: bool = False, units_per_assignment: int = 1)
View Source
class RemoteProcedureBlueprint(OnboardingRequired, Blueprint): """Blueprint for a task that runs a parlai chat""" AgentStateClass: ClassVar[Type["AgentState"]] = RemoteProcedureAgentState OnboardingAgentStateClass: ClassVar[Type["AgentState"]] = RemoteProcedureAgentState TaskBuilderClass: ClassVar[Type["TaskBuilder"]] = RemoteProcedureTaskBuilder TaskRunnerClass: ClassVar[Type["TaskRunner"]] = RemoteProcedureTaskRunner ArgsClass = RemoteProcedureBlueprintArgs SharedStateClass = SharedRemoteProcedureTaskState BLUEPRINT_TYPE = BLUEPRINT_TYPE_REMOTE_PROCEDURE def __init__( self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedRemoteProcedureTaskState", ): super().__init__(task_run, args, shared_state) self._initialization_data_dicts: Iterable[Dict[str, Any]] = [] blue_args = args.blueprint if blue_args.get("data_csv", None) is not None: csv_file = os.path.expanduser(blue_args.data_csv) with open(csv_file, "r", encoding="utf-8-sig") as csv_fp: csv_reader = csv.reader(csv_fp) headers = next(csv_reader) for row in csv_reader: row_data = {} for i, col in enumerate(row): row_data[headers[i]] = col self._initialization_data_dicts.append(row_data) elif blue_args.get("data_json", None) is not None: json_file = os.path.expanduser(blue_args.data_json) with open(json_file, "r", encoding="utf-8-sig") as json_fp: json_data = json.load(json_fp) for jd in json_data: self._initialization_data_dicts.append(jd) elif blue_args.get("data_jsonl", None) is not None: jsonl_file = os.path.expanduser(blue_args.data_jsonl) with open(jsonl_file, "r", encoding="utf-8-sig") as jsonl_fp: line = jsonl_fp.readline() while line: j = json.loads(line) self._initialization_data_dicts.append(j) line = jsonl_fp.readline() elif shared_state.static_task_data is not None: self._initialization_data_dicts = shared_state.static_task_data else: # instantiating a version of the blueprint, but not necessarily needing the data pass @classmethod def assert_task_args( cls, args: "DictConfig", shared_state: "SharedTaskState" ) -> None: """Ensure that arguments are properly configured to launch this task""" assert isinstance( shared_state, SharedRemoteProcedureTaskState ), "Must use SharedTaskState with RemoteProcedureBlueprint" blue_args = args.blueprint if blue_args.get("data_csv", None) is not None: csv_file = os.path.expanduser(blue_args.data_csv) assert os.path.exists( csv_file ), f"Provided csv file {csv_file} doesn't exist" elif blue_args.get("data_json", None) is not None: json_file = os.path.expanduser(blue_args.data_json) assert os.path.exists( json_file ), f"Provided JSON file {json_file} doesn't exist" elif blue_args.get("data_jsonl", None) is not None: jsonl_file = os.path.expanduser(blue_args.data_jsonl) assert os.path.exists( jsonl_file ), f"Provided JSON-L file {jsonl_file} doesn't exist" elif shared_state.static_task_data is not None: if isinstance(shared_state.static_task_data, types.GeneratorType): # TODO can we check something about this? pass else: assert ( len([x for x in shared_state.static_task_data]) > 0 ), "Length of data dict provided was 0" else: raise AssertionError( "Must provide one of a data csv, json, json-L, or a list of tasks" ) assert shared_state.function_registry is not None, ( "Must provide a valid function registry to use with the task, a mapping " "of function names to functions that take as input a string and an agent " "and return a string. " ) def get_initialization_data(self) -> Iterable["InitializationData"]: """ Return the InitializationData retrieved from the specified stream """ if isinstance(self._initialization_data_dicts, types.GeneratorType): def data_generator() -> Iterable["InitializationData"]: for item in self._initialization_data_dicts: yield InitializationData( shared=item, unit_data=[{}] * self.args.blueprint.units_per_assignment, ) return data_generator() else: return [ InitializationData( shared=d, unit_data=[{}] * self.args.blueprint.units_per_assignment ) for d in self._initialization_data_dicts ]
Blueprint for a task that runs a parlai chat
View Source
def __init__( self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedRemoteProcedureTaskState", ): super().__init__(task_run, args, shared_state) self._initialization_data_dicts: Iterable[Dict[str, Any]] = [] blue_args = args.blueprint if blue_args.get("data_csv", None) is not None: csv_file = os.path.expanduser(blue_args.data_csv) with open(csv_file, "r", encoding="utf-8-sig") as csv_fp: csv_reader = csv.reader(csv_fp) headers = next(csv_reader) for row in csv_reader: row_data = {} for i, col in enumerate(row): row_data[headers[i]] = col self._initialization_data_dicts.append(row_data) elif blue_args.get("data_json", None) is not None: json_file = os.path.expanduser(blue_args.data_json) with open(json_file, "r", encoding="utf-8-sig") as json_fp: json_data = json.load(json_fp) for jd in json_data: self._initialization_data_dicts.append(jd) elif blue_args.get("data_jsonl", None) is not None: jsonl_file = os.path.expanduser(blue_args.data_jsonl) with open(jsonl_file, "r", encoding="utf-8-sig") as jsonl_fp: line = jsonl_fp.readline() while line: j = json.loads(line) self._initialization_data_dicts.append(j) line = jsonl_fp.readline() elif shared_state.static_task_data is not None: self._initialization_data_dicts = shared_state.static_task_data else: # instantiating a version of the blueprint, but not necessarily needing the data pass
View Source
@classmethod def assert_task_args( cls, args: "DictConfig", shared_state: "SharedTaskState" ) -> None: """Ensure that arguments are properly configured to launch this task""" assert isinstance( shared_state, SharedRemoteProcedureTaskState ), "Must use SharedTaskState with RemoteProcedureBlueprint" blue_args = args.blueprint if blue_args.get("data_csv", None) is not None: csv_file = os.path.expanduser(blue_args.data_csv) assert os.path.exists( csv_file ), f"Provided csv file {csv_file} doesn't exist" elif blue_args.get("data_json", None) is not None: json_file = os.path.expanduser(blue_args.data_json) assert os.path.exists( json_file ), f"Provided JSON file {json_file} doesn't exist" elif blue_args.get("data_jsonl", None) is not None: jsonl_file = os.path.expanduser(blue_args.data_jsonl) assert os.path.exists( jsonl_file ), f"Provided JSON-L file {jsonl_file} doesn't exist" elif shared_state.static_task_data is not None: if isinstance(shared_state.static_task_data, types.GeneratorType): # TODO can we check something about this? pass else: assert ( len([x for x in shared_state.static_task_data]) > 0 ), "Length of data dict provided was 0" else: raise AssertionError( "Must provide one of a data csv, json, json-L, or a list of tasks" ) assert shared_state.function_registry is not None, ( "Must provide a valid function registry to use with the task, a mapping " "of function names to functions that take as input a string and an agent " "and return a string. " )
Ensure that arguments are properly configured to launch this task
View Source
def get_initialization_data(self) -> Iterable["InitializationData"]: """ Return the InitializationData retrieved from the specified stream """ if isinstance(self._initialization_data_dicts, types.GeneratorType): def data_generator() -> Iterable["InitializationData"]: for item in self._initialization_data_dicts: yield InitializationData( shared=item, unit_data=[{}] * self.args.blueprint.units_per_assignment, ) return data_generator() else: return [ InitializationData( shared=d, unit_data=[{}] * self.args.blueprint.units_per_assignment ) for d in self._initialization_data_dicts ]
Return the InitializationData retrieved from the specified stream
Inherited Members
View Source
class RemoteProcedureAgentState(AgentState): """ Holds information about tasks with live interactions in a remote query model. """ def __init__(self, agent: "Agent"): """ Create an agent state that keeps track of incoming actions from the frontend client Initialize with an existing file if it exists. """ self.agent = weakref.proxy(agent) data_file = self._get_expected_data_file() if os.path.exists(data_file): self.load_data() else: self.requests: Dict[str, RemoteRequest] = {} self.start_time = time.time() self.end_time = -1 self.init_data: Optional[Dict[str, Any]] = None self.final_submission: Optional[Dict[str, Any]] = None self.save_data() def set_init_state(self, data: Any) -> bool: """Set the initial state for this agent""" if self.init_data is not None: # Initial state is already set return False else: self.init_data = data self.save_data() return True def get_init_state(self) -> Optional[Dict[str, Any]]: """ Return the initial state for this agent, None if no such state exists """ if self.init_data is None: return None prev_requests = [] if len(self.requests) > 0: requests = self.requests.values() sorted_requests = sorted(requests, key=lambda x: x.timestamp) prev_requests = [r.to_dict() for r in sorted_requests] return {"task_data": self.init_data, "previous_requests": prev_requests} def _get_expected_data_file(self) -> str: """Return the place we would expect to find data for this agent state""" agent_dir = self.agent.get_data_dir() os.makedirs(agent_dir, exist_ok=True) return os.path.join(agent_dir, "state.json") def load_data(self) -> None: """Load stored data from a file to this object""" agent_file = self._get_expected_data_file() with open(agent_file, "r") as state_json: state = json.load(state_json) self.requests = {x["uuid"]: x for x in state["requests"]} self.init_data = state["init_data"] self.outputs = state["final_submission"] def get_data(self) -> Dict[str, Any]: """Return dict with the messages of this agent""" return { "final_submission": self.final_submission, "init_data": self.init_data, "requests": [r.to_dict() for r in self.requests.values()], "start_time": self.start_time, "end_time": self.end_time, } def get_parsed_data(self) -> Dict[str, Any]: """Return the formatted content""" # TODO implement actually getting this data return self.get_data() def get_task_start(self) -> float: """ Return the start time for this task """ return self.start_time def get_task_end(self) -> float: """ Return the end time for this task """ return self.end_time def save_data(self) -> None: """Save all messages from this agent to""" agent_file = self._get_expected_data_file() with open(agent_file, "w+") as state_json: json.dump(self.get_data(), state_json) def update_data(self, live_update: Dict[str, Any]) -> None: """ Append the incoming packet as well as who it came from """ if "handles" in live_update: # outgoing response_id = str(uuid4()) response = RemoteRequest( uuid=response_id, target=live_update["handles"], args_json=None, response_json=live_update["response"], timestamp=time.time(), ) self.requests[response_id] = response else: # incoming request = RemoteRequest( uuid=live_update["request_id"], target=live_update["target"], args_json=live_update["args"], response_json=None, timestamp=time.time(), ) self.requests[live_update["request_id"]] = request def update_submit(self, submitted_data: Dict[str, Any]) -> None: """Append any final submission to this state""" self.final_submission = submitted_data self.save_data()
Holds information about tasks with live interactions in a remote query model.
Inherited Members
- mephisto.abstractions.blueprints.remote_procedure.remote_procedure_agent_state.RemoteProcedureAgentState
- RemoteProcedureAgentState
- set_init_state
- get_init_state
- load_data
- get_data
- get_parsed_data
- get_task_start
- get_task_end
- save_data
- update_data
- update_submit
- mephisto.abstractions._subcomponents.agent_state.AgentState
- STATUS_NONE
- STATUS_ACCEPTED
- STATUS_ONBOARDING
- STATUS_WAITING
- STATUS_IN_TASK
- STATUS_COMPLETED
- STATUS_DISCONNECT
- STATUS_TIMEOUT
- STATUS_PARTNER_DISCONNECT
- STATUS_EXPIRED
- STATUS_RETURNED
- STATUS_APPROVED
- STATUS_SOFT_REJECTED
- STATUS_REJECTED
- complete
- valid
View Source
class RemoteProcedureAgentState(AgentState): """ Holds information about tasks with live interactions in a remote query model. """ def __init__(self, agent: "Agent"): """ Create an agent state that keeps track of incoming actions from the frontend client Initialize with an existing file if it exists. """ self.agent = weakref.proxy(agent) data_file = self._get_expected_data_file() if os.path.exists(data_file): self.load_data() else: self.requests: Dict[str, RemoteRequest] = {} self.start_time = time.time() self.end_time = -1 self.init_data: Optional[Dict[str, Any]] = None self.final_submission: Optional[Dict[str, Any]] = None self.save_data() def set_init_state(self, data: Any) -> bool: """Set the initial state for this agent""" if self.init_data is not None: # Initial state is already set return False else: self.init_data = data self.save_data() return True def get_init_state(self) -> Optional[Dict[str, Any]]: """ Return the initial state for this agent, None if no such state exists """ if self.init_data is None: return None prev_requests = [] if len(self.requests) > 0: requests = self.requests.values() sorted_requests = sorted(requests, key=lambda x: x.timestamp) prev_requests = [r.to_dict() for r in sorted_requests] return {"task_data": self.init_data, "previous_requests": prev_requests} def _get_expected_data_file(self) -> str: """Return the place we would expect to find data for this agent state""" agent_dir = self.agent.get_data_dir() os.makedirs(agent_dir, exist_ok=True) return os.path.join(agent_dir, "state.json") def load_data(self) -> None: """Load stored data from a file to this object""" agent_file = self._get_expected_data_file() with open(agent_file, "r") as state_json: state = json.load(state_json) self.requests = {x["uuid"]: x for x in state["requests"]} self.init_data = state["init_data"] self.outputs = state["final_submission"] def get_data(self) -> Dict[str, Any]: """Return dict with the messages of this agent""" return { "final_submission": self.final_submission, "init_data": self.init_data, "requests": [r.to_dict() for r in self.requests.values()], "start_time": self.start_time, "end_time": self.end_time, } def get_parsed_data(self) -> Dict[str, Any]: """Return the formatted content""" # TODO implement actually getting this data return self.get_data() def get_task_start(self) -> float: """ Return the start time for this task """ return self.start_time def get_task_end(self) -> float: """ Return the end time for this task """ return self.end_time def save_data(self) -> None: """Save all messages from this agent to""" agent_file = self._get_expected_data_file() with open(agent_file, "w+") as state_json: json.dump(self.get_data(), state_json) def update_data(self, live_update: Dict[str, Any]) -> None: """ Append the incoming packet as well as who it came from """ if "handles" in live_update: # outgoing response_id = str(uuid4()) response = RemoteRequest( uuid=response_id, target=live_update["handles"], args_json=None, response_json=live_update["response"], timestamp=time.time(), ) self.requests[response_id] = response else: # incoming request = RemoteRequest( uuid=live_update["request_id"], target=live_update["target"], args_json=live_update["args"], response_json=None, timestamp=time.time(), ) self.requests[live_update["request_id"]] = request def update_submit(self, submitted_data: Dict[str, Any]) -> None: """Append any final submission to this state""" self.final_submission = submitted_data self.save_data()
Holds information about tasks with live interactions in a remote query model.
Inherited Members
- mephisto.abstractions.blueprints.remote_procedure.remote_procedure_agent_state.RemoteProcedureAgentState
- RemoteProcedureAgentState
- set_init_state
- get_init_state
- load_data
- get_data
- get_parsed_data
- get_task_start
- get_task_end
- save_data
- update_data
- update_submit
- mephisto.abstractions._subcomponents.agent_state.AgentState
- STATUS_NONE
- STATUS_ACCEPTED
- STATUS_ONBOARDING
- STATUS_WAITING
- STATUS_IN_TASK
- STATUS_COMPLETED
- STATUS_DISCONNECT
- STATUS_TIMEOUT
- STATUS_PARTNER_DISCONNECT
- STATUS_EXPIRED
- STATUS_RETURNED
- STATUS_APPROVED
- STATUS_SOFT_REJECTED
- STATUS_REJECTED
- complete
- valid
View Source
class RemoteProcedureTaskBuilder(StaticReactTaskBuilder): """ Builder for a "static task" that has access to remote queries. At the moment, simply a StaticReactTaskBuilder, as we will be using static react tasks in the same way. """ pass
Builder for a "static task" that has access to remote queries. At the moment, simply a StaticReactTaskBuilder, as we will be using static react tasks in the same way.
Inherited Members
- mephisto.abstractions._subcomponents.task_builder.TaskBuilder
- TaskBuilder
View Source
class RemoteProcedureTaskRunner(TaskRunner): """ Task runner for a task with live remote queries on the local machine # TODO this is pretty gross, and would be better as a series of worker # threads that handle commands, as the functions all have direct access # to the full worker state. """ def __init__( self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedRemoteProcedureTaskState", ): super().__init__(task_run, args, shared_state) # TODO load up the possible functions from the shared_state self.is_concurrent = False # This task is 1 person w/ backend self.function_registry = shared_state.function_registry def get_init_data_for_agent(self, agent: "Agent") -> Dict[str, Any]: """ Return the data for an agent already assigned to a particular unit """ init_state = agent.state.get_init_state() if init_state is not None: # reconnecting agent, give what we've got return init_state else: assignment = agent.get_unit().get_assignment() assignment_data = self.get_data_for_assignment(assignment) agent.state.set_init_state(assignment_data.shared) new_state = agent.state.get_init_state() assert new_state is not None, "Recently initialized state still None" return new_state def _agent_in_onboarding_or_live( self, agent: Union["Agent", "OnboardingAgent"] ) -> bool: """Determine if an agent server should still be maintained""" return ( agent.get_agent_id() in self.running_units or agent.get_agent_id() in self.running_onboardings ) def _run_server_timestep_for_agent(self, agent: Union["Agent", "OnboardingAgent"]): """ Both onboarding and regular tasks have access to the server for remote queries """ live_update = agent.get_live_update() if live_update is not None and "request_id" in live_update: request_id = live_update["request_id"] # Execute commands that come in from the frontend # TODO extend scope to handle yield-style functions, and # move these to async tasks assert ( self.function_registry is not None and live_update["target"] in self.function_registry ), f"Target function {live_update['target']} not found in registry: {self.function_registry}" state = agent.state assert isinstance( state, RemoteProcedureAgentState ), "Must use an agent with RemoteProcedureAgentState" res = self.function_registry[live_update["target"]]( request_id, json.loads(live_update["args"]), state ) agent.observe( { "handles": request_id, "response": json.dumps(res), } ) # sleep to avoid tight loop time.sleep(THREAD_SHORT_SLEEP) def run_onboarding(self, agent: "OnboardingAgent") -> None: """ Running onboarding with access to remote queries """ # Run the server while the task isn't submitted yet while ( not agent.await_submit(timeout=None) and agent.get_agent_id() in self.running_onboardings ): self._run_server_timestep_for_agent(agent) while not agent.await_submit(timeout=None): time.sleep(0.3) def cleanup_onboarding(self, agent: "OnboardingAgent") -> None: """Shutdown onboarding resources""" pass def run_unit(self, unit: "Unit", agent: "Agent") -> None: """ Running a task with access to remote queries """ while not agent.await_submit(timeout=None) and unit.db_id in self.running_units: self._run_server_timestep_for_agent(agent) while not agent.await_submit(timeout=None): time.sleep(0.3) def cleanup_unit(self, unit: "Unit") -> None: """Handle cleanup for a specific unit""" pass
Task runner for a task with live remote queries on the local machine
TODO this is pretty gross, and would be better as a series of worker
threads that handle commands, as the functions all have direct access
to the full worker state.
Inherited Members
- mephisto.abstractions.blueprints.remote_procedure.remote_procedure_task_runner.RemoteProcedureTaskRunner
- RemoteProcedureTaskRunner
- get_init_data_for_agent
- run_onboarding
- cleanup_onboarding
- run_unit
- cleanup_unit
- mephisto.abstractions._subcomponents.task_runner.TaskRunner
- execute_onboarding
- execute_unit
- execute_assignment
- get_data_for_assignment
- filter_units_for_worker
- shutdown
- run_assignment
- cleanup_assignment
View Source
class RemoteProcedureBlueprintArgs(OnboardingRequiredArgs, BlueprintArgs): _blueprint_type: str = BLUEPRINT_TYPE_REMOTE_PROCEDURE _group: str = field( default="RemoteProcedureBlueprintArgs", metadata={ "help": """ Tasks launched from remote query blueprints need a source html file to display to workers, as well as a csv containing values that will be inserted into templates in the html. """ }, ) task_source: str = field( default=MISSING, metadata={ "help": "Path to file containing javascript bundle for the task", "required": True, }, ) link_task_source: bool = field( default=False, metadata={ "help": """ Symlinks the task_source file in your development folder to the one used for the server. Useful for local development so you can run a watch-based build for your task_source, allowing the UI code to update without having to restart the server each time. """, "required": False, }, ) units_per_assignment: int = field( default=1, metadata={"help": "How many workers you want to do each assignment"} )
RemoteProcedureBlueprintArgs(_blueprint_type: str = 'remote_procedure', block_qualification: str = '???', onboarding_qualification: str = '???', _group: str = 'RemoteProcedureBlueprintArgs', task_source: str = '???', link_task_source: bool = False, units_per_assignment: int = 1)