mephisto.abstractions.architects.ec2.ec2_helpers

View Source
#!/usr/bin/env python3

# Copyright (c) Meta Platforms and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from typing import Dict, Optional, Tuple, List, Any, TYPE_CHECKING
import boto3  # type: ignore
import botocore.exceptions  # type: ignore
import time
import os
import subprocess
import json
import getpass
import hashlib
from mephisto.abstractions.providers.mturk.mturk_utils import setup_aws_credentials
from mephisto.abstractions.architects.router import build_router

from botocore import client  # type: ignore
from botocore.exceptions import ClientError, ProfileNotFound  # type: ignore
from botocore.config import Config  # type: ignore
from mephisto.utils.logger_core import get_logger

logger = get_logger(name=__name__)


if TYPE_CHECKING:
    from omegaconf import DictConfig  # type: ignore

botoconfig = Config(
    region_name="us-east-2", retries={"max_attempts": 10, "mode": "standard"}
)

DEFAULT_AMI_ID = "ami-0f19d220602031aed"
AMI_DEFAULT_USER = "ec2-user"
DEFAULT_INSTANCE_TYPE = "m2.micro"
FALLBACK_INSTANCE_TYPE = "t2.nano"
MY_DIR = os.path.abspath(os.path.dirname(__file__))
DEFAULT_KEY_PAIR_DIRECTORY = os.path.join(MY_DIR, "keypairs")
DEFAULT_SERVER_DETAIL_LOCATION = os.path.join(MY_DIR, "servers")
SCRIPTS_DIRECTORY = os.path.join(MY_DIR, "run_scripts")
DEFAULT_FALLBACK_FILE = os.path.join(DEFAULT_SERVER_DETAIL_LOCATION, "fallback.json")
FALLBACK_SERVER_LOC = os.path.join(MY_DIR, "fallback_server")
KNOWN_HOST_PATH = os.path.expanduser("~/.ssh/known_hosts")
MAX_RETRIES = 10


def get_owner_tag() -> Dict[str, str]:
    """
    Creates a tag with the user's username
    as the owner for the given resource
    """
    return {"Key": "Owner", "Value": getpass.getuser()}


def check_aws_credentials(profile_name: str) -> bool:
    try:
        # Check existing credentials
        boto3.Session(profile_name=profile_name)
        return True
    except ProfileNotFound:
        return False


def setup_ec2_credentials(
    profile_name: str, register_args: Optional["DictConfig"] = None
) -> bool:
    return setup_aws_credentials(profile_name, register_args)


def get_domain_if_available(session: boto3.Session, domain_name: str) -> bool:
    """
    Attempt to register the given domain with Route53, return
    True if registration is successful, False otherwise.

    Details on valid domains can be found here:
    https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/registrar-tld-list.html

    Pricing is available on amazon
    """
    client = session.client("route53domains")
    avail_result = "PENDING"

    while avail_result == "PENDING":
        avail = client.check_domain_availabiliity(DomainName=domain_name)
        avail_result = avail["Availability"]
        time.sleep(0.3)

    # May extend to handle other available cases
    if avail_result not in ["AVAILABLE"]:
        print(
            f"Domain was not listed as available, instead "
            f"{avail_result}, visit route53 for more detail"
        )
        return False

    print("Automated domain registration isn't yet implemented")
    # Registration can be completed using client.register_domain
    # Details are available here:
    # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/route53domains.html#Route53Domains.Client.register_domain

    return False


def find_hosted_zone(session: boto3.Session, domain_name: str) -> Optional[str]:
    """
    search for a hosted zone with the given name, return its id
    if found and None otherwise
    """
    client = session.client("route53")

    zones = client.list_hosted_zones_by_name()

    logger.debug(f"Found zones {zones}")
    for zone in zones["HostedZones"]:
        if zone["Name"] == f"{domain_name}.":
            return zone["Id"]

    return None


def create_hosted_zone(session: boto3.Session, domain_name: str) -> str:
    """
    Given a domain name, tries to create a hosted zone
    for that domain. Returns the hosted zone id
    """
    client = session.client("route53")

    zone_id = find_hosted_zone(session, domain_name)
    if zone_id is None:

        res = client.create_hosted_zone(
            Name=domain_name,
            CallerReference=str(time.time()),
            HostedZoneConfig={
                "Comment": "Mephisto hosted zone",
            },
        )
        nameservers = res["DelegationSet"]["NameServers"]
        BOLD_WHITE_ON_BLUE = "\x1b[1;37;44m"
        RESET = "\x1b[0m"
        print(
            f"{BOLD_WHITE_ON_BLUE}"
            "Registered new hosted zone! You should ensure your domain "
            "name is registered to delegate to the following nameservers: "
            f"\n{nameservers}"
            f"{RESET}"
        )
        zone_id = res["HostedZone"]["Id"]
    else:
        logger.debug(f"This hosted zone already exists! Returning {zone_id}")

    return zone_id


def find_certificate_arn(session: boto3.Session, domain_name: str) -> Optional[str]:
    """
    Finds the certificate for the given domain if it exists, and returns
    the certification arn.
    """
    client = session.client("acm")
    certs = client.list_certificates()
    logger.debug(f"Found existing certs: {certs}")
    for cert in certs["CertificateSummaryList"]:
        if cert["DomainName"] == domain_name:
            return cert["CertificateArn"]
    return None


def get_certificate(session: boto3.Session, domain_name: str) -> Dict[str, str]:
    """
    Gets the certificate for the given domain name, and returns
    the dns validation name and target and cert arn ('Name' and 'Value', 'arn')
    """
    client = session.client("acm")
    cert_domain_name = f"*.{domain_name}"
    certificate_arn = find_certificate_arn(session, cert_domain_name)
    if certificate_arn is None:  # cert not yet issued
        logger.debug("Requesting new certificate")
        response = client.request_certificate(
            DomainName=cert_domain_name,
            ValidationMethod="DNS",
            IdempotencyToken=f"{domain_name.split('.')[0]}request",
            Options={
                "CertificateTransparencyLoggingPreference": "ENABLED",
            },
        )
        certificate_arn = response["CertificateArn"]
    else:
        logger.debug(f"Using existing certificate {certificate_arn}")
    attempts = 0
    sleep_time = 2
    details = None
    while attempts < MAX_RETRIES:
        try:
            details = client.describe_certificate(
                CertificateArn=certificate_arn,
            )
            return_data = details["Certificate"]["DomainValidationOptions"][0][
                "ResourceRecord"
            ]
            return_data["arn"] = certificate_arn
            return return_data
        except KeyError:
            # Resource record not created yet, try again
            attempts += 1
            logger.info(f"Attempt {attempts} had no certification details, retrying")
            time.sleep(sleep_time)
            sleep_time *= 2
    raise Exception("Exceeded MAX_RETRIES waiting for certificate records")


def register_zone_records(
    session: boto3.Session,
    zone_id: str,
    domain_name: str,
    load_balancer_arn: str,
    acm_valid_name: str,
    acm_valid_target: str,
) -> int:
    """
    Creates the required zone records for this mephisto hosted zone. Requires
    the load balancer target, and the ACM certificate addresses

    Returns the change id
    """
    # Get details about the load balancer
    ec2_client = session.client("elbv2")
    balancer = ec2_client.describe_load_balancers(
        LoadBalancerArns=[load_balancer_arn],
    )["LoadBalancers"][0]
    load_balancer_dns = balancer["DNSName"]
    load_balancer_zone = balancer["CanonicalHostedZoneId"]

    # Create the records
    client = session.client("route53")
    response = client.change_resource_record_sets(
        HostedZoneId=zone_id,
        ChangeBatch={
            "Comment": "Creating records for Mephisto load balancer and DNS validations for certs",
            "Changes": [
                {
                    "Action": "CREATE",
                    "ResourceRecordSet": {
                        "Name": f"*.{domain_name}",
                        "Type": "A",
                        "AliasTarget": {
                            "HostedZoneId": load_balancer_zone,
                            "DNSName": load_balancer_dns,
                            "EvaluateTargetHealth": True,
                        },
                    },
                },
                {
                    "Action": "CREATE",
                    "ResourceRecordSet": {
                        "Name": f"{domain_name}",
                        "Type": "A",
                        "AliasTarget": {
                            "HostedZoneId": load_balancer_zone,
                            "DNSName": load_balancer_dns,
                            "EvaluateTargetHealth": True,
                        },
                    },
                },
                {
                    "Action": "CREATE",
                    "ResourceRecordSet": {
                        "Name": acm_valid_name,
                        "Type": "CNAME",
                        "TTL": 300,
                        "ResourceRecords": [
                            {"Value": acm_valid_target},
                        ],
                    },
                },
            ],
        },
    )
    return response["ChangeInfo"]["Id"]


