mephisto.abstractions.blueprints.parlai_chat.parlai_chat_blueprint

View Source
#!/usr/bin/env python3

# Copyright (c) Meta Platforms and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from mephisto.abstractions.blueprint import (
    Blueprint,
    BlueprintArgs,
    SharedTaskState,
)
from mephisto.abstractions.blueprints.mixins.onboarding_required import (
    OnboardingRequired,
    OnboardingSharedState,
    OnboardingRequiredArgs,
)
from dataclasses import dataclass, field
from mephisto.data_model.assignment import InitializationData
from mephisto.abstractions.blueprints.parlai_chat.parlai_chat_agent_state import (
    ParlAIChatAgentState,
)
from mephisto.abstractions.blueprints.parlai_chat.parlai_chat_task_runner import (
    ParlAIChatTaskRunner,
)
from mephisto.abstractions.blueprints.parlai_chat.parlai_chat_task_builder import (
    ParlAIChatTaskBuilder,
)
from mephisto.operations.registry import register_mephisto_abstraction
from omegaconf import DictConfig, MISSING

import os
import time
import csv
import sys
import json

from importlib import import_module

from typing import ClassVar, List, Type, Any, Dict, Iterable, Optional, TYPE_CHECKING

if TYPE_CHECKING:
    from mephisto.data_model.worker import Worker
    from mephisto.data_model.agent import Agent, OnboardingAgent
    from mephisto.data_model.task_run import TaskRun
    from mephisto.abstractions.blueprint import AgentState, TaskRunner, TaskBuilder
    from mephisto.data_model.assignment import Assignment
    from mephisto.data_model.unit import Unit

BLUEPRINT_TYPE_PARLAI_CHAT = "parlai_chat"


MISSING_SOMETHING_TEXT = (
    "<h1>"
    "You didn't specify a task_description_file and also didn't override the "
    "frontend `TaskPreviewView` (if this is a preview) or the `TaskDescription` "
    "component (if this is in-task)."
    "</h1>"
)


@dataclass
class SharedParlAITaskState(OnboardingSharedState, SharedTaskState):
    frontend_task_opts: Dict[str, Any] = field(default_factory=dict)
    world_opt: Dict[str, Any] = field(default_factory=dict)
    onboarding_world_opt: Dict[str, Any] = field(default_factory=dict)
    world_module: Optional[Any] = None


@dataclass
class ParlAIChatBlueprintArgs(OnboardingRequiredArgs, BlueprintArgs):
    _blueprint_type: str = BLUEPRINT_TYPE_PARLAI_CHAT
    _group: str = field(
        default="ParlAIChatBlueprint",
        metadata={
            "help": """
                Tasks launched from ParlAI blueprints require the number of
                conversations (either an int or task data for each convo), as
                well as a world to initialize for connecting workers.
            """
        },
    )
    world_file: str = field(
        default=MISSING,
        metadata={"help": "Path to file containing ParlAI world", "required": True},
    )
    preview_source: str = field(
        default=MISSING,
        metadata={"help": "Optional path to source HTML file to preview the task"},
    )
    task_description_file: str = field(
        default=MISSING,
        metadata={
            "help": (
                "Path to file for the extended description of the task. "
                "Required if not providing a custom source bundle."
            )
        },
    )
    custom_source_bundle: str = field(
        default=MISSING,
        metadata={"help": "Optional path to a fully custom frontend bundle"},
    )
    custom_source_dir: str = field(
        default=MISSING,
        metadata={"help": "Optional path to a directory containing custom js code"},
    )
    extra_source_dir: str = field(
        default=MISSING,
        metadata={
            "help": (
                "Optional path to sources that the frontend may "
                "refer to (such as images/video/css/scripts)"
            )
        },
    )
    context_csv: str = field(
        default=MISSING,
        metadata={"help": "Optional path to csv containing task context"},
    )
    context_jsonl: str = field(
        default=MISSING,
        metadata={"help": "Optional path to jsonl file containing task context"},
    )
    num_conversations: int = field(
        default=MISSING,
        metadata={
            "help": "Optional count of conversations to have if no context provided"
        },
    )


