diff --git a/.gitignore b/.gitignore index 9f11b755a17d8192c60f61cb17b8902dffbd9f23..aa97ea3369127fa17e889aa1d883718adabfc408 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1 @@ -.idea/ +No more queryable resource, send RequestFailed back.idea/ diff --git a/helper/types.go b/helper/types.go index e999a7d902359a940b90357b48342c9187a4f965..b5bba85fb6353dbab3d4dac41d50af22e3035aef 100644 --- a/helper/types.go +++ b/helper/types.go @@ -29,25 +29,20 @@ const ( TRequestFailed ) -const ( - ACTION_QUERY = iota - ACTION_VIEW_STORAGE = iota -) - type Message struct { - Type int `json:"type"` - RequestContent RequestMessage - ResponseContent ResponseMessage + Type int `json:"type"` + RequestContent RequestMessage `json:"request_content"` + ResponseContent ResponseMessage `json:"response_content"` } type RequestMessage struct { - Query int - Sender Resource + Query int `json:"query"` + Sender Resource `json:"sender"` } type ResponseMessage struct { - QueryFound bool - Location Resource + Query int `json:"query"` + Location Resource `json:"location"` } func (r Resource) String() string { @@ -76,5 +71,5 @@ func (req RequestMessage) String() string { } func (res ResponseMessage) String() string { - return fmt.Sprintf("ResponseMessage{QueryFound=%v,Location=%s}", res.QueryFound, res.Location) + return fmt.Sprintf("ResponseMessage{Query=%d,Location=%s}", res.Query, res.Location) } diff --git a/server/neighbour-2.yaml b/server/neighbour-2.yaml index 27892801e0527ab7d7a3882dff01572a8aed629b..e8b7bcff76b9a044af66781908b1b5f29cf2e55d 100644 --- a/server/neighbour-2.yaml +++ b/server/neighbour-2.yaml @@ -10,4 +10,8 @@ storage: - id: 15 resource: name: "c" - address: "127.0.0.12" \ No newline at end of file + address: "127.0.0.12" + - id: 17 + resource: + name: "b" + address: "127.0.0.11" \ No newline at end of file diff --git a/server/server b/server/server index d7d06ef317ab8541c1ad0622ec160955be055267..8f8d21804c6852f012b23928521a5edeaf4e72c8 100755 Binary files a/server/server and b/server/server differ diff --git a/server/server.go b/server/server.go index 8be45ae5f1f2f53e54d9a22d827105e0cbde2b1f..d96eecaa350c15e97d6abb50aa0ba8568607b467 100644 --- a/server/server.go +++ b/server/server.go @@ -14,8 +14,11 @@ import ( var node helper.Node var queriedResourcesNames []string var port int -var requestsHistory []helper.RequestMessage -var mode string +var initiator = false +var hasProcessedRequest = false +var lastProcessedRequest helper.RequestMessage +var lastRequestSent helper.RequestMessage +var lastRequestDestination helper.Resource func main() { //read arguments passed to the program @@ -25,6 +28,7 @@ func main() { // mode is INIT or WAIT (INIT for the server that will run the search, WAIT for the servers that will listen) var err error var configNumber int + var mode string args := os.Args port, err = strconv.Atoi(args[1]) @@ -39,7 +43,7 @@ func main() { configFile := fmt.Sprintf("neighbour-%d.yaml", configNumber) node = parseConfigFile(configFile) - fmt.Printf("Node{%s}", node) + fmt.Printf("%s\n", node) if mode == "WAIT" { listenForIncomingMessages(node.Resource.Address, port) @@ -55,12 +59,14 @@ func main() { //run the search algorithm and send message to the servers //if WAIT //wait and listen for new connections - //debugTypesPrint() } func queryInitiator() { var query int + hasProcessedRequest = true + var foundResource = false + initiator = true //run the first query, proactive node acting as the client //while active //offer options to user @@ -75,21 +81,25 @@ func queryInitiator() { panic(err) } - foundId, isLocal, location := searchIdInStorage(query) - if foundId && isLocal { + foundId, _, location := searchIdInStorage(query) + if foundId /*&& isLocal*/ { fmt.Println("Found the queried value in the current node storage") continue } - location = getClosestResourceToQuery(query) + foundResource, location = getClosestResourceToQuery(query) + + if !foundResource { + continue + } //craft RequestMessage //Send to location request := helper.RequestMessage{ Query: query, Sender: node.Resource, } - sendRequestMessage(request, location) + sendMessage(helper.TRequest, request, helper.ResponseMessage{}, location) listenForIncomingMessages(node.Resource.Address, port) } } @@ -138,6 +148,8 @@ func processClient(connection net.Conn) { case helper.TResponse: handleResponseMessage(m.ResponseContent) break + case helper.TRequestFailed: + handleRequestFailed() default: fmt.Println("Received an unknown message format.") os.Exit(-1) @@ -145,26 +157,17 @@ func processClient(connection net.Conn) { connection.Close() } -func firstRequestFromSender(request helper.RequestMessage) bool { - for _, historyRequest := range requestsHistory { - if historyRequest.Sender == request.Sender { - return false - } - } - return true -} - func handleRequestMessage(request helper.RequestMessage) { - if !firstRequestFromSender(request) && mode != "INIT" { - if mode == "WAIT" { - fmt.Println("The sender already sent a RequestMessage, not allowed a second time.") - fmt.Printf("%s\n", request) - } else if mode == "INIT" { - fmt.Println("The initiator do not process messages of type RequestMessage") - sendMessage(helper.TRequestFailed, helper.RequestMessage{}, helper.ResponseMessage{}, request.Sender) - } + //TODO: Split code into methods, too long currently + lastProcessedRequest = request //track last request so we can reply in the right order on the way back. + if hasProcessedRequest { + fmt.Println("The node already processed a request, not allowed a second time.") + fmt.Printf("%s\n", request) + //reply + sendMessage(helper.TRequestFailed, helper.RequestMessage{}, helper.ResponseMessage{}, request.Sender) return } + //append request to the history, we only handle one request message per sender //search for the query in the current node storage //if found, return a response to the sender with the QueryFound attribute set to true @@ -172,40 +175,81 @@ func handleRequestMessage(request helper.RequestMessage) { //else query the neighbours starting from the neighbour with the closest value fmt.Println("Handling request message") fmt.Printf("%s\n", request) - requestsHistory = append(requestsHistory, request) - foundId, isLocal, location := searchIdInStorage(request.Query) + foundId, _, location := searchIdInStorage(request.Query) if foundId { - if isLocal { - fmt.Println("Found queried element in the storage...") - fmt.Printf("Location: %s\n", location) - fmt.Println("Sending ResponseMessage to the sender of the request") - //answer - //craft ResponseMessage - //handleResponseMessage - } else { + /*if isLocal {*/ + fmt.Println("Found queried element in the storage...") + fmt.Printf("Location: %s\n", location) + fmt.Println("Sending ResponseMessage to the sender of the request") + //answer + //craft ResponseMessage + //handleResponseMessage + response := helper.ResponseMessage{ + Query: request.Query, + Location: location, + } + sendMessage(helper.TResponse, helper.RequestMessage{}, response, request.Sender) + + /*} else { fmt.Println("Found queried element reference in the storage...") fmt.Printf("Location: %s\n", location) fmt.Println("Sending query to Location") //transmits //craft RequestMessage //handleRequestMessage + request := helper.RequestMessage{ + Query: request.Query, + Sender: node.Resource, + } + sendMessage(helper.TRequest, request, helper.ResponseMessage{}, location) - //handleRequestMessage(request) - - } + }*/ } else { if !allResourcesQueried() { - resource := getClosestResourceToQuery(request.Query) + foundResource, resource := getClosestResourceToQuery(request.Query) fmt.Printf("%s\n", queriedResourcesNames) fmt.Println("Retrieving closest resource to query") + fmt.Printf("%v\n", foundResource) fmt.Printf("%s\n", resource) - sendRequestMessage(request, resource) + //sendRequestMessage(request, resource) + req := helper.RequestMessage{ + Query: request.Query, + Sender: node.Resource, + } + sendMessage(helper.TRequest, req, helper.ResponseMessage{}, resource) } else { fmt.Println("All resources queried locally on this node, no more requests to send") + fmt.Printf("Sending RequestFailed to request sender %s\n", request.Sender) + sendMessage(helper.TRequestFailed, helper.RequestMessage{}, helper.ResponseMessage{}, request.Sender) } } } +func handleRequestFailed() { + fmt.Printf("Handling request failed from %s\n", lastRequestDestination) + if initiator { + fmt.Printf("The queried value %d was not found on the network.\n\n", lastRequestSent.Query) + return + } + if allResourcesQueried() { + fmt.Println("No more queryable resource, send RequestFailed back") + fmt.Printf("%s\n", lastProcessedRequest) + sendMessage(helper.TRequestFailed, helper.RequestMessage{}, helper.ResponseMessage{}, lastProcessedRequest.Sender) + return + } + + fmt.Println("Searching closest resource to query") + foundResource, resource := getClosestResourceToQuery(lastRequestSent.Query) + fmt.Printf("%v\n", foundResource) + fmt.Printf("Query: %d, %s\n", lastRequestSent.Query, resource) + request := helper.RequestMessage{ + Query: lastRequestSent.Query, + Sender: node.Resource, + } + + sendMessage(helper.TRequest, request, helper.ResponseMessage{}, resource) +} + func allResourcesQueried() bool { var count = 0 for _, storageElement := range node.Storage.Data { @@ -237,12 +281,13 @@ func searchIdInStorage(query int) (bool, bool, helper.Resource) { return foundId, isLocal, location } -func getClosestResourceToQuery(query int) helper.Resource { +func getClosestResourceToQuery(query int) (bool, helper.Resource) { var smallestDifference int = math.MaxInt var closestResource helper.Resource + var foundResource = false for _, storageElement := range node.Storage.Data { //skip if local resource - if storageElement.Resource.Name == node.Resource.Name || resourceAlreadyQueried() { + if storageElement.Resource.Name == node.Resource.Name || resourceAlreadyQueried(storageElement.Resource) { continue } value := storageElement.Id @@ -250,15 +295,18 @@ func getClosestResourceToQuery(query int) helper.Resource { if difference < smallestDifference { smallestDifference = difference closestResource = storageElement.Resource + foundResource = true } } - queriedResourcesNames = append(queriedResourcesNames, closestResource.Name) - return closestResource + if foundResource { + queriedResourcesNames = append(queriedResourcesNames, closestResource.Name) + } + return foundResource, closestResource } -func resourceAlreadyQueried() bool { - for _, resourceName := range queriedResourcesNames { - if resourceName == node.Resource.Name { +func resourceAlreadyQueried(resource helper.Resource) bool { + for _, queriedResourceName := range queriedResourcesNames { + if queriedResourceName == resource.Name { return true } } @@ -272,6 +320,15 @@ func sendMessage(messageType int, request helper.RequestMessage, response helper panic(err) } + switch messageType { + case helper.TRequest: + lastRequestSent = request + lastRequestDestination = destination + break + default: + break + } + m := helper.Message{ Type: messageType, RequestContent: request, @@ -294,6 +351,9 @@ func sendRequestMessage(request helper.RequestMessage, destination helper.Resour panic(err) } + lastRequestSent = request + lastRequestDestination = destination + m := helper.Message{ Type: helper.TRequest, RequestContent: helper.RequestMessage{ @@ -311,49 +371,47 @@ func sendRequestMessage(request helper.RequestMessage, destination helper.Resour defer connection.Close() } -func handleResponseMessage(message helper.ResponseMessage) { +func handleResponseMessage(response helper.ResponseMessage) { + fmt.Printf("Handling response message from %s\n", lastRequestDestination) + //store resource reference in the node storage + //if initiator inform the user that the query succeeded + //else transmits response back to processRequest sender fmt.Println("Received a response") -} - -func parseConfigFile(file string) helper.Node { - var node helper.Node - yamlFile, err := os.ReadFile(file) - if err != nil { - panic(err) + fmt.Printf("%s\n", response) + addResourceToStorage(response.Query, response.Location) + if initiator { + fmt.Println("ResponseMessage received on the initiator") + fmt.Printf("The queried value %d was found on the node %s at the address %s\n", response.Query, response.Location.Name, response.Location.Address) + return } - err = yaml.Unmarshal(yamlFile, &node) - return node -} -func usage() { + sendMessage(helper.TResponse, helper.RequestMessage{}, response, lastProcessedRequest.Sender) } -func debugTypesPrint() { - resource := helper.Resource{ - Name: "a", - Address: "127.0.0.10", - } +func addResourceToStorage(value int, resource helper.Resource) { + printStorage() storageElement := helper.StorageElement{ - Id: 15, - Resource: resource, - } - storageElement2 := helper.StorageElement{ - Id: 13, - Resource: resource, + Id: value, + Resource: helper.Resource{ + Name: resource.Name, + Address: resource.Address, + }, } + node.Storage.Data = append(node.Storage.Data, storageElement) + printStorage() +} - elements := make([]helper.StorageElement, 0) - elements = append(elements, storageElement) - elements = append(elements, storageElement2) - - storage := helper.Storage{ - Data: elements, - } +func printStorage() { + fmt.Printf("%s\n", node.Storage) +} - node := helper.Node{ - Resource: resource, - Storage: storage, +func parseConfigFile(file string) helper.Node { + var node helper.Node + yamlFile, err := os.ReadFile(file) + if err != nil { + panic(err) } - fmt.Printf(node.String()) + err = yaml.Unmarshal(yamlFile, &node) + return node }