def create_mephisto_vpc(session: boto3.Session) -> Dict[str, str]:
    """
    Create the required vpc with two subnets, an associated
    internet gateway, and routing tables.

    Currently sets up using US-east for both subnets
    """
    client = session.client("ec2")

    # Create VPC
    vpc_response = client.create_vpc(
        CidrBlock="10.0.0.0/16",
        TagSpecifications=[
            {
                "ResourceType": "vpc",
                "Tags": [
                    {"Key": "Name", "Value": "mephisto-core-vpc"},
                    get_owner_tag(),
                ],
            }
        ],
    )
    vpc_id = vpc_response["Vpc"]["VpcId"]

    # Create internet gateway
    gateway_response = client.create_internet_gateway(
        TagSpecifications=[
            {
                "ResourceType": "internet-gateway",
                "Tags": [{"Key": "Name", "Value": "mephisto-gateway"}, get_owner_tag()],
            }
        ],
    )
    gateway_id = gateway_response["InternetGateway"]["InternetGatewayId"]
    client.attach_internet_gateway(
        InternetGatewayId=gateway_id,
        VpcId=vpc_id,
    )

    # Create subnets
    subnet_1_response = client.create_subnet(
        TagSpecifications=[
            {
                "ResourceType": "subnet",
                "Tags": [
                    {"Key": "Name", "Value": "mephisto-subnet-1"},
                    get_owner_tag(),
                ],
            }
        ],
        CidrBlock="10.0.0.0/24",
        AvailabilityZone="us-east-2a",
        VpcId=vpc_id,
    )
    subnet_1_id = subnet_1_response["Subnet"]["SubnetId"]

    subnet_2_response = client.create_subnet(
        TagSpecifications=[
            {
                "ResourceType": "subnet",
                "Tags": [
                    {"Key": "Name", "Value": "mephisto-subnet-2"},
                    get_owner_tag(),
                ],
            }
        ],
        CidrBlock="10.0.1.0/24",
        AvailabilityZone="us-east-2b",
        VpcId=vpc_id,
    )
    subnet_2_id = subnet_2_response["Subnet"]["SubnetId"]

    # Create routing tables
    table_1_response = client.create_route_table(
        TagSpecifications=[
            {
                "ResourceType": "route-table",
                "Tags": [
                    {"Key": "Name", "Value": "mephisto-routes-1"},
                    get_owner_tag(),
                ],
            }
        ],
        VpcId=vpc_id,
    )
    route_table_1_id = table_1_response["RouteTable"]["RouteTableId"]

    table_2_response = client.create_route_table(
        TagSpecifications=[
            {
                "ResourceType": "route-table",
                "Tags": [
                    {"Key": "Name", "Value": "mephisto-routes-2"},
                    get_owner_tag(),
                ],
            }
        ],
        VpcId=vpc_id,
    )
    route_table_2_id = table_2_response["RouteTable"]["RouteTableId"]

    # Add routes in tables to gateway
    client.create_route(
        DestinationCidrBlock="0.0.0.0/0",
        GatewayId=gateway_id,
        RouteTableId=route_table_1_id,
    )
    client.create_route(
        DestinationCidrBlock="0.0.0.0/0",
        GatewayId=gateway_id,
        RouteTableId=route_table_2_id,
    )

    # Associate routing tables
    client.associate_route_table(
        RouteTableId=route_table_1_id,
        SubnetId=subnet_1_id,
    )
    client.associate_route_table(
        RouteTableId=route_table_2_id,
        SubnetId=subnet_2_id,
    )

    return {
        "vpc_id": vpc_id,
        "gateway_id": gateway_id,
        "subnet_1_id": subnet_1_id,
        "subnet_2_id": subnet_2_id,
        "route_1_id": route_table_1_id,
        "route_2_id": route_table_2_id,
    }


def create_security_group(session: boto3.Session, vpc_id: str, ssh_ip: str) -> str:
    """
    Create a security group with public access
    for 80 and 443, but only access from ssh_ip (comma-separated) for 22
    """
    client = session.client("ec2")

    create_response = client.create_security_group(
        Description="Security group used for Mephisto host servers",
        GroupName="mephisto-server-security-group",
        VpcId=vpc_id,
        TagSpecifications=[
            {
                "ResourceType": "security-group",
                "Tags": [
                    {"Key": "Name", "Value": "mephisto-server-security-group"},
                    get_owner_tag(),
                ],
            }
        ],
    )
    group_id = create_response["GroupId"]
    ssh_perms = [
        {
            "FromPort": 22,
            "ToPort": 22,
            "IpProtocol": "tcp",
            "IpRanges": [
                {
                    "CidrIp": one_ip,
                    "Description": "SSH from allowed ip",
                }
            ],
        }
        for one_ip in ssh_ip.split(",")
    ]

    response = client.authorize_security_group_ingress(
        GroupId=group_id,
        IpPermissions=[
            {
                "FromPort": 80,
                "ToPort": 80,
                "IpProtocol": "tcp",
                "IpRanges": [
                    {
                        "CidrIp": "0.0.0.0/0",
                        "Description": "Public insecure http access",
                    }
                ],
            },
            {
                "FromPort": 80,
                "ToPort": 80,
                "IpProtocol": "tcp",
                "Ipv6Ranges": [
                    {
                        "CidrIpv6": "::/0",
                        "Description": "Public insecure http access",
                    }
                ],
            },
            {
                "FromPort": 5000,
                "ToPort": 5000,
                "IpProtocol": "tcp",
                "IpRanges": [
                    {
                        "CidrIp": "0.0.0.0/0",
                        "Description": "Internal router access",
                    }
                ],
            },
            {
                "FromPort": 5000,
                "ToPort": 5000,
                "IpProtocol": "tcp",
                "Ipv6Ranges": [
                    {
                        "CidrIpv6": "::/0",
                        "Description": "Internal router access",
                    }
                ],
            },
            {
                "FromPort": 443,
                "ToPort": 443,
                "IpProtocol": "tcp",
                "IpRanges": [
                    {
                        "CidrIp": "0.0.0.0/0",
                        "Description": "Public secure http access",
                    }
                ],
            },
            {
                "FromPort": 443,
                "ToPort": 443,
                "IpProtocol": "tcp",
                "Ipv6Ranges": [
                    {
                        "CidrIpv6": "::/0",
                        "Description": "Public secure http access",
                    }
                ],
            },
        ]
        + ssh_perms,
    )

    assert response["ResponseMetadata"]["HTTPStatusCode"] == 200
    return group_id


def create_key_pair(
    session: boto3.Session,
    key_name: str,
    key_pair_dir: str = DEFAULT_KEY_PAIR_DIRECTORY,
) -> str:
    """
    creates a key pair by the given name, and writes it to file
    """
    target_keypair_filename = os.path.join(key_pair_dir, f"{key_name}.pem")
    if os.path.exists(target_keypair_filename):
        logger.warning(f"Keypair already exists! {target_keypair_filename}")
        return target_keypair_filename
    client = session.client("ec2")

    response = client.create_key_pair(
        KeyName=key_name,
        TagSpecifications=[
            {
                "ResourceType": "key-pair",
                "Tags": [{"Key": "Name", "Value": key_name}, get_owner_tag()],
            }
        ],
    )
    with open(target_keypair_filename, "w+") as keypair_file:
        keypair_file.write(response["KeyMaterial"])
        subprocess.check_call(["chmod", "400", target_keypair_filename])

    return target_keypair_filename


def create_instance(
    session: boto3.Session,
    key_pair_name: str,
    security_group_id: str,
    subnet_id: str,
    instance_name: str,
    volume_size: int = 8,
    instance_type: str = DEFAULT_INSTANCE_TYPE,
) -> str:
    """
    Create an instance, return the instance id, allocation id, and association id
    """
    client = session.client("ec2")
    instance_response = client.run_instances(
        BlockDeviceMappings=[
            {
                "DeviceName": "/dev/xvda",
                "Ebs": {
                    "DeleteOnTermination": True,
                    "VolumeSize": volume_size,
                    "VolumeType": "gp2",
                    "Encrypted": True,
                },
            }
        ],
        ImageId=DEFAULT_AMI_ID,
        InstanceType=instance_type,
        KeyName=key_pair_name,
        MaxCount=1,
        MinCount=1,
        Monitoring={
            "Enabled": False,
        },  # standard monitoring is enough
        Placement={
            "Tenancy": "default",
        },
        SecurityGroupIds=[security_group_id],
        SubnetId=subnet_id,
        DisableApiTermination=False,  # we need to allow shutdown from botocore
        # IamInstanceProfile={ # Maybe we can move the iam role to do rest of registration?
        #     'Arn': 'string',
        #     'Name': 'string'
        # },
        InstanceInitiatedShutdownBehavior="stop",
        TagSpecifications=[
            {
                "ResourceType": "instance",
                "Tags": [
                    {"Key": "Name", "Value": instance_name},
                    get_owner_tag(),
                ],
            },
        ],
        HibernationOptions={"Configured": False},
        MetadataOptions={
            "HttpTokens": "optional",
            "HttpEndpoint": "enabled",
        },
        EnclaveOptions={"Enabled": False},
    )
    instance_id = instance_response["Instances"][0]["InstanceId"]

    logger.debug(f"Waiting for instance {instance_id} to come up before continuing")
    waiter = client.get_waiter("instance_running")
    waiter.wait(
        InstanceIds=[instance_id],
    )

    return instance_id