@register_mephisto_abstraction()
class ParlAIChatBlueprint(OnboardingRequired, Blueprint):
    """Blueprint for a task that runs a parlai chat"""

    AgentStateClass: ClassVar[Type["AgentState"]] = ParlAIChatAgentState
    OnboardingAgentStateClass: ClassVar[Type["AgentState"]] = ParlAIChatAgentState
    TaskBuilderClass: ClassVar[Type["TaskBuilder"]] = ParlAIChatTaskBuilder
    TaskRunnerClass: ClassVar[Type["TaskRunner"]] = ParlAIChatTaskRunner
    ArgsClass = ParlAIChatBlueprintArgs
    SharedStateClass = SharedParlAITaskState
    BLUEPRINT_TYPE = BLUEPRINT_TYPE_PARLAI_CHAT

    def __init__(
        self,
        task_run: "TaskRun",
        args: "DictConfig",
        shared_state: "SharedParlAITaskState",
    ):
        super().__init__(task_run, args, shared_state)
        self._initialization_data_dicts: List[Dict[str, Any]] = []

        if args.blueprint.get("context_csv", None) is not None:
            csv_file = os.path.expanduser(args.blueprint.context_csv)
            with open(csv_file, "r", encoding="utf-8-sig") as csv_fp:
                csv_reader = csv.reader(csv_fp)
                headers = next(csv_reader)
                for row in csv_reader:
                    row_data: Dict[str, Any] = {}
                    for i, col in enumerate(row):
                        row_data[headers[i]] = col
                    self._initialization_data_dicts.append(row_data)
        elif args.blueprint.get("context_jsonl", None) is not None:
            jsonl_file = os.path.expanduser(args.blueprint.context_jsonl)
            with open(jsonl_file, "r", encoding="utf-8-sig") as jsonl_fp:
                line = jsonl_fp.readline()
                while line:
                    j = json.loads(line)
                    self._initialization_data_dicts.append(j)
                    line = jsonl_fp.readline()
        elif args.blueprint.get("num_conversations", None) is not None:
            self._initialization_data_dicts = [{}] * args.blueprint.num_conversations
        else:
            raise NotImplementedError(
                "Parsing parlai tasks directly from dicts or JSON is not supported yet"
            )

        if shared_state.world_module is None:
            world_file_path = os.path.expanduser(args.blueprint.world_file)
            world_module_dir = os.path.dirname(world_file_path)
            sys.path.append(world_module_dir)
            world_module_name = os.path.basename(world_file_path)[:-3]
            world_module = import_module(world_module_name)
        else:
            world_module = shared_state.world_module
        self.world_module = world_module
        assert hasattr(world_module, "make_world")
        assert hasattr(world_module, "get_world_params")
        self.agent_count = world_module.get_world_params()[  # type: ignore
            "agent_count"
        ]

        self.full_task_description = MISSING_SOMETHING_TEXT
        if args.blueprint.get("task_description_file", None) is not None:
            full_path = os.path.expanduser(args.blueprint.task_description_file)
            assert os.path.exists(
                full_path
            ), f"Target task description path {full_path} doesn't exist"
            with open(full_path, "r") as description_fp:
                self.full_task_description = description_fp.read()
        self.full_preview_description = MISSING_SOMETHING_TEXT
        if args.blueprint.get("preview_source", None) is not None:
            preview_source_file = os.path.expanduser(args.blueprint.preview_source)
            assert os.path.exists(
                preview_source_file
            ), f"Target preview source path {preview_source_file} doesn't exist"
            with open(preview_source_file, "r") as description_fp:
                self.full_preview_description = description_fp.read()

    @classmethod
    def assert_task_args(
        cls, args: "DictConfig", shared_state: "SharedTaskState"
    ) -> None:
        """Ensure that arguments are properly configured to launch this task"""
        # Find world module
        assert isinstance(
            shared_state, SharedParlAITaskState
        ), "Must use SharedParlAITaskState with ParlAIChatBlueprint"
        world_module = shared_state.world_module
        if world_module is None:
            world_file_path = os.path.expanduser(args.blueprint.world_file)
            world_module_dir = os.path.dirname(world_file_path)
            assert os.path.exists(
                world_file_path
            ), f"Provided world path {world_file_path} doesn't exist"
            sys.path.append(world_module_dir)
            world_module_name = os.path.basename(world_file_path)[:-3]
            world_module = import_module(world_module_name)
        # assert world file is valid
        assert hasattr(
            world_module, "make_world"
        ), "Provided world file has no `make_world` method"
        assert hasattr(
            world_module, "get_world_params"
        ), "Provided world file has no `get_world_params` method"

        # assert some method for determining quantity of conversations
        if args.blueprint.get("context_csv", None) is not None:
            csv_file = os.path.expanduser(args.blueprint.context_csv)
            assert os.path.exists(
                csv_file
            ), f"Target context_csv path {csv_file} doesn't exist"
        elif args.blueprint.get("context_jsonl", None) is not None:
            jsonl_file = os.path.expanduser(args.blueprint.context_jsonl)
            assert os.path.exists(
                jsonl_file
            ), f"Target context_jsonl path {jsonl_file} doesn't exist"
        elif args.blueprint.get("num_conversations", None) is not None:
            assert (
                args.blueprint.num_conversations > 0
            ), "Must have at least one conversation"
        else:
            raise AssertionError(
                "Must specify one of --context-csv, --context-jsonl or --num-conversations"
            )

        if args.blueprint.get("custom_source_bundle", None) is not None:
            custom_source_file_path = os.path.expanduser(
                args.blueprint.custom_source_bundle
            )
            assert os.path.exists(
                custom_source_file_path
            ), f"Provided custom bundle doesn't exist at {custom_source_file_path}"

        if args.blueprint.get("custom_source_dir", None) is not None:
            custom_source_dir_path = os.path.expanduser(
                args.blueprint.custom_source_dir
            )
            assert os.path.exists(
                custom_source_dir_path
            ), f"Provided custom source dir doesn't exist at {custom_source_dir_path}"

        if args.blueprint.get("preview_source", None) is not None:
            preview_source_file = os.path.expanduser(args.blueprint.preview_source)
            assert os.path.exists(
                preview_source_file
            ), f"Provided preview source doesn't exist at {preview_source_file}"

        if args.blueprint.get("extra_source_dir", None) is not None:
            extra_source_dir = os.path.expanduser(args.blueprint.extra_source_dir)
            assert os.path.exists(
                extra_source_dir
            ), f"Provided extra resource dir doesn't exist at {extra_source_dir}"

    def get_frontend_args(self) -> Dict[str, Any]:
        """
        Specifies what options within a task_config should be fowarded
        to the client for use by the task's frontend
        """
        # Start with standard task configuration arguments
        frontend_task_config = super().get_frontend_args()
        shared_state = self.shared_state
        assert isinstance(
            shared_state, SharedParlAITaskState
        ), "Must use SharedParlAITaskState with ParlAIChatBlueprint"
        # Add ParlAI standards
        frontend_task_config.update(
            {
                "task_description": self.full_task_description,
                "preview_html": self.full_preview_description,
                "frame_height": 650,
                "chat_title": self.args.task.task_title,
                "has_preview": self.args.blueprint.get("preview_source", None)
                is not None,
                "block_mobile": True,
                "frontend_task_opts": shared_state.frontend_task_opts,
            }
        )
        # Use overrides provided downstream
        frontend_task_config.update(self.frontend_task_config)
        return frontend_task_config

    def get_initialization_data(self) -> Iterable["InitializationData"]:
        """
        Return the InitializationData retrieved from the specified stream
        """
        return [
            InitializationData(shared=d, unit_data=[{}] * self.agent_count)
            for d in self._initialization_data_dicts
        ]

    def validate_onboarding(
        self, worker: "Worker", onboarding_agent: "OnboardingAgent"
    ) -> bool:
        if hasattr(self.world_module, "validate_onboarding"):
            return self.world_module.validate_onboarding(  # type: ignore
                onboarding_agent.state.get_data()
            )
        return True
View Source
class SharedParlAITaskState(OnboardingSharedState, SharedTaskState):
    frontend_task_opts: Dict[str, Any] = field(default_factory=dict)
    world_opt: Dict[str, Any] = field(default_factory=dict)
    onboarding_world_opt: Dict[str, Any] = field(default_factory=dict)
    world_module: Optional[Any] = None

SharedParlAITaskState(task_config: Dict[str, Any] = , qualifications: List[Any] = , worker_can_do_unit: Callable[[ForwardRef('Worker'), ForwardRef('Unit')], bool] = , on_unit_submitted: Callable[[ForwardRef('Unit')], NoneType] = , onboarding_data: Dict[str, Any] = , validate_onboarding: Callable[[Any], bool] = , frontend_task_opts: Dict[str, Any] = , world_opt: Dict[str, Any] = , onboarding_world_opt: Dict[str, Any] = , world_module: Union[Any, NoneType] = None)

#   SharedParlAITaskState( task_config: Dict[str, Any] = <factory>, qualifications: List[Any] = <factory>, worker_can_do_unit: collections.abc.Callable[mephisto.data_model.worker.Worker, mephisto.data_model.unit.Unit, bool] = <factory>, on_unit_submitted: collections.abc.Callable[mephisto.data_model.unit.Unit, NoneType] = <factory>, onboarding_data: Dict[str, Any] = <factory>, validate_onboarding: Callable[[Any], bool] = <factory>, frontend_task_opts: Dict[str, Any] = <factory>, world_opt: Dict[str, Any] = <factory>, onboarding_world_opt: Dict[str, Any] = <factory>, world_module: Union[Any, NoneType] = None )
#   world_module: Union[Any, NoneType] = None
View Source
class ParlAIChatBlueprintArgs(OnboardingRequiredArgs, BlueprintArgs):
    _blueprint_type: str = BLUEPRINT_TYPE_PARLAI_CHAT
    _group: str = field(
        default="ParlAIChatBlueprint",
        metadata={
            "help": """
                Tasks launched from ParlAI blueprints require the number of
                conversations (either an int or task data for each convo), as
                well as a world to initialize for connecting workers.
            """
        },
    )
    world_file: str = field(
        default=MISSING,
        metadata={"help": "Path to file containing ParlAI world", "required": True},
    )
    preview_source: str = field(
        default=MISSING,
        metadata={"help": "Optional path to source HTML file to preview the task"},
    )
    task_description_file: str = field(
        default=MISSING,
        metadata={
            "help": (
                "Path to file for the extended description of the task. "
                "Required if not providing a custom source bundle."
            )
        },
    )
    custom_source_bundle: str = field(
        default=MISSING,
        metadata={"help": "Optional path to a fully custom frontend bundle"},
    )
    custom_source_dir: str = field(
        default=MISSING,
        metadata={"help": "Optional path to a directory containing custom js code"},
    )
    extra_source_dir: str = field(
        default=MISSING,
        metadata={
            "help": (
                "Optional path to sources that the frontend may "
                "refer to (such as images/video/css/scripts)"
            )
        },
    )
    context_csv: str = field(
        default=MISSING,
        metadata={"help": "Optional path to csv containing task context"},
    )
    context_jsonl: str = field(
        default=MISSING,
        metadata={"help": "Optional path to jsonl file containing task context"},
    )
    num_conversations: int = field(
        default=MISSING,
        metadata={
            "help": "Optional count of conversations to have if no context provided"
        },
    )

