mephisto.abstractions.architects.mock_architect

View Source
#!/usr/bin/env python3

# Copyright (c) Meta Platforms and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.


import tornado
from tornado.websocket import WebSocketHandler
import os
import threading
import uuid
import json
import time

from mephisto.abstractions.architect import Architect, ArchitectArgs
from dataclasses import dataclass, field
from mephisto.abstractions.blueprint import AgentState
from mephisto.data_model.packet import (
    PACKET_TYPE_ALIVE,
    PACKET_TYPE_SUBMIT_ONBOARDING,
    PACKET_TYPE_SUBMIT_UNIT,
    PACKET_TYPE_CLIENT_BOUND_LIVE_UPDATE,
    PACKET_TYPE_MEPHISTO_BOUND_LIVE_UPDATE,
    PACKET_TYPE_REGISTER_AGENT,
    PACKET_TYPE_AGENT_DETAILS,
    PACKET_TYPE_UPDATE_STATUS,
    PACKET_TYPE_REQUEST_STATUSES,
    PACKET_TYPE_RETURN_STATUSES,
    PACKET_TYPE_ERROR,
)
from mephisto.operations.registry import register_mephisto_abstraction
from mephisto.abstractions.architects.channels.websocket_channel import WebsocketChannel
from typing import List, Dict, Any, Optional, TYPE_CHECKING, Callable

if TYPE_CHECKING:
    from mephisto.abstractions._subcomponents.channel import Channel
    from mephisto.data_model.packet import Packet
    from mephisto.data_model.task_run import TaskRun
    from mephisto.abstractions.database import MephistoDB
    from argparse import _ArgumentGroup as ArgumentGroup
    from omegaconf import DictConfig
    from mephisto.abstractions.blueprint import SharedTaskState

MOCK_DEPLOY_URL = "MOCK_DEPLOY_URL"
ARCHITECT_TYPE = "mock"


def get_rand_id():
    return str(uuid.uuid4())


@dataclass
class MockArchitectArgs(ArchitectArgs):
    """Additional arguments for configuring a mock architect"""

    _architect_type: str = ARCHITECT_TYPE
    should_run_server: bool = field(
        default=False, metadata={"help": "Addressible location of the server"}
    )
    port: str = field(default="3000", metadata={"help": "Port to launch the server on"})


class SocketHandler(WebSocketHandler):
    def __init__(self, *args, **kwargs):
        self.subs: Dict[int, "SocketHandler"] = kwargs.pop("subs")
        self.app: "MockServer" = kwargs.pop("app")
        self.sid = get_rand_id()
        super().__init__(*args, **kwargs)

    def open(self):
        """
        Opens a websocket and assigns a random UUID that is stored in the class-level
        `subs` variable.
        """
        if self.sid not in self.subs.values():
            self.subs[self.sid] = self

    def on_close(self):
        """
        Runs when a socket is closed.
        """
        del self.subs[self.sid]

    def on_message(self, message_text):
        """
        Callback that runs when a new message is received from a client See the
        chat_service README for the resultant message structure.
        Args:
            message_text: A stringified JSON object with a text or attachment key.
                `text` should contain a string message and `attachment` is a dict.
                See `WebsocketAgent.put_data` for more information about the
                attachment dict structure.
        """
        message = json.loads(message_text)
        if message["packet_type"] == PACKET_TYPE_ALIVE:
            self.app.last_alive_packet = message
        elif message["packet_type"] == PACKET_TYPE_CLIENT_BOUND_LIVE_UPDATE:
            self.app.actions_observed += 1
        elif message["packet_type"] == PACKET_TYPE_MEPHISTO_BOUND_LIVE_UPDATE:
            self.app.actions_observed += 1
        elif message["packet_type"] != PACKET_TYPE_REQUEST_STATUSES:
            self.app.last_packet = message

    def check_origin(self, origin):
        return True


class AliveHandler(tornado.web.RequestHandler):
    """Simple handler for is_alive"""

    def get(self, eids):
        pass  # Default behavior returns 200


class MockServer(tornado.web.Application):
    """
    Tornado-based server that with hooks for sending specific
    messages through socket connections and such
    """

    def __init__(self, port):
        self.subs = {}
        self.port = port
        self.running_instance = None
        self.last_alive_packet: Optional[Dict[str, Any]] = None
        self.actions_observed = 0
        self.last_packet: Optional[Dict[str, Any]] = None
        tornado_settings = {
            "autoescape": None,
            "debug": "/dbg/" in __file__,
            "compiled_template_cache": False,
            "static_url_prefix": "/static/",
            "debug": True,
        }
        handlers = [
            ("/socket", SocketHandler, {"subs": self.subs, "app": self}),
            ("/is_alive", AliveHandler, {}),
        ]
        super(MockServer, self).__init__(handlers, **tornado_settings)

    def __server_thread_fn(self):
        """
        Main loop for the application
        """
        self.running_instance = tornado.ioloop.IOLoop()
        http_server = tornado.httpserver.HTTPServer(self, max_buffer_size=1024**3)
        http_server.listen(self.port)
        self.running_instance.start()
        http_server.stop()

    def _get_sub(self):
        """Return the subscriber socket to write to"""
        return list(self.subs.values())[0]

    def _send_message(self, message):
        """Send the given message back to the mephisto client"""
        failed_attempts = 0
        last_exception = None
        while failed_attempts < 5:
            try:
                socket = self._get_sub()
                message_json = json.dumps(message)
                socket.write_message(message_json)
                last_exception = None
                break
            except Exception as e:
                last_exception = e
                time.sleep(0.2)
                failed_attempts += 1
            finally:
                time.sleep(0.1)
        if last_exception is not None:
            raise last_exception

    def send_agent_act(self, agent_id, act_content):
        """
        Send a packet from the given agent with
        the given content
        """
        self._send_message(
            {
                "packet_type": PACKET_TYPE_MEPHISTO_BOUND_LIVE_UPDATE,
                "subject_id": agent_id,
                "data": act_content,
            }
        )

    def register_mock_agent(self, worker_name, agent_details):
        """
        Send a packet asking to register a mock agent.
        """
        self._send_message(
            {
                "packet_type": PACKET_TYPE_REGISTER_AGENT,
                "subject_id": "MockServer",
                "data": {
                    "request_id": agent_details,
                    "provider_data": {
                        "worker_name": worker_name,
                        "agent_registration_id": agent_details,
                    },
                },
            }
        )

    def submit_mock_unit(self, agent_id, submit_data):
        """
        Send a packet asking to submit data.
        """
        self._send_message(
            {
                "packet_type": PACKET_TYPE_SUBMIT_UNIT,
                "subject_id": agent_id,
                "data": submit_data,
            }
        )

    def register_mock_agent_after_onboarding(self, worker_id, agent_id, onboard_data):
        """
        Send a packet asking to register a mock agent.
        """
        self._send_message(
            {
                "packet_type": PACKET_TYPE_SUBMIT_ONBOARDING,
                "subject_id": agent_id,
                "data": {
                    "request_id": "1234",
                    "onboarding_data": onboard_data,
                },
            }
        )

    def disconnect_mock_agent(self, agent_id):
        """
        Mark a mock agent as disconnected.
        """
        self._send_message(
            {
                "packet_type": PACKET_TYPE_RETURN_STATUSES,
                "subject_id": "Mephisto",
                "data": {agent_id: AgentState.STATUS_DISCONNECT},
            }
        )

    def launch_mock(self):
        """
        Start the primary loop for this application
        """
        self.__server_thread = threading.Thread(
            target=self.__server_thread_fn, name="mock-server-thread"
        )
        self.__server_thread.start()

    def shutdown_mock(self):
        """
        Defined to shutown the tornado application.
        """

        def stop_and_free():
            self.running_instance.stop()

        self.running_instance.add_callback(stop_and_free)
        self.__server_thread.join()