def create_target_group(
    session: boto3.Session,
    vpc_id: str,
    instance_id: str,
    group_name="mephisto-fallback",
) -> str:
    """
    Create a target group for the given instance
    """
    client = session.client("elbv2")
    group_name_hash = hashlib.md5(group_name.encode("utf-8")).hexdigest()
    anti_collision_group_name = f"{group_name_hash[:8]}-{group_name}"
    final_group_name = f"{anti_collision_group_name[:28]}-tg"
    create_target_response = client.create_target_group(
        Name=final_group_name[:32],
        Protocol="HTTP",
        ProtocolVersion="HTTP1",
        Port=5000,
        VpcId=vpc_id,
        Matcher={
            "HttpCode": "200-299",
        },
        TargetType="instance",
        Tags=[
            {"Key": "string", "Value": "string"},
        ],
    )
    target_group_arn = create_target_response["TargetGroups"][0]["TargetGroupArn"]

    client.register_targets(
        TargetGroupArn=target_group_arn,
        Targets=[
            {
                "Id": instance_id,
            }
        ],
    )

    return target_group_arn


def rule_is_new(
    session: boto3.Session,
    subdomain: str,
    listener_arn: str,
) -> bool:
    """
    Check to see if a rule already exists with the given subdomain
    """
    client = session.client("elbv2")
    find_rule_response = client.describe_rules(
        ListenerArn=listener_arn,
    )
    rules = find_rule_response["Rules"]
    for rule in rules:
        if len(rule["Conditions"]) == 0:
            continue  # base rule
        host_condition = rule["Conditions"][0]
        values = host_condition.get("Values")
        if values is None or len(values) == 0:
            values = host_condition["HostHeaderConfig"]["Values"]
        existing = values[0]
        if existing.startswith(f"{subdomain}."):
            return False

    return True


def register_instance_to_listener(
    session: boto3.Session,
    instance_id: str,
    vpc_id: str,
    listener_arn: str,
    domain: str,
) -> Tuple[str, str]:
    """
    Creates a rule for this specific redirect case,
    and returns the target group id and rule arn
    """
    subdomain_root = domain.split(".")[0]
    target_group_arn = create_target_group(session, vpc_id, instance_id, subdomain_root)
    client = session.client("elbv2")

    find_rule_response = client.describe_rules(
        ListenerArn=listener_arn,
    )

    # Get the next available priority
    priorities = set([r["Priority"] for r in find_rule_response["Rules"]])
    priority = 1
    while str(priority) in priorities:
        priority += 1

    rule_response = client.create_rule(
        ListenerArn=listener_arn,
        Conditions=[
            {
                "Field": "host-header",
                "HostHeaderConfig": {
                    "Values": [
                        domain,
                        f"*.{domain}",
                    ],
                },
            },
        ],
        Priority=priority,
        Actions=[
            {
                "Type": "forward",
                "TargetGroupArn": target_group_arn,
            },
        ],
    )
    rule_arn = rule_response["Rules"][0]["RuleArn"]

    return target_group_arn, rule_arn


def create_load_balancer(
    session: boto3.Session,
    subnet_ids: List[str],
    security_group_id: str,
    vpc_id: str,
) -> str:
    """
    Creates a load balancer and returns the balancer's arn
    """
    client = session.client("elbv2")

    create_response = client.create_load_balancer(
        Name="mephisto-hosts-balancer",
        Subnets=subnet_ids,
        SecurityGroups=[security_group_id],
        Scheme="internet-facing",
        Type="application",
        IpAddressType="ipv4",
    )
    balancer_arn = create_response["LoadBalancers"][0]["LoadBalancerArn"]
    return balancer_arn


def configure_base_balancer(
    session: boto3.Session,
    balancer_arn: str,
    certificate_arn: str,
    target_group_arn: str,
) -> str:
    """
    Configure the default rules for this load balancer. Return the id
    of the listener to add rules to for redirecting to specified target groups
    """

    client = session.client("elbv2")

    _redirect_response = client.create_listener(
        LoadBalancerArn=balancer_arn,
        Protocol="HTTP",
        Port=80,
        DefaultActions=[
            {
                "Type": "redirect",
                "RedirectConfig": {
                    "Protocol": "HTTPS",
                    "Port": "443",
                    "Host": "#{host}",
                    "Path": "/#{path}",
                    "Query": "#{query}",
                    "StatusCode": "HTTP_301",
                },
            }
        ],
    )

    forward_response = client.create_listener(
        LoadBalancerArn=balancer_arn,
        Protocol="HTTPS",
        Port=443,
        SslPolicy="ELBSecurityPolicy-2016-08",
        Certificates=[
            {
                "CertificateArn": certificate_arn,
            }
        ],
        DefaultActions=[
            {
                "Type": "forward",
                "TargetGroupArn": target_group_arn,
            }
        ],
    )
    listener_arn = forward_response["Listeners"][0]["ListenerArn"]
    return listener_arn


def get_instance_address(
    session: boto3.Session,
    instance_id: str,
) -> Tuple[str, str, str]:
    """
    Create a temporary publicly accessible IP for the given instance.
    Return the IP address, the allocation id, and the association id.
    """
    client = session.client("ec2")

    allocation_response = client.allocate_address(
        Domain="vpc",
        TagSpecifications=[
            {
                "ResourceType": "elastic-ip",
                "Tags": [
                    {
                        "Key": "Name",
                        "Value": f"{instance_id}-ip-address",
                    },
                    get_owner_tag(),
                ],
            }
        ],
    )
    ip_address = allocation_response["PublicIp"]
    allocation_id = allocation_response["AllocationId"]

    associate_response = client.associate_address(
        AllocationId=allocation_id,
        InstanceId=instance_id,
        AllowReassociation=False,
    )
    association_id = associate_response["AssociationId"]

    # Remove this IP from known hosts in case it's there,
    # as it's definitely not the old host anymore
    subprocess.check_call(
        [
            "ssh-keygen",
            "-f",
            f"{KNOWN_HOST_PATH}",
            "-R",
            f'"{ip_address}"',
        ]
    )

    return ip_address, allocation_id, association_id


def detete_instance_address(
    session: boto3.Session,
    allocation_id: str,
    association_id: str,
) -> None:
    """
    Removes the public ip described by the given allocation and association ids
    """
    client = session.client("ec2")
    client.disassociate_address(
        AssociationId=association_id,
    )

    client.release_address(
        AllocationId=allocation_id,
    )


def try_server_push(subprocess_args: List[str], retries=5, sleep_time=10.0):
    """
    Try to execute the server push provided in subprocess args
    """
    while retries > 0:
        try:
            subprocess.check_call(
                subprocess_args, env=dict(os.environ, SSH_AUTH_SOCK="")
            )
            return
        except subprocess.CalledProcessError:
            retries -= 1
            sleep_time *= 1.5
            logger.info(
                f"Timed out trying to push to server. Retries remaining: {retries}"
            )
            time.sleep(sleep_time)
    raise Exception(
        "Could not successfully push to the ec2 instance. See log for errors."
    )


def deploy_fallback_server(
    session: boto3.Session,
    instance_id: str,
    key_pair: str,
    log_access_pass: str,
) -> bool:
    """
    Deploy the fallback server to the given instance,
    return True if successful
    """
    client = session.client("ec2")
    server_host, allocation_id, association_id = get_instance_address(
        session, instance_id
    )
    try:
        keypair_file = os.path.join(DEFAULT_KEY_PAIR_DIRECTORY, f"{key_pair}.pem")
        password_file_name = os.path.join(FALLBACK_SERVER_LOC, f"access_key.txt")
        with open(password_file_name, "w+") as password_file:
            password_file.write(log_access_pass)

        remote_server = f"{AMI_DEFAULT_USER}@{server_host}"

        dest = f"{remote_server}:/home/ec2-user/"
        try_server_push(
            [
                "scp",
                "-o",
                "StrictHostKeyChecking=no",
                "-i",
                keypair_file,
                "-r",
                f"{FALLBACK_SERVER_LOC}",
                dest,
            ]
        )
        os.unlink(password_file_name)
        subprocess.check_call(
            [
                "ssh",
                "-i",
                keypair_file,
                remote_server,
                "bash",
                "/home/ec2-user/fallback_server/scripts/first_setup.sh",
            ],
            env=dict(os.environ, SSH_AUTH_SOCK=""),
        )
        detete_instance_address(session, allocation_id, association_id)
    except Exception as e:
        detete_instance_address(session, allocation_id, association_id)
        raise e

    return True


