Skip to content
Snippets Groups Projects
auth.py 5.18 KiB
Newer Older
jorge.leitemac's avatar
jorge.leitemac committed
import socket
from fastapi import FastAPI
import sys
import json
import binascii
import os
import base64
import struct
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
import paramiko
from paramiko.common import MSG_DISCONNECT
import getpass
from paramiko.ssh_exception import SSHException
from pydantic import BaseModel

with open('config.json', 'r') as f:
    config = json.load(f)

app = FastAPI()

class AuthInfo(BaseModel):
    algorithm: str
    session_id: str
    userauth_request: str
    username: str

class Item(BaseModel):
    data: str

class bcolors:
    HEADER = '\033[95m'
    OKBLUE = '\033[94m'
    OKCYAN = '\033[96m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'

libs_path = os.path.join(os.getcwd(), 'libs')
if libs_path not in sys.path:
    sys.path.append(libs_path)


public_key_file = config['keys']['pubkey_python']

with open(public_key_file, 'rb') as key_file:
    public_key = serialization.load_ssh_public_key(key_file.read(), backend=default_backend())

public_key_pem = public_key.public_bytes(
    encoding=serialization.Encoding.OpenSSH,
    format=serialization.PublicFormat.OpenSSH
)

public_key_bits = public_key_pem.decode().split(' ')[1]

# print("Contenu de la clé publique :\n", public_key_pem.decode())

def pack_string(s):
    return struct.pack(">I", len(s)) + s

def pack_byte(b):
    return struct.pack(">B", b)

def pack_boolean(b):
    return struct.pack(">?", b)

def pack_bytes(bytes_data):
    return struct.pack(f">{len(bytes_data)}s", bytes_data)

def _get_session_blob(key, service, username, algorithm, session_id, userauth_request):
    global public_key_bits
    
    m2 = b'\x00\x00\x00 '
    m2 += pack_bytes(session_id) 
    m2 += pack_bytes(userauth_request) 
    m2 += pack_string(username.encode()) 
    m2 += pack_string(service.encode())
    m2 += pack_string("publickey".encode())
    m2 += pack_boolean(True)
    m2 += pack_string(algorithm.encode())
    
    hex_sequence = b'\x00\x00\x01\x97'
    m2 += hex_sequence
    rsa_key_bytes = base64.b64decode(public_key_bits)
    m2 += pack_bytes(rsa_key_bytes) 

    return m2


# Function to sign data from ssh-agent (c)
# Here we only sign the payload given by the agent
@app.post("/sign2")
def sign2(item: Item):
    private_key_file = config['keys']['privkey_c']
    algo = "rsa-sha2-512"   
    mess = b''
    keyPriv = paramiko.RSAKey.from_private_key_file(private_key_file)
    print("Please sign the following data : " + item.data)
    data = binascii.unhexlify(item.data)
    signature = keyPriv.sign_ssh_data(data, algo)
    print("Signature : " + str(signature.asbytes().hex()))
    mess += pack_bytes(signature.asbytes()) # Assuming bits is a string
    encoded_data = base64.b64encode(mess).decode('utf-8')
    return {"signed_data": encoded_data}

# Function to sign data from the custom paramiko client
# Here we create the whole package of data including the signature
@app.post("/sign/")
def sign_info(auth_info: AuthInfo):
    private_key_file = config['keys']['privkey_python']

    # Conversion des données JSON en variables
    algorithm = auth_info.algorithm
    session_id = base64.b64decode(auth_info.session_id)
    userauth_request = base64.b64decode(auth_info.userauth_request)
    username = auth_info.username

    
    print("Demande de signature recue pour : " + username )
    user_validation = input("Voulez vous valider la signature ? (yes/no): ")

    if user_validation.lower() == 'yes' or user_validation.lower() == 'y':
        m3 = b''
        m3 += pack_bytes(userauth_request) 
        m3 += pack_string(username.encode()) 
        m3 += pack_string("ssh-connection".encode())
        m3 += pack_string("publickey".encode())
        m3 += pack_boolean(True) 
        password = None

        try:
            privateKey = paramiko.RSAKey.from_private_key_file(private_key_file)
        except SSHException:
            password = getpass.getpass('Veuillez entrer le mot de passe de la clé privée : ')
            privateKey = paramiko.RSAKey.from_private_key_file(private_key_file, password=password)

        sessionblob = _get_session_blob(privateKey, "ssh-connection", username, algorithm, session_id, userauth_request)

        m3 += pack_string(algorithm.encode()) 

        hex_sequence = b'\x00\x00\x01\x97' 
        m3 += hex_sequence
        rsa_key_bytes = base64.b64decode(public_key_bits)

        m3 += pack_bytes(rsa_key_bytes)
        sig = privateKey.sign_ssh_data(sessionblob, algorithm)

        hex_sequence2 = b'\x00\x00\x01\x94' 
        m3 += hex_sequence2
        m3 += pack_bytes(sig.asbytes()) 

        print(m3)
        encoded_data = base64.b64encode(m3).decode('utf-8')
        print("encoded: ", encoded_data)
        
        print(bcolors.OKGREEN + "Transaction signée avec succes, la connexion sur l'appareil principal est ouverte" + bcolors.ENDC)
        return {"message":encoded_data}
    else:
        print(bcolors.FAIL + "User did not validate the signature. Closing the connection..." + bcolors.ENDC)
        return {"reject": base64.b64encode(bytes(MSG_DISCONNECT)).decode('utf-8')}