mephisto.abstractions.blueprints.parlai_chat.parlai_chat_blueprint
View Source
#!/usr/bin/env python3 # Copyright (c) Meta Platforms and its affiliates. # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. from mephisto.abstractions.blueprint import ( Blueprint, BlueprintArgs, SharedTaskState, ) from mephisto.abstractions.blueprints.mixins.onboarding_required import ( OnboardingRequired, OnboardingSharedState, OnboardingRequiredArgs, ) from dataclasses import dataclass, field from mephisto.data_model.assignment import InitializationData from mephisto.abstractions.blueprints.parlai_chat.parlai_chat_agent_state import ( ParlAIChatAgentState, ) from mephisto.abstractions.blueprints.parlai_chat.parlai_chat_task_runner import ( ParlAIChatTaskRunner, ) from mephisto.abstractions.blueprints.parlai_chat.parlai_chat_task_builder import ( ParlAIChatTaskBuilder, ) from mephisto.operations.registry import register_mephisto_abstraction from omegaconf import DictConfig, MISSING import os import time import csv import sys import json from importlib import import_module from typing import ClassVar, List, Type, Any, Dict, Iterable, Optional, TYPE_CHECKING if TYPE_CHECKING: from mephisto.data_model.worker import Worker from mephisto.data_model.agent import Agent, OnboardingAgent from mephisto.data_model.task_run import TaskRun from mephisto.abstractions.blueprint import AgentState, TaskRunner, TaskBuilder from mephisto.data_model.assignment import Assignment from mephisto.data_model.unit import Unit BLUEPRINT_TYPE_PARLAI_CHAT = "parlai_chat" MISSING_SOMETHING_TEXT = ( "<h1>" "You didn't specify a task_description_file and also didn't override the " "frontend `TaskPreviewView` (if this is a preview) or the `TaskDescription` " "component (if this is in-task)." "</h1>" ) @dataclass class SharedParlAITaskState(OnboardingSharedState, SharedTaskState): frontend_task_opts: Dict[str, Any] = field(default_factory=dict) world_opt: Dict[str, Any] = field(default_factory=dict) onboarding_world_opt: Dict[str, Any] = field(default_factory=dict) world_module: Optional[Any] = None @dataclass class ParlAIChatBlueprintArgs(OnboardingRequiredArgs, BlueprintArgs): _blueprint_type: str = BLUEPRINT_TYPE_PARLAI_CHAT _group: str = field( default="ParlAIChatBlueprint", metadata={ "help": """ Tasks launched from ParlAI blueprints require the number of conversations (either an int or task data for each convo), as well as a world to initialize for connecting workers. """ }, ) world_file: str = field( default=MISSING, metadata={"help": "Path to file containing ParlAI world", "required": True}, ) preview_source: str = field( default=MISSING, metadata={"help": "Optional path to source HTML file to preview the task"}, ) task_description_file: str = field( default=MISSING, metadata={ "help": ( "Path to file for the extended description of the task. " "Required if not providing a custom source bundle." ) }, ) custom_source_bundle: str = field( default=MISSING, metadata={"help": "Optional path to a fully custom frontend bundle"}, ) custom_source_dir: str = field( default=MISSING, metadata={"help": "Optional path to a directory containing custom js code"}, ) extra_source_dir: str = field( default=MISSING, metadata={ "help": ( "Optional path to sources that the frontend may " "refer to (such as images/video/css/scripts)" ) }, ) context_csv: str = field( default=MISSING, metadata={"help": "Optional path to csv containing task context"}, ) context_jsonl: str = field( default=MISSING, metadata={"help": "Optional path to jsonl file containing task context"}, ) num_conversations: int = field( default=MISSING, metadata={ "help": "Optional count of conversations to have if no context provided" }, ) @register_mephisto_abstraction() class ParlAIChatBlueprint(OnboardingRequired, Blueprint): """Blueprint for a task that runs a parlai chat""" AgentStateClass: ClassVar[Type["AgentState"]] = ParlAIChatAgentState OnboardingAgentStateClass: ClassVar[Type["AgentState"]] = ParlAIChatAgentState TaskBuilderClass: ClassVar[Type["TaskBuilder"]] = ParlAIChatTaskBuilder TaskRunnerClass: ClassVar[Type["TaskRunner"]] = ParlAIChatTaskRunner ArgsClass = ParlAIChatBlueprintArgs SharedStateClass = SharedParlAITaskState BLUEPRINT_TYPE = BLUEPRINT_TYPE_PARLAI_CHAT def __init__( self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedParlAITaskState", ): super().__init__(task_run, args, shared_state) self._initialization_data_dicts: List[Dict[str, Any]] = [] if args.blueprint.get("context_csv", None) is not None: csv_file = os.path.expanduser(args.blueprint.context_csv) with open(csv_file, "r", encoding="utf-8-sig") as csv_fp: csv_reader = csv.reader(csv_fp) headers = next(csv_reader) for row in csv_reader: row_data: Dict[str, Any] = {} for i, col in enumerate(row): row_data[headers[i]] = col self._initialization_data_dicts.append(row_data) elif args.blueprint.get("context_jsonl", None) is not None: jsonl_file = os.path.expanduser(args.blueprint.context_jsonl) with open(jsonl_file, "r", encoding="utf-8-sig") as jsonl_fp: line = jsonl_fp.readline() while line: j = json.loads(line) self._initialization_data_dicts.append(j) line = jsonl_fp.readline() elif args.blueprint.get("num_conversations", None) is not None: self._initialization_data_dicts = [{}] * args.blueprint.num_conversations else: raise NotImplementedError( "Parsing parlai tasks directly from dicts or JSON is not supported yet" ) if shared_state.world_module is None: world_file_path = os.path.expanduser(args.blueprint.world_file) world_module_dir = os.path.dirname(world_file_path) sys.path.append(world_module_dir) world_module_name = os.path.basename(world_file_path)[:-3] world_module = import_module(world_module_name) else: world_module = shared_state.world_module self.world_module = world_module assert hasattr(world_module, "make_world") assert hasattr(world_module, "get_world_params") self.agent_count = world_module.get_world_params()[ # type: ignore "agent_count" ] self.full_task_description = MISSING_SOMETHING_TEXT if args.blueprint.get("task_description_file", None) is not None: full_path = os.path.expanduser(args.blueprint.task_description_file) assert os.path.exists( full_path ), f"Target task description path {full_path} doesn't exist" with open(full_path, "r") as description_fp: self.full_task_description = description_fp.read() self.full_preview_description = MISSING_SOMETHING_TEXT if args.blueprint.get("preview_source", None) is not None: preview_source_file = os.path.expanduser(args.blueprint.preview_source) assert os.path.exists( preview_source_file ), f"Target preview source path {preview_source_file} doesn't exist" with open(preview_source_file, "r") as description_fp: self.full_preview_description = description_fp.read() @classmethod def assert_task_args( cls, args: "DictConfig", shared_state: "SharedTaskState" ) -> None: """Ensure that arguments are properly configured to launch this task""" # Find world module assert isinstance( shared_state, SharedParlAITaskState ), "Must use SharedParlAITaskState with ParlAIChatBlueprint" world_module = shared_state.world_module if world_module is None: world_file_path = os.path.expanduser(args.blueprint.world_file) world_module_dir = os.path.dirname(world_file_path) assert os.path.exists( world_file_path ), f"Provided world path {world_file_path} doesn't exist" sys.path.append(world_module_dir) world_module_name = os.path.basename(world_file_path)[:-3] world_module = import_module(world_module_name) # assert world file is valid assert hasattr( world_module, "make_world" ), "Provided world file has no `make_world` method" assert hasattr( world_module, "get_world_params" ), "Provided world file has no `get_world_params` method" # assert some method for determining quantity of conversations if args.blueprint.get("context_csv", None) is not None: csv_file = os.path.expanduser(args.blueprint.context_csv) assert os.path.exists( csv_file ), f"Target context_csv path {csv_file} doesn't exist" elif args.blueprint.get("context_jsonl", None) is not None: jsonl_file = os.path.expanduser(args.blueprint.context_jsonl) assert os.path.exists( jsonl_file ), f"Target context_jsonl path {jsonl_file} doesn't exist" elif args.blueprint.get("num_conversations", None) is not None: assert ( args.blueprint.num_conversations > 0 ), "Must have at least one conversation" else: raise AssertionError( "Must specify one of --context-csv, --context-jsonl or --num-conversations" ) if args.blueprint.get("custom_source_bundle", None) is not None: custom_source_file_path = os.path.expanduser( args.blueprint.custom_source_bundle ) assert os.path.exists( custom_source_file_path ), f"Provided custom bundle doesn't exist at {custom_source_file_path}" if args.blueprint.get("custom_source_dir", None) is not None: custom_source_dir_path = os.path.expanduser( args.blueprint.custom_source_dir ) assert os.path.exists( custom_source_dir_path ), f"Provided custom source dir doesn't exist at {custom_source_dir_path}" if args.blueprint.get("preview_source", None) is not None: preview_source_file = os.path.expanduser(args.blueprint.preview_source) assert os.path.exists( preview_source_file ), f"Provided preview source doesn't exist at {preview_source_file}" if args.blueprint.get("extra_source_dir", None) is not None: extra_source_dir = os.path.expanduser(args.blueprint.extra_source_dir) assert os.path.exists( extra_source_dir ), f"Provided extra resource dir doesn't exist at {extra_source_dir}" def get_frontend_args(self) -> Dict[str, Any]: """ Specifies what options within a task_config should be fowarded to the client for use by the task's frontend """ # Start with standard task configuration arguments frontend_task_config = super().get_frontend_args() shared_state = self.shared_state assert isinstance( shared_state, SharedParlAITaskState ), "Must use SharedParlAITaskState with ParlAIChatBlueprint" # Add ParlAI standards frontend_task_config.update( { "task_description": self.full_task_description, "preview_html": self.full_preview_description, "frame_height": 650, "chat_title": self.args.task.task_title, "has_preview": self.args.blueprint.get("preview_source", None) is not None, "block_mobile": True, "frontend_task_opts": shared_state.frontend_task_opts, } ) # Use overrides provided downstream frontend_task_config.update(self.frontend_task_config) return frontend_task_config def get_initialization_data(self) -> Iterable["InitializationData"]: """ Return the InitializationData retrieved from the specified stream """ return [ InitializationData(shared=d, unit_data=[{}] * self.agent_count) for d in self._initialization_data_dicts ] def validate_onboarding( self, worker: "Worker", onboarding_agent: "OnboardingAgent" ) -> bool: if hasattr(self.world_module, "validate_onboarding"): return self.world_module.validate_onboarding( # type: ignore onboarding_agent.state.get_data() ) return True
View Source
class ParlAIChatBlueprintArgs(OnboardingRequiredArgs, BlueprintArgs): _blueprint_type: str = BLUEPRINT_TYPE_PARLAI_CHAT _group: str = field( default="ParlAIChatBlueprint", metadata={ "help": """ Tasks launched from ParlAI blueprints require the number of conversations (either an int or task data for each convo), as well as a world to initialize for connecting workers. """ }, ) world_file: str = field( default=MISSING, metadata={"help": "Path to file containing ParlAI world", "required": True}, ) preview_source: str = field( default=MISSING, metadata={"help": "Optional path to source HTML file to preview the task"}, ) task_description_file: str = field( default=MISSING, metadata={ "help": ( "Path to file for the extended description of the task. " "Required if not providing a custom source bundle." ) }, ) custom_source_bundle: str = field( default=MISSING, metadata={"help": "Optional path to a fully custom frontend bundle"}, ) custom_source_dir: str = field( default=MISSING, metadata={"help": "Optional path to a directory containing custom js code"}, ) extra_source_dir: str = field( default=MISSING, metadata={ "help": ( "Optional path to sources that the frontend may " "refer to (such as images/video/css/scripts)" ) }, ) context_csv: str = field( default=MISSING, metadata={"help": "Optional path to csv containing task context"}, ) context_jsonl: str = field( default=MISSING, metadata={"help": "Optional path to jsonl file containing task context"}, ) num_conversations: int = field( default=MISSING, metadata={ "help": "Optional count of conversations to have if no context provided" }, )
ParlAIChatBlueprintArgs(_blueprint_type: str = 'parlai_chat', block_qualification: str = '???', onboarding_qualification: str = '???', _group: str = 'ParlAIChatBlueprint', world_file: str = '???', preview_source: str = '???', task_description_file: str = '???', custom_source_bundle: str = '???', custom_source_dir: str = '???', extra_source_dir: str = '???', context_csv: str = '???', context_jsonl: str = '???', num_conversations: int = '???')
View Source
class ParlAIChatBlueprint(OnboardingRequired, Blueprint): """Blueprint for a task that runs a parlai chat""" AgentStateClass: ClassVar[Type["AgentState"]] = ParlAIChatAgentState OnboardingAgentStateClass: ClassVar[Type["AgentState"]] = ParlAIChatAgentState TaskBuilderClass: ClassVar[Type["TaskBuilder"]] = ParlAIChatTaskBuilder TaskRunnerClass: ClassVar[Type["TaskRunner"]] = ParlAIChatTaskRunner ArgsClass = ParlAIChatBlueprintArgs SharedStateClass = SharedParlAITaskState BLUEPRINT_TYPE = BLUEPRINT_TYPE_PARLAI_CHAT def __init__( self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedParlAITaskState", ): super().__init__(task_run, args, shared_state) self._initialization_data_dicts: List[Dict[str, Any]] = [] if args.blueprint.get("context_csv", None) is not None: csv_file = os.path.expanduser(args.blueprint.context_csv) with open(csv_file, "r", encoding="utf-8-sig") as csv_fp: csv_reader = csv.reader(csv_fp) headers = next(csv_reader) for row in csv_reader: row_data: Dict[str, Any] = {} for i, col in enumerate(row): row_data[headers[i]] = col self._initialization_data_dicts.append(row_data) elif args.blueprint.get("context_jsonl", None) is not None: jsonl_file = os.path.expanduser(args.blueprint.context_jsonl) with open(jsonl_file, "r", encoding="utf-8-sig") as jsonl_fp: line = jsonl_fp.readline() while line: j = json.loads(line) self._initialization_data_dicts.append(j) line = jsonl_fp.readline() elif args.blueprint.get("num_conversations", None) is not None: self._initialization_data_dicts = [{}] * args.blueprint.num_conversations else: raise NotImplementedError( "Parsing parlai tasks directly from dicts or JSON is not supported yet" ) if shared_state.world_module is None: world_file_path = os.path.expanduser(args.blueprint.world_file) world_module_dir = os.path.dirname(world_file_path) sys.path.append(world_module_dir) world_module_name = os.path.basename(world_file_path)[:-3] world_module = import_module(world_module_name) else: world_module = shared_state.world_module self.world_module = world_module assert hasattr(world_module, "make_world") assert hasattr(world_module, "get_world_params") self.agent_count = world_module.get_world_params()[ # type: ignore "agent_count" ] self.full_task_description = MISSING_SOMETHING_TEXT if args.blueprint.get("task_description_file", None) is not None: full_path = os.path.expanduser(args.blueprint.task_description_file) assert os.path.exists( full_path ), f"Target task description path {full_path} doesn't exist" with open(full_path, "r") as description_fp: self.full_task_description = description_fp.read() self.full_preview_description = MISSING_SOMETHING_TEXT if args.blueprint.get("preview_source", None) is not None: preview_source_file = os.path.expanduser(args.blueprint.preview_source) assert os.path.exists( preview_source_file ), f"Target preview source path {preview_source_file} doesn't exist" with open(preview_source_file, "r") as description_fp: self.full_preview_description = description_fp.read() @classmethod def assert_task_args( cls, args: "DictConfig", shared_state: "SharedTaskState" ) -> None: """Ensure that arguments are properly configured to launch this task""" # Find world module assert isinstance( shared_state, SharedParlAITaskState ), "Must use SharedParlAITaskState with ParlAIChatBlueprint" world_module = shared_state.world_module if world_module is None: world_file_path = os.path.expanduser(args.blueprint.world_file) world_module_dir = os.path.dirname(world_file_path) assert os.path.exists( world_file_path ), f"Provided world path {world_file_path} doesn't exist" sys.path.append(world_module_dir) world_module_name = os.path.basename(world_file_path)[:-3] world_module = import_module(world_module_name) # assert world file is valid assert hasattr( world_module, "make_world" ), "Provided world file has no `make_world` method" assert hasattr( world_module, "get_world_params" ), "Provided world file has no `get_world_params` method" # assert some method for determining quantity of conversations if args.blueprint.get("context_csv", None) is not None: csv_file = os.path.expanduser(args.blueprint.context_csv) assert os.path.exists( csv_file ), f"Target context_csv path {csv_file} doesn't exist" elif args.blueprint.get("context_jsonl", None) is not None: jsonl_file = os.path.expanduser(args.blueprint.context_jsonl) assert os.path.exists( jsonl_file ), f"Target context_jsonl path {jsonl_file} doesn't exist" elif args.blueprint.get("num_conversations", None) is not None: assert ( args.blueprint.num_conversations > 0 ), "Must have at least one conversation" else: raise AssertionError( "Must specify one of --context-csv, --context-jsonl or --num-conversations" ) if args.blueprint.get("custom_source_bundle", None) is not None: custom_source_file_path = os.path.expanduser( args.blueprint.custom_source_bundle ) assert os.path.exists( custom_source_file_path ), f"Provided custom bundle doesn't exist at {custom_source_file_path}" if args.blueprint.get("custom_source_dir", None) is not None: custom_source_dir_path = os.path.expanduser( args.blueprint.custom_source_dir ) assert os.path.exists( custom_source_dir_path ), f"Provided custom source dir doesn't exist at {custom_source_dir_path}" if args.blueprint.get("preview_source", None) is not None: preview_source_file = os.path.expanduser(args.blueprint.preview_source) assert os.path.exists( preview_source_file ), f"Provided preview source doesn't exist at {preview_source_file}" if args.blueprint.get("extra_source_dir", None) is not None: extra_source_dir = os.path.expanduser(args.blueprint.extra_source_dir) assert os.path.exists( extra_source_dir ), f"Provided extra resource dir doesn't exist at {extra_source_dir}" def get_frontend_args(self) -> Dict[str, Any]: """ Specifies what options within a task_config should be fowarded to the client for use by the task's frontend """ # Start with standard task configuration arguments frontend_task_config = super().get_frontend_args() shared_state = self.shared_state assert isinstance( shared_state, SharedParlAITaskState ), "Must use SharedParlAITaskState with ParlAIChatBlueprint" # Add ParlAI standards frontend_task_config.update( { "task_description": self.full_task_description, "preview_html": self.full_preview_description, "frame_height": 650, "chat_title": self.args.task.task_title, "has_preview": self.args.blueprint.get("preview_source", None) is not None, "block_mobile": True, "frontend_task_opts": shared_state.frontend_task_opts, } ) # Use overrides provided downstream frontend_task_config.update(self.frontend_task_config) return frontend_task_config def get_initialization_data(self) -> Iterable["InitializationData"]: """ Return the InitializationData retrieved from the specified stream """ return [ InitializationData(shared=d, unit_data=[{}] * self.agent_count) for d in self._initialization_data_dicts ] def validate_onboarding( self, worker: "Worker", onboarding_agent: "OnboardingAgent" ) -> bool: if hasattr(self.world_module, "validate_onboarding"): return self.world_module.validate_onboarding( # type: ignore onboarding_agent.state.get_data() ) return True
Blueprint for a task that runs a parlai chat
View Source
def __init__( self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedParlAITaskState", ): super().__init__(task_run, args, shared_state) self._initialization_data_dicts: List[Dict[str, Any]] = [] if args.blueprint.get("context_csv", None) is not None: csv_file = os.path.expanduser(args.blueprint.context_csv) with open(csv_file, "r", encoding="utf-8-sig") as csv_fp: csv_reader = csv.reader(csv_fp) headers = next(csv_reader) for row in csv_reader: row_data: Dict[str, Any] = {} for i, col in enumerate(row): row_data[headers[i]] = col self._initialization_data_dicts.append(row_data) elif args.blueprint.get("context_jsonl", None) is not None: jsonl_file = os.path.expanduser(args.blueprint.context_jsonl) with open(jsonl_file, "r", encoding="utf-8-sig") as jsonl_fp: line = jsonl_fp.readline() while line: j = json.loads(line) self._initialization_data_dicts.append(j) line = jsonl_fp.readline() elif args.blueprint.get("num_conversations", None) is not None: self._initialization_data_dicts = [{}] * args.blueprint.num_conversations else: raise NotImplementedError( "Parsing parlai tasks directly from dicts or JSON is not supported yet" ) if shared_state.world_module is None: world_file_path = os.path.expanduser(args.blueprint.world_file) world_module_dir = os.path.dirname(world_file_path) sys.path.append(world_module_dir) world_module_name = os.path.basename(world_file_path)[:-3] world_module = import_module(world_module_name) else: world_module = shared_state.world_module self.world_module = world_module assert hasattr(world_module, "make_world") assert hasattr(world_module, "get_world_params") self.agent_count = world_module.get_world_params()[ # type: ignore "agent_count" ] self.full_task_description = MISSING_SOMETHING_TEXT if args.blueprint.get("task_description_file", None) is not None: full_path = os.path.expanduser(args.blueprint.task_description_file) assert os.path.exists( full_path ), f"Target task description path {full_path} doesn't exist" with open(full_path, "r") as description_fp: self.full_task_description = description_fp.read() self.full_preview_description = MISSING_SOMETHING_TEXT if args.blueprint.get("preview_source", None) is not None: preview_source_file = os.path.expanduser(args.blueprint.preview_source) assert os.path.exists( preview_source_file ), f"Target preview source path {preview_source_file} doesn't exist" with open(preview_source_file, "r") as description_fp: self.full_preview_description = description_fp.read()
View Source
@classmethod def assert_task_args( cls, args: "DictConfig", shared_state: "SharedTaskState" ) -> None: """Ensure that arguments are properly configured to launch this task""" # Find world module assert isinstance( shared_state, SharedParlAITaskState ), "Must use SharedParlAITaskState with ParlAIChatBlueprint" world_module = shared_state.world_module if world_module is None: world_file_path = os.path.expanduser(args.blueprint.world_file) world_module_dir = os.path.dirname(world_file_path) assert os.path.exists( world_file_path ), f"Provided world path {world_file_path} doesn't exist" sys.path.append(world_module_dir) world_module_name = os.path.basename(world_file_path)[:-3] world_module = import_module(world_module_name) # assert world file is valid assert hasattr( world_module, "make_world" ), "Provided world file has no `make_world` method" assert hasattr( world_module, "get_world_params" ), "Provided world file has no `get_world_params` method" # assert some method for determining quantity of conversations if args.blueprint.get("context_csv", None) is not None: csv_file = os.path.expanduser(args.blueprint.context_csv) assert os.path.exists( csv_file ), f"Target context_csv path {csv_file} doesn't exist" elif args.blueprint.get("context_jsonl", None) is not None: jsonl_file = os.path.expanduser(args.blueprint.context_jsonl) assert os.path.exists( jsonl_file ), f"Target context_jsonl path {jsonl_file} doesn't exist" elif args.blueprint.get("num_conversations", None) is not None: assert ( args.blueprint.num_conversations > 0 ), "Must have at least one conversation" else: raise AssertionError( "Must specify one of --context-csv, --context-jsonl or --num-conversations" ) if args.blueprint.get("custom_source_bundle", None) is not None: custom_source_file_path = os.path.expanduser( args.blueprint.custom_source_bundle ) assert os.path.exists( custom_source_file_path ), f"Provided custom bundle doesn't exist at {custom_source_file_path}" if args.blueprint.get("custom_source_dir", None) is not None: custom_source_dir_path = os.path.expanduser( args.blueprint.custom_source_dir ) assert os.path.exists( custom_source_dir_path ), f"Provided custom source dir doesn't exist at {custom_source_dir_path}" if args.blueprint.get("preview_source", None) is not None: preview_source_file = os.path.expanduser(args.blueprint.preview_source) assert os.path.exists( preview_source_file ), f"Provided preview source doesn't exist at {preview_source_file}" if args.blueprint.get("extra_source_dir", None) is not None: extra_source_dir = os.path.expanduser(args.blueprint.extra_source_dir) assert os.path.exists( extra_source_dir ), f"Provided extra resource dir doesn't exist at {extra_source_dir}"
Ensure that arguments are properly configured to launch this task
View Source
def get_frontend_args(self) -> Dict[str, Any]: """ Specifies what options within a task_config should be fowarded to the client for use by the task's frontend """ # Start with standard task configuration arguments frontend_task_config = super().get_frontend_args() shared_state = self.shared_state assert isinstance( shared_state, SharedParlAITaskState ), "Must use SharedParlAITaskState with ParlAIChatBlueprint" # Add ParlAI standards frontend_task_config.update( { "task_description": self.full_task_description, "preview_html": self.full_preview_description, "frame_height": 650, "chat_title": self.args.task.task_title, "has_preview": self.args.blueprint.get("preview_source", None) is not None, "block_mobile": True, "frontend_task_opts": shared_state.frontend_task_opts, } ) # Use overrides provided downstream frontend_task_config.update(self.frontend_task_config) return frontend_task_config
Specifies what options within a task_config should be fowarded to the client for use by the task's frontend
View Source
def get_initialization_data(self) -> Iterable["InitializationData"]: """ Return the InitializationData retrieved from the specified stream """ return [ InitializationData(shared=d, unit_data=[{}] * self.agent_count) for d in self._initialization_data_dicts ]
Return the InitializationData retrieved from the specified stream
View Source
def validate_onboarding( self, worker: "Worker", onboarding_agent: "OnboardingAgent" ) -> bool: if hasattr(self.world_module, "validate_onboarding"): return self.world_module.validate_onboarding( # type: ignore onboarding_agent.state.get_data() ) return True
Check the incoming onboarding data and evaluate if the worker has passed the qualification or not. Return True if the worker has qualified.
By default we use the validate_onboarding provided in a run_task, and all onboarding tasks should allow run_task to specify additional or entirely override what's provided in a blueprint.
Inherited Members
View Source
class ParlAIChatAgentState(AgentState): """ Holds information about ParlAI-style chat. Data is stored in json files containing every act from the ParlAI world. """ def __init__(self, agent: "Agent"): """ Create an AgentState to track the state of an agent's work on a Unit Initialize with an existing file if it exists. """ self.agent = weakref.proxy(agent) data_file = self._get_expected_data_file() if os.path.exists(data_file): self.load_data() else: self.messages: List[Dict[str, Any]] = [] self.final_submission: Optional[Dict[str, Any]] = None self.init_data = None self.save_data() def set_init_state(self, data: Any) -> bool: """Set the initial state for this agent""" if self.init_data is not None: # Initial state is already set return False else: self.init_data = data self.save_data() return True def get_init_state(self) -> Optional[Dict[str, Any]]: """ Return the initial state for this agent, None if no such state exists """ if self.init_data is None: return None return {"task_data": self.init_data, "past_live_updates": self.messages} def _get_expected_data_file(self) -> str: """Return the place we would expect to find data for this agent state""" agent_dir = self.agent.get_data_dir() os.makedirs(agent_dir, exist_ok=True) return os.path.join(agent_dir, "state.json") def load_data(self) -> None: """Load stored data from a file to this object""" agent_file = self._get_expected_data_file() with open(agent_file, "r") as state_json: state = json.load(state_json) self.messages = state["outputs"]["messages"] self.init_data = state["inputs"] self.final_submission = state["outputs"].get("final_submission") def get_data(self) -> Dict[str, Any]: """Return dict with the messages of this agent""" return { "outputs": { "messages": self.messages, "final_submission": self.final_submission, }, "inputs": self.init_data, } def get_parsed_data(self) -> Dict[str, Any]: """Return properly parsed data from this task""" init_data = self.init_data save_data = None agent_name = None for m in self.messages: if "agent_display_name" in m["task_data"]: agent_name = m["task_data"]["agent_display_name"] break messages = self.messages if len(messages) > 0: if "WORLD_DATA" in messages[-1]: save_data = messages[-1]["WORLD_DATA"] messages = messages[:-1] return { "agent_name": agent_name, "initial_data": init_data, "messages": messages, "save_data": save_data, "final_submission": self.final_submission, } def get_task_start(self) -> float: """ Return the start time for this task, the timestamp of the very first message. """ return self.messages[0]["timestamp"] def get_task_end(self) -> float: """ Return the end time for this task, the timestamp of the very final message. """ return self.messages[-1]["timestamp"] def save_data(self) -> None: """Save all messages from this agent to""" agent_file = self._get_expected_data_file() with open(agent_file, "w+") as state_json: json.dump(self.get_data(), state_json) def update_data(self, live_update: Dict[str, Any]) -> None: """ Append the incoming packet as well as its arrival time """ live_update["timestamp"] = time.time() self.messages.append(live_update) self.save_data() def update_submit(self, submitted_data: Dict[str, Any]) -> None: """Append any final submission to this state""" self.final_submission = submitted_data self.save_data()
Holds information about ParlAI-style chat. Data is stored in json files containing every act from the ParlAI world.
Inherited Members
- mephisto.abstractions.blueprints.parlai_chat.parlai_chat_agent_state.ParlAIChatAgentState
- ParlAIChatAgentState
- set_init_state
- get_init_state
- load_data
- get_data
- get_parsed_data
- get_task_start
- get_task_end
- save_data
- update_data
- update_submit
- mephisto.abstractions._subcomponents.agent_state.AgentState
- STATUS_NONE
- STATUS_ACCEPTED
- STATUS_ONBOARDING
- STATUS_WAITING
- STATUS_IN_TASK
- STATUS_COMPLETED
- STATUS_DISCONNECT
- STATUS_TIMEOUT
- STATUS_PARTNER_DISCONNECT
- STATUS_EXPIRED
- STATUS_RETURNED
- STATUS_APPROVED
- STATUS_SOFT_REJECTED
- STATUS_REJECTED
- complete
- valid
View Source
class ParlAIChatAgentState(AgentState): """ Holds information about ParlAI-style chat. Data is stored in json files containing every act from the ParlAI world. """ def __init__(self, agent: "Agent"): """ Create an AgentState to track the state of an agent's work on a Unit Initialize with an existing file if it exists. """ self.agent = weakref.proxy(agent) data_file = self._get_expected_data_file() if os.path.exists(data_file): self.load_data() else: self.messages: List[Dict[str, Any]] = [] self.final_submission: Optional[Dict[str, Any]] = None self.init_data = None self.save_data() def set_init_state(self, data: Any) -> bool: """Set the initial state for this agent""" if self.init_data is not None: # Initial state is already set return False else: self.init_data = data self.save_data() return True def get_init_state(self) -> Optional[Dict[str, Any]]: """ Return the initial state for this agent, None if no such state exists """ if self.init_data is None: return None return {"task_data": self.init_data, "past_live_updates": self.messages} def _get_expected_data_file(self) -> str: """Return the place we would expect to find data for this agent state""" agent_dir = self.agent.get_data_dir() os.makedirs(agent_dir, exist_ok=True) return os.path.join(agent_dir, "state.json") def load_data(self) -> None: """Load stored data from a file to this object""" agent_file = self._get_expected_data_file() with open(agent_file, "r") as state_json: state = json.load(state_json) self.messages = state["outputs"]["messages"] self.init_data = state["inputs"] self.final_submission = state["outputs"].get("final_submission") def get_data(self) -> Dict[str, Any]: """Return dict with the messages of this agent""" return { "outputs": { "messages": self.messages, "final_submission": self.final_submission, }, "inputs": self.init_data, } def get_parsed_data(self) -> Dict[str, Any]: """Return properly parsed data from this task""" init_data = self.init_data save_data = None agent_name = None for m in self.messages: if "agent_display_name" in m["task_data"]: agent_name = m["task_data"]["agent_display_name"] break messages = self.messages if len(messages) > 0: if "WORLD_DATA" in messages[-1]: save_data = messages[-1]["WORLD_DATA"] messages = messages[:-1] return { "agent_name": agent_name, "initial_data": init_data, "messages": messages, "save_data": save_data, "final_submission": self.final_submission, } def get_task_start(self) -> float: """ Return the start time for this task, the timestamp of the very first message. """ return self.messages[0]["timestamp"] def get_task_end(self) -> float: """ Return the end time for this task, the timestamp of the very final message. """ return self.messages[-1]["timestamp"] def save_data(self) -> None: """Save all messages from this agent to""" agent_file = self._get_expected_data_file() with open(agent_file, "w+") as state_json: json.dump(self.get_data(), state_json) def update_data(self, live_update: Dict[str, Any]) -> None: """ Append the incoming packet as well as its arrival time """ live_update["timestamp"] = time.time() self.messages.append(live_update) self.save_data() def update_submit(self, submitted_data: Dict[str, Any]) -> None: """Append any final submission to this state""" self.final_submission = submitted_data self.save_data()
Holds information about ParlAI-style chat. Data is stored in json files containing every act from the ParlAI world.
Inherited Members
- mephisto.abstractions.blueprints.parlai_chat.parlai_chat_agent_state.ParlAIChatAgentState
- ParlAIChatAgentState
- set_init_state
- get_init_state
- load_data
- get_data
- get_parsed_data
- get_task_start
- get_task_end
- save_data
- update_data
- update_submit
- mephisto.abstractions._subcomponents.agent_state.AgentState
- STATUS_NONE
- STATUS_ACCEPTED
- STATUS_ONBOARDING
- STATUS_WAITING
- STATUS_IN_TASK
- STATUS_COMPLETED
- STATUS_DISCONNECT
- STATUS_TIMEOUT
- STATUS_PARTNER_DISCONNECT
- STATUS_EXPIRED
- STATUS_RETURNED
- STATUS_APPROVED
- STATUS_SOFT_REJECTED
- STATUS_REJECTED
- complete
- valid
View Source
class ParlAIChatTaskBuilder(TaskBuilder): """ Builder for a parlai chat task, pulls the appropriate html, builds the frontend (if a build doesn't already exist), then puts the file into the server directory """ BUILT_FILE = BUILT_FILE BUILT_MESSAGE = "built!" def rebuild_core(self): """Rebuild the frontend for this task""" return_dir = os.getcwd() os.chdir(FRONTEND_SOURCE_DIR) if os.path.exists(FRONTEND_BUILD_DIR): shutil.rmtree(FRONTEND_BUILD_DIR) packages_installed = subprocess.call(["npm", "install"]) if packages_installed != 0: raise Exception( "please make sure npm is installed, otherwise view " "the above error for more info." ) webpack_complete = subprocess.call(["npm", "run", "dev"]) if webpack_complete != 0: raise Exception( "Webpack appears to have failed to build your " "frontend. See the above error for more information." ) os.chdir(return_dir) def build_custom_bundle(self, custom_src_dir): """Locate all of the custom files used for a custom build, create a prebuild directory containing all of them, then build the custom source. Check dates to only go through this build process when files have changes """ TARGET_BUILD_FILES = { "main.js": "src/main.js", "package.json": "package.json", "style.css": "src/style.css", } TARGET_BUILD_FOLDERS = {"components": "src/components"} prebuild_path = os.path.join(custom_src_dir, CUSTOM_BUILD_DIRNAME) build_path = os.path.join(prebuild_path, "build", "bundle.js") # see if we need to rebuild if os.path.exists(build_path): created_date = os.path.getmtime(build_path) up_to_date = True for fn in TARGET_BUILD_FILES.keys(): possible_conflict = os.path.join(custom_src_dir, fn) if os.path.exists(possible_conflict): if os.path.getmtime(possible_conflict) > created_date: up_to_date = False break for fn in TARGET_BUILD_FOLDERS.keys(): if not up_to_date: break possible_conflict_dir = os.path.join(custom_src_dir, fn) for root, dirs, files in os.walk(possible_conflict_dir): if not up_to_date: break for fname in files: path = os.path.join(root, fname) if os.path.getmtime(path) > created_date: up_to_date = False break if os.path.exists(possible_conflict): if os.path.getmtime(possible_conflict) > created_date: up_to_date = False break if up_to_date: return build_path # build anew REQUIRED_SOURCE_FILES = [ ".babelrc", ".eslintrc", "package.json", "webpack.config.js", ] REQUIRED_SOURCE_DIRS = ["src"] if not os.path.exists(os.path.join(prebuild_path, "build")): os.makedirs(os.path.join(prebuild_path, "build"), exist_ok=True) # Copy default files for src_dir in REQUIRED_SOURCE_DIRS: src_path = os.path.join(FRONTEND_SOURCE_DIR, src_dir) dst_path = os.path.join(prebuild_path, src_dir) if os.path.exists(dst_path): shutil.rmtree(dst_path) shutil.copytree(src_path, dst_path) for src_file in REQUIRED_SOURCE_FILES: src_path = os.path.join(FRONTEND_SOURCE_DIR, src_file) dst_path = os.path.join(prebuild_path, src_file) shutil.copy2(src_path, dst_path) # copy custom files for src_file in TARGET_BUILD_FILES.keys(): src_path = os.path.join(custom_src_dir, src_file) if os.path.exists(src_path): dst_path = os.path.join(prebuild_path, TARGET_BUILD_FILES[src_file]) shutil.copy2(src_path, dst_path) for src_dir in TARGET_BUILD_FOLDERS.keys(): src_path = os.path.join(custom_src_dir, src_dir) dst_path = os.path.join(prebuild_path, TARGET_BUILD_FOLDERS[src_dir]) if os.path.exists(src_path): if os.path.exists(dst_path): shutil.rmtree(dst_path) shutil.copytree(src_path, dst_path) # navigate and build return_dir = os.getcwd() os.chdir(prebuild_path) packages_installed = subprocess.call(["npm", "install"]) if packages_installed != 0: raise Exception( "please make sure npm is installed, otherwise view " "the above error for more info." ) webpack_complete = subprocess.call(["npm", "run", "dev"]) if webpack_complete != 0: raise Exception( "Webpack appears to have failed to build your " "frontend. See the above error for more information." ) # cleanup and return os.chdir(return_dir) return build_path def build_in_dir(self, build_dir: str): """Build the frontend if it doesn't exist, then copy into the server directory""" # Only build this task if it hasn't already been built if not os.path.exists(FRONTEND_BUILD_DIR): self.rebuild_core() custom_source_dir = self.args.blueprint.get("custom_source_dir", None) build_bundle = None if custom_source_dir is not None: custom_source_dir = os.path.expanduser(custom_source_dir) build_bundle = self.build_custom_bundle(custom_source_dir) # Copy over the preview file as preview.html, use the default if none specified target_resource_dir = os.path.join(build_dir, "static") preview_file = self.args.blueprint.get("preview_source", None) if preview_file is not None: use_preview_file = os.path.expanduser(preview_file) target_path = os.path.join(target_resource_dir, "preview.html") shutil.copy2(use_preview_file, target_path) # If any additional task files are required via a source_dir, copy those as well extra_dir_path = self.args.blueprint.get("extra_source_dir", None) if extra_dir_path is not None: extra_dir_path = os.path.expanduser(extra_dir_path) copy_tree(extra_dir_path, target_resource_dir) bundle_js_file = self.args.blueprint.get("custom_source_bundle", None) if bundle_js_file is None: if build_bundle is not None: bundle_js_file = build_bundle else: bundle_js_file = os.path.join(FRONTEND_BUILD_DIR, "bundle.js") target_path = os.path.join(target_resource_dir, "bundle.js") shutil.copy2(bundle_js_file, target_path) # Copy over the static files for this task: for fin_file in ["index.html", "notif.mp3"]: copied_static_file = os.path.join( FRONTEND_SOURCE_DIR, "src", "static", fin_file ) target_path = os.path.join(target_resource_dir, fin_file) shutil.copy2(copied_static_file, target_path) # Write a built file confirmation with open(os.path.join(build_dir, self.BUILT_FILE), "w+") as built_file: built_file.write(self.BUILT_MESSAGE)
Builder for a parlai chat task, pulls the appropriate html, builds the frontend (if a build doesn't already exist), then puts the file into the server directory
Inherited Members
- mephisto.abstractions._subcomponents.task_builder.TaskBuilder
- TaskBuilder
View Source
class ParlAIChatTaskRunner(TaskRunner): """ Task runner for a parlai chat task """ def __init__( self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState" ): super().__init__(task_run, args, shared_state) from mephisto.abstractions.blueprints.parlai_chat.parlai_chat_blueprint import ( SharedParlAITaskState, ) assert isinstance( shared_state, SharedParlAITaskState ), "Must use SharedParlAITaskState for parlai blueprints" if shared_state.world_module is None: world_file_path = os.path.expanduser(args.blueprint.world_file) world_module_dir = os.path.dirname(world_file_path) sys.path.append(world_module_dir) world_module_name = os.path.basename(world_file_path)[:-3] world_module = import_module(world_module_name) else: world_module = shared_state.world_module self.parlai_world_module = world_module world_params = world_module.get_world_params() # type: ignore self.is_concurrent = world_params["agent_count"] > 1 self.id_to_worlds: Dict[str, Any] = {} def get_init_data_for_agent(self, agent: "Agent") -> Dict[str, Any]: """ Return the data for an agent already assigned to a particular unit """ init_state = agent.state.get_init_state() if init_state is not None: # reconnecting agent, give what we've got return init_state else: assignment = agent.get_unit().get_assignment() assignment_data = self.get_data_for_assignment(assignment) agent.state.set_init_state(assignment_data.shared) new_state = agent.state.get_init_state() assert new_state is not None, "Recently initialized state still None" return new_state def get_world_id(self, world_type: str, extra_id: str) -> str: """Get a world id specific to the given world type""" return f"{world_type}-{extra_id}" def run_onboarding(self, agent: "OnboardingAgent") -> None: """ ParlAI Onboarding will initialize an onboarding world, then run it to completion if possible """ shared_state = self.shared_state from mephisto.abstractions.blueprints.parlai_chat.parlai_chat_blueprint import ( SharedParlAITaskState, ) assert isinstance( shared_state, SharedParlAITaskState ), "Must use SharedParlAITaskState for parlai blueprints" opt: Dict[str, Any] = shared_state.onboarding_world_opt parlai_agent = MephistoAgentWrapper(agent) try: world = self.parlai_world_module.make_onboarding_world( # type: ignore opt, parlai_agent, initialization_data=shared_state.onboarding_data, ) except TypeError: # make_world doesn't ask for initialization_data world = self.parlai_world_module.make_onboarding_world(opt, parlai_agent) # type: ignore world_id = self.get_world_id("onboard", agent.get_agent_id()) self.id_to_worlds[world_id] = world while ( not world.episode_done() and agent.get_agent_id() in self.running_onboardings ): world.parley() # Ensure agent can submit after onboarding agent.update_status(AgentState.STATUS_WAITING) world.shutdown() agent.state.update_data( { "id": "SUBMIT_WORLD_DATA", "WORLD_DATA": world.prep_save_data([parlai_agent]), "text": "", } ) # Mark the agent as done, then wait for the incoming submit action while not agent.await_submit(timeout=None): time.sleep(0.3) def cleanup_onboarding(self, agent: "OnboardingAgent") -> None: """Shutdown the world""" onboarding_id = agent.get_agent_id() world_id = self.get_world_id("onboard", onboarding_id) # Only shut down world if it was actually started if world_id in self.id_to_worlds: self.id_to_worlds[world_id].shutdown() del self.id_to_worlds[world_id] def run_assignment(self, assignment: "Assignment", agents: List["Agent"]) -> None: """ ParlAI runners will initialize a task world, then run them to completion if possible """ for agent in agents: assert agent is not None, "task was not fully assigned" opt: Dict[str, Any] = cast("SharedParlAITaskState", self.shared_state).world_opt parlai_agents = [MephistoAgentWrapper(a) for a in agents] try: world = self.parlai_world_module.make_world( # type: ignore opt, parlai_agents, initialization_data=assignment.get_assignment_data() ) except TypeError: # make_world doesn't ask for initialization_data world = self.parlai_world_module.make_world(opt, parlai_agents) # type: ignore world_id = self.get_world_id("assignment", assignment.db_id) self.id_to_worlds[world_id] = world while not world.episode_done() and assignment.db_id in self.running_assignments: world.parley() # Ensure agents can submit after completion for idx in range(len(parlai_agents)): agents[idx].observe({"task_data": {"task_done": True}}) # TODO(WISH) it would be nice to have individual agents be able to submit their # final things without needing to wait for their partner, such # as if one needs to rate and the other doesn't world.shutdown() for idx in range(len(parlai_agents)): agents[idx].state.update_data( { "id": "SUBMIT_WORLD_DATA", "WORLD_DATA": world.prep_save_data([parlai_agents[idx]]), "text": "", } ) def cleanup_assignment(self, assignment: "Assignment") -> None: """Handle cleanup for a specific assignment""" world_id = self.get_world_id("assignment", assignment.db_id) self.id_to_worlds[world_id].shutdown() del self.id_to_worlds[world_id] def run_unit(self, unit: "Unit", agent: "Agent") -> None: """ ParlAI runners will initialize a task world, then run them to completion if possible """ agents = [agent] opt: Dict[str, Any] = cast("SharedParlAITaskState", self.shared_state).world_opt parlai_agents = [MephistoAgentWrapper(a) for a in agents] try: world = self.parlai_world_module.make_world( # type: ignore opt, parlai_agents, initialization_data=unit.get_assignment_data() ) except TypeError: # make_world doesn't ask for initialization_data world = self.parlai_world_module.make_world(opt, parlai_agents) # type: ignore world_id = self.get_world_id("unit", unit.db_id) self.id_to_worlds[world_id] = world while not world.episode_done() and unit.db_id in self.running_units: world.parley() # Ensure agent can submit after completion agent.observe({"task_data": {"task_done": True}}) world.shutdown() if hasattr(world, "prep_save_data"): agent.observe( { "id": "SUBMIT_WORLD_DATA", "WORLD_DATA": world.prep_save_data(parlai_agents), "text": "", } ) def cleanup_unit(self, unit: "Unit") -> None: """Handle cleanup for a specific unit""" world_id = self.get_world_id("unit", unit.db_id) self.id_to_worlds[world_id].shutdown() del self.id_to_worlds[world_id]
Task runner for a parlai chat task
Inherited Members
- mephisto.abstractions.blueprints.parlai_chat.parlai_chat_task_runner.ParlAIChatTaskRunner
- ParlAIChatTaskRunner
- get_init_data_for_agent
- get_world_id
- run_onboarding
- cleanup_onboarding
- run_assignment
- cleanup_assignment
- run_unit
- cleanup_unit
- mephisto.abstractions._subcomponents.task_runner.TaskRunner
- execute_onboarding
- execute_unit
- execute_assignment
- get_data_for_assignment
- filter_units_for_worker
- shutdown
View Source
class ParlAIChatBlueprintArgs(OnboardingRequiredArgs, BlueprintArgs): _blueprint_type: str = BLUEPRINT_TYPE_PARLAI_CHAT _group: str = field( default="ParlAIChatBlueprint", metadata={ "help": """ Tasks launched from ParlAI blueprints require the number of conversations (either an int or task data for each convo), as well as a world to initialize for connecting workers. """ }, ) world_file: str = field( default=MISSING, metadata={"help": "Path to file containing ParlAI world", "required": True}, ) preview_source: str = field( default=MISSING, metadata={"help": "Optional path to source HTML file to preview the task"}, ) task_description_file: str = field( default=MISSING, metadata={ "help": ( "Path to file for the extended description of the task. " "Required if not providing a custom source bundle." ) }, ) custom_source_bundle: str = field( default=MISSING, metadata={"help": "Optional path to a fully custom frontend bundle"}, ) custom_source_dir: str = field( default=MISSING, metadata={"help": "Optional path to a directory containing custom js code"}, ) extra_source_dir: str = field( default=MISSING, metadata={ "help": ( "Optional path to sources that the frontend may " "refer to (such as images/video/css/scripts)" ) }, ) context_csv: str = field( default=MISSING, metadata={"help": "Optional path to csv containing task context"}, ) context_jsonl: str = field( default=MISSING, metadata={"help": "Optional path to jsonl file containing task context"}, ) num_conversations: int = field( default=MISSING, metadata={ "help": "Optional count of conversations to have if no context provided" }, )
ParlAIChatBlueprintArgs(_blueprint_type: str = 'parlai_chat', block_qualification: str = '???', onboarding_qualification: str = '???', _group: str = 'ParlAIChatBlueprint', world_file: str = '???', preview_source: str = '???', task_description_file: str = '???', custom_source_bundle: str = '???', custom_source_dir: str = '???', extra_source_dir: str = '???', context_csv: str = '???', context_jsonl: str = '???', num_conversations: int = '???')