def deploy_to_routing_server(
    session: boto3.Session,
    instance_id: str,
    key_pair: str,
    push_directory: str,
) -> bool:
    client = session.client("ec2")
    server_host, allocation_id, association_id = get_instance_address(
        session, instance_id
    )
    keypair_file = os.path.join(DEFAULT_KEY_PAIR_DIRECTORY, f"{key_pair}.pem")

    print("Uploading files to server, then attempting to run")
    try:
        remote_server = f"{AMI_DEFAULT_USER}@{server_host}"
        dest = f"{remote_server}:/home/ec2-user/"
        try_server_push(
            [
                "scp",
                "-o",
                "StrictHostKeyChecking=no",
                "-i",
                keypair_file,
                "-r",
                f"{push_directory}",
                dest,
            ]
        )

        subprocess.check_call(
            [
                "ssh",
                "-i",
                keypair_file,
                remote_server,
                "bash",
                "/home/ec2-user/routing_server/setup/init_server.sh",
            ],
            env=dict(os.environ, SSH_AUTH_SOCK=""),
        )
        detete_instance_address(session, allocation_id, association_id)
        print("Server setup complete!")
    except Exception as e:
        detete_instance_address(session, allocation_id, association_id)
        raise e

    return True


def delete_rule(
    session: boto3.Session,
    rule_arn: str,
    target_group_arn: str,
) -> None:
    """
    Remove the given rule and the target group for this rule
    """
    client = session.client("elbv2")
    client.delete_rule(
        RuleArn=rule_arn,
    )

    client.delete_target_group(
        TargetGroupArn=target_group_arn,
    )


def delete_instance(
    session: boto3.Session,
    instance_id: str,
) -> None:
    """
    Remove the given instance and the associated elastic ip
    """
    client = session.client("ec2")
    client.terminate_instances(InstanceIds=[instance_id])


def remove_instance_and_cleanup(
    session: boto3.Session,
    server_name: str,
) -> None:
    """
    Cleanup for a launched server, removing the redirect rule
    clearing the target group, and then shutting down the instance.
    """
    server_detail_path = os.path.join(
        DEFAULT_SERVER_DETAIL_LOCATION, f"{server_name}.json"
    )

    with open(server_detail_path, "r") as detail_file:
        details = json.load(detail_file)

    delete_rule(session, details["balancer_rule_arn"], details["target_group_arn"])
    delete_instance(
        session,
        details["instance_id"],
    )
    os.unlink(server_detail_path)
    return None


def delete_listener(
    session: boto3.Session,
    listener_arn: str,
) -> None:
    client = session.client("elbv2")
    client.delete_listener(
        ListenerArn=listener_arn,
    )


def cleanup_fallback_server(
    iam_profile: str,
    delete_hosted_zone: bool = False,
    server_details_file: str = DEFAULT_FALLBACK_FILE,
) -> None:
    """
    Cleans up all of the resources for the given iam profile,
    assuming that the details are stored in the given
    server_details_file.

    Optionally includes deleting the hosted zone, which remains
    an option due to the DNS changes required
    """
    session = boto3.Session(profile_name=iam_profile, region_name="us-east-2")

    elb_client = session.client("elbv2")
    ec2_client = session.client("ec2")

    server_details_file = (
        DEFAULT_FALLBACK_FILE if server_details_file is None else server_details_file
    )
    with open(server_details_file, "r") as details_file:
        details = json.load(details_file)

    listener_arn = details.get("listener_arn")
    if listener_arn is not None:
        print(f"Deleting listener {listener_arn}...")
        find_rule_response = elb_client.describe_rules(
            ListenerArn=listener_arn,
        )
        rules = find_rule_response["Rules"]
        if len(rules) > 1:
            confirm = input(
                "There are still existing rules on the router, which would imply that active jobs are running right now. Are you SURE you want to DELETE ALL?[yes/no]"
            )
            if confirm != "yes":
                return
        elb_client.delete_listener(
            ListenerArn=listener_arn,
        )

    target_group_arn = details.get("target_group_arn")
    if target_group_arn is not None:
        print(f"Deleting target group {target_group_arn}...")
        elb_client.delete_target_group(
            TargetGroupArn=target_group_arn,
        )

    balancer_arn = details.get("balancer_arn")
    if balancer_arn is not None:
        print(f"Deleting balancer {balancer_arn}...")
        elb_client.delete_load_balancer(
            LoadBalancerArn=balancer_arn,
        )

    instance_id = details.get("instance_id")
    if instance_id is not None:
        print(f"Deleting instance {instance_id}...")
        delete_instance(session, instance_id)

    vpc_details = details.get("vpc_details")
    if vpc_details is not None:
        print(f"Deleting vpc {vpc_details['vpc_id']} and related resources...")
        ec2_client.delete_subnet(SubnetId=vpc_details["subnet_1_id"])
        ec2_client.delete_subnet(SubnetId=vpc_details["subnet_2_id"])
        ec2_client.delete_route_table(RouteTableId=vpc_details["route_1_id"])
        ec2_client.delete_route_table(RouteTableId=vpc_details["route_2_id"])
        table_response = ec2_client.describe_route_tables(
            Filters=[
                {
                    "Name": "vpc-id",
                    "Values": [vpc_details["vpc_id"]],
                }
            ]
        )
        tables = table_response["RouteTables"]
        for table in tables:
            ec2_client.delete_route_table(RouteTableId=table["RouteTableId"])

        ec2_client.delete_internet_gateway(InternetGatewayId=vpc_details["gateway_id"])

        security_group_id = details.get("security_group_id")
        if security_group_id is not None:
            print("Deleting security group {security_group_id}...")
            ec2_client.delete_security_group(
                GroupId=security_group_id,
            )

        ec2_client.delete_vpc(VpcId=vpc_details["vpc_id"])

    if delete_hosted_zone:
        hosted_zone_id = details.get("hosted_zone_id")
        if hosted_zone_id is not None:
            route53_client = session.client("route53")
            print(
                "Deleting hosted zones not yet implemented, "
                "navigate to the AWS Route53 console to complete "
                f"this step, deleting {hosted_zone_id}"
            )
            # To delete a hosted zone, we need to query it
            # for the list of records, than remove all
            # that aren't SOA/NS

    os.unlink(server_details_file)
    return None
#   def get_owner_tag() -> Dict[str, str]:
View Source
def get_owner_tag() -> Dict[str, str]:
    """
    Creates a tag with the user's username
    as the owner for the given resource
    """
    return {"Key": "Owner", "Value": getpass.getuser()}

Creates a tag with the user's username as the owner for the given resource

#   def check_aws_credentials(profile_name: str) -> bool:
View Source
def check_aws_credentials(profile_name: str) -> bool:
    try:
        # Check existing credentials
        boto3.Session(profile_name=profile_name)
        return True
    except ProfileNotFound:
        return False
#   def setup_ec2_credentials( profile_name: str, register_args: Union[omegaconf.dictconfig.DictConfig, NoneType] = None ) -> bool:
View Source
def setup_ec2_credentials(
    profile_name: str, register_args: Optional["DictConfig"] = None
) -> bool:
    return setup_aws_credentials(profile_name, register_args)
#   def get_domain_if_available(session: boto3.session.Session, domain_name: str) -> bool:
View Source
def get_domain_if_available(session: boto3.Session, domain_name: str) -> bool:
    """
    Attempt to register the given domain with Route53, return
    True if registration is successful, False otherwise.

    Details on valid domains can be found here:
    https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/registrar-tld-list.html

    Pricing is available on amazon
    """
    client = session.client("route53domains")
    avail_result = "PENDING"

    while avail_result == "PENDING":
        avail = client.check_domain_availabiliity(DomainName=domain_name)
        avail_result = avail["Availability"]
        time.sleep(0.3)

    # May extend to handle other available cases
    if avail_result not in ["AVAILABLE"]:
        print(
            f"Domain was not listed as available, instead "
            f"{avail_result}, visit route53 for more detail"
        )
        return False

    print("Automated domain registration isn't yet implemented")
    # Registration can be completed using client.register_domain
    # Details are available here:
    # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/route53domains.html#Route53Domains.Client.register_domain

    return False