ParlAIChatBlueprintArgs(_blueprint_type: str = 'parlai_chat', block_qualification: str = '???', onboarding_qualification: str = '???', _group: str = 'ParlAIChatBlueprint', world_file: str = '???', preview_source: str = '???', task_description_file: str = '???', custom_source_bundle: str = '???', custom_source_dir: str = '???', extra_source_dir: str = '???', context_csv: str = '???', context_jsonl: str = '???', num_conversations: int = '???')

#   ParlAIChatBlueprintArgs( _blueprint_type: str = 'parlai_chat', block_qualification: str = '???', onboarding_qualification: str = '???', _group: str = 'ParlAIChatBlueprint', world_file: str = '???', preview_source: str = '???', task_description_file: str = '???', custom_source_bundle: str = '???', custom_source_dir: str = '???', extra_source_dir: str = '???', context_csv: str = '???', context_jsonl: str = '???', num_conversations: int = '???' )
#   world_file: str = '???'
#   preview_source: str = '???'
#   task_description_file: str = '???'
#   custom_source_bundle: str = '???'
#   custom_source_dir: str = '???'
#   extra_source_dir: str = '???'
#   context_csv: str = '???'
#   context_jsonl: str = '???'
#   num_conversations: int = '???'
View Source
class ParlAIChatBlueprint(OnboardingRequired, Blueprint):
    """Blueprint for a task that runs a parlai chat"""

    AgentStateClass: ClassVar[Type["AgentState"]] = ParlAIChatAgentState
    OnboardingAgentStateClass: ClassVar[Type["AgentState"]] = ParlAIChatAgentState
    TaskBuilderClass: ClassVar[Type["TaskBuilder"]] = ParlAIChatTaskBuilder
    TaskRunnerClass: ClassVar[Type["TaskRunner"]] = ParlAIChatTaskRunner
    ArgsClass = ParlAIChatBlueprintArgs
    SharedStateClass = SharedParlAITaskState
    BLUEPRINT_TYPE = BLUEPRINT_TYPE_PARLAI_CHAT

    def __init__(
        self,
        task_run: "TaskRun",
        args: "DictConfig",
        shared_state: "SharedParlAITaskState",
    ):
        super().__init__(task_run, args, shared_state)
        self._initialization_data_dicts: List[Dict[str, Any]] = []

        if args.blueprint.get("context_csv", None) is not None:
            csv_file = os.path.expanduser(args.blueprint.context_csv)
            with open(csv_file, "r", encoding="utf-8-sig") as csv_fp:
                csv_reader = csv.reader(csv_fp)
                headers = next(csv_reader)
                for row in csv_reader:
                    row_data: Dict[str, Any] = {}
                    for i, col in enumerate(row):
                        row_data[headers[i]] = col
                    self._initialization_data_dicts.append(row_data)
        elif args.blueprint.get("context_jsonl", None) is not None:
            jsonl_file = os.path.expanduser(args.blueprint.context_jsonl)
            with open(jsonl_file, "r", encoding="utf-8-sig") as jsonl_fp:
                line = jsonl_fp.readline()
                while line:
                    j = json.loads(line)
                    self._initialization_data_dicts.append(j)
                    line = jsonl_fp.readline()
        elif args.blueprint.get("num_conversations", None) is not None:
            self._initialization_data_dicts = [{}] * args.blueprint.num_conversations
        else:
            raise NotImplementedError(
                "Parsing parlai tasks directly from dicts or JSON is not supported yet"
            )

        if shared_state.world_module is None:
            world_file_path = os.path.expanduser(args.blueprint.world_file)
            world_module_dir = os.path.dirname(world_file_path)
            sys.path.append(world_module_dir)
            world_module_name = os.path.basename(world_file_path)[:-3]
            world_module = import_module(world_module_name)
        else:
            world_module = shared_state.world_module
        self.world_module = world_module
        assert hasattr(world_module, "make_world")
        assert hasattr(world_module, "get_world_params")
        self.agent_count = world_module.get_world_params()[  # type: ignore
            "agent_count"
        ]

        self.full_task_description = MISSING_SOMETHING_TEXT
        if args.blueprint.get("task_description_file", None) is not None:
            full_path = os.path.expanduser(args.blueprint.task_description_file)
            assert os.path.exists(
                full_path
            ), f"Target task description path {full_path} doesn't exist"
            with open(full_path, "r") as description_fp:
                self.full_task_description = description_fp.read()
        self.full_preview_description = MISSING_SOMETHING_TEXT
        if args.blueprint.get("preview_source", None) is not None:
            preview_source_file = os.path.expanduser(args.blueprint.preview_source)
            assert os.path.exists(
                preview_source_file
            ), f"Target preview source path {preview_source_file} doesn't exist"
            with open(preview_source_file, "r") as description_fp:
                self.full_preview_description = description_fp.read()

    @classmethod
    def assert_task_args(
        cls, args: "DictConfig", shared_state: "SharedTaskState"
    ) -> None:
        """Ensure that arguments are properly configured to launch this task"""
        # Find world module
        assert isinstance(
            shared_state, SharedParlAITaskState
        ), "Must use SharedParlAITaskState with ParlAIChatBlueprint"
        world_module = shared_state.world_module
        if world_module is None:
            world_file_path = os.path.expanduser(args.blueprint.world_file)
            world_module_dir = os.path.dirname(world_file_path)
            assert os.path.exists(
                world_file_path
            ), f"Provided world path {world_file_path} doesn't exist"
            sys.path.append(world_module_dir)
            world_module_name = os.path.basename(world_file_path)[:-3]
            world_module = import_module(world_module_name)
        # assert world file is valid
        assert hasattr(
            world_module, "make_world"
        ), "Provided world file has no `make_world` method"
        assert hasattr(
            world_module, "get_world_params"
        ), "Provided world file has no `get_world_params` method"

        # assert some method for determining quantity of conversations
        if args.blueprint.get("context_csv", None) is not None:
            csv_file = os.path.expanduser(args.blueprint.context_csv)
            assert os.path.exists(
                csv_file
            ), f"Target context_csv path {csv_file} doesn't exist"
        elif args.blueprint.get("context_jsonl", None) is not None:
            jsonl_file = os.path.expanduser(args.blueprint.context_jsonl)
            assert os.path.exists(
                jsonl_file
            ), f"Target context_jsonl path {jsonl_file} doesn't exist"
        elif args.blueprint.get("num_conversations", None) is not None:
            assert (
                args.blueprint.num_conversations > 0
            ), "Must have at least one conversation"
        else:
            raise AssertionError(
                "Must specify one of --context-csv, --context-jsonl or --num-conversations"
            )

        if args.blueprint.get("custom_source_bundle", None) is not None:
            custom_source_file_path = os.path.expanduser(
                args.blueprint.custom_source_bundle
            )
            assert os.path.exists(
                custom_source_file_path
            ), f"Provided custom bundle doesn't exist at {custom_source_file_path}"

        if args.blueprint.get("custom_source_dir", None) is not None:
            custom_source_dir_path = os.path.expanduser(
                args.blueprint.custom_source_dir
            )
            assert os.path.exists(
                custom_source_dir_path
            ), f"Provided custom source dir doesn't exist at {custom_source_dir_path}"

        if args.blueprint.get("preview_source", None) is not None:
            preview_source_file = os.path.expanduser(args.blueprint.preview_source)
            assert os.path.exists(
                preview_source_file
            ), f"Provided preview source doesn't exist at {preview_source_file}"

        if args.blueprint.get("extra_source_dir", None) is not None:
            extra_source_dir = os.path.expanduser(args.blueprint.extra_source_dir)
            assert os.path.exists(
                extra_source_dir
            ), f"Provided extra resource dir doesn't exist at {extra_source_dir}"

    def get_frontend_args(self) -> Dict[str, Any]:
        """
        Specifies what options within a task_config should be fowarded
        to the client for use by the task's frontend
        """
        # Start with standard task configuration arguments
        frontend_task_config = super().get_frontend_args()
        shared_state = self.shared_state
        assert isinstance(
            shared_state, SharedParlAITaskState
        ), "Must use SharedParlAITaskState with ParlAIChatBlueprint"
        # Add ParlAI standards
        frontend_task_config.update(
            {
                "task_description": self.full_task_description,
                "preview_html": self.full_preview_description,
                "frame_height": 650,
                "chat_title": self.args.task.task_title,
                "has_preview": self.args.blueprint.get("preview_source", None)
                is not None,
                "block_mobile": True,
                "frontend_task_opts": shared_state.frontend_task_opts,
            }
        )
        # Use overrides provided downstream
        frontend_task_config.update(self.frontend_task_config)
        return frontend_task_config

    def get_initialization_data(self) -> Iterable["InitializationData"]:
        """
        Return the InitializationData retrieved from the specified stream
        """
        return [
            InitializationData(shared=d, unit_data=[{}] * self.agent_count)
            for d in self._initialization_data_dicts
        ]

    def validate_onboarding(
        self, worker: "Worker", onboarding_agent: "OnboardingAgent"
    ) -> bool:
        if hasattr(self.world_module, "validate_onboarding"):
            return self.world_module.validate_onboarding(  # type: ignore
                onboarding_agent.state.get_data()
            )
        return True

