Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • xavier.perret/perso-distributed-systems
1 result
Select Git revision
Show changes
Commits on Source (60)
Showing
with 1156 additions and 718 deletions
*.pdf
# Distributed Systems
## Lab 1
### Developing and deploying a broadcast algorithm for a blockchain management system on a Cloud infrastructure
- Student : Xavier Perret
- Email : xavier.perret@etu.hesge.ch
- Group : 1
- <ssh://git@ssh.hesge.ch:10572/xavier.perret/perso-distributed-systems.git>
- <https://gitedu.hesge.ch/xavier.perret/perso-distributed-systems.git>
### Client
To launch the client you need to run the following command:
```bash
go main.go --config=client_config.yaml --client
```
Please note that the `client_config.yaml` can be a `neighbor-x.yaml` file if you want to create a transaction and spread
it to neighbours instead on manually entering an ip.
### Server
#### Root Server
To lauch the server to which you wish to send commands using the client you need to do
```bash
go main.go --config=neighbor-0.yaml --server --root
```
#### Normal Server
To launch the other server you need to run the following command:
```bash
go main.go --config=server_config.yaml --server
```
### Functionalities
- Able to create a transaction from the client and then request to root server to broadcast by wave to all the other servers
- Able to send a rate request from client to root server then broadcast by wave to all the other servers (buggy at the moment)
- Not able to fake a transaction yet
- Able to print all local transaction from any server or client
\ No newline at end of file
module node
go 1.19
require gopkg.in/yaml.v3 v3.0.1 // indirect
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
package main
import (
"encoding/json"
"fmt"
"gopkg.in/yaml.v3"
"io/ioutil"
"math/rand"
"net"
"os"
"strconv"
"time"
)
/**
* @file
* @brief Distributed Systems - Blockchain
* @author Xavier Perret
* @date 28/09/2022 - 05/10/2022
*/
// Message - MessageBody can be of any type but will be use here for Transaction, AckTransaction
type Message struct {
MessageType string `json:"messageType"`
MessageBody interface{} `json:"messageBody"`
}
// Transaction - represents a transaction in the blockchain
type Transaction struct {
Id string `json:"id"`
Sender string `json:"sender"`
Receiver string `json:"receiver"`
Amount string `json:"amount"`
}
// AckTransaction - represents an acknowledgement of a transaction @see vote function
type AckTransaction struct {
Id string `json:"id"`
AmountOfCorrectNode int `json:"amountOfCorrectNode"`
TotalNodes int `json:"totalNodes"`
}
// Config - is the configuration of a node, it's id, address, port and neighbours
type Config struct {
ID int `yaml:"id"`
Address string `yaml:"address"`
Port int `yaml:"port"`
Neighbours []struct {
ID int `yaml:"id"`
Address string `yaml:"address"`
Port int `yaml:"port"`
} `yaml:"neighbours"`
}
// ServerReady is true if the server can listen to new connection attempt, used for initialization
var ServerReady = false
// DB in memory database
var DB []Transaction
// ConnectionType is the type of connection used by the server
var ConnectionType = "tcp"
// a node generates a transaction “trans”, stores it in its object-based storage
// and sends it to all nodes of the network. This function relies on the broadcast by wave distributed
// algorithm.
// trans -
func createTransaction(trans Transaction) {
}
func isTransactionValid(trans Transaction) bool {
return false
}
// A node sends a request to all nodes to check if the transaction trans, stored locally,
// is the same as the ones stored in the other nodes. The result (rate) is a percentage representing the
// transactions which are the same as the one stored locally. This function relies on the broadcast by wave
// with ACK distributed algorithm. However, some adjustments are needed to implement this function.
func vote(server net.Listener, trans Transaction, config Config, parentAddress string, stimulatedByClient bool) AckTransaction {
var ack AckTransaction
ack.Id = ""
ack.TotalNodes = 0
ack.AmountOfCorrectNode = 0
var transactionToRate Transaction
for _, transactionInDatabase := range DB {
if transactionInDatabase.Id == trans.Id {
ack.Id = trans.Id
transactionToRate = transactionInDatabase
}
}
// todo add stuff
if ack.Id != "" {
if transactionToRate.Receiver != trans.Receiver {
ack.AmountOfCorrectNode = 0
ack.TotalNodes = 1
}
if transactionToRate.Sender != trans.Sender {
ack.AmountOfCorrectNode = 0
ack.TotalNodes = 1
}
if transactionToRate.Amount != trans.Amount {
ack.AmountOfCorrectNode = 0
ack.TotalNodes = 1
}
}
if ack.TotalNodes == 0 {
ack.AmountOfCorrectNode = 1
ack.TotalNodes = 1
}
var newAck AckTransaction
reach := false
count := 0
limit := len(config.Neighbours)
if stimulatedByClient {
reach = true
go sendVoteToAllNeighbours(config, trans, parentAddress)
}
var mess Message
for count < limit {
if count != 0 || reach {
fmt.Println("Count is ", count, " to ", limit)
connection, err := server.Accept()
if err != nil {
fmt.Println("Error accepting: ", err.Error())
err = nil
continue
}
fmt.Println("*****************************************")
fmt.Println("Processing client request number ", count)
fmt.Println("The connection is ", connection)
fmt.Println("The remote address is ", connection.RemoteAddr())
fmt.Println("The local address is ", connection.LocalAddr())
fmt.Println("*****************************************")
err = json.NewDecoder(connection).Decode(&mess)
if err != nil {
fmt.Println("Error while decoding the transactionInDatabase", err)
err = nil
continue
}
if mess.MessageType == "AckResponse" {
var body map[string]interface{} = mess.MessageBody.(map[string]interface{})
newAck.Id = body["id"].(string)
newAck.AmountOfCorrectNode = body["amountOfCorrectNode"].(int)
newAck.TotalNodes = body["totalNodes"].(int)
if ack.Id == newAck.Id {
ack.TotalNodes += newAck.TotalNodes
ack.AmountOfCorrectNode += newAck.AmountOfCorrectNode
}
fmt.Println("Node correct : ", ack.AmountOfCorrectNode, "out of ", ack.TotalNodes)
}
}
count++ // received message
if !reach {
reach = true
go sendVoteToAllNeighbours(config, trans, parentAddress)
}
}
if stimulatedByClient {
fmt.Println("***********************************")
fmt.Println("Transaction has been rated")
fmt.Println("Node correct : ", ack.AmountOfCorrectNode, "out of ", ack.TotalNodes)
fmt.Println("***********************************")
go sendAckToNeighbour(config, ack, parentAddress)
}
return ack
}
func sendVoteToAllNeighbours(config Config, trans Transaction, parentIp string) {
for neighbour := range config.Neighbours {
ip := config.Neighbours[neighbour].Address
port := strconv.Itoa(config.Neighbours[neighbour].Port)
address := ip + ":" + port
if address != parentIp {
go sendVoteToNeighbour(config, trans, address)
}
}
}
func fake(authenticTrans Transaction, fakeTrans Transaction) {
}
func printTransaction(trans Transaction) {
fmt.Println("--------------------")
fmt.Println("Printing transaction")
fmt.Println("The id is ", trans.Id)
fmt.Println("The sender is ", trans.Sender)
fmt.Println("The receiver is ", trans.Receiver)
fmt.Println("The amount is ", trans.Amount)
fmt.Println("--------------------")
}
func listAllTransactions() {
for i, transaction := range DB {
fmt.Println("***********INDEX N=,", i, "****************")
printTransaction(transaction)
fmt.Println("***********************************")
}
}
// userCreatedTransaction is a function to interactively create a transaction using the terminal
func userCreatedTransaction(config Config) Transaction {
fmt.Println("--------------------STARTING USER CREATED TRANSACTION--------------------")
var trans Transaction
now := time.Now().String()
randomString := strconv.Itoa(rand.Int())
trans.Id = now + randomString
fmt.Println()
var receiver string
fmt.Print("\nPlease enter the receiver")
_, err := fmt.Scanln(&receiver)
if err != nil {
return Transaction{}
}
trans.Receiver = receiver
var sender string
fmt.Print("\nPlease enter the sender")
_, err = fmt.Scanln(&sender)
if err != nil {
return Transaction{}
}
trans.Sender = sender
fmt.Println("Creating a transaction with sender id:", trans.Sender)
fmt.Println("transaction id is ", trans.Id)
fmt.Print("\nEnter amount :")
_, err = fmt.Scanln(&trans.Amount)
if err != nil {
fmt.Println("error")
os.Exit(1)
}
for neighbour := range config.Neighbours {
fmt.Println("Neighbour ", config.Neighbours[neighbour].ID)
fmt.Println("address ", config.Neighbours[neighbour].Address)
fmt.Println("port", config.Neighbours[neighbour].Port)
}
fmt.Println("--------------------FINISHED USER CREATED TRANSACTION--------------------")
return trans
}
// sendTransactionToNeighbour is a function to send a transaction to a neighbour
func sendTransactionToNeighbour(config Config, trans Transaction, destinationIp string, destinationPort string) {
fmt.Println()
ipAddr := destinationIp + ":" + destinationPort
fmt.Println("Trying to connect to ", destinationIp, ":", destinationPort)
conn, err := net.Dial(ConnectionType, ipAddr)
if err != nil {
fmt.Println("Error while connecting to the neighbour", err)
os.Exit(1)
}
mess := Message{
MessageType: "transaction",
MessageBody: trans,
}
fmt.Println("Sending message to neighbour", mess)
encoder := json.NewEncoder(conn)
err = encoder.Encode(mess)
if err != nil {
fmt.Println("Error while encoding the transaction", err)
os.Exit(1)
}
conn.Close()
fmt.Println("MessageBody successfully sent to neighbour")
}
func sendTransactionToAllNeighbours(config Config, trans Transaction) {
for neighbour := range config.Neighbours {
fmt.Println("Sending transaction to neighbour ", config.Neighbours[neighbour].ID)
ip := config.Neighbours[neighbour].Address
port := strconv.Itoa(config.Neighbours[neighbour].Port)
sendTransactionToNeighbour(config, trans, ip, port)
}
}
func printAllNeighbours(config Config) {
fmt.Println("Listing neighbours :")
for neighbour := range config.Neighbours {
fmt.Println("Neighbour ", config.Neighbours[neighbour].ID)
}
}
func processClient(connection net.Conn, server net.Listener, config Config, stimulatedByClient bool) {
fmt.Println("Processing client")
fmt.Println("The connection is ", connection)
fmt.Println("The remote address is ", connection.RemoteAddr())
fmt.Println("The local address is ", connection.LocalAddr())
var mess Message
jsonDecoder := json.NewDecoder(connection)
err := jsonDecoder.Decode(&mess)
if err != nil {
fmt.Println("Error while decoding the message", err)
}
fmt.Println("The message is ", mess)
fmt.Println("A client has connected")
if mess.MessageType == "error" {
fmt.Println("Error while decoding the message", err)
return
} else if mess.MessageType == "transaction" {
fmt.Println("Received a transaction")
var trans Transaction
var body map[string]interface{} = mess.MessageBody.(map[string]interface{})
trans.Id = body["id"].(string)
trans.Receiver = body["receiver"].(string)
trans.Sender = body["sender"].(string)
trans.Amount = body["amount"].(string)
for _, transaction := range DB {
if transaction.Id == trans.Id {
return
}
}
processTransaction(server, config, trans, stimulatedByClient)
} else if mess.MessageType == "rate" {
fmt.Println("Received a rate")
var trans Transaction
var body map[string]interface{} = mess.MessageBody.(map[string]interface{})
trans.Id = body["id"].(string)
trans.Receiver = body["receiver"].(string)
trans.Sender = body["sender"].(string)
trans.Amount = body["amount"].(string)
printTransaction(trans)
// todo change this for cloud
vote(server, trans, config, connection.LocalAddr().String(), stimulatedByClient)
} else if mess.MessageType == "fake" {
fmt.Println("Received a fake")
fmt.Println("Not yet implemented")
} else if mess.MessageType == "list" {
fmt.Println("Received a list all transactions order")
listAllTransactions()
} else {
fmt.Println("Unknown message type")
}
}
func processTransaction(server net.Listener, config Config, trans Transaction, stimulatedByClient bool) {
fmt.Println("Processing transaction")
printTransaction(trans)
fmt.Println("The transaction has been added")
reach := false
count := 0
limit := len(config.Neighbours)
if stimulatedByClient {
reach = true
addTransactionToDb(trans)
go sendTransactionToAllNeighbours(config, trans)
}
for count < limit {
if count != 0 || reach {
fmt.Println("Count is ", count, " to ", limit)
connection, err := server.Accept()
if err != nil {
fmt.Println("Error accepting: ", err.Error())
err = nil
continue
}
fmt.Println("*****************************************")
fmt.Println("Processing client request number ", count)
fmt.Println("The connection is ", connection)
fmt.Println("The remote address is ", connection.RemoteAddr())
fmt.Println("The local address is ", connection.LocalAddr())
fmt.Println("*****************************************")
err = json.NewDecoder(connection).Decode(&trans)
if err != nil {
fmt.Println("Error while decoding the transaction", err)
err = nil
continue
}
fmt.Println("Received back a transaction")
printTransaction(trans)
}
count++ // received message
if !reach {
reach = true
addTransactionToDb(trans)
go sendTransactionToAllNeighbours(config, trans)
}
}
fmt.Println("***********************************")
fmt.Println("All transactions have been received")
fmt.Println("***********************************")
}
func addTransactionToDb(trans Transaction) {
for _, transaction := range DB {
if transaction.Id == trans.Id {
return
}
}
DB = append(DB, trans)
}
func serverLoop(config Config, stimulatedByClient bool) {
addr := "0.0.0.0"
port := strconv.FormatInt(int64(config.Port), 10)
server, err := net.Listen("tcp", addr+":"+port)
if err != nil {
fmt.Println("Error while creating the server", err)
os.Exit(1)
}
fmt.Println("Server is ready to accept connections")
fmt.Println("listening on ", addr, ":", port)
ServerReady = true
for {
// Listening for connections
connection, err := server.Accept()
if err != nil {
fmt.Println("Error accepting: ", err.Error())
} else {
processClient(connection, server, config, stimulatedByClient)
}
}
}
func userInputLoop(config Config, isAlsoServer bool) {
for true {
var operation string
if !ServerReady {
continue
}
fmt.Println()
fmt.Println()
fmt.Println("Please enter the operation you want to do")
fmt.Println("1. Create a transaction")
fmt.Println("2. Rate a transaction (from the client)")
fmt.Println("3. Fabricate a fake transaction")
fmt.Println("4. Print all transactions")
fmt.Println("8. Exit")
fmt.Print("Your choice: ")
_, err := fmt.Scanln(&operation)
if err != nil {
fmt.Println("error :", err.Error())
os.Exit(1)
}
switch operation {
case "1":
fmt.Println("You chose to create a transaction")
if isAlsoServer {
fmt.Println("Not yet implemented!")
//newTrans := userCreatedTransaction(config)
//createTransaction(newTrans)
} else {
newTrans := userCreatedTransaction(config)
addTransactionToDb(newTrans)
printAllNeighbours(config)
fmt.Println("TRANSACTION READY TO BE SENT")
fmt.Println("Please enter the ID of the neighbour you want to send the transaction to")
var neighbourID string
_, err := fmt.Scanln(&neighbourID)
if err != nil {
fmt.Println("error :", err.Error())
os.Exit(1)
}
neighbourIDInt, err := strconv.ParseInt(neighbourID, 10, 64)
if err != nil {
fmt.Println("error :", err.Error())
os.Exit(1)
}
sendTransactionToNeighbour(config, newTrans, config.Neighbours[neighbourIDInt].Address, strconv.Itoa(config.Neighbours[neighbourIDInt].Port))
}
break
case "2":
fmt.Println("You chose to rate a transaction")
listAllTransactions()
fmt.Print("\nPlease enter the index of the transaction you want to rate:")
var transID string
_, err := fmt.Scanln(&transID)
if err != nil {
fmt.Println("error :", err.Error())
os.Exit(1)
}
transIDInt, err := strconv.ParseInt(transID, 10, 64)
if err != nil {
fmt.Println("error :", err.Error())
os.Exit(1)
}
printAllNeighbours(config)
fmt.Println("Please enter the ID of the neighbour you want to send the transaction to")
var neighbourID string
_, err = fmt.Scanln(&neighbourID)
if err != nil {
fmt.Println("error :", err.Error())
os.Exit(1)
}
neighbourIDInt, err := strconv.ParseInt(neighbourID, 10, 64)
if err != nil {
fmt.Println("error :", err.Error())
os.Exit(1)
}
address := config.Neighbours[neighbourIDInt].Address + ":" + strconv.Itoa(config.Neighbours[neighbourIDInt].Port)
fmt.Println("Sending rate demand to ", address)
sendVoteToNeighbour(config, DB[transIDInt], address)
break
case "3":
fmt.Println("You chose to fabricate a fake transaction")
break
case "4":
fmt.Println("You chose to print all transactions")
listAllTransactions()
break
case "5":
break
case "6":
break
case "7":
break
case "8":
fmt.Println("You chose to exit")
os.Exit(0)
default:
fmt.Println("You chose an invalid option")
break
}
}
}
func sendVoteToNeighbour(config Config, trans Transaction, address string) {
fmt.Println()
fmt.Println("Trying to connect to ", address)
conn, err := net.Dial(ConnectionType, address)
if err != nil {
fmt.Println("Error while connecting to the neighbour", err)
os.Exit(1)
}
mess := Message{
MessageType: "rate",
MessageBody: trans,
}
fmt.Println("Sending message to neighbour", mess)
encoder := json.NewEncoder(conn)
err = encoder.Encode(mess)
if err != nil {
fmt.Println("Error while encoding the transaction", err)
os.Exit(1)
}
conn.Close()
fmt.Println("MessageBody successfully sent to neighbour")
}
func sendAckToNeighbour(config Config, ack AckTransaction, address string) {
fmt.Println()
fmt.Println("Trying to connect to ", address)
conn, err := net.Dial(ConnectionType, address)
if err != nil {
fmt.Println("Error while connecting to the neighbour", err)
os.Exit(1)
}
mess := Message{
MessageType: "AckResponse",
MessageBody: ack,
}
fmt.Println("Sending message to neighbour", mess)
encoder := json.NewEncoder(conn)
err = encoder.Encode(mess)
if err != nil {
fmt.Println("Error while encoding the transaction", err)
os.Exit(1)
}
conn.Close()
fmt.Println("MessageBody successfully sent to neighbour")
}
func main() {
if len(os.Args) <= 2 {
fmt.Println("First argument should be the path of the config file '--path=<config.yaml>'")
fmt.Println("Second argument should be either '--server' or '--client'")
os.Exit(1)
}
// INIT CONFIG
args := os.Args[1:]
fmt.Println("Program started with arguments", args)
fmt.Println("config file path is ", args[0][7:])
buf, err := ioutil.ReadFile(args[0][7:])
if err != nil {
fmt.Println("Error while reading the config file", err)
fmt.Println("Exiting...")
os.Exit(1)
}
var c Config
err = yaml.Unmarshal(buf, &c)
if err != nil {
os.Exit(1)
}
fmt.Println("The config is ", c)
fmt.Println("The ID is ", c.ID)
fmt.Println("The port is ", c.Port)
fmt.Println("The neighbours are :")
for neighbour := range c.Neighbours {
fmt.Println("\tThe neighbour is ", c.Neighbours[neighbour].ID)
fmt.Println("\tThe neighbour address is ", c.Neighbours[neighbour].Address)
fmt.Println("\tThe neighbour port is ", c.Neighbours[neighbour].Port)
fmt.Println()
}
fmt.Println()
// INIT SERVER AND/OR CLIENT LOOP
isStimulatedByClient := false
if len(os.Args) == 4 {
if os.Args[3] == "--root" {
isStimulatedByClient = true
}
}
if os.Args[2] == "--server" {
fmt.Println("Starting server")
go serverLoop(c, isStimulatedByClient)
userInputLoop(c, true)
}
if os.Args[2] == "--client" {
fmt.Println("Starting client")
go serverLoop(c, isStimulatedByClient)
userInputLoop(c, false)
}
}
# Distributed Systems
## Lab 1 - Deliverable 1.3
### Developing and deploying a broadcast algorithm for a blockchain management system on a Cloud infrastructure
- Student : Xavier Perret
- Email : xavier.perret@etu.hesge.ch
- Group : 1
- <ssh://git@ssh.hesge.ch:10572/xavier.perret/perso-distributed-systems.git>
- <https://gitedu.hesge.ch/xavier.perret/perso-distributed-systems.git>
### Client
To launch the client you need to run the following command:
```bash
go client.go --config=client_config.yaml
```
Please note that the `client_config.yaml` can be a `neighbor-x.yaml` file if you want to create a transaction and spread
it to neighbours instead on manually entering an ip.
### Server
#### Root Server
To lauch the server to which you wish to send commands using the client you need to do
```bash
go server.go --config=neighbor-x.yaml --root
```
#### Normal Server
To launch the other server you need to run the following command:
```bash
go server.go --config=neighbor-x.yaml
```
### Functionalities
- Able to create a transaction from the client and then request to root server to broadcast by wave to all the other servers
- Able to send a rate request from client to root server then broadcast by wave to all the other servers
- Able to fake a transaction on a node by using its command line
- Able to print all local transaction from any server or client on a node by using its command line
### Object Storage Module
#### Requirements
- Go 1.14
- Azure CLI
- Azure Storage Account configured (like <https://learn.microsoft.com/en-us/azure/storage/blobs/storage-quickstart-blobs-go>)
- For testing you need to change the "hepiadistributedsystems" by the storage account name
```go
azureCreds := InitAzureCredentials("hepiadistributedsystems")
```
- Should not be necessary _Azure Storage Blob Sdk (The right version is already in `go.mod` dependencies)_
##### Project Description
- `test.go` : Contains the main function to launch the tests
- `object-storage/object-storage.go` : Contains the functions to interact with the object storage and the test function
- `types/datastructures.go` : Contains the datastructure useful for this lab (transactions, object storage, ...)
##### Methods Description
```go
func addTransactionToBlobStorage(transaction Transaction, database Database, os Blob) Database
```
Add transaction to given database and upload it to the blob storage. The database is then returned.
```go
func fakeTransaction(transactionToFake Transaction, fakeTransaction Transaction, database Database) Database
```
Find the given transaction to replace and replace it with the fake transaction. The database is then returned.
```go
func readDatabaseFromBlobStorage(blob Blob) Database
```
Read (Download) the database from the blob storage and return it.
```go
func readGivenBlobFromContainer(blob Blob, data any) any
```
Read given blob that is of type data from the container and return it.
```go
func writeDatabaseToBlobStorage(database Database, blob Blob)
```
Write given database to given blob/file.
```go
func writeGivenDataToBlob(blob Blob, data any)
```
Write given data to given blob/file.
```go
func InitAzureCredentials(storageAccountName string) AzureCredentials
```
Initialize the Azure credentials with the given storage account name.
```go
func InitializeBlobFromObjectStorageCreds(blobName string, azureCreds AzureCredentials) Blob
```
Initialize a container and then a blob/file (for upload/download) with the given blobName and returns it.
```go
func InitializeContainer(containerName string, azureCreds AzureCredentials) azblob.ContainerClient
```
Initialize a container with the given containerName and returns it (to create/delete/manage blob in it).
```go
func InitializeBlob(blobName string, azureCreds AzureCredentials, containerName string, containerClient azblob.ContainerClient) azblob.BlockBlobClient
```
Initialize a blob/file (for upload/download) with the given blobName and returns an object to interact with it (write/read data).
```go
func ListBlobs(blob Blob)
```
List blobs in container
```go
func PrintingDatabaseToConsole(database Database)
```
Print the database to the console.
```go
func TestObjectStorage()
```
Provide tests for the object storage.
package main
import (
"fmt"
"gopkg.in/yaml.v3"
command_line "node/command-line"
. "node/types"
"node/utilities"
"os"
)
/*
* @file
* @brief Distributed Systems - Blockchain
* @author Xavier Perret
* @date 28/09/2022 - 05/10/2022
*/
func main() {
argsLen := len(os.Args)
isThereEnoughArgs := argsLen <= 1
if isThereEnoughArgs {
fmt.Println("First argument should be the path of the config file '--path=<config.yaml>'")
os.Exit(1)
}
// init configuration
args := os.Args[1:]
fmt.Println("Program started with arguments", args)
fmt.Println("config file path is ", args[0][7:])
buf, err := os.ReadFile(args[0][7:])
if err != nil {
fmt.Println("Error while reading the config file", err)
fmt.Println("Exiting...")
os.Exit(1)
}
var clientConfig Config
err = yaml.Unmarshal(buf, &clientConfig)
if err != nil {
os.Exit(1)
}
utilities.PrintConfig(clientConfig)
command_line.ClientUserInputLoop(clientConfig, false)
}
id: -1
address: "127.0.0.1"
port: 4000
neighbours:
neighbors:
- id: 0
address: "127.0.0.1"
port: 5001
......
package command_line
import (
"fmt"
ObjectStorageAPI "node/object-storage"
Sender "node/sender"
. "node/types"
"node/utilities"
"os"
"strconv"
)
// userCreatedTransaction is a function to interactively create a transaction using the terminal
func userCreatedTransaction(config Config) Transaction {
fmt.Println("--------------------STARTING USER CREATED TRANSACTION--------------------")
var trans Transaction
trans.Id = utilities.RandomString()
fmt.Println()
var receiver string
fmt.Print("\nPlease enter the receiver")
_, err := fmt.Scanln(&receiver)
if err != nil {
return Transaction{}
}
trans.Receiver = receiver
var sender string
fmt.Print("\nPlease enter the sender")
_, err = fmt.Scanln(&sender)
if err != nil {
return Transaction{}
}
trans.Sender = sender
fmt.Println("Creating a transaction with sender id:", trans.Sender)
fmt.Println("transaction id is ", trans.Id)
fmt.Print("\nEnter amount :")
_, err = fmt.Scanln(&trans.Amount)
if err != nil {
fmt.Println("error")
os.Exit(1)
}
for neighbour := range config.Neighbours {
fmt.Println("Neighbour ", config.Neighbours[neighbour].ID)
fmt.Println("address ", config.Neighbours[neighbour].Address)
fmt.Println("port", config.Neighbours[neighbour].Port)
}
fmt.Println("--------------------FINISHED USER CREATED TRANSACTION--------------------")
return trans
}
func ServerUserInputLoop(config Config, isAlsoServer bool, objectStorage Blob) {
var database Database
for true {
var operation string
fmt.Println()
fmt.Println()
fmt.Println("Please enter the operation you want to do")
fmt.Println("1. Fabricate a fake transaction")
fmt.Println("2. Print all transactions")
fmt.Println("3. Exit")
fmt.Print("Your choice: ")
_, err := fmt.Scanln(&operation)
if err != nil {
fmt.Println("error :", err.Error())
os.Exit(1)
}
switch operation {
case "1":
fmt.Println("You chose to fabricate a fake transaction")
fmt.Println("by giving us the id or index of the transaction you wish to modify (=fake)")
fmt.Println("we will first look for a transaction with the given id then try the index")
database = ObjectStorageAPI.ReadDatabaseFromBlobStorage(objectStorage)
utilities.PrintingDatabaseToConsole(database)
fmt.Print("\nPlease enter the index or id of the transaction you want to overwrite by faking:")
database = ObjectStorageAPI.ReadDatabaseFromBlobStorage(objectStorage)
var transID string
_, err := fmt.Scanln(&transID)
if err != nil {
fmt.Println("error :", err.Error())
os.Exit(1)
}
trans := Transaction{
Id: transID,
Sender: "",
Receiver: "",
Amount: "",
}
trans = ObjectStorageAPI.GetTransactionInDatabaseById(objectStorage, trans)
if trans.Id == "" {
transIDInt, err := strconv.ParseInt(transID, 10, 64)
if err != nil {
fmt.Println("error :", err.Error())
os.Exit(1)
}
trans = database[transIDInt]
}
tmpFakeTrans := userCreatedTransaction(config)
database = ObjectStorageAPI.FakeTransaction(trans, tmpFakeTrans, database)
ObjectStorageAPI.WriteDatabaseToBlobStorage(database, objectStorage)
database = ObjectStorageAPI.ReadDatabaseFromBlobStorage(objectStorage)
fmt.Println("Database after faking:")
utilities.PrintingDatabaseToConsole(database)
case "2":
fmt.Println("You chose to print all transactions")
database = ObjectStorageAPI.ReadDatabaseFromBlobStorage(objectStorage)
utilities.PrintingDatabaseToConsole(database)
break
case "3":
fmt.Println("You chose to exit")
return
default:
fmt.Println("You chose an invalid option")
break
}
}
}
func ClientUserInputLoop(clientConfig Config, isAlsoServer bool) {
for true {
var operation string
fmt.Println()
fmt.Println()
fmt.Println("Please enter the operation you want to do")
fmt.Println("1. Create a transaction and ask node to spread it")
fmt.Println("2. Ask node to rate a transaction")
fmt.Println("3. Ask node to fake a transaction")
fmt.Println("4. Ask node to list all transactions")
fmt.Println("5. Exit")
fmt.Print("Your choice: ")
_, err := fmt.Scanln(&operation)
if err != nil {
fmt.Println("error :", err.Error())
os.Exit(1)
}
switch operation {
case "1":
fmt.Println("You chose to create a transaction")
if !isAlsoServer {
newTrans := userCreatedTransaction(clientConfig)
utilities.PrintNeighbors(clientConfig.Neighbours)
fmt.Println("TRANSACTION READY TO BE SENT")
fmt.Println("Please enter the ID of the neighbour you want to send the transaction to")
var neighbourID string
_, err := fmt.Scanln(&neighbourID)
if err != nil {
fmt.Println("error :", err.Error())
os.Exit(1)
}
neighbourIDInt, err := strconv.ParseInt(neighbourID, 10, 64)
if err != nil {
fmt.Println("error :", err.Error())
os.Exit(1)
}
Sender.SendTransactionToNeighbour(clientConfig, newTrans, clientConfig.Neighbours[neighbourIDInt].Address, strconv.Itoa(clientConfig.Neighbours[neighbourIDInt].Port))
}
break
case "2":
fmt.Println("You chose to rate a transaction")
fmt.Println("We will ask you for an id of a transaction to rate")
fmt.Println("The given node will send a request to all its neighbours to rate the transaction")
fmt.Println("The node will then print the result of the rating")
fmt.Print("\nPlease enter the id of the transaction you want to rate:")
var transID string
_, err := fmt.Scanln(&transID)
if err != nil {
fmt.Println("error :", err.Error())
os.Exit(1)
}
transIDInt, err := strconv.ParseInt(transID, 10, 64)
if err != nil {
fmt.Println("error :", err.Error())
os.Exit(1)
}
utilities.PrintNeighbors(clientConfig.Neighbours)
fmt.Println("Please enter the ID of the neighbour you want to send the rate request transaction to")
var neighbourID string
_, err = fmt.Scanln(&neighbourID)
if err != nil {
fmt.Println("error :", err.Error())
os.Exit(1)
}
neighbourIDInt, err := strconv.ParseInt(neighbourID, 10, 64)
if err != nil {
fmt.Println("error :", err.Error())
os.Exit(1)
}
address := clientConfig.Neighbours[neighbourIDInt].Address + ":" + strconv.Itoa(clientConfig.Neighbours[neighbourIDInt].Port)
fmt.Println("Sending rate demand to ", address)
Sender.SendVoteRequestToNode(clientConfig, transIDInt, address)
break
case "3":
fmt.Println("You chose to fake a transaction")
fmt.Println("We will ask you for the id of a transaction to fake")
fmt.Println("and the node you want to ask to fake it")
fmt.Println("The node will receive the request and change the transaction with a fake one")
fmt.Print("\nPlease enter the id of the transaction you want to fake:")
var transID string
_, err := fmt.Scanln(&transID)
if err != nil {
fmt.Println("error :", err.Error())
os.Exit(1)
}
transIDInt, err := strconv.ParseInt(transID, 10, 64)
if err != nil {
fmt.Println("error :", err.Error())
os.Exit(1)
}
utilities.PrintNeighbors(clientConfig.Neighbours)
fmt.Println("Please enter the ID of the neighbour you want to send the rate request transaction to")
var neighbourID string
_, err = fmt.Scanln(&neighbourID)
if err != nil {
fmt.Println("error :", err.Error())
os.Exit(1)
}
neighbourIDInt, err := strconv.ParseInt(neighbourID, 10, 64)
if err != nil {
fmt.Println("error :", err.Error())
os.Exit(1)
}
address := clientConfig.Neighbours[neighbourIDInt].Address + ":" + strconv.Itoa(clientConfig.Neighbours[neighbourIDInt].Port)
fmt.Println("Sending rate demand to ", address)
Sender.SendFakeRequestToNode(clientConfig, transIDInt, address)
break
case "4":
fmt.Println("You chose to ask for all transactions of a given node")
utilities.PrintNeighbors(clientConfig.Neighbours)
fmt.Println("We will ask you for the ID of the neighbour you want to ask for the transactions")
fmt.Println("The node will then print locally its transactions")
fmt.Println("Please enter the ID of the neighbour you want to send the print request transaction to")
var neighbourID string
_, err = fmt.Scanln(&neighbourID)
if err != nil {
fmt.Println("error :", err.Error())
os.Exit(1)
}
neighbourIDInt, err := strconv.ParseInt(neighbourID, 10, 64)
if err != nil {
fmt.Println("error :", err.Error())
os.Exit(1)
}
address := clientConfig.Neighbours[neighbourIDInt].Address + ":" + strconv.Itoa(clientConfig.Neighbours[neighbourIDInt].Port)
fmt.Println("Sending rate demand to ", address)
Sender.SendListTransactionsRequestToNode(clientConfig, address)
break
case "5":
fmt.Println("You chose to exit")
return
default:
fmt.Println("You chose an invalid option")
break
}
}
}
module node
go 1.19
require (
github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.12.0
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.2.0
gopkg.in/yaml.v3 v3.0.1
)
require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.20.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.1 // indirect
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 // indirect
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 // indirect
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d // indirect
golang.org/x/text v0.3.7 // indirect
)
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.20.0 h1:KQgdWmEOmaJKxaUUZwHAYh12t+b+ZJf8q3friycK1kA=
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.20.0/go.mod h1:ZPW/Z0kLCTdDZaDbYTetxc9Cxl/2lNqxYHYNOF2bti0=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.12.0 h1:VBvHGLJbaY0+c66NZHdS9cgjHVYSH6DDa0XJMyrblsI=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.12.0/go.mod h1:GJzjM4SR9T0KyX5gKCVyz1ytD8FeWeUPCwtFCt1AyfE=
github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.1 h1:BUYIbDf/mMZ8945v3QkG3OuqGVyS4Iek0AOLwdRAYoc=
github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.1/go.mod h1:KLF4gFr6DcKFZwSuH8w8yEK6DpFl3LP5rhdvAb7Yz5I=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.2.0 h1:62Ew5xXg5UCGIXDOM7+y4IL5/6mQJq1nenhBCJAeGX8=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.2.0/go.mod h1:eHWhQKXc1Gv1DvWH//UzgWjWFEo0Pp4pH2vBzjBw8Fc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dnaeon/go-vcr v1.1.0/go.mod h1:M7tiix8f0r6mKKJ3Yq/kqU1OYf3MnfmBWVbPx/yU9ko=
github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI=
github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/modocache/gover v0.0.0-20171022184752-b58185e213c5/go.mod h1:caMODM3PzxT8aQXRPkAt8xlV/e7d7w8GM5g0fa5F0D8=
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 h1:49lOXmGaUpV9Fz3gd7TFZY106KVlPVa5jcYD1gaQf98=
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 h1:pLI5jrR7OSLijeIDcmRxNmw2api+jEfxLoykJVice/E=
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20201010224723-4f7140c49acb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d h1:20cMwl2fHAzkJMEA+8J4JgqBQcQGzbisXo31MIeenXI=
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
package manage_connection
import (
"fmt"
"net"
)
func CreateConnection(destinationAddress string) net.Conn {
fmt.Println("Trying to connect to ", destinationAddress)
// connect to ip with tcp and add the ip of the sender to the connection
conn, err := net.Dial("tcp", destinationAddress)
if err != nil {
fmt.Println("Error while connecting to the neighbour", err)
}
return conn
}
func CreateConnectionWithSpecifiedLocalAddress(senderAddress string, destinationAddress string) net.Conn {
dialer := net.Dialer{LocalAddr: &net.TCPAddr{IP: net.ParseIP(senderAddress)}}
conn, err := dialer.Dial("tcp", destinationAddress)
if err != nil {
fmt.Println("Error while connecting to the neighbour", err)
}
return conn
}
\ No newline at end of file
id: 0
address: "127.0.0.1"
port: 5001
neighbours:
neighbors:
- id: 2
address: "127.0.0.3"
port: 5003
......
id: 1
address: "127.0.0.2"
port: 5002
neighbours:
neighbors:
- id: 2
address: "127.0.0.3"
port: 5003
\ No newline at end of file
id: 2
address: "127.0.0.3"
port: 5003
neighbours:
neighbors:
- id: 0
address: "127.0.0.1"
port: 5001
......
id: 3
address: "127.0.0.4"
port: 5004
neighbours:
neighbors:
- id: 0
address: "127.0.0.1"
port: 5001
\ No newline at end of file
id: 4
address: "127.0.0.5"
port: 5005
neighbours:
neighbors:
- id: 2
address: "127.0.0.3"
port: 5003
\ No newline at end of file
id: 5
address: "127.0.0.6"
port: 5006
neighbours:
neighbors:
- id: 2
address: "127.0.0.3"
port: 5003
\ No newline at end of file
package object_storage
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"log"
"math/rand"
. "node/types"
. "node/utilities"
"os"
"strconv"
"time"
)
// AddTransactionToBlobStorage Add transaction to blob storage if it isn't in the database already and returns the database
func AddTransactionToBlobStorage(transaction Transaction, database Database, os Blob) Database {
for _, transactionInDatabase := range database {
if transactionInDatabase.Id == transaction.Id {
return database
}
}
database = append(database, transaction)
WriteDatabaseToBlobStorage(database, os)
return database
}
func FakeTransaction(transactionToFake Transaction, fakeTransaction Transaction, database Database) Database {
for i, trans := range database {
if trans.Id == transactionToFake.Id {
fakeTransaction.Id = transactionToFake.Id
database[i] = fakeTransaction
break
}
}
return database
}
func ReadDatabaseFromBlobStorage(blob Blob) Database {
ctx := context.Background()
var database Database
// Download the blob
get, err := blob.BlockBlobClient.Download(ctx, nil)
if err != nil {
log.Println(err)
return database
}
downloadedData := &bytes.Buffer{}
reader := get.Body(azblob.RetryReaderOptions{})
_, err = downloadedData.ReadFrom(reader)
if err != nil {
log.Println(err)
return database
}
err = reader.Close()
if err != nil {
log.Println(err)
return database
}
err = json.Unmarshal(downloadedData.Bytes(), &database)
if err != nil {
log.Println(err)
return database
}
fmt.Println(downloadedData.String())
return database
}
func readGivenBlobFromContainer(blob Blob, data any) any {
// Download the blob
ctx := context.Background()
get, err := blob.BlockBlobClient.Download(ctx, nil)
if err != nil {
log.Println(err)
}
downloadedData := &bytes.Buffer{}
reader := get.Body(azblob.RetryReaderOptions{})
_, err = downloadedData.ReadFrom(reader)
if err != nil {
log.Println(err)
}
err = reader.Close()
if err != nil {
log.Println(err)
}
err = json.Unmarshal(downloadedData.Bytes(), &data)
if err != nil {
log.Println(err)
}
fmt.Println(downloadedData.String())
return data
}
func WriteDatabaseToBlobStorage(database Database, blob Blob) {
ctx := context.Background()
data, err := json.Marshal(database)
// Upload to data to blob storage
_, err = blob.BlockBlobClient.UploadBufferToBlockBlob(ctx, data, azblob.HighLevelUploadToBlockBlobOption{})
if err != nil {
log.Printf("Failure to upload to blob: %+v", err)
}
}
func writeGivenDataToBlob(blob Blob, data any) {
ctx := context.Background()
dataBytes, err := json.Marshal(data)
// Upload to data to blob storage
_, err = blob.BlockBlobClient.UploadBufferToBlockBlob(ctx, dataBytes, azblob.HighLevelUploadToBlockBlobOption{})
if err != nil {
log.Printf("Failure to upload to blob: %+v", err)
}
}
func randomString() string {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
return strconv.Itoa(r.Int())
}
func InitAzureCredentials(storageAccountName string) AzureCredentials {
fmt.Printf("Azure Blob storage initialization\n")
//replace <StorageAccountName> with your Azure storage account name
// hepiadistributedsystems
url := "https://" + storageAccountName + ".blob.core.windows.net/"
// Create a default request pipeline using your storage account name and account key.
credential, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
log.Println("Invalid credentials with error: " + err.Error())
}
serviceClient, err := azblob.NewServiceClient(url, credential, nil)
if err != nil {
log.Println("Invalid credentials with error: " + err.Error())
}
newAzureCredentials := AzureCredentials{
Url: url,
Credential: credential,
ServiceClient: serviceClient,
}
return newAzureCredentials
}
func InitializeContainer(containerName string, azureCreds AzureCredentials) azblob.ContainerClient {
ctx := context.Background()
containerClient := azureCreds.ServiceClient.NewContainerClient(containerName)
_, err := containerClient.Create(ctx, nil)
if err != nil {
log.Fatal(err)
}
return containerClient
}
func InitializeBlob(blobName string, azureCreds AzureCredentials, containerName string, containerClient azblob.ContainerClient) azblob.BlockBlobClient {
blobClient, err := azblob.NewBlockBlobClient(azureCreds.Url+containerName+"/"+blobName, azureCreds.Credential, nil)
if err != nil {
log.Fatal(err)
}
return blobClient
}
func InitializeBlobFromObjectStorageCreds(blobName string, azureCreds AzureCredentials) Blob {
ctx := context.Background()
// Create the container
containerName := fmt.Sprintf("bc-container-%s", randomString())
fmt.Printf("Creating a container named %s\n", containerName)
containerClient := azureCreds.ServiceClient.NewContainerClient(containerName)
_, err := containerClient.Create(ctx, nil)
if err != nil {
log.Fatal(err)
}
blobClient, err := azblob.NewBlockBlobClient(azureCreds.Url+containerName+"/"+blobName, azureCreds.Credential, nil)
if err != nil {
log.Fatal(err)
}
return Blob{
BlobName: blobName,
ContainerName: containerName,
ContainerClient: containerClient,
BlockBlobClient: blobClient,
}
}
func ListBlobs(blob Blob) {
// List the blobs in the container
fmt.Println("Listing the blobs in the container:")
ctx := context.Background()
pager := blob.ContainerClient.ListBlobsFlat(nil)
for pager.NextPage(ctx) {
resp := pager.PageResponse()
for _, v := range resp.ContainerListBlobFlatSegmentResult.Segment.BlobItems {
fmt.Println(*v.Name)
}
}
if err := pager.Err(); err != nil {
log.Printf("Failure to list blobs: %+v", err)
}
}
func TestObjectStorage() {
ctx := context.Background()
// url := "https://hepiadistributedsystems.blob.core.windows.net/" //replace <StorageAccountName> with your Azure storage account name
azureCreds := InitAzureCredentials("hepiadistributedsystems")
blobName := "database" + "-" + randomString()
objectStorage := InitializeBlobFromObjectStorageCreds(blobName, azureCreds)
var db Database
t1 := Transaction{
Id: "1",
Sender: "0",
Receiver: "1",
Amount: "1000",
}
t2 := Transaction{
Id: "2",
Sender: "0",
Receiver: "1",
Amount: "1000",
}
db = AddTransactionToBlobStorage(t1, db, objectStorage)
db = AddTransactionToBlobStorage(t2, db, objectStorage)
fmt.Println("Listing files in container")
db1 := ReadDatabaseFromBlobStorage(objectStorage)
PrintingDatabaseToConsole(db1)
fmt.Println("Press enter to continue")
bufio.NewReader(os.Stdin).ReadBytes('\n')
fmt.Println("Going to fake a transaction (replacing the first with a new one)")
t3 := Transaction{
Id: "0",
Sender: "0",
Receiver: "1",
Amount: "1000000",
}
db = FakeTransaction(t1, t3, db)
WriteDatabaseToBlobStorage(db, objectStorage)
fmt.Println("Listing files in container")
db2 := ReadDatabaseFromBlobStorage(objectStorage)
PrintingDatabaseToConsole(db2)
fmt.Println("Press enter to continue")
bufio.NewReader(os.Stdin).ReadBytes('\n')
fmt.Println("Press enter key to delete the blob fils, example container, and exit the application.")
bufio.NewReader(os.Stdin).ReadBytes('\n')
fmt.Println("Cleaning up.")
// Delete the blob
fmt.Println("Deleting the blob " + blobName)
_, err := objectStorage.BlockBlobClient.Delete(ctx, nil)
if err != nil {
log.Fatalf("Failure: %+v", err)
}
// Delete the container
fmt.Println("Deleting the blob " + objectStorage.BlobName)
_, err = objectStorage.ContainerClient.Delete(ctx, nil)
if err != nil {
log.Fatalf("Failure: %+v", err)
}
}
func GetTransactionInDatabaseById(objectStorage Blob, trans Transaction) Transaction {
db := ReadDatabaseFromBlobStorage(objectStorage)
for _, t := range db {
if t.Id == trans.Id {
return trans
}
}
return Transaction{
Id: "",
Sender: "",
Receiver: "",
Amount: "",
}
}
func IsTransactionInDatabase(objectStorage Blob, trans Transaction) bool {
return GetTransactionInDatabaseById(objectStorage, trans).Id == trans.Id
}
package ProcessConnection
import (
"encoding/json"
"fmt"
"net"
ObjectStorageAPI "node/object-storage"
Sender "node/sender"
. "node/types"
"node/utilities"
"os"
"strconv"
"strings"
"sync"
)
func listAllTransactionsToClient(conn net.Conn, objectStorage Blob) {
database := ObjectStorageAPI.ReadDatabaseFromBlobStorage(objectStorage)
mess := Message{
MessageType: "list",
MessageBody: database,
}
fmt.Println("Sending message to neighbour", mess)
encoder := json.NewEncoder(conn)
err := encoder.Encode(mess)
if err != nil {
fmt.Println("Error while encoding the transaction", err)
os.Exit(1)
}
err = conn.Close()
if err != nil {
fmt.Println("Error closing connection", err)
}
}
func processTransaction(serverListener net.Listener, serverConfig Config, objectStorage Blob, mess Message, amIRoot bool) {
// Convert mess to transaction
fmt.Println("Processing transaction")
trans := utilities.TranslateMessageToTransaction(mess)
utilities.PrintTransaction(trans)
database := ObjectStorageAPI.ReadDatabaseFromBlobStorage(objectStorage)
fmt.Println("The database before adding the transaction is ")
utilities.PrintingDatabaseToConsole(database)
reach := false
count := 0
limit := len(serverConfig.Neighbours)
if amIRoot {
reach = true
database = ObjectStorageAPI.AddTransactionToBlobStorage(trans, database, objectStorage)
fmt.Println("The database after adding the transaction is ")
utilities.PrintingDatabaseToConsole(database)
go Sender.SendTransactionToAllNeighbours(serverConfig, trans)
}
for count < limit {
if count != 0 || reach {
fmt.Println("Count is ", count, " to ", limit)
connection, err := serverListener.Accept()
if err != nil {
fmt.Println("Error accepting: ", err.Error())
err = nil
continue
}
utilities.PrintConnection(connection, count)
err = json.NewDecoder(connection).Decode(&trans)
if err != nil {
fmt.Println("Error while decoding the transaction", err)
err = nil
continue
}
fmt.Println("Received back a transaction")
utilities.PrintTransaction(trans)
}
count++ // received message
if !reach {
reach = true
database = ObjectStorageAPI.ReadDatabaseFromBlobStorage(objectStorage)
database = ObjectStorageAPI.AddTransactionToBlobStorage(trans, database, objectStorage)
go Sender.SendTransactionToAllNeighbours(serverConfig, trans)
}
if !amIRoot && limit == 1 { // for end nodes
break
}
}
utilities.PrintTransactionDoneMessage()
}
func processRate(conn net.Conn, serverListener net.Listener, serverConfig Config, objectStorage Blob, mess Message, amIRoot bool) {
trans := utilities.TranslateMessageToTransaction(mess)
utilities.PrintTransaction(trans)
address := strings.Split(conn.RemoteAddr().String(), ":")[0]
vote(serverListener, serverConfig, trans, address, objectStorage, amIRoot)
}
// A node sends a request to all nodes to check if the transaction trans, stored locally,
// is the same as the ones stored in the other nodes. The result (rate) is a percentage representing the
// transactions which are the same as the one stored locally. This function relies on the broadcast by wave
// with ACK distributed algorithm. However, some adjustments are needed to implement this function.
func vote(server net.Listener, serverConfig Config, trans Transaction, parentAddress string, objectStorage Blob, amIRoot bool) AckTransaction {
ack := AckTransaction{
Id: "",
TotalNodes: 0,
AmountOfCorrectNode: 0,
}
var transactionToRate Transaction
database := ObjectStorageAPI.ReadDatabaseFromBlobStorage(objectStorage)
for _, transactionInDatabase := range database {
if transactionInDatabase.Id == trans.Id {
ack.Id = trans.Id
transactionToRate = transactionInDatabase
}
}
if ack.Id == "" {
fmt.Println("Error retrieving the transaction")
return ack
}
ack = utilities.CompareTransactions(transactionToRate, trans, ack)
var newAck AckTransaction
reach := false
count := 0
limit := len(serverConfig.Neighbours)
if amIRoot {
reach = true
go Sender.SendVoteToAllNeighbours(serverConfig, trans, parentAddress)
}
var mess Message
for count < limit {
if count != 0 || reach {
fmt.Println("Count is ", count, " to ", limit)
connection, err := server.Accept()
if err != nil {
fmt.Println("Error accepting: ", err.Error())
err = nil
continue
}
utilities.PrintConnection(connection, count)
err = json.NewDecoder(connection).Decode(&mess)
if err != nil {
fmt.Println("Error while decoding the transactionInDatabase", err)
err = nil
continue
}
if mess.MessageType == "AckResponse" {
newAck = utilities.TranslateMessageToAckTransaction(mess)
if ack.Id == newAck.Id {
ack.TotalNodes += newAck.TotalNodes
ack.AmountOfCorrectNode += newAck.AmountOfCorrectNode
}
fmt.Println("Node correct : ", ack.AmountOfCorrectNode, "out of ", ack.TotalNodes)
}
}
count++ // received message
if !reach {
reach = true
go Sender.SendVoteToAllNeighbours(serverConfig, trans, parentAddress)
}
}
fmt.Println("***********************************")
fmt.Println("Transaction has been rated")
fmt.Println("Node correct : ", ack.AmountOfCorrectNode, "out of ", ack.TotalNodes)
fmt.Println("***********************************")
if !amIRoot {
address := parentAddress
for _, neighbour := range serverConfig.Neighbours {
fmt.Println("Comparing ", neighbour.Address, " with ", parentAddress)
if neighbour.Address == parentAddress {
address = address + ":" + strconv.Itoa(neighbour.Port)
fmt.Println("Calculated address is ", address)
}
}
fmt.Println("Sending to ", address)
go Sender.SendAckToNeighbour(serverConfig, ack, address)
}
return ack
}
func processFakeRequest(conn net.Conn, serverListener net.Listener, serverConfig Config, objectStorage Blob, mess Message, amIRoot bool) {
trans := utilities.TranslateMessageToTransaction(mess)
utilities.PrintTransaction(trans)
fakeTrans := Transaction{
Id: trans.Id,
Sender: utilities.RandomString(),
Receiver: utilities.RandomString(),
Amount: utilities.RandomString(),
}
database := ObjectStorageAPI.ReadDatabaseFromBlobStorage(objectStorage)
database = ObjectStorageAPI.FakeTransaction(trans, fakeTrans, database)
ObjectStorageAPI.WriteDatabaseToBlobStorage(database, objectStorage)
}
func processVoteRequest(conn net.Conn, serverListener net.Listener, serverConfig Config, objectStorage Blob, mess Message, amIRoot bool) {
trans := utilities.TranslateMessageToTransaction(mess)
var transToRate Transaction
utilities.PrintTransaction(trans)
database := ObjectStorageAPI.ReadDatabaseFromBlobStorage(objectStorage)
for _, transactionInDatabase := range database {
if trans.Id == transactionInDatabase.Id {
transToRate = transactionInDatabase
}
}
ack := vote(serverListener, serverConfig, transToRate, "", objectStorage, amIRoot)
utilities.PrintAckTransaction(ack)
}
func ProcessClient(conn net.Conn, server net.Listener, objectStorage Blob, serverConfig Config, amIRoot bool, mutex *sync.Mutex) {
// mutex.Lock()
// defer mutex.Unlock()
utilities.PrintConnection(conn, 0)
var mess Message
jsonDecoder := json.NewDecoder(conn)
err := jsonDecoder.Decode(&mess)
if err != nil || mess.MessageType == "error" {
fmt.Println("Error while decoding the message", err)
return
}
fmt.Println("The message is ", mess)
if mess.MessageType == "transaction" {
fmt.Println("Received a transaction.. processing")
trans := utilities.TranslateMessageToTransaction(mess)
isTransactionInDatabase := ObjectStorageAPI.IsTransactionInDatabase(objectStorage, trans)
if isTransactionInDatabase {
conn.Close()
return // transaction already in database
}
processTransaction(server, serverConfig, objectStorage, mess, amIRoot)
} else if mess.MessageType == "rate" {
fmt.Println("Received a rate")
processRate(conn, server, serverConfig, objectStorage, mess, amIRoot)
} else if mess.MessageType == "list" {
fmt.Println("Received an order to list all transactions")
listAllTransactionsToClient(conn, objectStorage)
} else if mess.MessageType == "fakeRequest" {
fmt.Println("Received a request to fake transaction")
processFakeRequest(conn, server, serverConfig, objectStorage, mess, amIRoot)
} else if mess.MessageType == "voteRequest" {
fmt.Println("Received a request to vote on a given transaction")
processVoteRequest(conn, server, serverConfig, objectStorage, mess, true)
} else if mess.MessageType == "listTransactionsRequest" {
fmt.Println("Received a request to list all transactions")
database := ObjectStorageAPI.ReadDatabaseFromBlobStorage(objectStorage)
utilities.PrintingDatabaseToConsole(database)
} else {
fmt.Println("Unknown message type")
}
}