Attempt to register the given domain with Route53, return True if registration is successful, False otherwise.

Details on valid domains can be found here: https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/registrar-tld-list.html

Pricing is available on amazon

#   def find_hosted_zone( session: boto3.session.Session, domain_name: str ) -> Union[str, NoneType]:
View Source
def find_hosted_zone(session: boto3.Session, domain_name: str) -> Optional[str]:
    """
    search for a hosted zone with the given name, return its id
    if found and None otherwise
    """
    client = session.client("route53")

    zones = client.list_hosted_zones_by_name()

    logger.debug(f"Found zones {zones}")
    for zone in zones["HostedZones"]:
        if zone["Name"] == f"{domain_name}.":
            return zone["Id"]

    return None

search for a hosted zone with the given name, return its id if found and None otherwise

#   def create_hosted_zone(session: boto3.session.Session, domain_name: str) -> str:
View Source
def create_hosted_zone(session: boto3.Session, domain_name: str) -> str:
    """
    Given a domain name, tries to create a hosted zone
    for that domain. Returns the hosted zone id
    """
    client = session.client("route53")

    zone_id = find_hosted_zone(session, domain_name)
    if zone_id is None:

        res = client.create_hosted_zone(
            Name=domain_name,
            CallerReference=str(time.time()),
            HostedZoneConfig={
                "Comment": "Mephisto hosted zone",
            },
        )
        nameservers = res["DelegationSet"]["NameServers"]
        BOLD_WHITE_ON_BLUE = "\x1b[1;37;44m"
        RESET = "\x1b[0m"
        print(
            f"{BOLD_WHITE_ON_BLUE}"
            "Registered new hosted zone! You should ensure your domain "
            "name is registered to delegate to the following nameservers: "
            f"\n{nameservers}"
            f"{RESET}"
        )
        zone_id = res["HostedZone"]["Id"]
    else:
        logger.debug(f"This hosted zone already exists! Returning {zone_id}")

    return zone_id

Given a domain name, tries to create a hosted zone for that domain. Returns the hosted zone id

#   def find_certificate_arn( session: boto3.session.Session, domain_name: str ) -> Union[str, NoneType]:
View Source
def find_certificate_arn(session: boto3.Session, domain_name: str) -> Optional[str]:
    """
    Finds the certificate for the given domain if it exists, and returns
    the certification arn.
    """
    client = session.client("acm")
    certs = client.list_certificates()
    logger.debug(f"Found existing certs: {certs}")
    for cert in certs["CertificateSummaryList"]:
        if cert["DomainName"] == domain_name:
            return cert["CertificateArn"]
    return None

Finds the certificate for the given domain if it exists, and returns the certification arn.

#   def get_certificate(session: boto3.session.Session, domain_name: str) -> Dict[str, str]:
View Source
def get_certificate(session: boto3.Session, domain_name: str) -> Dict[str, str]:
    """
    Gets the certificate for the given domain name, and returns
    the dns validation name and target and cert arn ('Name' and 'Value', 'arn')
    """
    client = session.client("acm")
    cert_domain_name = f"*.{domain_name}"
    certificate_arn = find_certificate_arn(session, cert_domain_name)
    if certificate_arn is None:  # cert not yet issued
        logger.debug("Requesting new certificate")
        response = client.request_certificate(
            DomainName=cert_domain_name,
            ValidationMethod="DNS",
            IdempotencyToken=f"{domain_name.split('.')[0]}request",
            Options={
                "CertificateTransparencyLoggingPreference": "ENABLED",
            },
        )
        certificate_arn = response["CertificateArn"]
    else:
        logger.debug(f"Using existing certificate {certificate_arn}")
    attempts = 0
    sleep_time = 2
    details = None
    while attempts < MAX_RETRIES:
        try:
            details = client.describe_certificate(
                CertificateArn=certificate_arn,
            )
            return_data = details["Certificate"]["DomainValidationOptions"][0][
                "ResourceRecord"
            ]
            return_data["arn"] = certificate_arn
            return return_data
        except KeyError:
            # Resource record not created yet, try again
            attempts += 1
            logger.info(f"Attempt {attempts} had no certification details, retrying")
            time.sleep(sleep_time)
            sleep_time *= 2
    raise Exception("Exceeded MAX_RETRIES waiting for certificate records")

Gets the certificate for the given domain name, and returns the dns validation name and target and cert arn ('Name' and 'Value', 'arn')

#   def register_zone_records( session: boto3.session.Session, zone_id: str, domain_name: str, load_balancer_arn: str, acm_valid_name: str, acm_valid_target: str ) -> int:
View Source
def register_zone_records(
    session: boto3.Session,
    zone_id: str,
    domain_name: str,
    load_balancer_arn: str,
    acm_valid_name: str,
    acm_valid_target: str,
) -> int:
    """
    Creates the required zone records for this mephisto hosted zone. Requires
    the load balancer target, and the ACM certificate addresses

    Returns the change id
    """
    # Get details about the load balancer
    ec2_client = session.client("elbv2")
    balancer = ec2_client.describe_load_balancers(
        LoadBalancerArns=[load_balancer_arn],
    )["LoadBalancers"][0]
    load_balancer_dns = balancer["DNSName"]
    load_balancer_zone = balancer["CanonicalHostedZoneId"]

    # Create the records
    client = session.client("route53")
    response = client.change_resource_record_sets(
        HostedZoneId=zone_id,
        ChangeBatch={
            "Comment": "Creating records for Mephisto load balancer and DNS validations for certs",
            "Changes": [
                {
                    "Action": "CREATE",
                    "ResourceRecordSet": {
                        "Name": f"*.{domain_name}",
                        "Type": "A",
                        "AliasTarget": {
                            "HostedZoneId": load_balancer_zone,
                            "DNSName": load_balancer_dns,
                            "EvaluateTargetHealth": True,
                        },
                    },
                },
                {
                    "Action": "CREATE",
                    "ResourceRecordSet": {
                        "Name": f"{domain_name}",
                        "Type": "A",
                        "AliasTarget": {
                            "HostedZoneId": load_balancer_zone,
                            "DNSName": load_balancer_dns,
                            "EvaluateTargetHealth": True,
                        },
                    },
                },
                {
                    "Action": "CREATE",
                    "ResourceRecordSet": {
                        "Name": acm_valid_name,
                        "Type": "CNAME",
                        "TTL": 300,
                        "ResourceRecords": [
                            {"Value": acm_valid_target},
                        ],
                    },
                },
            ],
        },
    )
    return response["ChangeInfo"]["Id"]

Creates the required zone records for this mephisto hosted zone. Requires the load balancer target, and the ACM certificate addresses

Returns the change id

#   def create_mephisto_vpc(session: boto3.session.Session) -> Dict[str, str]:
View Source
def create_mephisto_vpc(session: boto3.Session) -> Dict[str, str]:
    """
    Create the required vpc with two subnets, an associated
    internet gateway, and routing tables.

    Currently sets up using US-east for both subnets
    """
    client = session.client("ec2")

    # Create VPC
    vpc_response = client.create_vpc(
        CidrBlock="10.0.0.0/16",
        TagSpecifications=[
            {
                "ResourceType": "vpc",
                "Tags": [
                    {"Key": "Name", "Value": "mephisto-core-vpc"},
                    get_owner_tag(),
                ],
            }
        ],
    )
    vpc_id = vpc_response["Vpc"]["VpcId"]

    # Create internet gateway
    gateway_response = client.create_internet_gateway(
        TagSpecifications=[
            {
                "ResourceType": "internet-gateway",
                "Tags": [{"Key": "Name", "Value": "mephisto-gateway"}, get_owner_tag()],
            }
        ],
    )
    gateway_id = gateway_response["InternetGateway"]["InternetGatewayId"]
    client.attach_internet_gateway(
        InternetGatewayId=gateway_id,
        VpcId=vpc_id,
    )

    # Create subnets
    subnet_1_response = client.create_subnet(
        TagSpecifications=[
            {
                "ResourceType": "subnet",
                "Tags": [
                    {"Key": "Name", "Value": "mephisto-subnet-1"},
                    get_owner_tag(),
                ],
            }
        ],
        CidrBlock="10.0.0.0/24",
        AvailabilityZone="us-east-2a",
        VpcId=vpc_id,
    )
    subnet_1_id = subnet_1_response["Subnet"]["SubnetId"]

    subnet_2_response = client.create_subnet(
        TagSpecifications=[
            {
                "ResourceType": "subnet",
                "Tags": [
                    {"Key": "Name", "Value": "mephisto-subnet-2"},
                    get_owner_tag(),
                ],
            }
        ],
        CidrBlock="10.0.1.0/24",
        AvailabilityZone="us-east-2b",
        VpcId=vpc_id,
    )
    subnet_2_id = subnet_2_response["Subnet"]["SubnetId"]

    # Create routing tables
    table_1_response = client.create_route_table(
        TagSpecifications=[
            {
                "ResourceType": "route-table",
                "Tags": [
                    {"Key": "Name", "Value": "mephisto-routes-1"},
                    get_owner_tag(),
                ],
            }
        ],
        VpcId=vpc_id,
    )
    route_table_1_id = table_1_response["RouteTable"]["RouteTableId"]

    table_2_response = client.create_route_table(
        TagSpecifications=[
            {
                "ResourceType": "route-table",
                "Tags": [
                    {"Key": "Name", "Value": "mephisto-routes-2"},
                    get_owner_tag(),
                ],
            }
        ],
        VpcId=vpc_id,
    )
    route_table_2_id = table_2_response["RouteTable"]["RouteTableId"]

    # Add routes in tables to gateway
    client.create_route(
        DestinationCidrBlock="0.0.0.0/0",
        GatewayId=gateway_id,
        RouteTableId=route_table_1_id,
    )
    client.create_route(
        DestinationCidrBlock="0.0.0.0/0",
        GatewayId=gateway_id,
        RouteTableId=route_table_2_id,
    )

    # Associate routing tables
    client.associate_route_table(
        RouteTableId=route_table_1_id,
        SubnetId=subnet_1_id,
    )
    client.associate_route_table(
        RouteTableId=route_table_2_id,
        SubnetId=subnet_2_id,
    )

    return {
        "vpc_id": vpc_id,
        "gateway_id": gateway_id,
        "subnet_1_id": subnet_1_id,
        "subnet_2_id": subnet_2_id,
        "route_1_id": route_table_1_id,
        "route_2_id": route_table_2_id,
    }

Create the required vpc with two subnets, an associated internet gateway, and routing tables.

Currently sets up using US-east for both subnets

#   def create_security_group(session: boto3.session.Session, vpc_id: str, ssh_ip: str) -> str:
View Source
def create_security_group(session: boto3.Session, vpc_id: str, ssh_ip: str) -> str:
    """
    Create a security group with public access
    for 80 and 443, but only access from ssh_ip (comma-separated) for 22
    """
    client = session.client("ec2")

    create_response = client.create_security_group(
        Description="Security group used for Mephisto host servers",
        GroupName="mephisto-server-security-group",
        VpcId=vpc_id,
        TagSpecifications=[
            {
                "ResourceType": "security-group",
                "Tags": [
                    {"Key": "Name", "Value": "mephisto-server-security-group"},
                    get_owner_tag(),
                ],
            }
        ],
    )
    group_id = create_response["GroupId"]
    ssh_perms = [
        {
            "FromPort": 22,
            "ToPort": 22,
            "IpProtocol": "tcp",
            "IpRanges": [
                {
                    "CidrIp": one_ip,
                    "Description": "SSH from allowed ip",
                }
            ],
        }
        for one_ip in ssh_ip.split(",")
    ]

    response = client.authorize_security_group_ingress(
        GroupId=group_id,
        IpPermissions=[
            {
                "FromPort": 80,
                "ToPort": 80,
                "IpProtocol": "tcp",
                "IpRanges": [
                    {
                        "CidrIp": "0.0.0.0/0",
                        "Description": "Public insecure http access",
                    }
                ],
            },
            {
                "FromPort": 80,
                "ToPort": 80,
                "IpProtocol": "tcp",
                "Ipv6Ranges": [
                    {
                        "CidrIpv6": "::/0",
                        "Description": "Public insecure http access",
                    }
                ],
            },
            {
                "FromPort": 5000,
                "ToPort": 5000,
                "IpProtocol": "tcp",
                "IpRanges": [
                    {
                        "CidrIp": "0.0.0.0/0",
                        "Description": "Internal router access",
                    }
                ],
            },
            {
                "FromPort": 5000,
                "ToPort": 5000,
                "IpProtocol": "tcp",
                "Ipv6Ranges": [
                    {
                        "CidrIpv6": "::/0",
                        "Description": "Internal router access",
                    }
                ],
            },
            {
                "FromPort": 443,
                "ToPort": 443,
                "IpProtocol": "tcp",
                "IpRanges": [
                    {
                        "CidrIp": "0.0.0.0/0",
                        "Description": "Public secure http access",
                    }
                ],
            },
            {
                "FromPort": 443,
                "ToPort": 443,
                "IpProtocol": "tcp",
                "Ipv6Ranges": [
                    {
                        "CidrIpv6": "::/0",
                        "Description": "Public secure http access",
                    }
                ],
            },
        ]
        + ssh_perms,
    )

    assert response["ResponseMetadata"]["HTTPStatusCode"] == 200
    return group_id

Create a security group with public access for 80 and 443, but only access from ssh_ip (comma-separated) for 22

#   def create_key_pair( session: boto3.session.Session, key_name: str, key_pair_dir: str = '/Users/jju/mephisto/mephisto/abstractions/architects/ec2/keypairs' ) -> str:
View Source
def create_key_pair(
    session: boto3.Session,
    key_name: str,
    key_pair_dir: str = DEFAULT_KEY_PAIR_DIRECTORY,
) -> str:
    """
    creates a key pair by the given name, and writes it to file
    """
    target_keypair_filename = os.path.join(key_pair_dir, f"{key_name}.pem")
    if os.path.exists(target_keypair_filename):
        logger.warning(f"Keypair already exists! {target_keypair_filename}")
        return target_keypair_filename
    client = session.client("ec2")

    response = client.create_key_pair(
        KeyName=key_name,
        TagSpecifications=[
            {
                "ResourceType": "key-pair",
                "Tags": [{"Key": "Name", "Value": key_name}, get_owner_tag()],
            }
        ],
    )
    with open(target_keypair_filename, "w+") as keypair_file:
        keypair_file.write(response["KeyMaterial"])
        subprocess.check_call(["chmod", "400", target_keypair_filename])

    return target_keypair_filename

creates a key pair by the given name, and writes it to file

#   def create_instance( session: boto3.session.Session, key_pair_name: str, security_group_id: str, subnet_id: str, instance_name: str, volume_size: int = 8, instance_type: str = 'm2.micro' ) -> str:
View Source
def create_instance(
    session: boto3.Session,
    key_pair_name: str,
    security_group_id: str,
    subnet_id: str,
    instance_name: str,
    volume_size: int = 8,
    instance_type: str = DEFAULT_INSTANCE_TYPE,
) -> str:
    """
    Create an instance, return the instance id, allocation id, and association id
    """
    client = session.client("ec2")
    instance_response = client.run_instances(
        BlockDeviceMappings=[
            {
                "DeviceName": "/dev/xvda",
                "Ebs": {
                    "DeleteOnTermination": True,
                    "VolumeSize": volume_size,
                    "VolumeType": "gp2",
                    "Encrypted": True,
                },
            }
        ],
        ImageId=DEFAULT_AMI_ID,
        InstanceType=instance_type,
        KeyName=key_pair_name,
        MaxCount=1,
        MinCount=1,
        Monitoring={
            "Enabled": False,
        },  # standard monitoring is enough
        Placement={
            "Tenancy": "default",
        },
        SecurityGroupIds=[security_group_id],
        SubnetId=subnet_id,
        DisableApiTermination=False,  # we need to allow shutdown from botocore
        # IamInstanceProfile={ # Maybe we can move the iam role to do rest of registration?
        #     'Arn': 'string',
        #     'Name': 'string'
        # },
        InstanceInitiatedShutdownBehavior="stop",
        TagSpecifications=[
            {
                "ResourceType": "instance",
                "Tags": [
                    {"Key": "Name", "Value": instance_name},
                    get_owner_tag(),
                ],
            },
        ],
        HibernationOptions={"Configured": False},
        MetadataOptions={
            "HttpTokens": "optional",
            "HttpEndpoint": "enabled",
        },
        EnclaveOptions={"Enabled": False},
    )
    instance_id = instance_response["Instances"][0]["InstanceId"]

    logger.debug(f"Waiting for instance {instance_id} to come up before continuing")
    waiter = client.get_waiter("instance_running")
    waiter.wait(
        InstanceIds=[instance_id],
    )

    return instance_id

Create an instance, return the instance id, allocation id, and association id

#   def create_target_group( session: boto3.session.Session, vpc_id: str, instance_id: str, group_name='mephisto-fallback' ) -> str:
View Source
def create_target_group(
    session: boto3.Session,
    vpc_id: str,
    instance_id: str,
    group_name="mephisto-fallback",
) -> str:
    """
    Create a target group for the given instance
    """
    client = session.client("elbv2")
    group_name_hash = hashlib.md5(group_name.encode("utf-8")).hexdigest()
    anti_collision_group_name = f"{group_name_hash[:8]}-{group_name}"
    final_group_name = f"{anti_collision_group_name[:28]}-tg"
    create_target_response = client.create_target_group(
        Name=final_group_name[:32],
        Protocol="HTTP",
        ProtocolVersion="HTTP1",
        Port=5000,
        VpcId=vpc_id,
        Matcher={
            "HttpCode": "200-299",
        },
        TargetType="instance",
        Tags=[
            {"Key": "string", "Value": "string"},
        ],
    )
    target_group_arn = create_target_response["TargetGroups"][0]["TargetGroupArn"]

    client.register_targets(
        TargetGroupArn=target_group_arn,
        Targets=[
            {
                "Id": instance_id,
            }
        ],
    )

    return target_group_arn

Create a target group for the given instance

#   def rule_is_new( session: boto3.session.Session, subdomain: str, listener_arn: str ) -> bool:
View Source
def rule_is_new(
    session: boto3.Session,
    subdomain: str,
    listener_arn: str,
) -> bool:
    """
    Check to see if a rule already exists with the given subdomain
    """
    client = session.client("elbv2")
    find_rule_response = client.describe_rules(
        ListenerArn=listener_arn,
    )
    rules = find_rule_response["Rules"]
    for rule in rules:
        if len(rule["Conditions"]) == 0:
            continue  # base rule
        host_condition = rule["Conditions"][0]
        values = host_condition.get("Values")
        if values is None or len(values) == 0:
            values = host_condition["HostHeaderConfig"]["Values"]
        existing = values[0]
        if existing.startswith(f"{subdomain}."):
            return False

    return True

Check to see if a rule already exists with the given subdomain

#   def register_instance_to_listener( session: boto3.session.Session, instance_id: str, vpc_id: str, listener_arn: str, domain: str ) -> Tuple[str, str]:
View Source
def register_instance_to_listener(
    session: boto3.Session,
    instance_id: str,
    vpc_id: str,
    listener_arn: str,
    domain: str,
) -> Tuple[str, str]:
    """
    Creates a rule for this specific redirect case,
    and returns the target group id and rule arn
    """
    subdomain_root = domain.split(".")[0]
    target_group_arn = create_target_group(session, vpc_id, instance_id, subdomain_root)
    client = session.client("elbv2")

    find_rule_response = client.describe_rules(
        ListenerArn=listener_arn,
    )

    # Get the next available priority
    priorities = set([r["Priority"] for r in find_rule_response["Rules"]])
    priority = 1
    while str(priority) in priorities:
        priority += 1

    rule_response = client.create_rule(
        ListenerArn=listener_arn,
        Conditions=[
            {
                "Field": "host-header",
                "HostHeaderConfig": {
                    "Values": [
                        domain,
                        f"*.{domain}",
                    ],
                },
            },
        ],
        Priority=priority,
        Actions=[
            {
                "Type": "forward",
                "TargetGroupArn": target_group_arn,
            },
        ],
    )
    rule_arn = rule_response["Rules"][0]["RuleArn"]

    return target_group_arn, rule_arn

Creates a rule for this specific redirect case, and returns the target group id and rule arn

#   def create_load_balancer( session: boto3.session.Session, subnet_ids: List[str], security_group_id: str, vpc_id: str ) -> str:
View Source
def create_load_balancer(
    session: boto3.Session,
    subnet_ids: List[str],
    security_group_id: str,
    vpc_id: str,
) -> str:
    """
    Creates a load balancer and returns the balancer's arn
    """
    client = session.client("elbv2")

    create_response = client.create_load_balancer(
        Name="mephisto-hosts-balancer",
        Subnets=subnet_ids,
        SecurityGroups=[security_group_id],
        Scheme="internet-facing",
        Type="application",
        IpAddressType="ipv4",
    )
    balancer_arn = create_response["LoadBalancers"][0]["LoadBalancerArn"]
    return balancer_arn

Creates a load balancer and returns the balancer's arn

#   def configure_base_balancer( session: boto3.session.Session, balancer_arn: str, certificate_arn: str, target_group_arn: str ) -> str:
View Source
def configure_base_balancer(
    session: boto3.Session,
    balancer_arn: str,
    certificate_arn: str,
    target_group_arn: str,
) -> str:
    """
    Configure the default rules for this load balancer. Return the id
    of the listener to add rules to for redirecting to specified target groups
    """

    client = session.client("elbv2")

    _redirect_response = client.create_listener(
        LoadBalancerArn=balancer_arn,
        Protocol="HTTP",
        Port=80,
        DefaultActions=[
            {
                "Type": "redirect",
                "RedirectConfig": {
                    "Protocol": "HTTPS",
                    "Port": "443",
                    "Host": "#{host}",
                    "Path": "/#{path}",
                    "Query": "#{query}",
                    "StatusCode": "HTTP_301",
                },
            }
        ],
    )

    forward_response = client.create_listener(
        LoadBalancerArn=balancer_arn,
        Protocol="HTTPS",
        Port=443,
        SslPolicy="ELBSecurityPolicy-2016-08",
        Certificates=[
            {
                "CertificateArn": certificate_arn,
            }
        ],
        DefaultActions=[
            {
                "Type": "forward",
                "TargetGroupArn": target_group_arn,
            }
        ],
    )
    listener_arn = forward_response["Listeners"][0]["ListenerArn"]
    return listener_arn

Configure the default rules for this load balancer. Return the id of the listener to add rules to for redirecting to specified target groups

#   def get_instance_address( session: boto3.session.Session, instance_id: str ) -> Tuple[str, str, str]:
View Source
def get_instance_address(
    session: boto3.Session,
    instance_id: str,
) -> Tuple[str, str, str]:
    """
    Create a temporary publicly accessible IP for the given instance.
    Return the IP address, the allocation id, and the association id.
    """
    client = session.client("ec2")

    allocation_response = client.allocate_address(
        Domain="vpc",
        TagSpecifications=[
            {
                "ResourceType": "elastic-ip",
                "Tags": [
                    {
                        "Key": "Name",
                        "Value": f"{instance_id}-ip-address",
                    },
                    get_owner_tag(),
                ],
            }
        ],
    )
    ip_address = allocation_response["PublicIp"]
    allocation_id = allocation_response["AllocationId"]

    associate_response = client.associate_address(
        AllocationId=allocation_id,
        InstanceId=instance_id,
        AllowReassociation=False,
    )
    association_id = associate_response["AssociationId"]

    # Remove this IP from known hosts in case it's there,
    # as it's definitely not the old host anymore
    subprocess.check_call(
        [
            "ssh-keygen",
            "-f",
            f"{KNOWN_HOST_PATH}",
            "-R",
            f'"{ip_address}"',
        ]
    )

    return ip_address, allocation_id, association_id

Create a temporary publicly accessible IP for the given instance. Return the IP address, the allocation id, and the association id.

#   def detete_instance_address( session: boto3.session.Session, allocation_id: str, association_id: str ) -> None:
View Source
def detete_instance_address(
    session: boto3.Session,
    allocation_id: str,
    association_id: str,
) -> None:
    """
    Removes the public ip described by the given allocation and association ids
    """
    client = session.client("ec2")
    client.disassociate_address(
        AssociationId=association_id,
    )

    client.release_address(
        AllocationId=allocation_id,
    )

Removes the public ip described by the given allocation and association ids

#   def try_server_push(subprocess_args: List[str], retries=5, sleep_time=10.0):
View Source
def try_server_push(subprocess_args: List[str], retries=5, sleep_time=10.0):
    """
    Try to execute the server push provided in subprocess args
    """
    while retries > 0:
        try:
            subprocess.check_call(
                subprocess_args, env=dict(os.environ, SSH_AUTH_SOCK="")
            )
            return
        except subprocess.CalledProcessError:
            retries -= 1
            sleep_time *= 1.5
            logger.info(
                f"Timed out trying to push to server. Retries remaining: {retries}"
            )
            time.sleep(sleep_time)
    raise Exception(
        "Could not successfully push to the ec2 instance. See log for errors."
    )

Try to execute the server push provided in subprocess args