Blueprint for a task that runs a parlai chat

#   ParlAIChatBlueprint( task_run: mephisto.data_model.task_run.TaskRun, args: omegaconf.dictconfig.DictConfig, shared_state: mephisto.abstractions.blueprints.parlai_chat.parlai_chat_blueprint.SharedParlAITaskState )
View Source
    def __init__(
        self,
        task_run: "TaskRun",
        args: "DictConfig",
        shared_state: "SharedParlAITaskState",
    ):
        super().__init__(task_run, args, shared_state)
        self._initialization_data_dicts: List[Dict[str, Any]] = []

        if args.blueprint.get("context_csv", None) is not None:
            csv_file = os.path.expanduser(args.blueprint.context_csv)
            with open(csv_file, "r", encoding="utf-8-sig") as csv_fp:
                csv_reader = csv.reader(csv_fp)
                headers = next(csv_reader)
                for row in csv_reader:
                    row_data: Dict[str, Any] = {}
                    for i, col in enumerate(row):
                        row_data[headers[i]] = col
                    self._initialization_data_dicts.append(row_data)
        elif args.blueprint.get("context_jsonl", None) is not None:
            jsonl_file = os.path.expanduser(args.blueprint.context_jsonl)
            with open(jsonl_file, "r", encoding="utf-8-sig") as jsonl_fp:
                line = jsonl_fp.readline()
                while line:
                    j = json.loads(line)
                    self._initialization_data_dicts.append(j)
                    line = jsonl_fp.readline()
        elif args.blueprint.get("num_conversations", None) is not None:
            self._initialization_data_dicts = [{}] * args.blueprint.num_conversations
        else:
            raise NotImplementedError(
                "Parsing parlai tasks directly from dicts or JSON is not supported yet"
            )

        if shared_state.world_module is None:
            world_file_path = os.path.expanduser(args.blueprint.world_file)
            world_module_dir = os.path.dirname(world_file_path)
            sys.path.append(world_module_dir)
            world_module_name = os.path.basename(world_file_path)[:-3]
            world_module = import_module(world_module_name)
        else:
            world_module = shared_state.world_module
        self.world_module = world_module
        assert hasattr(world_module, "make_world")
        assert hasattr(world_module, "get_world_params")
        self.agent_count = world_module.get_world_params()[  # type: ignore
            "agent_count"
        ]

        self.full_task_description = MISSING_SOMETHING_TEXT
        if args.blueprint.get("task_description_file", None) is not None:
            full_path = os.path.expanduser(args.blueprint.task_description_file)
            assert os.path.exists(
                full_path
            ), f"Target task description path {full_path} doesn't exist"
            with open(full_path, "r") as description_fp:
                self.full_task_description = description_fp.read()
        self.full_preview_description = MISSING_SOMETHING_TEXT
        if args.blueprint.get("preview_source", None) is not None:
            preview_source_file = os.path.expanduser(args.blueprint.preview_source)
            assert os.path.exists(
                preview_source_file
            ), f"Target preview source path {preview_source_file} doesn't exist"
            with open(preview_source_file, "r") as description_fp:
                self.full_preview_description = description_fp.read()
#   BLUEPRINT_TYPE: str = 'parlai_chat'
#  
@classmethod
def assert_task_args( cls, args: omegaconf.dictconfig.DictConfig, shared_state: mephisto.abstractions.blueprint.SharedTaskState ) -> None:
View Source
    @classmethod
    def assert_task_args(
        cls, args: "DictConfig", shared_state: "SharedTaskState"
    ) -> None:
        """Ensure that arguments are properly configured to launch this task"""
        # Find world module
        assert isinstance(
            shared_state, SharedParlAITaskState
        ), "Must use SharedParlAITaskState with ParlAIChatBlueprint"
        world_module = shared_state.world_module
        if world_module is None:
            world_file_path = os.path.expanduser(args.blueprint.world_file)
            world_module_dir = os.path.dirname(world_file_path)
            assert os.path.exists(
                world_file_path
            ), f"Provided world path {world_file_path} doesn't exist"
            sys.path.append(world_module_dir)
            world_module_name = os.path.basename(world_file_path)[:-3]
            world_module = import_module(world_module_name)
        # assert world file is valid
        assert hasattr(
            world_module, "make_world"
        ), "Provided world file has no `make_world` method"
        assert hasattr(
            world_module, "get_world_params"
        ), "Provided world file has no `get_world_params` method"

        # assert some method for determining quantity of conversations
        if args.blueprint.get("context_csv", None) is not None:
            csv_file = os.path.expanduser(args.blueprint.context_csv)
            assert os.path.exists(
                csv_file
            ), f"Target context_csv path {csv_file} doesn't exist"
        elif args.blueprint.get("context_jsonl", None) is not None:
            jsonl_file = os.path.expanduser(args.blueprint.context_jsonl)
            assert os.path.exists(
                jsonl_file
            ), f"Target context_jsonl path {jsonl_file} doesn't exist"
        elif args.blueprint.get("num_conversations", None) is not None:
            assert (
                args.blueprint.num_conversations > 0
            ), "Must have at least one conversation"
        else:
            raise AssertionError(
                "Must specify one of --context-csv, --context-jsonl or --num-conversations"
            )

        if args.blueprint.get("custom_source_bundle", None) is not None:
            custom_source_file_path = os.path.expanduser(
                args.blueprint.custom_source_bundle
            )
            assert os.path.exists(
                custom_source_file_path
            ), f"Provided custom bundle doesn't exist at {custom_source_file_path}"

        if args.blueprint.get("custom_source_dir", None) is not None:
            custom_source_dir_path = os.path.expanduser(
                args.blueprint.custom_source_dir
            )
            assert os.path.exists(
                custom_source_dir_path
            ), f"Provided custom source dir doesn't exist at {custom_source_dir_path}"

        if args.blueprint.get("preview_source", None) is not None:
            preview_source_file = os.path.expanduser(args.blueprint.preview_source)
            assert os.path.exists(
                preview_source_file
            ), f"Provided preview source doesn't exist at {preview_source_file}"

        if args.blueprint.get("extra_source_dir", None) is not None:
            extra_source_dir = os.path.expanduser(args.blueprint.extra_source_dir)
            assert os.path.exists(
                extra_source_dir
            ), f"Provided extra resource dir doesn't exist at {extra_source_dir}"

