mephisto.abstractions.architects.mock_architect
View Source
#!/usr/bin/env python3 # Copyright (c) Meta Platforms and its affiliates. # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. import tornado from tornado.websocket import WebSocketHandler import os import threading import uuid import json import time from mephisto.abstractions.architect import Architect, ArchitectArgs from dataclasses import dataclass, field from mephisto.abstractions.blueprint import AgentState from mephisto.data_model.packet import ( PACKET_TYPE_ALIVE, PACKET_TYPE_SUBMIT_ONBOARDING, PACKET_TYPE_SUBMIT_UNIT, PACKET_TYPE_CLIENT_BOUND_LIVE_UPDATE, PACKET_TYPE_MEPHISTO_BOUND_LIVE_UPDATE, PACKET_TYPE_REGISTER_AGENT, PACKET_TYPE_AGENT_DETAILS, PACKET_TYPE_UPDATE_STATUS, PACKET_TYPE_REQUEST_STATUSES, PACKET_TYPE_RETURN_STATUSES, PACKET_TYPE_ERROR, ) from mephisto.operations.registry import register_mephisto_abstraction from mephisto.abstractions.architects.channels.websocket_channel import WebsocketChannel from typing import List, Dict, Any, Optional, TYPE_CHECKING, Callable if TYPE_CHECKING: from mephisto.abstractions._subcomponents.channel import Channel from mephisto.data_model.packet import Packet from mephisto.data_model.task_run import TaskRun from mephisto.abstractions.database import MephistoDB from argparse import _ArgumentGroup as ArgumentGroup from omegaconf import DictConfig from mephisto.abstractions.blueprint import SharedTaskState MOCK_DEPLOY_URL = "MOCK_DEPLOY_URL" ARCHITECT_TYPE = "mock" def get_rand_id(): return str(uuid.uuid4()) @dataclass class MockArchitectArgs(ArchitectArgs): """Additional arguments for configuring a mock architect""" _architect_type: str = ARCHITECT_TYPE should_run_server: bool = field( default=False, metadata={"help": "Addressible location of the server"} ) port: str = field(default="3000", metadata={"help": "Port to launch the server on"}) class SocketHandler(WebSocketHandler): def __init__(self, *args, **kwargs): self.subs: Dict[int, "SocketHandler"] = kwargs.pop("subs") self.app: "MockServer" = kwargs.pop("app") self.sid = get_rand_id() super().__init__(*args, **kwargs) def open(self): """ Opens a websocket and assigns a random UUID that is stored in the class-level `subs` variable. """ if self.sid not in self.subs.values(): self.subs[self.sid] = self def on_close(self): """ Runs when a socket is closed. """ del self.subs[self.sid] def on_message(self, message_text): """ Callback that runs when a new message is received from a client See the chat_service README for the resultant message structure. Args: message_text: A stringified JSON object with a text or attachment key. `text` should contain a string message and `attachment` is a dict. See `WebsocketAgent.put_data` for more information about the attachment dict structure. """ message = json.loads(message_text) if message["packet_type"] == PACKET_TYPE_ALIVE: self.app.last_alive_packet = message elif message["packet_type"] == PACKET_TYPE_CLIENT_BOUND_LIVE_UPDATE: self.app.actions_observed += 1 elif message["packet_type"] == PACKET_TYPE_MEPHISTO_BOUND_LIVE_UPDATE: self.app.actions_observed += 1 elif message["packet_type"] != PACKET_TYPE_REQUEST_STATUSES: self.app.last_packet = message def check_origin(self, origin): return True class AliveHandler(tornado.web.RequestHandler): """Simple handler for is_alive""" def get(self, eids): pass # Default behavior returns 200 class MockServer(tornado.web.Application): """ Tornado-based server that with hooks for sending specific messages through socket connections and such """ def __init__(self, port): self.subs = {} self.port = port self.running_instance = None self.last_alive_packet: Optional[Dict[str, Any]] = None self.actions_observed = 0 self.last_packet: Optional[Dict[str, Any]] = None tornado_settings = { "autoescape": None, "debug": "/dbg/" in __file__, "compiled_template_cache": False, "static_url_prefix": "/static/", "debug": True, } handlers = [ ("/socket", SocketHandler, {"subs": self.subs, "app": self}), ("/is_alive", AliveHandler, {}), ] super(MockServer, self).__init__(handlers, **tornado_settings) def __server_thread_fn(self): """ Main loop for the application """ self.running_instance = tornado.ioloop.IOLoop() http_server = tornado.httpserver.HTTPServer(self, max_buffer_size=1024**3) http_server.listen(self.port) self.running_instance.start() http_server.stop() def _get_sub(self): """Return the subscriber socket to write to""" return list(self.subs.values())[0] def _send_message(self, message): """Send the given message back to the mephisto client""" failed_attempts = 0 last_exception = None while failed_attempts < 5: try: socket = self._get_sub() message_json = json.dumps(message) socket.write_message(message_json) last_exception = None break except Exception as e: last_exception = e time.sleep(0.2) failed_attempts += 1 finally: time.sleep(0.1) if last_exception is not None: raise last_exception def send_agent_act(self, agent_id, act_content): """ Send a packet from the given agent with the given content """ self._send_message( { "packet_type": PACKET_TYPE_MEPHISTO_BOUND_LIVE_UPDATE, "subject_id": agent_id, "data": act_content, } ) def register_mock_agent(self, worker_name, agent_details): """ Send a packet asking to register a mock agent. """ self._send_message( { "packet_type": PACKET_TYPE_REGISTER_AGENT, "subject_id": "MockServer", "data": { "request_id": agent_details, "provider_data": { "worker_name": worker_name, "agent_registration_id": agent_details, }, }, } ) def submit_mock_unit(self, agent_id, submit_data): """ Send a packet asking to submit data. """ self._send_message( { "packet_type": PACKET_TYPE_SUBMIT_UNIT, "subject_id": agent_id, "data": submit_data, } ) def register_mock_agent_after_onboarding(self, worker_id, agent_id, onboard_data): """ Send a packet asking to register a mock agent. """ self._send_message( { "packet_type": PACKET_TYPE_SUBMIT_ONBOARDING, "subject_id": agent_id, "data": { "request_id": "1234", "onboarding_data": onboard_data, }, } ) def disconnect_mock_agent(self, agent_id): """ Mark a mock agent as disconnected. """ self._send_message( { "packet_type": PACKET_TYPE_RETURN_STATUSES, "subject_id": "Mephisto", "data": {agent_id: AgentState.STATUS_DISCONNECT}, } ) def launch_mock(self): """ Start the primary loop for this application """ self.__server_thread = threading.Thread( target=self.__server_thread_fn, name="mock-server-thread" ) self.__server_thread.start() def shutdown_mock(self): """ Defined to shutown the tornado application. """ def stop_and_free(): self.running_instance.stop() self.running_instance.add_callback(stop_and_free) self.__server_thread.join() @register_mephisto_abstraction() class MockArchitect(Architect): """ The MockArchitect runs a mock server on the localhost so that we can send special packets and assert connections have been made """ ArgsClass = MockArchitectArgs ARCHITECT_TYPE = ARCHITECT_TYPE def __init__( self, db: "MephistoDB", args: "DictConfig", shared_state: "SharedTaskState", task_run: "TaskRun", build_dir_root: str, ): """Create an architect for use in testing""" self.task_run = task_run self.build_dir = build_dir_root self.task_run_id = task_run.db_id self.should_run_server = args.architect.should_run_server self.port = args.architect.port self.server: Optional["MockServer"] = None # TODO(#651) track state in parent class? self.prepared = False self.deployed = False self.cleaned = False self.did_shutdown = False def _get_socket_urls(self) -> List[str]: """Return the path to the local server socket""" assert self.port is not None, "No ports for socket" return [f"ws://localhost:{self.port}/socket"] def get_channels( self, on_channel_open: Callable[[str], None], on_catastrophic_disconnect: Callable[[str], None], on_message: Callable[[str, "Packet"], None], ) -> List["Channel"]: """ Return a list of all relevant channels that the ClientIOHandler will need to register to in order to function """ urls = self._get_socket_urls() return [ WebsocketChannel( f"mock_channel_{self.task_run_id}_{idx}", on_channel_open=on_channel_open, on_catastrophic_disconnect=on_catastrophic_disconnect, on_message=on_message, socket_url=url, ) for idx, url in enumerate(urls) ] def download_file(self, target_filename: str, save_dir: str) -> None: """ Mock architects can just pretend to write a file """ with open(os.path.join(save_dir, target_filename), "wb") as fp: fp.write(b"mock\n") def prepare(self) -> str: """Mark the preparation call""" self.prepared = True built_dir = os.path.join( self.build_dir, "mock_build_{}".format(self.task_run_id) ) os.makedirs(built_dir) return built_dir def deploy(self) -> str: """Mock a deploy or deploy a mock server, depending on settings""" self.deployed = True if not self.should_run_server: return MOCK_DEPLOY_URL else: self.server = MockServer(self.port) self.server.launch_mock() return f"http://localhost:{self.port}/" def cleanup(self) -> None: """Mark the cleanup call""" self.cleaned = True def shutdown(self) -> None: """Mark the shutdown call""" self.did_shutdown = True if self.should_run_server and self.server is not None: self.server.shutdown_mock()
View Source
def get_rand_id(): return str(uuid.uuid4())
View Source
class MockArchitectArgs(ArchitectArgs): """Additional arguments for configuring a mock architect""" _architect_type: str = ARCHITECT_TYPE should_run_server: bool = field( default=False, metadata={"help": "Addressible location of the server"} ) port: str = field(default="3000", metadata={"help": "Port to launch the server on"})
Additional arguments for configuring a mock architect
Inherited Members
View Source
class SocketHandler(WebSocketHandler): def __init__(self, *args, **kwargs): self.subs: Dict[int, "SocketHandler"] = kwargs.pop("subs") self.app: "MockServer" = kwargs.pop("app") self.sid = get_rand_id() super().__init__(*args, **kwargs) def open(self): """ Opens a websocket and assigns a random UUID that is stored in the class-level `subs` variable. """ if self.sid not in self.subs.values(): self.subs[self.sid] = self def on_close(self): """ Runs when a socket is closed. """ del self.subs[self.sid] def on_message(self, message_text): """ Callback that runs when a new message is received from a client See the chat_service README for the resultant message structure. Args: message_text: A stringified JSON object with a text or attachment key. `text` should contain a string message and `attachment` is a dict. See `WebsocketAgent.put_data` for more information about the attachment dict structure. """ message = json.loads(message_text) if message["packet_type"] == PACKET_TYPE_ALIVE: self.app.last_alive_packet = message elif message["packet_type"] == PACKET_TYPE_CLIENT_BOUND_LIVE_UPDATE: self.app.actions_observed += 1 elif message["packet_type"] == PACKET_TYPE_MEPHISTO_BOUND_LIVE_UPDATE: self.app.actions_observed += 1 elif message["packet_type"] != PACKET_TYPE_REQUEST_STATUSES: self.app.last_packet = message def check_origin(self, origin): return True
Subclass this class to create a basic WebSocket handler.
Override on_message
to handle incoming messages, and use
write_message
to send messages to the client. You can also
override open
and on_close
to handle opened and closed
connections.
Custom upgrade response headers can be sent by overriding
~tornado.web.RequestHandler.set_default_headers
or
~tornado.web.RequestHandler.prepare
.
See http://dev.w3.org/html5/websockets/ for details on the JavaScript interface. The protocol is specified at http://tools.ietf.org/html/rfc6455.
Here is an example WebSocket handler that echos back all received messages back to the client:
.. testcode::
class EchoWebSocket(tornado.websocket.WebSocketHandler): def open(self): print("WebSocket opened")
def on_message(self, message):
self.write_message(u"You said: " + message)
def on_close(self):
print("WebSocket closed")
.. testoutput:: :hide:
WebSockets are not standard HTTP connections. The "handshake" is
HTTP, but after the handshake, the protocol is
message-based. Consequently, most of the Tornado HTTP facilities
are not available in handlers of this type. The only communication
methods available to you are write_message()
, ping()
, and
close()
. Likewise, your request handler class should implement
open()
method rather than get()
or post()
.
If you map the handler above to /websocket
in your application, you can
invoke it in JavaScript with::
var ws = new WebSocket("ws://localhost:8888/websocket"); ws.onopen = function() { ws.send("Hello, world"); }; ws.onmessage = function (evt) { alert(evt.data); };
This script pops up an alert box that says "You said: Hello, world".
Web browsers allow any site to open a websocket connection to any other,
instead of using the same-origin policy that governs other network
access from javascript. This can be surprising and is a potential
security hole, so since Tornado 4.0 WebSocketHandler
requires
applications that wish to receive cross-origin websockets to opt in
by overriding the ~WebSocketHandler.check_origin
method (see that
method's docs for details). Failure to do so is the most likely
cause of 403 errors when making a websocket connection.
When using a secure websocket connection (wss://
) with a self-signed
certificate, the connection from a browser may fail because it wants
to show the "accept this certificate" dialog but has nowhere to show it.
You must first visit a regular HTML page using the same certificate
to accept it before the websocket connection will succeed.
If the application setting websocket_ping_interval
has a non-zero
value, a ping will be sent periodically, and the connection will be
closed if a response is not received before the websocket_ping_timeout
.
Messages larger than the websocket_max_message_size
application setting
(default 10MiB) will not be accepted.
.. versionchanged:: 4.5
Added websocket_ping_interval
, websocket_ping_timeout
, and
websocket_max_message_size
.
View Source
def __init__(self, *args, **kwargs): self.subs: Dict[int, "SocketHandler"] = kwargs.pop("subs") self.app: "MockServer" = kwargs.pop("app") self.sid = get_rand_id() super().__init__(*args, **kwargs)
View Source
def open(self): """ Opens a websocket and assigns a random UUID that is stored in the class-level `subs` variable. """ if self.sid not in self.subs.values(): self.subs[self.sid] = self
Opens a websocket and assigns a random UUID that is stored in the class-level
subs
variable.
View Source
def on_close(self): """ Runs when a socket is closed. """ del self.subs[self.sid]
Runs when a socket is closed.
View Source
def on_message(self, message_text): """ Callback that runs when a new message is received from a client See the chat_service README for the resultant message structure. Args: message_text: A stringified JSON object with a text or attachment key. `text` should contain a string message and `attachment` is a dict. See `WebsocketAgent.put_data` for more information about the attachment dict structure. """ message = json.loads(message_text) if message["packet_type"] == PACKET_TYPE_ALIVE: self.app.last_alive_packet = message elif message["packet_type"] == PACKET_TYPE_CLIENT_BOUND_LIVE_UPDATE: self.app.actions_observed += 1 elif message["packet_type"] == PACKET_TYPE_MEPHISTO_BOUND_LIVE_UPDATE: self.app.actions_observed += 1 elif message["packet_type"] != PACKET_TYPE_REQUEST_STATUSES: self.app.last_packet = message
Callback that runs when a new message is received from a client See the
chat_service README for the resultant message structure.
Args:
message_text: A stringified JSON object with a text or attachment key.
text
should contain a string message and attachment
is a dict.
See WebsocketAgent.put_data
for more information about the
attachment dict structure.
View Source
def check_origin(self, origin): return True
Override to enable support for allowing alternate origins.
The origin
argument is the value of the Origin
HTTP
header, the url responsible for initiating this request. This
method is not called for clients that do not send this header;
such requests are always allowed (because all browsers that
implement WebSockets support this header, and non-browser
clients do not have the same cross-site security concerns).
Should return True
to accept the request or False
to
reject it. By default, rejects all requests with an origin on
a host other than this one.
This is a security protection against cross site scripting attacks on browsers, since WebSockets are allowed to bypass the usual same-origin policies and don't use CORS headers.
.. warning::
This is an important security measure; don't disable it
without understanding the security implications. In
particular, if your authentication is cookie-based, you
must either restrict the origins allowed by
check_origin()
or implement your own XSRF-like
protection for websocket connections. See these
<https://www.christian-schneider.net/CrossSiteWebSocketHijacking.html>
_
articles
<https://devcenter.heroku.com/articles/websocket-security>
_
for more.
To accept all cross-origin traffic (which was the default prior to
Tornado 4.0), simply override this method to always return True
::
def check_origin(self, origin):
return True
To allow connections from any subdomain of your site, you might do something like::
def check_origin(self, origin):
parsed_origin = urllib.parse.urlparse(origin)
return parsed_origin.netloc.endswith(".mydomain.com")
.. versionadded:: 4.0
Inherited Members
- tornado.websocket.WebSocketHandler
- stream
- get
- ping_interval
- ping_timeout
- max_message_size
- write_message
- select_subprotocol
- selected_subprotocol
- get_compression_options
- ping
- on_pong
- on_ping
- close
- set_nodelay
- on_connection_close
- on_ws_connection_close
- send_error
- get_websocket_protocol
- tornado.web.RequestHandler
- SUPPORTED_METHODS
- path_args
- path_kwargs
- initialize
- settings
- head
- post
- delete
- patch
- put
- options
- prepare
- on_finish
- clear
- set_default_headers
- set_status
- get_status
- set_header
- add_header
- clear_header
- get_argument
- get_arguments
- get_body_argument
- get_body_arguments
- get_query_argument
- get_query_arguments
- decode_argument
- create_signed_value
- redirect
- write
- render
- render_linked_js
- render_embed_js
- render_linked_css
- render_embed_css
- render_string
- get_template_namespace
- create_template_loader
- flush
- finish
- detach
- write_error
- locale
- get_user_locale
- get_browser_locale
- current_user
- get_current_user
- get_login_url
- get_template_path
- xsrf_token
- xsrf_form_html
- static_url
- require_setting
- reverse_url
- compute_etag
- set_etag_header
- check_etag_header
- data_received
- log_exception
View Source
class AliveHandler(tornado.web.RequestHandler): """Simple handler for is_alive""" def get(self, eids): pass # Default behavior returns 200
Simple handler for is_alive
View Source
def get(self, eids): pass # Default behavior returns 200
Inherited Members
- tornado.web.RequestHandler
- RequestHandler
- SUPPORTED_METHODS
- path_args
- path_kwargs
- initialize
- settings
- head
- post
- delete
- patch
- put
- options
- prepare
- on_finish
- on_connection_close
- clear
- set_default_headers
- set_status
- get_status
- set_header
- add_header
- clear_header
- get_argument
- get_arguments
- get_body_argument
- get_body_arguments
- get_query_argument
- get_query_arguments
- decode_argument
- create_signed_value
- redirect
- write
- render
- render_linked_js
- render_embed_js
- render_linked_css
- render_embed_css
- render_string
- get_template_namespace
- create_template_loader
- flush
- finish
- detach
- send_error
- write_error
- locale
- get_user_locale
- get_browser_locale
- current_user
- get_current_user
- get_login_url
- get_template_path
- xsrf_token
- xsrf_form_html
- static_url
- require_setting
- reverse_url
- compute_etag
- set_etag_header
- check_etag_header
- data_received
- log_exception
View Source
class MockServer(tornado.web.Application): """ Tornado-based server that with hooks for sending specific messages through socket connections and such """ def __init__(self, port): self.subs = {} self.port = port self.running_instance = None self.last_alive_packet: Optional[Dict[str, Any]] = None self.actions_observed = 0 self.last_packet: Optional[Dict[str, Any]] = None tornado_settings = { "autoescape": None, "debug": "/dbg/" in __file__, "compiled_template_cache": False, "static_url_prefix": "/static/", "debug": True, } handlers = [ ("/socket", SocketHandler, {"subs": self.subs, "app": self}), ("/is_alive", AliveHandler, {}), ] super(MockServer, self).__init__(handlers, **tornado_settings) def __server_thread_fn(self): """ Main loop for the application """ self.running_instance = tornado.ioloop.IOLoop() http_server = tornado.httpserver.HTTPServer(self, max_buffer_size=1024**3) http_server.listen(self.port) self.running_instance.start() http_server.stop() def _get_sub(self): """Return the subscriber socket to write to""" return list(self.subs.values())[0] def _send_message(self, message): """Send the given message back to the mephisto client""" failed_attempts = 0 last_exception = None while failed_attempts < 5: try: socket = self._get_sub() message_json = json.dumps(message) socket.write_message(message_json) last_exception = None break except Exception as e: last_exception = e time.sleep(0.2) failed_attempts += 1 finally: time.sleep(0.1) if last_exception is not None: raise last_exception def send_agent_act(self, agent_id, act_content): """ Send a packet from the given agent with the given content """ self._send_message( { "packet_type": PACKET_TYPE_MEPHISTO_BOUND_LIVE_UPDATE, "subject_id": agent_id, "data": act_content, } ) def register_mock_agent(self, worker_name, agent_details): """ Send a packet asking to register a mock agent. """ self._send_message( { "packet_type": PACKET_TYPE_REGISTER_AGENT, "subject_id": "MockServer", "data": { "request_id": agent_details, "provider_data": { "worker_name": worker_name, "agent_registration_id": agent_details, }, }, } ) def submit_mock_unit(self, agent_id, submit_data): """ Send a packet asking to submit data. """ self._send_message( { "packet_type": PACKET_TYPE_SUBMIT_UNIT, "subject_id": agent_id, "data": submit_data, } ) def register_mock_agent_after_onboarding(self, worker_id, agent_id, onboard_data): """ Send a packet asking to register a mock agent. """ self._send_message( { "packet_type": PACKET_TYPE_SUBMIT_ONBOARDING, "subject_id": agent_id, "data": { "request_id": "1234", "onboarding_data": onboard_data, }, } ) def disconnect_mock_agent(self, agent_id): """ Mark a mock agent as disconnected. """ self._send_message( { "packet_type": PACKET_TYPE_RETURN_STATUSES, "subject_id": "Mephisto", "data": {agent_id: AgentState.STATUS_DISCONNECT}, } ) def launch_mock(self): """ Start the primary loop for this application """ self.__server_thread = threading.Thread( target=self.__server_thread_fn, name="mock-server-thread" ) self.__server_thread.start() def shutdown_mock(self): """ Defined to shutown the tornado application. """ def stop_and_free(): self.running_instance.stop() self.running_instance.add_callback(stop_and_free) self.__server_thread.join()
Tornado-based server that with hooks for sending specific messages through socket connections and such
View Source
def __init__(self, port): self.subs = {} self.port = port self.running_instance = None self.last_alive_packet: Optional[Dict[str, Any]] = None self.actions_observed = 0 self.last_packet: Optional[Dict[str, Any]] = None tornado_settings = { "autoescape": None, "debug": "/dbg/" in __file__, "compiled_template_cache": False, "static_url_prefix": "/static/", "debug": True, } handlers = [ ("/socket", SocketHandler, {"subs": self.subs, "app": self}), ("/is_alive", AliveHandler, {}), ] super(MockServer, self).__init__(handlers, **tornado_settings)
View Source
def send_agent_act(self, agent_id, act_content): """ Send a packet from the given agent with the given content """ self._send_message( { "packet_type": PACKET_TYPE_MEPHISTO_BOUND_LIVE_UPDATE, "subject_id": agent_id, "data": act_content, } )
Send a packet from the given agent with the given content
View Source
def register_mock_agent(self, worker_name, agent_details): """ Send a packet asking to register a mock agent. """ self._send_message( { "packet_type": PACKET_TYPE_REGISTER_AGENT, "subject_id": "MockServer", "data": { "request_id": agent_details, "provider_data": { "worker_name": worker_name, "agent_registration_id": agent_details, }, }, } )
Send a packet asking to register a mock agent.
View Source
def submit_mock_unit(self, agent_id, submit_data): """ Send a packet asking to submit data. """ self._send_message( { "packet_type": PACKET_TYPE_SUBMIT_UNIT, "subject_id": agent_id, "data": submit_data, } )
Send a packet asking to submit data.
View Source
def register_mock_agent_after_onboarding(self, worker_id, agent_id, onboard_data): """ Send a packet asking to register a mock agent. """ self._send_message( { "packet_type": PACKET_TYPE_SUBMIT_ONBOARDING, "subject_id": agent_id, "data": { "request_id": "1234", "onboarding_data": onboard_data, }, } )
Send a packet asking to register a mock agent.
View Source
def disconnect_mock_agent(self, agent_id): """ Mark a mock agent as disconnected. """ self._send_message( { "packet_type": PACKET_TYPE_RETURN_STATUSES, "subject_id": "Mephisto", "data": {agent_id: AgentState.STATUS_DISCONNECT}, } )
Mark a mock agent as disconnected.
View Source
def launch_mock(self): """ Start the primary loop for this application """ self.__server_thread = threading.Thread( target=self.__server_thread_fn, name="mock-server-thread" ) self.__server_thread.start()
Start the primary loop for this application
View Source
def shutdown_mock(self): """ Defined to shutown the tornado application. """ def stop_and_free(): self.running_instance.stop() self.running_instance.add_callback(stop_and_free) self.__server_thread.join()
Defined to shutown the tornado application.
Inherited Members
- tornado.web.Application
- listen
- add_handlers
- add_transform
- find_handler
- get_handler_delegate
- reverse_url
- log_request
- tornado.routing.Router
- start_request
- tornado.httputil.HTTPServerConnectionDelegate
- on_close
View Source
class MockArchitect(Architect): """ The MockArchitect runs a mock server on the localhost so that we can send special packets and assert connections have been made """ ArgsClass = MockArchitectArgs ARCHITECT_TYPE = ARCHITECT_TYPE def __init__( self, db: "MephistoDB", args: "DictConfig", shared_state: "SharedTaskState", task_run: "TaskRun", build_dir_root: str, ): """Create an architect for use in testing""" self.task_run = task_run self.build_dir = build_dir_root self.task_run_id = task_run.db_id self.should_run_server = args.architect.should_run_server self.port = args.architect.port self.server: Optional["MockServer"] = None # TODO(#651) track state in parent class? self.prepared = False self.deployed = False self.cleaned = False self.did_shutdown = False def _get_socket_urls(self) -> List[str]: """Return the path to the local server socket""" assert self.port is not None, "No ports for socket" return [f"ws://localhost:{self.port}/socket"] def get_channels( self, on_channel_open: Callable[[str], None], on_catastrophic_disconnect: Callable[[str], None], on_message: Callable[[str, "Packet"], None], ) -> List["Channel"]: """ Return a list of all relevant channels that the ClientIOHandler will need to register to in order to function """ urls = self._get_socket_urls() return [ WebsocketChannel( f"mock_channel_{self.task_run_id}_{idx}", on_channel_open=on_channel_open, on_catastrophic_disconnect=on_catastrophic_disconnect, on_message=on_message, socket_url=url, ) for idx, url in enumerate(urls) ] def download_file(self, target_filename: str, save_dir: str) -> None: """ Mock architects can just pretend to write a file """ with open(os.path.join(save_dir, target_filename), "wb") as fp: fp.write(b"mock\n") def prepare(self) -> str: """Mark the preparation call""" self.prepared = True built_dir = os.path.join( self.build_dir, "mock_build_{}".format(self.task_run_id) ) os.makedirs(built_dir) return built_dir def deploy(self) -> str: """Mock a deploy or deploy a mock server, depending on settings""" self.deployed = True if not self.should_run_server: return MOCK_DEPLOY_URL else: self.server = MockServer(self.port) self.server.launch_mock() return f"http://localhost:{self.port}/" def cleanup(self) -> None: """Mark the cleanup call""" self.cleaned = True def shutdown(self) -> None: """Mark the shutdown call""" self.did_shutdown = True if self.should_run_server and self.server is not None: self.server.shutdown_mock()
The MockArchitect runs a mock server on the localhost so that we can send special packets and assert connections have been made
View Source
def __init__( self, db: "MephistoDB", args: "DictConfig", shared_state: "SharedTaskState", task_run: "TaskRun", build_dir_root: str, ): """Create an architect for use in testing""" self.task_run = task_run self.build_dir = build_dir_root self.task_run_id = task_run.db_id self.should_run_server = args.architect.should_run_server self.port = args.architect.port self.server: Optional["MockServer"] = None # TODO(#651) track state in parent class? self.prepared = False self.deployed = False self.cleaned = False self.did_shutdown = False
Create an architect for use in testing
View Source
def get_channels( self, on_channel_open: Callable[[str], None], on_catastrophic_disconnect: Callable[[str], None], on_message: Callable[[str, "Packet"], None], ) -> List["Channel"]: """ Return a list of all relevant channels that the ClientIOHandler will need to register to in order to function """ urls = self._get_socket_urls() return [ WebsocketChannel( f"mock_channel_{self.task_run_id}_{idx}", on_channel_open=on_channel_open, on_catastrophic_disconnect=on_catastrophic_disconnect, on_message=on_message, socket_url=url, ) for idx, url in enumerate(urls) ]
Return a list of all relevant channels that the ClientIOHandler will need to register to in order to function
View Source
def download_file(self, target_filename: str, save_dir: str) -> None: """ Mock architects can just pretend to write a file """ with open(os.path.join(save_dir, target_filename), "wb") as fp: fp.write(b"mock\n")
Mock architects can just pretend to write a file
View Source
def prepare(self) -> str: """Mark the preparation call""" self.prepared = True built_dir = os.path.join( self.build_dir, "mock_build_{}".format(self.task_run_id) ) os.makedirs(built_dir) return built_dir
Mark the preparation call
View Source
def deploy(self) -> str: """Mock a deploy or deploy a mock server, depending on settings""" self.deployed = True if not self.should_run_server: return MOCK_DEPLOY_URL else: self.server = MockServer(self.port) self.server.launch_mock() return f"http://localhost:{self.port}/"
Mock a deploy or deploy a mock server, depending on settings
View Source
def cleanup(self) -> None: """Mark the cleanup call""" self.cleaned = True
Mark the cleanup call
View Source
def shutdown(self) -> None: """Mark the shutdown call""" self.did_shutdown = True if self.should_run_server and self.server is not None: self.server.shutdown_mock()
Mark the shutdown call
Inherited Members
View Source
class MockArchitectArgs(ArchitectArgs): """Additional arguments for configuring a mock architect""" _architect_type: str = ARCHITECT_TYPE should_run_server: bool = field( default=False, metadata={"help": "Addressible location of the server"} ) port: str = field(default="3000", metadata={"help": "Port to launch the server on"})
Additional arguments for configuring a mock architect