#   def deploy_fallback_server( session: boto3.session.Session, instance_id: str, key_pair: str, log_access_pass: str ) -> bool:
View Source
def deploy_fallback_server(
    session: boto3.Session,
    instance_id: str,
    key_pair: str,
    log_access_pass: str,
) -> bool:
    """
    Deploy the fallback server to the given instance,
    return True if successful
    """
    client = session.client("ec2")
    server_host, allocation_id, association_id = get_instance_address(
        session, instance_id
    )
    try:
        keypair_file = os.path.join(DEFAULT_KEY_PAIR_DIRECTORY, f"{key_pair}.pem")
        password_file_name = os.path.join(FALLBACK_SERVER_LOC, f"access_key.txt")
        with open(password_file_name, "w+") as password_file:
            password_file.write(log_access_pass)

        remote_server = f"{AMI_DEFAULT_USER}@{server_host}"

        dest = f"{remote_server}:/home/ec2-user/"
        try_server_push(
            [
                "scp",
                "-o",
                "StrictHostKeyChecking=no",
                "-i",
                keypair_file,
                "-r",
                f"{FALLBACK_SERVER_LOC}",
                dest,
            ]
        )
        os.unlink(password_file_name)
        subprocess.check_call(
            [
                "ssh",
                "-i",
                keypair_file,
                remote_server,
                "bash",
                "/home/ec2-user/fallback_server/scripts/first_setup.sh",
            ],
            env=dict(os.environ, SSH_AUTH_SOCK=""),
        )
        detete_instance_address(session, allocation_id, association_id)
    except Exception as e:
        detete_instance_address(session, allocation_id, association_id)
        raise e

    return True

Deploy the fallback server to the given instance, return True if successful

#   def deploy_to_routing_server( session: boto3.session.Session, instance_id: str, key_pair: str, push_directory: str ) -> bool:
View Source
def deploy_to_routing_server(
    session: boto3.Session,
    instance_id: str,
    key_pair: str,
    push_directory: str,
) -> bool:
    client = session.client("ec2")
    server_host, allocation_id, association_id = get_instance_address(
        session, instance_id
    )
    keypair_file = os.path.join(DEFAULT_KEY_PAIR_DIRECTORY, f"{key_pair}.pem")

    print("Uploading files to server, then attempting to run")
    try:
        remote_server = f"{AMI_DEFAULT_USER}@{server_host}"
        dest = f"{remote_server}:/home/ec2-user/"
        try_server_push(
            [
                "scp",
                "-o",
                "StrictHostKeyChecking=no",
                "-i",
                keypair_file,
                "-r",
                f"{push_directory}",
                dest,
            ]
        )

        subprocess.check_call(
            [
                "ssh",
                "-i",
                keypair_file,
                remote_server,
                "bash",
                "/home/ec2-user/routing_server/setup/init_server.sh",
            ],
            env=dict(os.environ, SSH_AUTH_SOCK=""),
        )
        detete_instance_address(session, allocation_id, association_id)
        print("Server setup complete!")
    except Exception as e:
        detete_instance_address(session, allocation_id, association_id)
        raise e

    return True
#   def delete_rule( session: boto3.session.Session, rule_arn: str, target_group_arn: str ) -> None:
View Source
def delete_rule(
    session: boto3.Session,
    rule_arn: str,
    target_group_arn: str,
) -> None:
    """
    Remove the given rule and the target group for this rule
    """
    client = session.client("elbv2")
    client.delete_rule(
        RuleArn=rule_arn,
    )

    client.delete_target_group(
        TargetGroupArn=target_group_arn,
    )

Remove the given rule and the target group for this rule

#   def delete_instance(session: boto3.session.Session, instance_id: str) -> None:
View Source
def delete_instance(
    session: boto3.Session,
    instance_id: str,
) -> None:
    """
    Remove the given instance and the associated elastic ip
    """
    client = session.client("ec2")
    client.terminate_instances(InstanceIds=[instance_id])

Remove the given instance and the associated elastic ip

#   def remove_instance_and_cleanup(session: boto3.session.Session, server_name: str) -> None:
View Source
def remove_instance_and_cleanup(
    session: boto3.Session,
    server_name: str,
) -> None:
    """
    Cleanup for a launched server, removing the redirect rule
    clearing the target group, and then shutting down the instance.
    """
    server_detail_path = os.path.join(
        DEFAULT_SERVER_DETAIL_LOCATION, f"{server_name}.json"
    )

    with open(server_detail_path, "r") as detail_file:
        details = json.load(detail_file)

    delete_rule(session, details["balancer_rule_arn"], details["target_group_arn"])
    delete_instance(
        session,
        details["instance_id"],
    )
    os.unlink(server_detail_path)
    return None

Cleanup for a launched server, removing the redirect rule clearing the target group, and then shutting down the instance.

#   def delete_listener(session: boto3.session.Session, listener_arn: str) -> None:
View Source
def delete_listener(
    session: boto3.Session,
    listener_arn: str,
) -> None:
    client = session.client("elbv2")
    client.delete_listener(
        ListenerArn=listener_arn,
    )
#   def cleanup_fallback_server( iam_profile: str, delete_hosted_zone: bool = False, server_details_file: str = '/Users/jju/mephisto/mephisto/abstractions/architects/ec2/servers/fallback.json' ) -> None:
View Source
def cleanup_fallback_server(
    iam_profile: str,
    delete_hosted_zone: bool = False,
    server_details_file: str = DEFAULT_FALLBACK_FILE,
) -> None:
    """
    Cleans up all of the resources for the given iam profile,
    assuming that the details are stored in the given
    server_details_file.

    Optionally includes deleting the hosted zone, which remains
    an option due to the DNS changes required
    """
    session = boto3.Session(profile_name=iam_profile, region_name="us-east-2")

    elb_client = session.client("elbv2")
    ec2_client = session.client("ec2")

    server_details_file = (
        DEFAULT_FALLBACK_FILE if server_details_file is None else server_details_file
    )
    with open(server_details_file, "r") as details_file:
        details = json.load(details_file)

    listener_arn = details.get("listener_arn")
    if listener_arn is not None:
        print(f"Deleting listener {listener_arn}...")
        find_rule_response = elb_client.describe_rules(
            ListenerArn=listener_arn,
        )
        rules = find_rule_response["Rules"]
        if len(rules) > 1:
            confirm = input(
                "There are still existing rules on the router, which would imply that active jobs are running right now. Are you SURE you want to DELETE ALL?[yes/no]"
            )
            if confirm != "yes":
                return
        elb_client.delete_listener(
            ListenerArn=listener_arn,
        )

    target_group_arn = details.get("target_group_arn")
    if target_group_arn is not None:
        print(f"Deleting target group {target_group_arn}...")
        elb_client.delete_target_group(
            TargetGroupArn=target_group_arn,
        )

    balancer_arn = details.get("balancer_arn")
    if balancer_arn is not None:
        print(f"Deleting balancer {balancer_arn}...")
        elb_client.delete_load_balancer(
            LoadBalancerArn=balancer_arn,
        )

    instance_id = details.get("instance_id")
    if instance_id is not None:
        print(f"Deleting instance {instance_id}...")
        delete_instance(session, instance_id)

    vpc_details = details.get("vpc_details")
    if vpc_details is not None:
        print(f"Deleting vpc {vpc_details['vpc_id']} and related resources...")
        ec2_client.delete_subnet(SubnetId=vpc_details["subnet_1_id"])
        ec2_client.delete_subnet(SubnetId=vpc_details["subnet_2_id"])
        ec2_client.delete_route_table(RouteTableId=vpc_details["route_1_id"])
        ec2_client.delete_route_table(RouteTableId=vpc_details["route_2_id"])
        table_response = ec2_client.describe_route_tables(
            Filters=[
                {
                    "Name": "vpc-id",
                    "Values": [vpc_details["vpc_id"]],
                }
            ]
        )
        tables = table_response["RouteTables"]
        for table in tables:
            ec2_client.delete_route_table(RouteTableId=table["RouteTableId"])

        ec2_client.delete_internet_gateway(InternetGatewayId=vpc_details["gateway_id"])

        security_group_id = details.get("security_group_id")
        if security_group_id is not None:
            print("Deleting security group {security_group_id}...")
            ec2_client.delete_security_group(
                GroupId=security_group_id,
            )

        ec2_client.delete_vpc(VpcId=vpc_details["vpc_id"])

    if delete_hosted_zone:
        hosted_zone_id = details.get("hosted_zone_id")
        if hosted_zone_id is not None:
            route53_client = session.client("route53")
            print(
                "Deleting hosted zones not yet implemented, "
                "navigate to the AWS Route53 console to complete "
                f"this step, deleting {hosted_zone_id}"
            )
            # To delete a hosted zone, we need to query it
            # for the list of records, than remove all
            # that aren't SOA/NS

    os.unlink(server_details_file)
    return None

Cleans up all of the resources for the given iam profile, assuming that the details are stored in the given server_details_file.

Optionally includes deleting the hosted zone, which remains an option due to the DNS changes required