Newer
Older
from fastapi import FastAPI
import sys
import json
import os
import base64
import struct
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
import paramiko
from paramiko.common import MSG_DISCONNECT
import getpass
from paramiko.ssh_exception import SSHException
from pydantic import BaseModel
app = FastAPI()
class AuthInfo(BaseModel):
algorithm: str
session_id: str
userauth_request: str
username: str
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
# ajouter le chemin vers le répertoire libs à PYTHONPATH
libs_path = os.path.join(os.getcwd(), 'libs')
if libs_path not in sys.path:
sys.path.append(libs_path)
with open(public_key_file, 'rb') as key_file:
public_key = serialization.load_ssh_public_key(key_file.read(), backend=default_backend())
public_key_pem = public_key.public_bytes(
encoding=serialization.Encoding.OpenSSH,
format=serialization.PublicFormat.OpenSSH
)
public_key_bits = public_key_pem.decode().split(' ')[1]
# print("Contenu de la clé publique :\n", public_key_pem.decode())
def pack_string(s):
return struct.pack(">I", len(s)) + s
def pack_byte(b):
return struct.pack(">B", b)
def pack_boolean(b):
return struct.pack(">?", b)
def pack_bytes(bytes_data):
return struct.pack(f">{len(bytes_data)}s", bytes_data)
def pack_rsa_key(key):
return struct.pack(f">{len(bytes(key.get_base64(), 'utf-8'))}s", key.get_bits())
def _get_key_type_and_bits_o(key):
# print("_get_key_type_and_bits")
"""
Given any key, return its type/algorithm & bits-to-sign.
Intended for input to or verification of, key signatures.
"""
# Use certificate contents, if available, plain pubkey otherwise
if key.public_blob:
return key.public_blob.key_type, key.public_blob.key_blob
else:
def _get_session_blob(key, service, username, algorithm, session_id, userauth_request):
global public_key_bits
m2 = b'\x00\x00\x00 '
m2 += pack_bytes(session_id) # Assuming session_id is a string
m2 += pack_bytes(userauth_request) # Assuming userauth_request is a byte
m2 += pack_string(username.encode()) # Assuming username is a string
m2 += pack_string(service.encode()) # Assuming service is a string¨
m2 += pack_string("publickey".encode())
m2 += pack_boolean(True) # Adding a boolean
m2 += pack_string(algorithm.encode()) # Assuming algorithm is a string
hex_sequence = b'\x00\x00\x01\x97' #je comprends pas d'où vient cette séquence ??
m2 += hex_sequence
rsa_key_bytes = base64.b64decode(public_key_bits)
m2 += pack_bytes(rsa_key_bytes) # Assuming bits is a string
@app.post("/sign/")
def sign_info(auth_info: AuthInfo):
# Le contenu de votre while True: ici
private_key_file = 'ssh-keys/test4'
# Conversion des données JSON en variables
algorithm = auth_info.algorithm
session_id = base64.b64decode(auth_info.session_id)
userauth_request = base64.b64decode(auth_info.userauth_request)
username = auth_info.username
print("Demande de signature recue pour : " + username )
# print("Session ID : " + str(session_id))
# print("Userauth request : " + str(userauth_request))
user_validation = input("Voulez vous valider la signature ? (yes/no): ")
if user_validation.lower() == 'yes' or user_validation.lower() == 'y':
# Sending the signature back to the client
m3 = b''
m3 += pack_bytes(userauth_request) # Assuming userauth_request is a byte
m3 += pack_string(username.encode()) # Assuming username is a string
m3 += pack_string("ssh-connection".encode()) # Assuming service is a string¨
m3 += pack_string("publickey".encode())
m3 += pack_boolean(True) # Adding a boolean
# Lire la clé privée au format OpenSSH
# privateKey = paramiko.RSAKey.from_private_key_file(private_key_file)
password = None
try:
# Essayez de lire la clé privée sans mot de passe
privateKey = paramiko.RSAKey.from_private_key_file(private_key_file)
except SSHException:
# Si une exception est levée, cela signifie que la clé privée est protégée par un mot de passe
# Vous pouvez alors demander à l'utilisateur de saisir le mot de passe
password = getpass.getpass('Veuillez entrer le mot de passe de la clé privée : ')
privateKey = paramiko.RSAKey.from_private_key_file(private_key_file, password=password)
sessionblob = _get_session_blob(privateKey, "ssh-connection", username, algorithm, session_id, userauth_request)
m3 += pack_string(algorithm.encode()) # Assuming algorithm is a string
hex_sequence = b'\x00\x00\x01\x97' #je comprends pas d'où vient cette séquence ??
m3 += hex_sequence
rsa_key_bytes = base64.b64decode(public_key_bits)
m3 += pack_bytes(rsa_key_bytes) # Assuming bits is a string
sig = privateKey.sign_ssh_data(sessionblob, algorithm)
hex_sequence2 = b'\x00\x00\x01\x94' #je comprends pas d'où vient cette séquence ??
m3 += hex_sequence2
m3 += pack_bytes(sig.asbytes()) # Assuming bits is a string
encoded_data = base64.b64encode(m3).decode('utf-8')
# # Pour récupérer le binaire
# decoded_data = base64.b64decode(encoded_data)
# print("decoded: ", decoded_data)
print(bcolors.OKGREEN + "Transaction signée avec succes, la connexion sur l'appareil principal est ouverte" + bcolors.ENDC)
return {"message":encoded_data}
else:
print(bcolors.FAIL + "Signature refusée. Fermeture de la connection..." + bcolors.ENDC)
# Sending a disconnect message back to the client
return {"reject": base64.b64encode(bytes(MSG_DISCONNECT)).decode('utf-8')}
# # Variables pour le serveur de signature
# hostname = 'localhost'
# port = 12349
# # Variables pour la clé privée
# private_key_file = 'ssh-keys/test4'
# # password = None
# # Démarrer le serveur de signatu
# signer_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# signer_server.bind((hostname, port))
# signer_server.listen(1)
# while True:
# print("En attente d'une connexion...")
# client, address = signer_server.accept()
# # print("Connexion recue !")
# data = client.recv(1024)
# info_dict = json.loads(data.decode())
# # Vous pouvez maintenant accéder aux informations dans info_dict
# algorithm = info_dict['algorithm']
# session_id = base64.b64decode(info_dict['session_id'])
# userauth_request = base64.b64decode(info_dict['userauth_request'])
# username = info_dict['username']
# # ... all your code before try: ...
# # ask the user to validate
# print(bcolors.OKCYAN + "Signature request received for username : " + username + " and algorithm : " + bcolors.ENDC)
# user_validation = input("Do you want to validate the signature? (yes/no): ")
# if user_validation.lower() == 'yes' or user_validation.lower() == 'y':
# try:
# # Sending the signature back to the client
# m3 = b''
# m3 += pack_bytes(userauth_request) # Assuming userauth_request is a byte
# m3 += pack_string(username.encode()) # Assuming username is a string
# m3 += pack_string("ssh-connection".encode()) # Assuming service is a string¨
# m3 += pack_string("publickey".encode())
# m3 += pack_boolean(True) # Adding a boolean
# # Lire la clé privée au format OpenSSH
# # privateKey = paramiko.RSAKey.from_private_key_file(private_key_file)
# password = None
# try:
# # Essayez de lire la clé privée sans mot de passe
# privateKey = paramiko.RSAKey.from_private_key_file(private_key_file)
# except SSHException:
# # Si une exception est levée, cela signifie que la clé privée est protégée par un mot de passe
# # Vous pouvez alors demander à l'utilisateur de saisir le mot de passe
# password = getpass.getpass('Veuillez entrer le mot de passe de la clé privée : ')
# privateKey = paramiko.RSAKey.from_private_key_file(private_key_file, password=password)
# sessionblob = _get_session_blob(privateKey, "ssh-connection", username, algorithm, session_id, userauth_request)
# m3 += pack_string(algorithm.encode()) # Assuming algorithm is a string
# hex_sequence = b'\x00\x00\x01\x97' #je comprends pas d'où vient cette séquence ??
# m3 += hex_sequence
# rsa_key_bytes = base64.b64decode(public_key_bits)
# m3 += pack_bytes(rsa_key_bytes) # Assuming bits is a string
# sig = privateKey.sign_ssh_data(sessionblob, algorithm)
# hex_sequence2 = b'\x00\x00\x01\x94' #je comprends pas d'où vient cette séquence ??
# m3 += hex_sequence2
# m3 += pack_bytes(sig.asbytes()) # Assuming bits is a string
# print(bcolors.OKGREEN + "Transaction signée avec succes, la connexion sur l'appareil principal est ouverte" + bcolors.ENDC)
# client.sendall(m3)
# finally:
# client.close()
# else:
# print(bcolors.FAIL + "User did not validate the signature. Closing the connection..." + bcolors.ENDC)
# # Sending a disconnect message back to the client
# client.sendall(pack_bytes(bytes(MSG_DISCONNECT)))
# client.close()