Ensure that arguments are properly configured to launch this task

#   def get_frontend_args(self) -> Dict[str, Any]:
View Source
    def get_frontend_args(self) -> Dict[str, Any]:
        """
        Specifies what options within a task_config should be fowarded
        to the client for use by the task's frontend
        """
        # Start with standard task configuration arguments
        frontend_task_config = super().get_frontend_args()
        shared_state = self.shared_state
        assert isinstance(
            shared_state, SharedParlAITaskState
        ), "Must use SharedParlAITaskState with ParlAIChatBlueprint"
        # Add ParlAI standards
        frontend_task_config.update(
            {
                "task_description": self.full_task_description,
                "preview_html": self.full_preview_description,
                "frame_height": 650,
                "chat_title": self.args.task.task_title,
                "has_preview": self.args.blueprint.get("preview_source", None)
                is not None,
                "block_mobile": True,
                "frontend_task_opts": shared_state.frontend_task_opts,
            }
        )
        # Use overrides provided downstream
        frontend_task_config.update(self.frontend_task_config)
        return frontend_task_config

Specifies what options within a task_config should be fowarded to the client for use by the task's frontend

#   def get_initialization_data( self ) -> collections.abc.Iterable[mephisto.data_model.assignment.InitializationData]:
View Source
    def get_initialization_data(self) -> Iterable["InitializationData"]:
        """
        Return the InitializationData retrieved from the specified stream
        """
        return [
            InitializationData(shared=d, unit_data=[{}] * self.agent_count)
            for d in self._initialization_data_dicts
        ]

Return the InitializationData retrieved from the specified stream

#   def validate_onboarding( self, worker: mephisto.data_model.worker.Worker, onboarding_agent: mephisto.data_model.agent.OnboardingAgent ) -> bool:
View Source
    def validate_onboarding(
        self, worker: "Worker", onboarding_agent: "OnboardingAgent"
    ) -> bool:
        if hasattr(self.world_module, "validate_onboarding"):
            return self.world_module.validate_onboarding(  # type: ignore
                onboarding_agent.state.get_data()
            )
        return True

Check the incoming onboarding data and evaluate if the worker has passed the qualification or not. Return True if the worker has qualified.

By default we use the validate_onboarding provided in a run_task, and all onboarding tasks should allow run_task to specify additional or entirely override what's provided in a blueprint.

#   class ParlAIChatBlueprint.AgentStateClass(mephisto.abstractions._subcomponents.agent_state.AgentState):
View Source
class ParlAIChatAgentState(AgentState):
    """
    Holds information about ParlAI-style chat. Data is stored in json files
    containing every act from the ParlAI world.
    """

    def __init__(self, agent: "Agent"):
        """
        Create an AgentState to track the state of an agent's work on a Unit

        Initialize with an existing file if it exists.
        """
        self.agent = weakref.proxy(agent)
        data_file = self._get_expected_data_file()
        if os.path.exists(data_file):
            self.load_data()
        else:
            self.messages: List[Dict[str, Any]] = []
            self.final_submission: Optional[Dict[str, Any]] = None
            self.init_data = None
            self.save_data()

    def set_init_state(self, data: Any) -> bool:
        """Set the initial state for this agent"""
        if self.init_data is not None:
            # Initial state is already set
            return False
        else:
            self.init_data = data
            self.save_data()
            return True

    def get_init_state(self) -> Optional[Dict[str, Any]]:
        """
        Return the initial state for this agent,
        None if no such state exists
        """
        if self.init_data is None:
            return None
        return {"task_data": self.init_data, "past_live_updates": self.messages}

    def _get_expected_data_file(self) -> str:
        """Return the place we would expect to find data for this agent state"""
        agent_dir = self.agent.get_data_dir()
        os.makedirs(agent_dir, exist_ok=True)
        return os.path.join(agent_dir, "state.json")

    def load_data(self) -> None:
        """Load stored data from a file to this object"""
        agent_file = self._get_expected_data_file()
        with open(agent_file, "r") as state_json:
            state = json.load(state_json)
            self.messages = state["outputs"]["messages"]
            self.init_data = state["inputs"]
            self.final_submission = state["outputs"].get("final_submission")

    def get_data(self) -> Dict[str, Any]:
        """Return dict with the messages of this agent"""
        return {
            "outputs": {
                "messages": self.messages,
                "final_submission": self.final_submission,
            },
            "inputs": self.init_data,
        }

    def get_parsed_data(self) -> Dict[str, Any]:
        """Return properly parsed data from this task"""
        init_data = self.init_data
        save_data = None

        agent_name = None
        for m in self.messages:
            if "agent_display_name" in m["task_data"]:
                agent_name = m["task_data"]["agent_display_name"]
                break

        messages = self.messages
        if len(messages) > 0:
            if "WORLD_DATA" in messages[-1]:
                save_data = messages[-1]["WORLD_DATA"]
                messages = messages[:-1]

        return {
            "agent_name": agent_name,
            "initial_data": init_data,
            "messages": messages,
            "save_data": save_data,
            "final_submission": self.final_submission,
        }

    def get_task_start(self) -> float:
        """
        Return the start time for this task, the timestamp of the very first message.
        """
        return self.messages[0]["timestamp"]

    def get_task_end(self) -> float:
        """
        Return the end time for this task, the timestamp of the very final message.
        """
        return self.messages[-1]["timestamp"]

    def save_data(self) -> None:
        """Save all messages from this agent to"""
        agent_file = self._get_expected_data_file()
        with open(agent_file, "w+") as state_json:
            json.dump(self.get_data(), state_json)

    def update_data(self, live_update: Dict[str, Any]) -> None:
        """
        Append the incoming packet as well as its arrival time
        """
        live_update["timestamp"] = time.time()
        self.messages.append(live_update)
        self.save_data()

    def update_submit(self, submitted_data: Dict[str, Any]) -> None:
        """Append any final submission to this state"""
        self.final_submission = submitted_data
        self.save_data()

Holds information about ParlAI-style chat. Data is stored in json files containing every act from the ParlAI world.

Inherited Members
mephisto.abstractions.blueprints.parlai_chat.parlai_chat_agent_state.ParlAIChatAgentState
ParlAIChatAgentState
set_init_state
get_init_state
load_data
get_data
get_parsed_data
get_task_start
get_task_end
save_data
update_data
update_submit
mephisto.abstractions._subcomponents.agent_state.AgentState
STATUS_NONE
STATUS_ACCEPTED
STATUS_ONBOARDING
STATUS_WAITING
STATUS_IN_TASK
STATUS_COMPLETED
STATUS_DISCONNECT
STATUS_TIMEOUT
STATUS_PARTNER_DISCONNECT
STATUS_EXPIRED
STATUS_RETURNED
STATUS_APPROVED
STATUS_SOFT_REJECTED
STATUS_REJECTED
complete
valid
#   class ParlAIChatBlueprint.OnboardingAgentStateClass(mephisto.abstractions._subcomponents.agent_state.AgentState):
View Source
class ParlAIChatAgentState(AgentState):
    """
    Holds information about ParlAI-style chat. Data is stored in json files
    containing every act from the ParlAI world.
    """

    def __init__(self, agent: "Agent"):
        """
        Create an AgentState to track the state of an agent's work on a Unit

        Initialize with an existing file if it exists.
        """
        self.agent = weakref.proxy(agent)
        data_file = self._get_expected_data_file()
        if os.path.exists(data_file):
            self.load_data()
        else:
            self.messages: List[Dict[str, Any]] = []
            self.final_submission: Optional[Dict[str, Any]] = None
            self.init_data = None
            self.save_data()

    def set_init_state(self, data: Any) -> bool:
        """Set the initial state for this agent"""
        if self.init_data is not None:
            # Initial state is already set
            return False
        else:
            self.init_data = data
            self.save_data()
            return True

    def get_init_state(self) -> Optional[Dict[str, Any]]:
        """
        Return the initial state for this agent,
        None if no such state exists
        """
        if self.init_data is None:
            return None
        return {"task_data": self.init_data, "past_live_updates": self.messages}

    def _get_expected_data_file(self) -> str:
        """Return the place we would expect to find data for this agent state"""
        agent_dir = self.agent.get_data_dir()
        os.makedirs(agent_dir, exist_ok=True)
        return os.path.join(agent_dir, "state.json")

    def load_data(self) -> None:
        """Load stored data from a file to this object"""
        agent_file = self._get_expected_data_file()
        with open(agent_file, "r") as state_json:
            state = json.load(state_json)
            self.messages = state["outputs"]["messages"]
            self.init_data = state["inputs"]
            self.final_submission = state["outputs"].get("final_submission")

    def get_data(self) -> Dict[str, Any]:
        """Return dict with the messages of this agent"""
        return {
            "outputs": {
                "messages": self.messages,
                "final_submission": self.final_submission,
            },
            "inputs": self.init_data,
        }

    def get_parsed_data(self) -> Dict[str, Any]:
        """Return properly parsed data from this task"""
        init_data = self.init_data
        save_data = None

        agent_name = None
        for m in self.messages:
            if "agent_display_name" in m["task_data"]:
                agent_name = m["task_data"]["agent_display_name"]
                break

        messages = self.messages
        if len(messages) > 0:
            if "WORLD_DATA" in messages[-1]:
                save_data = messages[-1]["WORLD_DATA"]
                messages = messages[:-1]

        return {
            "agent_name": agent_name,
            "initial_data": init_data,
            "messages": messages,
            "save_data": save_data,
            "final_submission": self.final_submission,
        }

    def get_task_start(self) -> float:
        """
        Return the start time for this task, the timestamp of the very first message.
        """
        return self.messages[0]["timestamp"]

    def get_task_end(self) -> float:
        """
        Return the end time for this task, the timestamp of the very final message.
        """
        return self.messages[-1]["timestamp"]

    def save_data(self) -> None:
        """Save all messages from this agent to"""
        agent_file = self._get_expected_data_file()
        with open(agent_file, "w+") as state_json:
            json.dump(self.get_data(), state_json)

    def update_data(self, live_update: Dict[str, Any]) -> None:
        """
        Append the incoming packet as well as its arrival time
        """
        live_update["timestamp"] = time.time()
        self.messages.append(live_update)
        self.save_data()

    def update_submit(self, submitted_data: Dict[str, Any]) -> None:
        """Append any final submission to this state"""
        self.final_submission = submitted_data
        self.save_data()

Holds information about ParlAI-style chat. Data is stored in json files containing every act from the ParlAI world.

