Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • master
1 result

Target

Select target project
  • xavier.perret/perso-distributed-systems
1 result
Select Git revision
  • master
1 result
Show changes
package sender
import (
"encoding/json"
"fmt"
manage_connection "node/manage-connection"
. "node/types"
"os"
"strconv"
)
func SendVoteToNeighbour(config Config, trans Transaction, address string) {
fmt.Println()
fmt.Println("Trying to connect to ", address)
conn := manage_connection.CreateConnectionWithSpecifiedLocalAddress(config.Address, address)
mess := Message{
MessageType: "rate",
MessageBody: trans,
}
fmt.Println("Sending message to neighbour", mess)
encoder := json.NewEncoder(conn)
err := encoder.Encode(mess)
if err != nil {
fmt.Println("Error while encoding the transaction", err)
os.Exit(1)
}
err = conn.Close()
if err != nil {
fmt.Println("Error while closing the process-manage-connection", err)
}
fmt.Println("MessageBody successfully sent to neighbour")
}
func SendAckToNeighbour(config Config, ack AckTransaction, address string) {
fmt.Println("*******************STARTING TO SEND ACK*******************")
fmt.Println("Trying to connect to ", address)
var ackToSend AckTransaction
ackToSend.Id = ack.Id
ackToSend.AmountOfCorrectNode = ack.AmountOfCorrectNode
ackToSend.TotalNodes = ack.TotalNodes
conn := manage_connection.CreateConnectionWithSpecifiedLocalAddress(config.Address, address)
mess := Message{
MessageType: "AckResponse",
MessageBody: ackToSend,
}
fmt.Println("Sending message to neighbour", mess)
encoder := json.NewEncoder(conn)
err := encoder.Encode(mess)
if err != nil {
fmt.Println("Error while encoding the transaction", err)
os.Exit(1)
}
conn.Close()
fmt.Println("MessageBody successfully sent to neighbour")
fmt.Println("*******************FINISHED ACK*******************")
}
func SendVoteToAllNeighbours(config Config, trans Transaction, parentIp string) {
for neighbour := range config.Neighbours {
ip := config.Neighbours[neighbour].Address
port := strconv.Itoa(config.Neighbours[neighbour].Port)
address := ip + ":" + port
if parentIp != ip {
go SendVoteToNeighbour(config, trans, address)
}
}
}
// SendTransactionToAllNeighbours is a function to send a transaction to all neighbours
func SendTransactionToAllNeighbours(config Config, trans Transaction) {
for neighbour := range config.Neighbours {
fmt.Println("Sending transaction to neighbour ", config.Neighbours[neighbour].ID)
ip := config.Neighbours[neighbour].Address
port := strconv.Itoa(config.Neighbours[neighbour].Port)
SendTransactionToNeighbour(config, trans, ip, port)
}
}
// SendTransactionToNeighbour is a function to send a transaction to a neighbour, used both by server/client
func SendTransactionToNeighbour(config Config, trans Transaction, destinationIp string, destinationPort string) {
fmt.Println()
ipAddr := destinationIp + ":" + destinationPort
fmt.Println("Trying to connect to ", destinationIp, ":", destinationPort)
conn := manage_connection.CreateConnectionWithSpecifiedLocalAddress(config.Address, ipAddr)
mess := Message{
MessageType: "transaction",
MessageBody: trans,
}
fmt.Println("Sending message to neighbour", mess)
encoder := json.NewEncoder(conn)
err := encoder.Encode(mess)
if err != nil {
fmt.Println("Error while encoding the transaction", err)
os.Exit(1)
}
conn.Close()
fmt.Println("MessageBody successfully sent to neighbour")
}
// SendVoteRequestToNode is a function to send a vote request to a node, used strictly by client
func SendVoteRequestToNode(clientConfig Config, transIDInt int64, address string) {
fmt.Println()
fmt.Println("Trying to connect to ", address)
conn := manage_connection.CreateConnection(address)
trans := Transaction{
Id: strconv.FormatInt(transIDInt, 10),
Sender: "",
Receiver: "",
Amount: "",
}
mess := Message{
MessageType: "voteRequest",
MessageBody: trans,
}
fmt.Println("Sending vote request to node", mess)
encoder := json.NewEncoder(conn)
err := encoder.Encode(mess)
if err != nil {
fmt.Println("Error while encoding the transaction", err)
os.Exit(1)
}
conn.Close()
}
// SendFakeRequestToNode is a function to send a fake request to a node, used strictly by client
func SendFakeRequestToNode(clientConfig Config, transIDInt int64, address string) {
fmt.Println()
fmt.Println("Trying to connect to ", address)
conn := manage_connection.CreateConnection(address)
trans := Transaction{
Id: strconv.FormatInt(transIDInt, 10),
Sender: "",
Receiver: "",
Amount: "",
}
mess := Message{
MessageType: "fakeRequest",
MessageBody: trans,
}
fmt.Println("Sending vote request to node", mess)
encoder := json.NewEncoder(conn)
err := encoder.Encode(mess)
if err != nil {
fmt.Println("Error while encoding the transaction", err)
os.Exit(1)
}
conn.Close()
}
// SendListTransactionsRequestToNode is a function to ask a node to locally list its transaction in its cli, used strictly by client
func SendListTransactionsRequestToNode(clientConfig Config, address string) {
fmt.Println()
fmt.Println("Trying to connect to ", address)
conn := manage_connection.CreateConnection(address)
mess := Message{
MessageType: "listTransactionsRequest",
MessageBody: "",
}
fmt.Println("Sending vote request to node", mess)
encoder := json.NewEncoder(conn)
err := encoder.Encode(mess)
if err != nil {
fmt.Println("Error while encoding the transaction", err)
os.Exit(1)
}
conn.Close()
}
package main
import (
"context"
"fmt"
"gopkg.in/yaml.v3"
"log"
"net"
command_line "node/command-line"
ObjectStorage "node/object-storage"
ProcessConnection "node/process-connection"
. "node/types"
"node/utilities"
"os"
"strconv"
"sync"
)
/**
* @file Server.go - Server for the blockchain
* @brief Distributed Systems - Blockchain, each instance of this program is a node in the blockchain
* @author Xavier Perret
* @date 28/10/2022
* @version 1.3
*/
func listenForConnections(serverConfig Config, objectStorage Blob, addressToListenOn string, amIRoot bool) {
port := strconv.FormatInt(int64(serverConfig.Port), 10)
completeAddress := addressToListenOn + ":" + port
server, err := net.Listen("tcp", completeAddress)
if err != nil {
fmt.Println("Error while creating the server :", err)
os.Exit(1)
}
fmt.Println("Server is ready to accept connections")
fmt.Println("listening on ", completeAddress)
var mu sync.Mutex
for {
// Listening for connections
conn, err := server.Accept()
if err != nil {
fmt.Println("Error accepting: ", err.Error())
} else {
ProcessConnection.ProcessClient(conn, server, objectStorage, serverConfig, amIRoot, &mu)
}
}
}
func main() {
ctx := context.Background()
argsLen := len(os.Args)
isThereEnoughArgs := argsLen <= 1
if isThereEnoughArgs {
fmt.Println("First argument should be the path of the config file '--path=<config.yaml>'")
os.Exit(1)
}
// init configuration
args := os.Args[1:]
fmt.Println("Program started with arguments", args)
fmt.Println("config file path is ", args[0][7:])
buf, err := os.ReadFile(args[0][7:])
if err != nil {
fmt.Println("Error while reading the config file", err)
fmt.Println("Exiting...")
os.Exit(1)
}
var serverConfig Config
err = yaml.Unmarshal(buf, &serverConfig)
if err != nil {
os.Exit(1)
}
utilities.PrintConfig(serverConfig)
amIRoot := argsLen == 3
if amIRoot {
if os.Args[3] == "--root" {
amIRoot = true
}
}
if !amIRoot {
fmt.Println("Third argument is not --root meaning this node is not meant to be directly stimulated by the client")
}
azureCreds := ObjectStorage.InitAzureCredentials("hepiadistributedsystems")
blobName := "blob-number-" + strconv.Itoa(serverConfig.ID)
objectStorage := ObjectStorage.InitializeBlobFromObjectStorageCreds(blobName, azureCreds)
addressToListenOn := "0.0.0.0"
go listenForConnections(serverConfig, objectStorage, addressToListenOn, amIRoot)
command_line.ServerUserInputLoop(serverConfig, true, objectStorage)
_, err = objectStorage.BlockBlobClient.Delete(ctx, nil)
if err != nil {
fmt.Println("Error while deleting the blob", err)
fmt.Println("This previous error is normal if the blob was already deleted or wasn't created")
}
// Delete the container
fmt.Println("Deleting the container " + objectStorage.BlobName)
_, err = objectStorage.ContainerClient.Delete(ctx, nil)
if err != nil {
log.Fatalf("Failure: %+v", err)
}
}
package main
import (
"fmt"
. "node/object-storage"
)
func main() {
fmt.Println("Testing file for the application")
TestObjectStorage()
}
package types
import (
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
)
// Message - MessageBody can be of any type but will be use here for Transaction, AckTransaction
type Message struct {
MessageType string `json:"messageType"`
MessageBody interface{} `json:"messageBody"`
}
// Transaction - represents a transaction in the blockchain
type Transaction struct {
Id string `json:"id"`
Sender string `json:"sender"`
Receiver string `json:"receiver"`
Amount string `json:"amount"`
}
// AckTransaction - represents an acknowledgement of a transaction @see vote function
type AckTransaction struct {
Id string `json:"id"`
AmountOfCorrectNode int `json:"amountOfCorrectNode"`
TotalNodes int `json:"totalNodes"`
}
type Database []Transaction
type AzureCredentials struct {
Url string
Credential *azidentity.DefaultAzureCredential
ServiceClient azblob.ServiceClient
}
type Blob struct {
BlobName string
ContainerName string
ContainerClient azblob.ContainerClient
BlockBlobClient azblob.BlockBlobClient
}
// Config - is the configuration of a node, it's id, address, port and neighbours
type Config struct {
ID int `yaml:"id"`
Address string `yaml:"address"`
Port int `yaml:"port"`
Neighbours []Neighbors `yaml:"neighbors"`
}
type Neighbors struct {
ID int `yaml:"id"`
Address string `yaml:"address"`
Port int `yaml:"port"`
}
package utilities
import (
"fmt"
"math/rand"
"net"
. "node/types"
"strconv"
"time"
)
func PrintTransaction(trans Transaction) {
fmt.Println("--------------------")
fmt.Println("Printing transaction")
fmt.Println("The id is ", trans.Id)
fmt.Println("The sender is ", trans.Sender)
fmt.Println("The receiver is ", trans.Receiver)
fmt.Println("The amount is ", trans.Amount)
fmt.Println("--------------------")
}
func ListAllTransactions(db Database) {
for i, transaction := range db {
fmt.Println("***********INDEX N=,", i, "****************")
PrintTransaction(transaction)
fmt.Println("***********************************")
}
}
func PrintNeighbors(neighbors []Neighbors) {
fmt.Println("Printing neighbors")
for i, neighbor := range neighbors {
fmt.Println("***********INDEX N=,", i, "****************")
fmt.Println("The id is ", neighbor.ID)
fmt.Println("The address is ", neighbor.Address)
fmt.Println("The port is ", neighbor.Port)
fmt.Println("***********************************")
}
}
func PrintConfig(config Config) {
fmt.Println("Printing config")
fmt.Println("The id is ", config.ID)
fmt.Println("The address is ", config.Address)
fmt.Println("The port is ", config.Port)
PrintNeighbors(config.Neighbours)
}
func PrintingDatabaseToConsole(database Database) {
for i, trans := range database {
fmt.Println("--------------------")
fmt.Printf("\nListing transactions number %d in database\n", i)
fmt.Printf("Transaction id: %s\n", trans.Id)
fmt.Printf("Transaction sender: %s\n", trans.Sender)
fmt.Printf("Transaction receiver: %s\n", trans.Receiver)
fmt.Printf("Transaction amount: %s\n", trans.Amount)
fmt.Println("--------------------")
}
}
func CompareTransactions(trans1 Transaction, trans2 Transaction, ack AckTransaction) AckTransaction {
newAck := AckTransaction{
Id: ack.Id,
AmountOfCorrectNode: ack.AmountOfCorrectNode,
TotalNodes: ack.TotalNodes,
}
if trans1.Id != trans2.Id {
return newAck
}
areBothTransactionEquals := true
if trans1.Receiver != trans2.Receiver {
areBothTransactionEquals = false
}
if trans1.Sender != trans2.Sender {
areBothTransactionEquals = false
}
if trans1.Amount != trans2.Amount {
areBothTransactionEquals = false
}
if areBothTransactionEquals {
newAck.AmountOfCorrectNode += 1
}
newAck.TotalNodes += 1
return newAck
}
func PrintConnection(connection net.Conn, connectionNumber int) {
fmt.Println("*****************************************")
fmt.Println("Processing client request number ", connectionNumber)
fmt.Println("The process-manage-connection is ", connection)
fmt.Println("The remote address is ", connection.RemoteAddr())
fmt.Println("The local address is ", connection.LocalAddr())
fmt.Println("*****************************************")
}
// TranslateMessageToAckTransaction translates a message to an ack transaction
// the geniuses who made unmarshable take int as float64
func TranslateMessageToAckTransaction(mess Message) AckTransaction {
var transactionId string
var amountOfCorrectNode int
var totalNode int
var body map[string]interface{} = mess.MessageBody.(map[string]interface{})
transactionId = body["id"].(string)
amountOfCorrectNode = int(body["amountOfCorrectNode"].(float64))
totalNode = int(body["totalNodes"].(float64))
return AckTransaction{
Id: transactionId,
AmountOfCorrectNode: amountOfCorrectNode,
TotalNodes: totalNode,
}
}
func TranslateMessageToTransaction(mess Message) Transaction {
var newTrans Transaction
var body map[string]interface{} = mess.MessageBody.(map[string]interface{})
newTrans.Id = body["id"].(string)
newTrans.Sender = body["sender"].(string)
newTrans.Receiver = body["receiver"].(string)
newTrans.Amount = body["amount"].(string)
return newTrans
}
func RandomString() string {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
return strconv.Itoa(r.Int())
}
func PrintTransactionDoneMessage() {
fmt.Println("***********************************")
fmt.Println("All transactions have been received")
fmt.Println("***********************************")
}
func PrintAckTransaction(ack AckTransaction) {
fmt.Println("***********************************")
fmt.Println("Printing ack transaction")
fmt.Println("The id is ", ack.Id)
fmt.Println("The amount of correct node is ", ack.AmountOfCorrectNode)
fmt.Println("The total nodes is ", ack.TotalNodes)
fmt.Println("***********************************")
}
apiVersion: v1
kind: Pod
metadata:
name: api
labels:
component: api
app: todo
spec:
containers:
- name: api
image: icclabcna/ccp2-k8s-todo-api
ports:
- containerPort: 8081
resources:
limits:
cpu: 100m
env:
- name: REDIS_ENDPOINT
value: redis-svc
- name: REDIS_PWD
value: ccp2
\ No newline at end of file
apiVersion: apps/v1
kind: Deployment
metadata:
name: redis-deploy
labels:
app: todo
spec:
replicas: 1
selector:
matchLabels:
app: todo
template:
metadata:
name: redis
labels:
component: redis
app: todo
spec:
containers:
- name: redis
image: redis
ports:
- containerPort: 6379
resources:
limits:
cpu: 100m
args:
- redis-server
- --requirepass ccp2
- --appendonly yes
\ No newline at end of file
apiVersion: v1
kind: Pod
metadata:
name: redis-pod
labels:
component: redis
app: todo
spec:
containers:
- name: redis-container
image: redis
ports:
- containerPort: 6379
resources:
limits:
cpu: 100m
args:
- redis-server
- --requirepass ccp2
- --appendonly yes