Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import socket
from fastapi import FastAPI
import sys
import json
import binascii
import os
import base64
import struct
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
import paramiko
from paramiko.common import MSG_DISCONNECT
import getpass
from paramiko.ssh_exception import SSHException
from pydantic import BaseModel
with open('config.json', 'r') as f:
config = json.load(f)
app = FastAPI()
class AuthInfo(BaseModel):
algorithm: str
session_id: str
userauth_request: str
username: str
class Item(BaseModel):
data: str
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
libs_path = os.path.join(os.getcwd(), 'libs')
if libs_path not in sys.path:
sys.path.append(libs_path)
public_key_file = config['keys']['pubkey_python']
with open(public_key_file, 'rb') as key_file:
public_key = serialization.load_ssh_public_key(key_file.read(), backend=default_backend())
public_key_pem = public_key.public_bytes(
encoding=serialization.Encoding.OpenSSH,
format=serialization.PublicFormat.OpenSSH
)
public_key_bits = public_key_pem.decode().split(' ')[1]
# print("Contenu de la clé publique :\n", public_key_pem.decode())
def pack_string(s):
return struct.pack(">I", len(s)) + s
def pack_byte(b):
return struct.pack(">B", b)
def pack_boolean(b):
return struct.pack(">?", b)
def pack_bytes(bytes_data):
return struct.pack(f">{len(bytes_data)}s", bytes_data)
def _get_session_blob(key, service, username, algorithm, session_id, userauth_request):
global public_key_bits
m2 = b'\x00\x00\x00 '
m2 += pack_bytes(session_id)
m2 += pack_bytes(userauth_request)
m2 += pack_string(username.encode())
m2 += pack_string(service.encode())
m2 += pack_string("publickey".encode())
m2 += pack_boolean(True)
m2 += pack_string(algorithm.encode())
hex_sequence = b'\x00\x00\x01\x97'
m2 += hex_sequence
rsa_key_bytes = base64.b64decode(public_key_bits)
m2 += pack_bytes(rsa_key_bytes)
return m2
# Function to sign data from ssh-agent (c)
# Here we only sign the payload given by the agent
@app.post("/sign2")
def sign2(item: Item):
private_key_file = config['keys']['privkey_c']
algo = "rsa-sha2-512"
mess = b''
keyPriv = paramiko.RSAKey.from_private_key_file(private_key_file)
print("Please sign the following data : " + item.data)
data = binascii.unhexlify(item.data)
signature = keyPriv.sign_ssh_data(data, algo)
print("Signature : " + str(signature.asbytes().hex()))
mess += pack_bytes(signature.asbytes()) # Assuming bits is a string
encoded_data = base64.b64encode(mess).decode('utf-8')
return {"signed_data": encoded_data}
# Function to sign data from the custom paramiko client
# Here we create the whole package of data including the signature
@app.post("/sign/")
def sign_info(auth_info: AuthInfo):
private_key_file = config['keys']['privkey_python']
# Conversion des données JSON en variables
algorithm = auth_info.algorithm
session_id = base64.b64decode(auth_info.session_id)
userauth_request = base64.b64decode(auth_info.userauth_request)
username = auth_info.username
print("Demande de signature recue pour : " + username )
user_validation = input("Voulez vous valider la signature ? (yes/no): ")
if user_validation.lower() == 'yes' or user_validation.lower() == 'y':
m3 = b''
m3 += pack_bytes(userauth_request)
m3 += pack_string(username.encode())
m3 += pack_string("ssh-connection".encode())
m3 += pack_string("publickey".encode())
m3 += pack_boolean(True)
password = None
try:
privateKey = paramiko.RSAKey.from_private_key_file(private_key_file)
except SSHException:
password = getpass.getpass('Veuillez entrer le mot de passe de la clé privée : ')
privateKey = paramiko.RSAKey.from_private_key_file(private_key_file, password=password)
sessionblob = _get_session_blob(privateKey, "ssh-connection", username, algorithm, session_id, userauth_request)
m3 += pack_string(algorithm.encode())
hex_sequence = b'\x00\x00\x01\x97'
m3 += hex_sequence
rsa_key_bytes = base64.b64decode(public_key_bits)
m3 += pack_bytes(rsa_key_bytes)
sig = privateKey.sign_ssh_data(sessionblob, algorithm)
hex_sequence2 = b'\x00\x00\x01\x94'
m3 += hex_sequence2
m3 += pack_bytes(sig.asbytes())
print(m3)
encoded_data = base64.b64encode(m3).decode('utf-8')
print("encoded: ", encoded_data)
print(bcolors.OKGREEN + "Transaction signée avec succes, la connexion sur l'appareil principal est ouverte" + bcolors.ENDC)
return {"message":encoded_data}
else:
print(bcolors.FAIL + "User did not validate the signature. Closing the connection..." + bcolors.ENDC)
return {"reject": base64.b64encode(bytes(MSG_DISCONNECT)).decode('utf-8')}