@register_mephisto_abstraction()
class MockArchitect(Architect):
    """
    The MockArchitect runs a mock server on the localhost so that
    we can send special packets and assert connections have been made
    """

    ArgsClass = MockArchitectArgs
    ARCHITECT_TYPE = ARCHITECT_TYPE

    def __init__(
        self,
        db: "MephistoDB",
        args: "DictConfig",
        shared_state: "SharedTaskState",
        task_run: "TaskRun",
        build_dir_root: str,
    ):
        """Create an architect for use in testing"""
        self.task_run = task_run
        self.build_dir = build_dir_root
        self.task_run_id = task_run.db_id
        self.should_run_server = args.architect.should_run_server
        self.port = args.architect.port
        self.server: Optional["MockServer"] = None
        # TODO(#651) track state in parent class?
        self.prepared = False
        self.deployed = False
        self.cleaned = False
        self.did_shutdown = False

    def _get_socket_urls(self) -> List[str]:
        """Return the path to the local server socket"""
        assert self.port is not None, "No ports for socket"
        return [f"ws://localhost:{self.port}/socket"]

    def get_channels(
        self,
        on_channel_open: Callable[[str], None],
        on_catastrophic_disconnect: Callable[[str], None],
        on_message: Callable[[str, "Packet"], None],
    ) -> List["Channel"]:
        """
        Return a list of all relevant channels that the ClientIOHandler
        will need to register to in order to function
        """
        urls = self._get_socket_urls()
        return [
            WebsocketChannel(
                f"mock_channel_{self.task_run_id}_{idx}",
                on_channel_open=on_channel_open,
                on_catastrophic_disconnect=on_catastrophic_disconnect,
                on_message=on_message,
                socket_url=url,
            )
            for idx, url in enumerate(urls)
        ]

    def download_file(self, target_filename: str, save_dir: str) -> None:
        """
        Mock architects can just pretend to write a file
        """
        with open(os.path.join(save_dir, target_filename), "wb") as fp:
            fp.write(b"mock\n")

    def prepare(self) -> str:
        """Mark the preparation call"""
        self.prepared = True
        built_dir = os.path.join(
            self.build_dir, "mock_build_{}".format(self.task_run_id)
        )
        os.makedirs(built_dir)
        return built_dir

    def deploy(self) -> str:
        """Mock a deploy or deploy a mock server, depending on settings"""
        self.deployed = True
        if not self.should_run_server:
            return MOCK_DEPLOY_URL
        else:
            self.server = MockServer(self.port)
            self.server.launch_mock()
            return f"http://localhost:{self.port}/"

    def cleanup(self) -> None:
        """Mark the cleanup call"""
        self.cleaned = True

    def shutdown(self) -> None:
        """Mark the shutdown call"""
        self.did_shutdown = True
        if self.should_run_server and self.server is not None:
            self.server.shutdown_mock()
#   def get_rand_id():
View Source
def get_rand_id():
    return str(uuid.uuid4())
View Source
class MockArchitectArgs(ArchitectArgs):
    """Additional arguments for configuring a mock architect"""

    _architect_type: str = ARCHITECT_TYPE
    should_run_server: bool = field(
        default=False, metadata={"help": "Addressible location of the server"}
    )
    port: str = field(default="3000", metadata={"help": "Port to launch the server on"})

Additional arguments for configuring a mock architect

#   MockArchitectArgs( _architect_type: str = 'mock', server_type: str = 'node', server_source_path: str = '???', should_run_server: bool = False, port: str = '3000' )
#   should_run_server: bool = False
#   port: str = '3000'
#   class SocketHandler(tornado.websocket.WebSocketHandler):
View Source
class SocketHandler(WebSocketHandler):
    def __init__(self, *args, **kwargs):
        self.subs: Dict[int, "SocketHandler"] = kwargs.pop("subs")
        self.app: "MockServer" = kwargs.pop("app")
        self.sid = get_rand_id()
        super().__init__(*args, **kwargs)

    def open(self):
        """
        Opens a websocket and assigns a random UUID that is stored in the class-level
        `subs` variable.
        """
        if self.sid not in self.subs.values():
            self.subs[self.sid] = self

    def on_close(self):
        """
        Runs when a socket is closed.
        """
        del self.subs[self.sid]

    def on_message(self, message_text):
        """
        Callback that runs when a new message is received from a client See the
        chat_service README for the resultant message structure.
        Args:
            message_text: A stringified JSON object with a text or attachment key.
                `text` should contain a string message and `attachment` is a dict.
                See `WebsocketAgent.put_data` for more information about the
                attachment dict structure.
        """
        message = json.loads(message_text)
        if message["packet_type"] == PACKET_TYPE_ALIVE:
            self.app.last_alive_packet = message
        elif message["packet_type"] == PACKET_TYPE_CLIENT_BOUND_LIVE_UPDATE:
            self.app.actions_observed += 1
        elif message["packet_type"] == PACKET_TYPE_MEPHISTO_BOUND_LIVE_UPDATE:
            self.app.actions_observed += 1
        elif message["packet_type"] != PACKET_TYPE_REQUEST_STATUSES:
            self.app.last_packet = message

    def check_origin(self, origin):
        return True

Subclass this class to create a basic WebSocket handler.

Override on_message to handle incoming messages, and use write_message to send messages to the client. You can also override open and on_close to handle opened and closed connections.

Custom upgrade response headers can be sent by overriding ~tornado.web.RequestHandler.set_default_headers or ~tornado.web.RequestHandler.prepare.

See http://dev.w3.org/html5/websockets/ for details on the JavaScript interface. The protocol is specified at http://tools.ietf.org/html/rfc6455.