Inherited Members
mephisto.abstractions.blueprints.parlai_chat.parlai_chat_agent_state.ParlAIChatAgentState
ParlAIChatAgentState
set_init_state
get_init_state
load_data
get_data
get_parsed_data
get_task_start
get_task_end
save_data
update_data
update_submit
mephisto.abstractions._subcomponents.agent_state.AgentState
STATUS_NONE
STATUS_ACCEPTED
STATUS_ONBOARDING
STATUS_WAITING
STATUS_IN_TASK
STATUS_COMPLETED
STATUS_DISCONNECT
STATUS_TIMEOUT
STATUS_PARTNER_DISCONNECT
STATUS_EXPIRED
STATUS_RETURNED
STATUS_APPROVED
STATUS_SOFT_REJECTED
STATUS_REJECTED
complete
valid
#   class ParlAIChatBlueprint.TaskBuilderClass(mephisto.abstractions._subcomponents.task_builder.TaskBuilder):
View Source
class ParlAIChatTaskBuilder(TaskBuilder):
    """
    Builder for a parlai chat task, pulls the appropriate html,
    builds the frontend (if a build doesn't already exist),
    then puts the file into the server directory
    """

    BUILT_FILE = BUILT_FILE
    BUILT_MESSAGE = "built!"

    def rebuild_core(self):
        """Rebuild the frontend for this task"""
        return_dir = os.getcwd()
        os.chdir(FRONTEND_SOURCE_DIR)
        if os.path.exists(FRONTEND_BUILD_DIR):
            shutil.rmtree(FRONTEND_BUILD_DIR)
        packages_installed = subprocess.call(["npm", "install"])
        if packages_installed != 0:
            raise Exception(
                "please make sure npm is installed, otherwise view "
                "the above error for more info."
            )

        webpack_complete = subprocess.call(["npm", "run", "dev"])
        if webpack_complete != 0:
            raise Exception(
                "Webpack appears to have failed to build your "
                "frontend. See the above error for more information."
            )
        os.chdir(return_dir)

    def build_custom_bundle(self, custom_src_dir):
        """Locate all of the custom files used for a custom build, create
        a prebuild directory containing all of them, then build the
        custom source.

        Check dates to only go through this build process when files have changes
        """
        TARGET_BUILD_FILES = {
            "main.js": "src/main.js",
            "package.json": "package.json",
            "style.css": "src/style.css",
        }
        TARGET_BUILD_FOLDERS = {"components": "src/components"}

        prebuild_path = os.path.join(custom_src_dir, CUSTOM_BUILD_DIRNAME)
        build_path = os.path.join(prebuild_path, "build", "bundle.js")

        # see if we need to rebuild
        if os.path.exists(build_path):
            created_date = os.path.getmtime(build_path)
            up_to_date = True
            for fn in TARGET_BUILD_FILES.keys():
                possible_conflict = os.path.join(custom_src_dir, fn)
                if os.path.exists(possible_conflict):
                    if os.path.getmtime(possible_conflict) > created_date:
                        up_to_date = False
                        break
            for fn in TARGET_BUILD_FOLDERS.keys():
                if not up_to_date:
                    break
                possible_conflict_dir = os.path.join(custom_src_dir, fn)
                for root, dirs, files in os.walk(possible_conflict_dir):
                    if not up_to_date:
                        break
                    for fname in files:
                        path = os.path.join(root, fname)
                        if os.path.getmtime(path) > created_date:
                            up_to_date = False
                            break
                if os.path.exists(possible_conflict):
                    if os.path.getmtime(possible_conflict) > created_date:
                        up_to_date = False
                        break
            if up_to_date:
                return build_path

        # build anew
        REQUIRED_SOURCE_FILES = [
            ".babelrc",
            ".eslintrc",
            "package.json",
            "webpack.config.js",
        ]
        REQUIRED_SOURCE_DIRS = ["src"]
        if not os.path.exists(os.path.join(prebuild_path, "build")):
            os.makedirs(os.path.join(prebuild_path, "build"), exist_ok=True)

        # Copy default files
        for src_dir in REQUIRED_SOURCE_DIRS:
            src_path = os.path.join(FRONTEND_SOURCE_DIR, src_dir)
            dst_path = os.path.join(prebuild_path, src_dir)
            if os.path.exists(dst_path):
                shutil.rmtree(dst_path)
            shutil.copytree(src_path, dst_path)
        for src_file in REQUIRED_SOURCE_FILES:
            src_path = os.path.join(FRONTEND_SOURCE_DIR, src_file)
            dst_path = os.path.join(prebuild_path, src_file)
            shutil.copy2(src_path, dst_path)

        # copy custom files
        for src_file in TARGET_BUILD_FILES.keys():
            src_path = os.path.join(custom_src_dir, src_file)
            if os.path.exists(src_path):
                dst_path = os.path.join(prebuild_path, TARGET_BUILD_FILES[src_file])
                shutil.copy2(src_path, dst_path)
        for src_dir in TARGET_BUILD_FOLDERS.keys():
            src_path = os.path.join(custom_src_dir, src_dir)
            dst_path = os.path.join(prebuild_path, TARGET_BUILD_FOLDERS[src_dir])
            if os.path.exists(src_path):
                if os.path.exists(dst_path):
                    shutil.rmtree(dst_path)
                shutil.copytree(src_path, dst_path)

        # navigate and build
        return_dir = os.getcwd()
        os.chdir(prebuild_path)
        packages_installed = subprocess.call(["npm", "install"])
        if packages_installed != 0:
            raise Exception(
                "please make sure npm is installed, otherwise view "
                "the above error for more info."
            )

        webpack_complete = subprocess.call(["npm", "run", "dev"])
        if webpack_complete != 0:
            raise Exception(
                "Webpack appears to have failed to build your "
                "frontend. See the above error for more information."
            )

        # cleanup and return
        os.chdir(return_dir)
        return build_path

    def build_in_dir(self, build_dir: str):
        """Build the frontend if it doesn't exist, then copy into the server directory"""
        # Only build this task if it hasn't already been built
        if not os.path.exists(FRONTEND_BUILD_DIR):
            self.rebuild_core()

        custom_source_dir = self.args.blueprint.get("custom_source_dir", None)
        build_bundle = None
        if custom_source_dir is not None:
            custom_source_dir = os.path.expanduser(custom_source_dir)
            build_bundle = self.build_custom_bundle(custom_source_dir)

        # Copy over the preview file as preview.html, use the default if none specified
        target_resource_dir = os.path.join(build_dir, "static")
        preview_file = self.args.blueprint.get("preview_source", None)
        if preview_file is not None:
            use_preview_file = os.path.expanduser(preview_file)
            target_path = os.path.join(target_resource_dir, "preview.html")
            shutil.copy2(use_preview_file, target_path)

        # If any additional task files are required via a source_dir, copy those as well
        extra_dir_path = self.args.blueprint.get("extra_source_dir", None)
        if extra_dir_path is not None:
            extra_dir_path = os.path.expanduser(extra_dir_path)
            copy_tree(extra_dir_path, target_resource_dir)

        bundle_js_file = self.args.blueprint.get("custom_source_bundle", None)
        if bundle_js_file is None:
            if build_bundle is not None:
                bundle_js_file = build_bundle
            else:
                bundle_js_file = os.path.join(FRONTEND_BUILD_DIR, "bundle.js")
        target_path = os.path.join(target_resource_dir, "bundle.js")
        shutil.copy2(bundle_js_file, target_path)

        # Copy over the static files for this task:
        for fin_file in ["index.html", "notif.mp3"]:
            copied_static_file = os.path.join(
                FRONTEND_SOURCE_DIR, "src", "static", fin_file
            )
            target_path = os.path.join(target_resource_dir, fin_file)
            shutil.copy2(copied_static_file, target_path)

        # Write a built file confirmation
        with open(os.path.join(build_dir, self.BUILT_FILE), "w+") as built_file:
            built_file.write(self.BUILT_MESSAGE)

