Skip to content
Snippets Groups Projects
auth.py 9.82 KiB
Newer Older
import sys
import json
import os
import base64
import struct
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
jorge.leitemac's avatar
jorge.leitemac committed
import paramiko
from paramiko.common import MSG_DISCONNECT
import getpass
from paramiko.ssh_exception import SSHException
from pydantic import BaseModel

app = FastAPI()

class AuthInfo(BaseModel):
    algorithm: str
    session_id: str
    userauth_request: str
    username: str
jorge.leitemac's avatar
jorge.leitemac committed

class bcolors:
    HEADER = '\033[95m'
    OKBLUE = '\033[94m'
    OKCYAN = '\033[96m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'

# ajouter le chemin vers le répertoire libs à PYTHONPATH
libs_path = os.path.join(os.getcwd(), 'libs')
if libs_path not in sys.path:
    sys.path.append(libs_path)


jorge.leitemac's avatar
jorge.leitemac committed
public_key_file = 'ssh-keys/test4.pub'

with open(public_key_file, 'rb') as key_file:
    public_key = serialization.load_ssh_public_key(key_file.read(), backend=default_backend())

public_key_pem = public_key.public_bytes(
    encoding=serialization.Encoding.OpenSSH,
    format=serialization.PublicFormat.OpenSSH
)

public_key_bits = public_key_pem.decode().split(' ')[1]

jorge.leitemac's avatar
jorge.leitemac committed
# print("Contenu de la clé publique :\n", public_key_pem.decode())

def pack_string(s):
    return struct.pack(">I", len(s)) + s

def pack_byte(b):
    return struct.pack(">B", b)

def pack_boolean(b):
    return struct.pack(">?", b)

def pack_bytes(bytes_data):
    return struct.pack(f">{len(bytes_data)}s", bytes_data)

def pack_rsa_key(key):
    return struct.pack(f">{len(bytes(key.get_base64(), 'utf-8'))}s", key.get_bits())
jorge.leitemac's avatar
jorge.leitemac committed
def _get_key_type_and_bits_o(key):
    # print("_get_key_type_and_bits")
    """
    Given any key, return its type/algorithm & bits-to-sign.
jorge.leitemac's avatar
jorge.leitemac committed
    Intended for input to or verification of, key signatures.
    """
    # Use certificate contents, if available, plain pubkey otherwise
    if key.public_blob:
        return key.public_blob.key_type, key.public_blob.key_blob
    else:
        return key.get_name(), key
jorge.leitemac's avatar
jorge.leitemac committed
def _get_session_blob(key, service, username, algorithm, session_id, userauth_request):
    global public_key_bits
    
    m2 = b'\x00\x00\x00 '
    m2 += pack_bytes(session_id) # Assuming session_id is a string
    m2 += pack_bytes(userauth_request) # Assuming userauth_request is a byte
    m2 += pack_string(username.encode()) # Assuming username is a string
    m2 += pack_string(service.encode()) # Assuming service is a string¨
    m2 += pack_string("publickey".encode())
    m2 += pack_boolean(True) # Adding a boolean
    m2 += pack_string(algorithm.encode()) # Assuming algorithm is a string
jorge.leitemac's avatar
jorge.leitemac committed
    
    hex_sequence = b'\x00\x00\x01\x97' #je comprends pas d'où vient cette séquence ??
    m2 += hex_sequence
    rsa_key_bytes = base64.b64decode(public_key_bits)
jorge.leitemac's avatar
jorge.leitemac committed
    m2 += pack_bytes(rsa_key_bytes) # Assuming bits is a string
jorge.leitemac's avatar
jorge.leitemac committed
    return m2