Here is an example WebSocket handler that echos back all received messages back to the client:

.. testcode::

class EchoWebSocket(tornado.websocket.WebSocketHandler): def open(self): print("WebSocket opened")

  def on_message(self, message):
      self.write_message(u"You said: " + message)

  def on_close(self):
      print("WebSocket closed")

.. testoutput:: :hide:

WebSockets are not standard HTTP connections. The "handshake" is HTTP, but after the handshake, the protocol is message-based. Consequently, most of the Tornado HTTP facilities are not available in handlers of this type. The only communication methods available to you are write_message(), ping(), and close(). Likewise, your request handler class should implement open() method rather than get() or post().

If you map the handler above to /websocket in your application, you can invoke it in JavaScript with::

var ws = new WebSocket("ws://localhost:8888/websocket"); ws.onopen = function() { ws.send("Hello, world"); }; ws.onmessage = function (evt) { alert(evt.data); };

This script pops up an alert box that says "You said: Hello, world".

Web browsers allow any site to open a websocket connection to any other, instead of using the same-origin policy that governs other network access from javascript. This can be surprising and is a potential security hole, so since Tornado 4.0 WebSocketHandler requires applications that wish to receive cross-origin websockets to opt in by overriding the ~WebSocketHandler.check_origin method (see that method's docs for details). Failure to do so is the most likely cause of 403 errors when making a websocket connection.

When using a secure websocket connection (wss://) with a self-signed certificate, the connection from a browser may fail because it wants to show the "accept this certificate" dialog but has nowhere to show it. You must first visit a regular HTML page using the same certificate to accept it before the websocket connection will succeed.

If the application setting websocket_ping_interval has a non-zero value, a ping will be sent periodically, and the connection will be closed if a response is not received before the websocket_ping_timeout.

Messages larger than the websocket_max_message_size application setting (default 10MiB) will not be accepted.

.. versionchanged:: 4.5 Added websocket_ping_interval, websocket_ping_timeout, and websocket_max_message_size.

#   SocketHandler(*args, **kwargs)
View Source
    def __init__(self, *args, **kwargs):
        self.subs: Dict[int, "SocketHandler"] = kwargs.pop("subs")
        self.app: "MockServer" = kwargs.pop("app")
        self.sid = get_rand_id()
        super().__init__(*args, **kwargs)
#   def open(self):
View Source
    def open(self):
        """
        Opens a websocket and assigns a random UUID that is stored in the class-level
        `subs` variable.
        """
        if self.sid not in self.subs.values():
            self.subs[self.sid] = self

Opens a websocket and assigns a random UUID that is stored in the class-level subs variable.

#   def on_close(self):
View Source
    def on_close(self):
        """
        Runs when a socket is closed.
        """
        del self.subs[self.sid]

Runs when a socket is closed.

#   def on_message(self, message_text):
View Source
    def on_message(self, message_text):
        """
        Callback that runs when a new message is received from a client See the
        chat_service README for the resultant message structure.
        Args:
            message_text: A stringified JSON object with a text or attachment key.
                `text` should contain a string message and `attachment` is a dict.
                See `WebsocketAgent.put_data` for more information about the
                attachment dict structure.
        """
        message = json.loads(message_text)
        if message["packet_type"] == PACKET_TYPE_ALIVE:
            self.app.last_alive_packet = message
        elif message["packet_type"] == PACKET_TYPE_CLIENT_BOUND_LIVE_UPDATE:
            self.app.actions_observed += 1
        elif message["packet_type"] == PACKET_TYPE_MEPHISTO_BOUND_LIVE_UPDATE:
            self.app.actions_observed += 1
        elif message["packet_type"] != PACKET_TYPE_REQUEST_STATUSES:
            self.app.last_packet = message

Callback that runs when a new message is received from a client See the chat_service README for the resultant message structure. Args: message_text: A stringified JSON object with a text or attachment key. text should contain a string message and attachment is a dict. See WebsocketAgent.put_data for more information about the attachment dict structure.

#   def check_origin(self, origin):
View Source
    def check_origin(self, origin):
        return True

Override to enable support for allowing alternate origins.

The origin argument is the value of the Origin HTTP header, the url responsible for initiating this request. This method is not called for clients that do not send this header; such requests are always allowed (because all browsers that implement WebSockets support this header, and non-browser clients do not have the same cross-site security concerns).

Should return True to accept the request or False to reject it. By default, rejects all requests with an origin on a host other than this one.

This is a security protection against cross site scripting attacks on browsers, since WebSockets are allowed to bypass the usual same-origin policies and don't use CORS headers.

.. warning::

This is an important security measure; don't disable it without understanding the security implications. In particular, if your authentication is cookie-based, you must either restrict the origins allowed by check_origin() or implement your own XSRF-like protection for websocket connections. See these <https://www.christian-schneider.net/CrossSiteWebSocketHijacking.html>_ articles <https://devcenter.heroku.com/articles/websocket-security>_ for more.

To accept all cross-origin traffic (which was the default prior to Tornado 4.0), simply override this method to always return True::

def check_origin(self, origin):
    return True

To allow connections from any subdomain of your site, you might do something like::

def check_origin(self, origin):
    parsed_origin = urllib.parse.urlparse(origin)
    return parsed_origin.netloc.endswith(".mydomain.com")

.. versionadded:: 4.0

Inherited Members
tornado.websocket.WebSocketHandler
stream
get
ping_interval
ping_timeout
max_message_size
write_message
select_subprotocol
selected_subprotocol
get_compression_options
ping
on_pong
on_ping
close
set_nodelay
on_connection_close
on_ws_connection_close
send_error
get_websocket_protocol
tornado.web.RequestHandler
SUPPORTED_METHODS
path_args
path_kwargs
initialize
settings
head
post
delete
patch
put
options
prepare
on_finish
clear
set_default_headers
set_status
get_status
set_header
add_header
clear_header
get_argument
get_arguments
get_body_argument
get_body_arguments
get_query_argument
get_query_arguments
decode_argument
cookies
clear_all_cookies
create_signed_value
redirect
write
render
render_linked_js
render_embed_js
render_linked_css
render_embed_css
render_string
get_template_namespace
create_template_loader
flush
finish
detach
write_error
locale
get_user_locale
get_browser_locale
current_user
get_current_user
get_login_url
get_template_path
xsrf_token
xsrf_form_html
static_url
require_setting
reverse_url
compute_etag
set_etag_header
check_etag_header
data_received
log_exception
#   class AliveHandler(tornado.web.RequestHandler):
View Source
class AliveHandler(tornado.web.RequestHandler):
    """Simple handler for is_alive"""

    def get(self, eids):
        pass  # Default behavior returns 200

Simple handler for is_alive

#   def get(self, eids):
View Source
    def get(self, eids):
        pass  # Default behavior returns 200
Inherited Members
tornado.web.RequestHandler
RequestHandler
SUPPORTED_METHODS
path_args
path_kwargs
initialize
settings
head
post
delete
patch
put
options
prepare
on_finish
on_connection_close
clear
set_default_headers
set_status
get_status
set_header
add_header
clear_header
get_argument
get_arguments
get_body_argument
get_body_arguments
get_query_argument
get_query_arguments
decode_argument
cookies
clear_all_cookies
create_signed_value
redirect
write
render
render_linked_js
render_embed_js
render_linked_css
render_embed_css
render_string
get_template_namespace
create_template_loader
flush
finish
detach
send_error
write_error
locale
get_user_locale
get_browser_locale
current_user
get_current_user
get_login_url
get_template_path
xsrf_token
xsrf_form_html
static_url
require_setting
reverse_url
compute_etag
set_etag_header
check_etag_header
data_received
log_exception
#   class MockServer(tornado.web.Application):
View Source
class MockServer(tornado.web.Application):
    """
    Tornado-based server that with hooks for sending specific
    messages through socket connections and such
    """

    def __init__(self, port):
        self.subs = {}
        self.port = port
        self.running_instance = None
        self.last_alive_packet: Optional[Dict[str, Any]] = None
        self.actions_observed = 0
        self.last_packet: Optional[Dict[str, Any]] = None
        tornado_settings = {
            "autoescape": None,
            "debug": "/dbg/" in __file__,
            "compiled_template_cache": False,
            "static_url_prefix": "/static/",
            "debug": True,
        }
        handlers = [
            ("/socket", SocketHandler, {"subs": self.subs, "app": self}),
            ("/is_alive", AliveHandler, {}),
        ]
        super(MockServer, self).__init__(handlers, **tornado_settings)

    def __server_thread_fn(self):
        """
        Main loop for the application
        """
        self.running_instance = tornado.ioloop.IOLoop()
        http_server = tornado.httpserver.HTTPServer(self, max_buffer_size=1024**3)
        http_server.listen(self.port)
        self.running_instance.start()
        http_server.stop()

    def _get_sub(self):
        """Return the subscriber socket to write to"""
        return list(self.subs.values())[0]

    def _send_message(self, message):
        """Send the given message back to the mephisto client"""
        failed_attempts = 0
        last_exception = None
        while failed_attempts < 5:
            try:
                socket = self._get_sub()
                message_json = json.dumps(message)
                socket.write_message(message_json)
                last_exception = None
                break
            except Exception as e:
                last_exception = e
                time.sleep(0.2)
                failed_attempts += 1
            finally:
                time.sleep(0.1)
        if last_exception is not None:
            raise last_exception

    def send_agent_act(self, agent_id, act_content):
        """
        Send a packet from the given agent with
        the given content
        """
        self._send_message(
            {
                "packet_type": PACKET_TYPE_MEPHISTO_BOUND_LIVE_UPDATE,
                "subject_id": agent_id,
                "data": act_content,
            }
        )

    def register_mock_agent(self, worker_name, agent_details):
        """
        Send a packet asking to register a mock agent.
        """
        self._send_message(
            {
                "packet_type": PACKET_TYPE_REGISTER_AGENT,
                "subject_id": "MockServer",
                "data": {
                    "request_id": agent_details,
                    "provider_data": {
                        "worker_name": worker_name,
                        "agent_registration_id": agent_details,
                    },
                },
            }
        )

    def submit_mock_unit(self, agent_id, submit_data):
        """
        Send a packet asking to submit data.
        """
        self._send_message(
            {
                "packet_type": PACKET_TYPE_SUBMIT_UNIT,
                "subject_id": agent_id,
                "data": submit_data,
            }
        )

    def register_mock_agent_after_onboarding(self, worker_id, agent_id, onboard_data):
        """
        Send a packet asking to register a mock agent.
        """
        self._send_message(
            {
                "packet_type": PACKET_TYPE_SUBMIT_ONBOARDING,
                "subject_id": agent_id,
                "data": {
                    "request_id": "1234",
                    "onboarding_data": onboard_data,
                },
            }
        )

    def disconnect_mock_agent(self, agent_id):
        """
        Mark a mock agent as disconnected.
        """
        self._send_message(
            {
                "packet_type": PACKET_TYPE_RETURN_STATUSES,
                "subject_id": "Mephisto",
                "data": {agent_id: AgentState.STATUS_DISCONNECT},
            }
        )

    def launch_mock(self):
        """
        Start the primary loop for this application
        """
        self.__server_thread = threading.Thread(
            target=self.__server_thread_fn, name="mock-server-thread"
        )
        self.__server_thread.start()

    def shutdown_mock(self):
        """
        Defined to shutown the tornado application.
        """

        def stop_and_free():
            self.running_instance.stop()

        self.running_instance.add_callback(stop_and_free)
        self.__server_thread.join()

Tornado-based server that with hooks for sending specific messages through socket connections and such

#   MockServer(port)
View Source
    def __init__(self, port):
        self.subs = {}
        self.port = port
        self.running_instance = None
        self.last_alive_packet: Optional[Dict[str, Any]] = None
        self.actions_observed = 0
        self.last_packet: Optional[Dict[str, Any]] = None
        tornado_settings = {
            "autoescape": None,
            "debug": "/dbg/" in __file__,
            "compiled_template_cache": False,
            "static_url_prefix": "/static/",
            "debug": True,
        }
        handlers = [
            ("/socket", SocketHandler, {"subs": self.subs, "app": self}),
            ("/is_alive", AliveHandler, {}),
        ]
        super(MockServer, self).__init__(handlers, **tornado_settings)
#   def send_agent_act(self, agent_id, act_content):
View Source
    def send_agent_act(self, agent_id, act_content):
        """
        Send a packet from the given agent with
        the given content
        """
        self._send_message(
            {
                "packet_type": PACKET_TYPE_MEPHISTO_BOUND_LIVE_UPDATE,
                "subject_id": agent_id,
                "data": act_content,
            }
        )

Send a packet from the given agent with the given content

#   def register_mock_agent(self, worker_name, agent_details):
View Source
    def register_mock_agent(self, worker_name, agent_details):
        """
        Send a packet asking to register a mock agent.
        """
        self._send_message(
            {
                "packet_type": PACKET_TYPE_REGISTER_AGENT,
                "subject_id": "MockServer",
                "data": {
                    "request_id": agent_details,
                    "provider_data": {
                        "worker_name": worker_name,
                        "agent_registration_id": agent_details,
                    },
                },
            }
        )

Send a packet asking to register a mock agent.

#   def submit_mock_unit(self, agent_id, submit_data):
View Source
    def submit_mock_unit(self, agent_id, submit_data):
        """
        Send a packet asking to submit data.
        """
        self._send_message(
            {
                "packet_type": PACKET_TYPE_SUBMIT_UNIT,
                "subject_id": agent_id,
                "data": submit_data,
            }
        )

Send a packet asking to submit data.

#   def register_mock_agent_after_onboarding(self, worker_id, agent_id, onboard_data):
View Source
    def register_mock_agent_after_onboarding(self, worker_id, agent_id, onboard_data):
        """
        Send a packet asking to register a mock agent.
        """
        self._send_message(
            {
                "packet_type": PACKET_TYPE_SUBMIT_ONBOARDING,
                "subject_id": agent_id,
                "data": {
                    "request_id": "1234",
                    "onboarding_data": onboard_data,
                },
            }
        )

Send a packet asking to register a mock agent.

#   def disconnect_mock_agent(self, agent_id):
View Source
    def disconnect_mock_agent(self, agent_id):
        """
        Mark a mock agent as disconnected.
        """
        self._send_message(
            {
                "packet_type": PACKET_TYPE_RETURN_STATUSES,
                "subject_id": "Mephisto",
                "data": {agent_id: AgentState.STATUS_DISCONNECT},
            }
        )

Mark a mock agent as disconnected.

#   def launch_mock(self):
View Source
    def launch_mock(self):
        """
        Start the primary loop for this application
        """
        self.__server_thread = threading.Thread(
            target=self.__server_thread_fn, name="mock-server-thread"
        )
        self.__server_thread.start()

Start the primary loop for this application

#   def shutdown_mock(self):
View Source
    def shutdown_mock(self):
        """
        Defined to shutown the tornado application.
        """

        def stop_and_free():
            self.running_instance.stop()

        self.running_instance.add_callback(stop_and_free)
        self.__server_thread.join()

Defined to shutown the tornado application.

Inherited Members
tornado.web.Application
listen
add_handlers
add_transform
find_handler
get_handler_delegate
reverse_url
log_request
tornado.routing.Router
start_request
tornado.httputil.HTTPServerConnectionDelegate
on_close
View Source
class MockArchitect(Architect):
    """
    The MockArchitect runs a mock server on the localhost so that
    we can send special packets and assert connections have been made
    """

    ArgsClass = MockArchitectArgs
    ARCHITECT_TYPE = ARCHITECT_TYPE

    def __init__(
        self,
        db: "MephistoDB",
        args: "DictConfig",
        shared_state: "SharedTaskState",
        task_run: "TaskRun",
        build_dir_root: str,
    ):
        """Create an architect for use in testing"""
        self.task_run = task_run
        self.build_dir = build_dir_root
        self.task_run_id = task_run.db_id
        self.should_run_server = args.architect.should_run_server
        self.port = args.architect.port
        self.server: Optional["MockServer"] = None
        # TODO(#651) track state in parent class?
        self.prepared = False
        self.deployed = False
        self.cleaned = False
        self.did_shutdown = False

    def _get_socket_urls(self) -> List[str]:
        """Return the path to the local server socket"""
        assert self.port is not None, "No ports for socket"
        return [f"ws://localhost:{self.port}/socket"]

    def get_channels(
        self,
        on_channel_open: Callable[[str], None],
        on_catastrophic_disconnect: Callable[[str], None],
        on_message: Callable[[str, "Packet"], None],
    ) -> List["Channel"]:
        """
        Return a list of all relevant channels that the ClientIOHandler
        will need to register to in order to function
        """
        urls = self._get_socket_urls()
        return [
            WebsocketChannel(
                f"mock_channel_{self.task_run_id}_{idx}",
                on_channel_open=on_channel_open,
                on_catastrophic_disconnect=on_catastrophic_disconnect,
                on_message=on_message,
                socket_url=url,
            )
            for idx, url in enumerate(urls)
        ]

    def download_file(self, target_filename: str, save_dir: str) -> None:
        """
        Mock architects can just pretend to write a file
        """
        with open(os.path.join(save_dir, target_filename), "wb") as fp:
            fp.write(b"mock\n")

    def prepare(self) -> str:
        """Mark the preparation call"""
        self.prepared = True
        built_dir = os.path.join(
            self.build_dir, "mock_build_{}".format(self.task_run_id)
        )
        os.makedirs(built_dir)
        return built_dir

    def deploy(self) -> str:
        """Mock a deploy or deploy a mock server, depending on settings"""
        self.deployed = True
        if not self.should_run_server:
            return MOCK_DEPLOY_URL
        else:
            self.server = MockServer(self.port)
            self.server.launch_mock()
            return f"http://localhost:{self.port}/"

    def cleanup(self) -> None:
        """Mark the cleanup call"""
        self.cleaned = True

    def shutdown(self) -> None:
        """Mark the shutdown call"""
        self.did_shutdown = True
        if self.should_run_server and self.server is not None:
            self.server.shutdown_mock()

The MockArchitect runs a mock server on the localhost so that we can send special packets and assert connections have been made

#   MockArchitect( db: mephisto.abstractions.database.MephistoDB, args: omegaconf.dictconfig.DictConfig, shared_state: mephisto.abstractions.blueprint.SharedTaskState, task_run: mephisto.data_model.task_run.TaskRun, build_dir_root: str )
View Source
    def __init__(
        self,
        db: "MephistoDB",
        args: "DictConfig",
        shared_state: "SharedTaskState",
        task_run: "TaskRun",
        build_dir_root: str,
    ):
        """Create an architect for use in testing"""
        self.task_run = task_run
        self.build_dir = build_dir_root
        self.task_run_id = task_run.db_id
        self.should_run_server = args.architect.should_run_server
        self.port = args.architect.port
        self.server: Optional["MockServer"] = None
        # TODO(#651) track state in parent class?
        self.prepared = False
        self.deployed = False
        self.cleaned = False
        self.did_shutdown = False

Create an architect for use in testing

#   ARCHITECT_TYPE: str = 'mock'
#   def get_channels( self, on_channel_open: Callable[[str], NoneType], on_catastrophic_disconnect: Callable[[str], NoneType], on_message: collections.abc.Callable[str, mephisto.data_model.packet.Packet, NoneType] ) -> list[mephisto.abstractions._subcomponents.channel.Channel]:
View Source
    def get_channels(
        self,
        on_channel_open: Callable[[str], None],
        on_catastrophic_disconnect: Callable[[str], None],
        on_message: Callable[[str, "Packet"], None],
    ) -> List["Channel"]:
        """
        Return a list of all relevant channels that the ClientIOHandler
        will need to register to in order to function
        """
        urls = self._get_socket_urls()
        return [
            WebsocketChannel(
                f"mock_channel_{self.task_run_id}_{idx}",
                on_channel_open=on_channel_open,
                on_catastrophic_disconnect=on_catastrophic_disconnect,
                on_message=on_message,
                socket_url=url,
            )
            for idx, url in enumerate(urls)
        ]

Return a list of all relevant channels that the ClientIOHandler will need to register to in order to function

#   def download_file(self, target_filename: str, save_dir: str) -> None:
View Source
    def download_file(self, target_filename: str, save_dir: str) -> None:
        """
        Mock architects can just pretend to write a file
        """
        with open(os.path.join(save_dir, target_filename), "wb") as fp:
            fp.write(b"mock\n")

Mock architects can just pretend to write a file

#   def prepare(self) -> str:
View Source
    def prepare(self) -> str:
        """Mark the preparation call"""
        self.prepared = True
        built_dir = os.path.join(
            self.build_dir, "mock_build_{}".format(self.task_run_id)
        )
        os.makedirs(built_dir)
        return built_dir

Mark the preparation call

#   def deploy(self) -> str:
View Source
    def deploy(self) -> str:
        """Mock a deploy or deploy a mock server, depending on settings"""
        self.deployed = True
        if not self.should_run_server:
            return MOCK_DEPLOY_URL
        else:
            self.server = MockServer(self.port)
            self.server.launch_mock()
            return f"http://localhost:{self.port}/"

Mock a deploy or deploy a mock server, depending on settings

#   def cleanup(self) -> None:
View Source
    def cleanup(self) -> None:
        """Mark the cleanup call"""
        self.cleaned = True

Mark the cleanup call

#   def shutdown(self) -> None:
View Source
    def shutdown(self) -> None:
        """Mark the shutdown call"""
        self.did_shutdown = True
        if self.should_run_server and self.server is not None:
            self.server.shutdown_mock()

Mark the shutdown call

#   class MockArchitect.ArgsClass(mephisto.abstractions.architect.ArchitectArgs):
View Source
class MockArchitectArgs(ArchitectArgs):
    """Additional arguments for configuring a mock architect"""

    _architect_type: str = ARCHITECT_TYPE
    should_run_server: bool = field(
        default=False, metadata={"help": "Addressible location of the server"}
    )
    port: str = field(default="3000", metadata={"help": "Port to launch the server on"})

Additional arguments for configuring a mock architect