Builder for a parlai chat task, pulls the appropriate html, builds the frontend (if a build doesn't already exist), then puts the file into the server directory

#   class ParlAIChatBlueprint.TaskRunnerClass(mephisto.abstractions._subcomponents.task_runner.TaskRunner):
View Source
class ParlAIChatTaskRunner(TaskRunner):
    """
    Task runner for a parlai chat task
    """

    def __init__(
        self, task_run: "TaskRun", args: "DictConfig", shared_state: "SharedTaskState"
    ):
        super().__init__(task_run, args, shared_state)
        from mephisto.abstractions.blueprints.parlai_chat.parlai_chat_blueprint import (
            SharedParlAITaskState,
        )

        assert isinstance(
            shared_state, SharedParlAITaskState
        ), "Must use SharedParlAITaskState for parlai blueprints"
        if shared_state.world_module is None:
            world_file_path = os.path.expanduser(args.blueprint.world_file)
            world_module_dir = os.path.dirname(world_file_path)
            sys.path.append(world_module_dir)
            world_module_name = os.path.basename(world_file_path)[:-3]
            world_module = import_module(world_module_name)
        else:
            world_module = shared_state.world_module
        self.parlai_world_module = world_module
        world_params = world_module.get_world_params()  # type: ignore
        self.is_concurrent = world_params["agent_count"] > 1
        self.id_to_worlds: Dict[str, Any] = {}

    def get_init_data_for_agent(self, agent: "Agent") -> Dict[str, Any]:
        """
        Return the data for an agent already assigned to a particular unit
        """
        init_state = agent.state.get_init_state()
        if init_state is not None:
            # reconnecting agent, give what we've got
            return init_state
        else:
            assignment = agent.get_unit().get_assignment()
            assignment_data = self.get_data_for_assignment(assignment)
            agent.state.set_init_state(assignment_data.shared)
            new_state = agent.state.get_init_state()
            assert new_state is not None, "Recently initialized state still None"
            return new_state

    def get_world_id(self, world_type: str, extra_id: str) -> str:
        """Get a world id specific to the given world type"""
        return f"{world_type}-{extra_id}"

    def run_onboarding(self, agent: "OnboardingAgent") -> None:
        """
        ParlAI Onboarding will initialize an onboarding
        world, then run it to completion if possible
        """
        shared_state = self.shared_state
        from mephisto.abstractions.blueprints.parlai_chat.parlai_chat_blueprint import (
            SharedParlAITaskState,
        )

        assert isinstance(
            shared_state, SharedParlAITaskState
        ), "Must use SharedParlAITaskState for parlai blueprints"
        opt: Dict[str, Any] = shared_state.onboarding_world_opt
        parlai_agent = MephistoAgentWrapper(agent)
        try:
            world = self.parlai_world_module.make_onboarding_world(  # type: ignore
                opt,
                parlai_agent,
                initialization_data=shared_state.onboarding_data,
            )
        except TypeError:
            # make_world doesn't ask for initialization_data
            world = self.parlai_world_module.make_onboarding_world(opt, parlai_agent)  # type: ignore

        world_id = self.get_world_id("onboard", agent.get_agent_id())
        self.id_to_worlds[world_id] = world
        while (
            not world.episode_done()
            and agent.get_agent_id() in self.running_onboardings
        ):
            world.parley()

        # Ensure agent can submit after onboarding
        agent.update_status(AgentState.STATUS_WAITING)

        world.shutdown()
        agent.state.update_data(
            {
                "id": "SUBMIT_WORLD_DATA",
                "WORLD_DATA": world.prep_save_data([parlai_agent]),
                "text": "",
            }
        )

        # Mark the agent as done, then wait for the incoming submit action
        while not agent.await_submit(timeout=None):
            time.sleep(0.3)

    def cleanup_onboarding(self, agent: "OnboardingAgent") -> None:
        """Shutdown the world"""
        onboarding_id = agent.get_agent_id()
        world_id = self.get_world_id("onboard", onboarding_id)
        # Only shut down world if it was actually started
        if world_id in self.id_to_worlds:
            self.id_to_worlds[world_id].shutdown()
            del self.id_to_worlds[world_id]

    def run_assignment(self, assignment: "Assignment", agents: List["Agent"]) -> None:
        """
        ParlAI runners will initialize a task world, then run them to completion
        if possible
        """
        for agent in agents:
            assert agent is not None, "task was not fully assigned"
        opt: Dict[str, Any] = cast("SharedParlAITaskState", self.shared_state).world_opt
        parlai_agents = [MephistoAgentWrapper(a) for a in agents]
        try:
            world = self.parlai_world_module.make_world(  # type: ignore
                opt, parlai_agents, initialization_data=assignment.get_assignment_data()
            )
        except TypeError:
            # make_world doesn't ask for initialization_data
            world = self.parlai_world_module.make_world(opt, parlai_agents)  # type: ignore

        world_id = self.get_world_id("assignment", assignment.db_id)
        self.id_to_worlds[world_id] = world
        while not world.episode_done() and assignment.db_id in self.running_assignments:
            world.parley()

        # Ensure agents can submit after completion
        for idx in range(len(parlai_agents)):
            agents[idx].observe({"task_data": {"task_done": True}})

        # TODO(WISH) it would be nice to have individual agents be able to submit their
        # final things without needing to wait for their partner, such
        # as if one needs to rate and the other doesn't

        world.shutdown()
        for idx in range(len(parlai_agents)):
            agents[idx].state.update_data(
                {
                    "id": "SUBMIT_WORLD_DATA",
                    "WORLD_DATA": world.prep_save_data([parlai_agents[idx]]),
                    "text": "",
                }
            )

    def cleanup_assignment(self, assignment: "Assignment") -> None:
        """Handle cleanup for a specific assignment"""
        world_id = self.get_world_id("assignment", assignment.db_id)
        self.id_to_worlds[world_id].shutdown()
        del self.id_to_worlds[world_id]

    def run_unit(self, unit: "Unit", agent: "Agent") -> None:
        """
        ParlAI runners will initialize a task world, then run them to completion
        if possible
        """
        agents = [agent]
        opt: Dict[str, Any] = cast("SharedParlAITaskState", self.shared_state).world_opt
        parlai_agents = [MephistoAgentWrapper(a) for a in agents]
        try:
            world = self.parlai_world_module.make_world(  # type: ignore
                opt, parlai_agents, initialization_data=unit.get_assignment_data()
            )
        except TypeError:
            # make_world doesn't ask for initialization_data
            world = self.parlai_world_module.make_world(opt, parlai_agents)  # type: ignore

        world_id = self.get_world_id("unit", unit.db_id)
        self.id_to_worlds[world_id] = world
        while not world.episode_done() and unit.db_id in self.running_units:
            world.parley()

        # Ensure agent can submit after completion
        agent.observe({"task_data": {"task_done": True}})

        world.shutdown()
        if hasattr(world, "prep_save_data"):
            agent.observe(
                {
                    "id": "SUBMIT_WORLD_DATA",
                    "WORLD_DATA": world.prep_save_data(parlai_agents),
                    "text": "",
                }
            )

    def cleanup_unit(self, unit: "Unit") -> None:
        """Handle cleanup for a specific unit"""
        world_id = self.get_world_id("unit", unit.db_id)
        self.id_to_worlds[world_id].shutdown()
        del self.id_to_worlds[world_id]

Task runner for a parlai chat task

Inherited Members
mephisto.abstractions.blueprints.parlai_chat.parlai_chat_task_runner.ParlAIChatTaskRunner
ParlAIChatTaskRunner
get_init_data_for_agent
get_world_id
run_onboarding
cleanup_onboarding
run_assignment
cleanup_assignment
run_unit
cleanup_unit
mephisto.abstractions._subcomponents.task_runner.TaskRunner
execute_onboarding
execute_unit
execute_assignment
get_data_for_assignment
filter_units_for_worker
shutdown
View Source
class ParlAIChatBlueprintArgs(OnboardingRequiredArgs, BlueprintArgs):
    _blueprint_type: str = BLUEPRINT_TYPE_PARLAI_CHAT
    _group: str = field(
        default="ParlAIChatBlueprint",
        metadata={
            "help": """
                Tasks launched from ParlAI blueprints require the number of
                conversations (either an int or task data for each convo), as
                well as a world to initialize for connecting workers.
            """
        },
    )
    world_file: str = field(
        default=MISSING,
        metadata={"help": "Path to file containing ParlAI world", "required": True},
    )
    preview_source: str = field(
        default=MISSING,
        metadata={"help": "Optional path to source HTML file to preview the task"},
    )
    task_description_file: str = field(
        default=MISSING,
        metadata={
            "help": (
                "Path to file for the extended description of the task. "
                "Required if not providing a custom source bundle."
            )
        },
    )
    custom_source_bundle: str = field(
        default=MISSING,
        metadata={"help": "Optional path to a fully custom frontend bundle"},
    )
    custom_source_dir: str = field(
        default=MISSING,
        metadata={"help": "Optional path to a directory containing custom js code"},
    )
    extra_source_dir: str = field(
        default=MISSING,
        metadata={
            "help": (
                "Optional path to sources that the frontend may "
                "refer to (such as images/video/css/scripts)"
            )
        },
    )
    context_csv: str = field(
        default=MISSING,
        metadata={"help": "Optional path to csv containing task context"},
    )
    context_jsonl: str = field(
        default=MISSING,
        metadata={"help": "Optional path to jsonl file containing task context"},
    )
    num_conversations: int = field(
        default=MISSING,
        metadata={
            "help": "Optional count of conversations to have if no context provided"
        },
    )

ParlAIChatBlueprintArgs(_blueprint_type: str = 'parlai_chat', block_qualification: str = '???', onboarding_qualification: str = '???', _group: str = 'ParlAIChatBlueprint', world_file: str = '???', preview_source: str = '???', task_description_file: str = '???', custom_source_bundle: str = '???', custom_source_dir: str = '???', extra_source_dir: str = '???', context_csv: str = '???', context_jsonl: str = '???', num_conversations: int = '???')

View Source
class SharedParlAITaskState(OnboardingSharedState, SharedTaskState):
    frontend_task_opts: Dict[str, Any] = field(default_factory=dict)
    world_opt: Dict[str, Any] = field(default_factory=dict)
    onboarding_world_opt: Dict[str, Any] = field(default_factory=dict)
    world_module: Optional[Any] = None

SharedParlAITaskState(task_config: Dict[str, Any] = , qualifications: List[Any] = , worker_can_do_unit: Callable[[ForwardRef('Worker'), ForwardRef('Unit')], bool] = , on_unit_submitted: Callable[[ForwardRef('Unit')], NoneType] = , onboarding_data: Dict[str, Any] = , validate_onboarding: Callable[[Any], bool] = , frontend_task_opts: Dict[str, Any] = , world_opt: Dict[str, Any] = , onboarding_world_opt: Dict[str, Any] = , world_module: Union[Any, NoneType] = None)