@app.post("/sign/")
def sign_info(auth_info: AuthInfo):
    # Le contenu de votre while True: ici
    private_key_file = 'ssh-keys/test4'
    # Conversion des données JSON en variables
    algorithm = auth_info.algorithm
    session_id = base64.b64decode(auth_info.session_id)
    userauth_request = base64.b64decode(auth_info.userauth_request)
    username = auth_info.username
    print("Demande de signature recue pour : " + username )
    # print("Session ID : " + str(session_id))
    # print("Userauth request : " + str(userauth_request))
    user_validation = input("Voulez vous valider la signature ? (yes/no): ")
    if user_validation.lower() == 'yes' or user_validation.lower() == 'y':
        # Sending the signature back to the client
        m3 = b''
        m3 += pack_bytes(userauth_request) # Assuming userauth_request is a byte
        m3 += pack_string(username.encode()) # Assuming username is a string
        m3 += pack_string("ssh-connection".encode()) # Assuming service is a string¨
        m3 += pack_string("publickey".encode())
        m3 += pack_boolean(True) # Adding a boolean
        
        # Lire la clé privée au format OpenSSH
        # privateKey = paramiko.RSAKey.from_private_key_file(private_key_file)
        password = None
        try:
            # Essayez de lire la clé privée sans mot de passe
            privateKey = paramiko.RSAKey.from_private_key_file(private_key_file)
        except SSHException:
            # Si une exception est levée, cela signifie que la clé privée est protégée par un mot de passe
            # Vous pouvez alors demander à l'utilisateur de saisir le mot de passe
            password = getpass.getpass('Veuillez entrer le mot de passe de la clé privée : ')
            privateKey = paramiko.RSAKey.from_private_key_file(private_key_file, password=password)

        sessionblob = _get_session_blob(privateKey, "ssh-connection", username, algorithm, session_id, userauth_request)

        m3 += pack_string(algorithm.encode()) # Assuming algorithm is a string

        hex_sequence = b'\x00\x00\x01\x97' #je comprends pas d'où vient cette séquence ??
        m3 += hex_sequence
        rsa_key_bytes = base64.b64decode(public_key_bits)

        m3 += pack_bytes(rsa_key_bytes) # Assuming bits is a string
        sig = privateKey.sign_ssh_data(sessionblob, algorithm)

        hex_sequence2 = b'\x00\x00\x01\x94' #je comprends pas d'où vient cette séquence ??
        m3 += hex_sequence2
        m3 += pack_bytes(sig.asbytes()) # Assuming bits is a string

        # print(m3)
        encoded_data = base64.b64encode(m3).decode('utf-8')
        # print("encoded: ", encoded_data)

        # # Pour récupérer le binaire
        # decoded_data = base64.b64decode(encoded_data)
        # print("decoded: ", decoded_data)
        
        print(bcolors.OKGREEN + "Transaction signée avec succes, la connexion sur l'appareil principal est ouverte" + bcolors.ENDC)
        return {"message":encoded_data}
    else:
        print(bcolors.FAIL + "Signature refusée. Fermeture de la connection..." + bcolors.ENDC)
        # Sending a disconnect message back to the client
        return {"reject": base64.b64encode(bytes(MSG_DISCONNECT)).decode('utf-8')}
# # Variables pour le serveur de signature
# hostname = 'localhost'
# port = 12349
# # Variables pour la clé privée
# private_key_file = 'ssh-keys/test4'
# # password = None
# # Démarrer le serveur de signatu
# signer_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# signer_server.bind((hostname, port))
# signer_server.listen(1)
# while True:
#     print("En attente d'une connexion...")
#     client, address = signer_server.accept()
#     # print("Connexion recue !")
#     info_dict = json.loads(data.decode())

#     # Vous pouvez maintenant accéder aux informations dans info_dict

#     algorithm = info_dict['algorithm']
#     session_id = base64.b64decode(info_dict['session_id'])
#     userauth_request = base64.b64decode(info_dict['userauth_request'])
#     username = info_dict['username']

    
#     # ... all your code before try: ...

#     # ask the user to validate
#     print(bcolors.OKCYAN + "Signature request received for username : " + username + " and algorithm : " + bcolors.ENDC)
#     user_validation = input("Do you want to validate the signature? (yes/no): ")

#     if user_validation.lower() == 'yes' or user_validation.lower() == 'y':
#         try:
#             # Sending the signature back to the client
#             m3 = b''
#             m3 += pack_bytes(userauth_request) # Assuming userauth_request is a byte
#             m3 += pack_string(username.encode()) # Assuming username is a string
#             m3 += pack_string("ssh-connection".encode()) # Assuming service is a string¨
#             m3 += pack_string("publickey".encode())
#             m3 += pack_boolean(True) # Adding a boolean
#             # Lire la clé privée au format OpenSSH
#             # privateKey = paramiko.RSAKey.from_private_key_file(private_key_file)
#             password = None
#             try:
#                 # Essayez de lire la clé privée sans mot de passe
#                 privateKey = paramiko.RSAKey.from_private_key_file(private_key_file)
#             except SSHException:
#                 # Si une exception est levée, cela signifie que la clé privée est protégée par un mot de passe
#                 # Vous pouvez alors demander à l'utilisateur de saisir le mot de passe
#                 password = getpass.getpass('Veuillez entrer le mot de passe de la clé privée : ')
#                 privateKey = paramiko.RSAKey.from_private_key_file(private_key_file, password=password)
#             sessionblob = _get_session_blob(privateKey, "ssh-connection", username, algorithm, session_id, userauth_request)
#             m3 += pack_string(algorithm.encode()) # Assuming algorithm is a string
#             hex_sequence = b'\x00\x00\x01\x97' #je comprends pas d'où vient cette séquence ??
#             m3 += hex_sequence
#             rsa_key_bytes = base64.b64decode(public_key_bits)
#             m3 += pack_bytes(rsa_key_bytes) # Assuming bits is a string
#             sig = privateKey.sign_ssh_data(sessionblob, algorithm)
#             hex_sequence2 = b'\x00\x00\x01\x94' #je comprends pas d'où vient cette séquence ??
#             m3 += hex_sequence2
#             m3 += pack_bytes(sig.asbytes()) # Assuming bits is a string
#             print(bcolors.OKGREEN + "Transaction signée avec succes, la connexion sur l'appareil principal est ouverte" + bcolors.ENDC)

#             client.sendall(m3)
#         finally:
#             client.close()
#     else:
#         print(bcolors.FAIL + "User did not validate the signature. Closing the connection..." + bcolors.ENDC)
#         # Sending a disconnect message back to the client
#         client.sendall(pack_bytes(bytes(MSG_DISCONNECT)))
#         client.close()