summaryrefslogtreecommitdiffstats
path: root/tozt/ssh.py
blob: 51111c869a99cee40aa9617a8a4495300085f681 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import binascii
import os
import subprocess
import tempfile
from typing import Any, Optional, Tuple

import pulumi


class SshKeyProvider(pulumi.dynamic.ResourceProvider):
    def create(self, inputs: Any):
        (private_key, public_key) = ssh_keygen()
        return pulumi.dynamic.CreateResult(
            id_=str(binascii.b2a_hex(os.urandom(16))),
            outs={"private_key": private_key, "public_key": public_key},
        )


class SshKey(pulumi.dynamic.Resource):
    private_key: pulumi.Output[str]
    public_key: pulumi.Output[str]

    def __init__(
        self, name: str, opts: Optional[pulumi.ResourceOptions] = None
    ):
        super().__init__(
            SshKeyProvider(),
            name,
            {"private_key": None, "public_key": None},
            opts,
        )


def ssh_keygen() -> Tuple[str, str]:
    with tempfile.TemporaryDirectory() as dir:
        key = f"{dir}/id_rsa"
        close = []
        try:
            (priv_r, priv_w) = os.pipe()
            close.extend([priv_r, priv_w])
            (pub_r, pub_w) = os.pipe()
            close.extend([pub_r, pub_w])
            (yes_r, yes_w) = os.pipe()
            close.extend([yes_r, yes_w])
            os.symlink(f"/proc/self/fd/{priv_w}", f"{key}")
            os.symlink(f"/proc/self/fd/{pub_w}", f"{key}.pub")
            os.write(yes_w, b"y\n")
            os.close(yes_w)
            subprocess.check_call(
                ["ssh-keygen", "-q", "-N", "", "-f", key],
                pass_fds=(pub_w, priv_w),
                stdin=yes_r,
                stdout=subprocess.DEVNULL,
            )
            os.close(priv_w)
            os.close(pub_w)
            os.close(yes_r)
            with open(priv_r) as f:
                privkey = f.read()
            with open(pub_r) as f:
                pubkey = f.read()
        except Exception:
            for fd in close:
                try:
                    os.close(fd)
                except Exception:
                    pass
            raise
        return (privkey, pubkey)