Skip to content
Snippets Groups Projects
Commit a228c992 authored by dylan.peiry's avatar dylan.peiry
Browse files

feat(freenet): implemented request messages

parent 5c24177d
No related branches found
No related tags found
No related merge requests found
...@@ -23,6 +23,33 @@ type StorageElement struct { ...@@ -23,6 +23,33 @@ type StorageElement struct {
Resource `json:"resource"` Resource `json:"resource"`
} }
const (
TRequest = iota
TResponse = iota
TRequestFailed
)
const (
ACTION_QUERY = iota
ACTION_VIEW_STORAGE = iota
)
type Message struct {
Type int `json:"type"`
RequestContent RequestMessage
ResponseContent ResponseMessage
}
type RequestMessage struct {
Query int
Sender Resource
}
type ResponseMessage struct {
QueryFound bool
Location Resource
}
func (r Resource) String() string { func (r Resource) String() string {
return fmt.Sprintf("Resource{Name=%s,Address=%s}", r.Name, r.Address) return fmt.Sprintf("Resource{Name=%s,Address=%s}", r.Name, r.Address)
} }
...@@ -43,3 +70,11 @@ func (s Storage) String() string { ...@@ -43,3 +70,11 @@ func (s Storage) String() string {
func (n Node) String() string { func (n Node) String() string {
return fmt.Sprintf("Node{\n\t%s,\n\t%s}\n", n.Resource, n.Storage) return fmt.Sprintf("Node{\n\t%s,\n\t%s}\n", n.Resource, n.Storage)
} }
func (req RequestMessage) String() string {
return fmt.Sprintf("RequestMessage{Query=%d,Sender=%s}", req.Query, req.Sender)
}
func (res ResponseMessage) String() string {
return fmt.Sprintf("ResponseMessage{QueryFound=%v,Location=%s}", res.QueryFound, res.Location)
}
No preview for this file type
package main package main
import ( import (
"encoding/json"
"fmt" "fmt"
"freenet/helper" "freenet/helper"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
"math"
"net"
"os" "os"
"strconv"
) )
var node helper.Node var node helper.Node
var queriedResourcesNames []string
var port int
var requestsHistory []helper.RequestMessage
var mode string
func main() { func main() {
//read arguments passed to the program //read arguments passed to the program
...@@ -15,17 +23,296 @@ func main() { ...@@ -15,17 +23,296 @@ func main() {
// port is the port the server will listen on // port is the port the server will listen on
// config_number represents de number of the neighbour-x.yaml file to load // config_number represents de number of the neighbour-x.yaml file to load
// mode is INIT or WAIT (INIT for the server that will run the search, WAIT for the servers that will listen) // mode is INIT or WAIT (INIT for the server that will run the search, WAIT for the servers that will listen)
var err error
var configNumber int
args := os.Args args := os.Args
configFile := fmt.Sprintf("neighbour-%s.yaml", args[1]) port, err = strconv.Atoi(args[1])
if err != nil {
panic(err)
}
configNumber, err = strconv.Atoi(args[2])
if err != nil {
panic(err)
}
mode = args[3]
configFile := fmt.Sprintf("neighbour-%d.yaml", configNumber)
node = parseConfigFile(configFile) node = parseConfigFile(configFile)
fmt.Printf("Node{%s}", node) fmt.Printf("Node{%s}", node)
if mode == "WAIT" {
listenForIncomingMessages(node.Resource.Address, port)
} else if mode == "INIT" {
queryInitiator()
} else {
fmt.Printf("Invalid mode")
os.Exit(-1)
}
//initialize socket connection //initialize socket connection
//if INIT //if INIT
//run the search algorithm and send message to the servers //run the search algorithm and send message to the servers
//if WAIT //if WAIT
//wait and listen for new connections //wait and listen for new connections
//debugTypesPrint() //debugTypesPrint()
}
func queryInitiator() {
var query int
//run the first query, proactive node acting as the client
//while active
//offer options to user
//user choose value to query
//run query on proactive node
//wait for response
//repeat steps above
for true {
fmt.Print("Which value do you wanna search on the network ? : ")
_, err := fmt.Scanf("%d", &query)
if err != nil {
panic(err)
}
foundId, isLocal, location := searchIdInStorage(query)
if foundId && isLocal {
fmt.Println("Found the queried value in the current node storage")
continue
}
location = getClosestResourceToQuery(query)
//craft RequestMessage
//Send to location
request := helper.RequestMessage{
Query: query,
Sender: node.Resource,
}
sendRequestMessage(request, location)
listenForIncomingMessages(node.Resource.Address, port)
}
}
func listenForIncomingMessages(address string, port int) {
server, err := net.Listen("tcp", fmt.Sprintf("%s:%d", address, port))
if err != nil {
fmt.Println("Error listening:", err.Error())
os.Exit(1)
}
defer server.Close()
fmt.Printf("Listening on %s:%d\n", address, port)
fmt.Println("Waiting for client...")
for {
connection, err := server.Accept()
if err != nil {
fmt.Println("Error accepting: ", err.Error())
os.Exit(1)
}
fmt.Println("client connected: \n" + connection.RemoteAddr().String())
go processClient(connection)
}
}
func processClient(connection net.Conn) {
buffer := make([]byte, 2048)
mlen, err := connection.Read(buffer)
//receive incoming message
if err != nil {
fmt.Println("Error reading:", err.Error())
}
//Request -> the value we search, the sender
//Response -> Resource -> Name,Address
var m helper.Message
err = json.Unmarshal(buffer[:mlen], &m)
if err != nil {
panic(err)
}
switch m.Type {
case helper.TRequest:
handleRequestMessage(m.RequestContent)
break
case helper.TResponse:
handleResponseMessage(m.ResponseContent)
break
default:
fmt.Println("Received an unknown message format.")
os.Exit(-1)
}
connection.Close()
}
func firstRequestFromSender(request helper.RequestMessage) bool {
for _, historyRequest := range requestsHistory {
if historyRequest.Sender == request.Sender {
return false
}
}
return true
}
func handleRequestMessage(request helper.RequestMessage) {
if !firstRequestFromSender(request) && mode != "INIT" {
if mode == "WAIT" {
fmt.Println("The sender already sent a RequestMessage, not allowed a second time.")
fmt.Printf("%s\n", request)
} else if mode == "INIT" {
fmt.Println("The initiator do not process messages of type RequestMessage")
sendMessage(helper.TRequestFailed, helper.RequestMessage{}, helper.ResponseMessage{}, request.Sender)
}
return
}
//append request to the history, we only handle one request message per sender
//search for the query in the current node storage
//if found, return a response to the sender with the QueryFound attribute set to true
//and with the Location attribute set to the current node Resource
//else query the neighbours starting from the neighbour with the closest value
fmt.Println("Handling request message")
fmt.Printf("%s\n", request)
requestsHistory = append(requestsHistory, request)
foundId, isLocal, location := searchIdInStorage(request.Query)
if foundId {
if isLocal {
fmt.Println("Found queried element in the storage...")
fmt.Printf("Location: %s\n", location)
fmt.Println("Sending ResponseMessage to the sender of the request")
//answer
//craft ResponseMessage
//handleResponseMessage
} else {
fmt.Println("Found queried element reference in the storage...")
fmt.Printf("Location: %s\n", location)
fmt.Println("Sending query to Location")
//transmits
//craft RequestMessage
//handleRequestMessage
//handleRequestMessage(request)
}
} else {
if !allResourcesQueried() {
resource := getClosestResourceToQuery(request.Query)
fmt.Printf("%s\n", queriedResourcesNames)
fmt.Println("Retrieving closest resource to query")
fmt.Printf("%s\n", resource)
sendRequestMessage(request, resource)
} else {
fmt.Println("All resources queried locally on this node, no more requests to send")
}
}
}
func allResourcesQueried() bool {
var count = 0
for _, storageElement := range node.Storage.Data {
if storageElement.Resource.Name != node.Resource.Name {
count++
}
}
return count == len(queriedResourcesNames)
}
func searchIdInStorage(query int) (bool, bool, helper.Resource) {
var foundId = false
var isLocal bool = false
var location helper.Resource
for index, storageElement := range node.Storage.Data {
value := storageElement.Id
fmt.Printf("Query:%d || Index:%d,Value:%d\n", query, index, value)
if value == query {
foundId = true
isLocal = storageElement.Resource.Name == node.Resource.Name
if isLocal {
location = node.Resource
} else {
location = storageElement.Resource
}
break
}
}
return foundId, isLocal, location
}
func getClosestResourceToQuery(query int) helper.Resource {
var smallestDifference int = math.MaxInt
var closestResource helper.Resource
for _, storageElement := range node.Storage.Data {
//skip if local resource
if storageElement.Resource.Name == node.Resource.Name || resourceAlreadyQueried() {
continue
}
value := storageElement.Id
difference := int(math.Abs(float64(query - value)))
if difference < smallestDifference {
smallestDifference = difference
closestResource = storageElement.Resource
}
}
queriedResourcesNames = append(queriedResourcesNames, closestResource.Name)
return closestResource
}
func resourceAlreadyQueried() bool {
for _, resourceName := range queriedResourcesNames {
if resourceName == node.Resource.Name {
return true
}
}
return false
}
func sendMessage(messageType int, request helper.RequestMessage, response helper.ResponseMessage, destination helper.Resource) {
fmt.Printf("Sending message of type %d to server %s:%d\n", messageType, destination.Address, port)
connection, err := net.Dial("tcp", fmt.Sprintf("%s:%d", destination.Address, port))
if err != nil {
panic(err)
}
m := helper.Message{
Type: messageType,
RequestContent: request,
ResponseContent: response,
}
fmt.Println(m)
data, err := json.Marshal(m)
if err != nil {
panic(err)
}
_, err = connection.Write(data)
defer connection.Close()
}
func sendRequestMessage(request helper.RequestMessage, destination helper.Resource) {
fmt.Printf("Sending RequestMessage to server %s:%d\n", destination.Address, port)
connection, err := net.Dial("tcp", fmt.Sprintf("%s:%d", destination.Address, port))
if err != nil {
panic(err)
}
m := helper.Message{
Type: helper.TRequest,
RequestContent: helper.RequestMessage{
Query: request.Query,
Sender: node.Resource,
},
}
fmt.Println(m)
data, err := json.Marshal(m)
if err != nil {
panic(err)
}
_, err = connection.Write(data)
defer connection.Close()
}
func handleResponseMessage(message helper.ResponseMessage) {
fmt.Println("Received a response")
} }
func parseConfigFile(file string) helper.Node { func parseConfigFile(file string) helper.Node {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment