diff --git a/helper/types.go b/helper/types.go index 1a44181c979d62a52e29d5a3230f184a3012d8f1..e999a7d902359a940b90357b48342c9187a4f965 100644 --- a/helper/types.go +++ b/helper/types.go @@ -23,6 +23,33 @@ type StorageElement struct { Resource `json:"resource"` } +const ( + TRequest = iota + TResponse = iota + TRequestFailed +) + +const ( + ACTION_QUERY = iota + ACTION_VIEW_STORAGE = iota +) + +type Message struct { + Type int `json:"type"` + RequestContent RequestMessage + ResponseContent ResponseMessage +} + +type RequestMessage struct { + Query int + Sender Resource +} + +type ResponseMessage struct { + QueryFound bool + Location Resource +} + func (r Resource) String() string { return fmt.Sprintf("Resource{Name=%s,Address=%s}", r.Name, r.Address) } @@ -43,3 +70,11 @@ func (s Storage) String() string { func (n Node) String() string { return fmt.Sprintf("Node{\n\t%s,\n\t%s}\n", n.Resource, n.Storage) } + +func (req RequestMessage) String() string { + return fmt.Sprintf("RequestMessage{Query=%d,Sender=%s}", req.Query, req.Sender) +} + +func (res ResponseMessage) String() string { + return fmt.Sprintf("ResponseMessage{QueryFound=%v,Location=%s}", res.QueryFound, res.Location) +} diff --git a/server/server b/server/server index 1c4e7b85cb37d691d28674e4e669db9f6aa83b05..d7d06ef317ab8541c1ad0622ec160955be055267 100755 Binary files a/server/server and b/server/server differ diff --git a/server/server.go b/server/server.go index 3eb702f8179e7838a6412ddb6b82749df9b97169..8be45ae5f1f2f53e54d9a22d827105e0cbde2b1f 100644 --- a/server/server.go +++ b/server/server.go @@ -1,13 +1,21 @@ package main import ( + "encoding/json" "fmt" "freenet/helper" "gopkg.in/yaml.v2" + "math" + "net" "os" + "strconv" ) var node helper.Node +var queriedResourcesNames []string +var port int +var requestsHistory []helper.RequestMessage +var mode string func main() { //read arguments passed to the program @@ -15,17 +23,296 @@ func main() { // port is the port the server will listen on // config_number represents de number of the neighbour-x.yaml file to load // mode is INIT or WAIT (INIT for the server that will run the search, WAIT for the servers that will listen) + var err error + var configNumber int + args := os.Args - configFile := fmt.Sprintf("neighbour-%s.yaml", args[1]) + port, err = strconv.Atoi(args[1]) + if err != nil { + panic(err) + } + configNumber, err = strconv.Atoi(args[2]) + if err != nil { + panic(err) + } + mode = args[3] + + configFile := fmt.Sprintf("neighbour-%d.yaml", configNumber) node = parseConfigFile(configFile) fmt.Printf("Node{%s}", node) + if mode == "WAIT" { + listenForIncomingMessages(node.Resource.Address, port) + } else if mode == "INIT" { + queryInitiator() + } else { + fmt.Printf("Invalid mode") + os.Exit(-1) + } + //initialize socket connection //if INIT //run the search algorithm and send message to the servers //if WAIT //wait and listen for new connections //debugTypesPrint() + +} + +func queryInitiator() { + var query int + //run the first query, proactive node acting as the client + //while active + //offer options to user + //user choose value to query + //run query on proactive node + //wait for response + //repeat steps above + for true { + fmt.Print("Which value do you wanna search on the network ? : ") + _, err := fmt.Scanf("%d", &query) + if err != nil { + panic(err) + } + + foundId, isLocal, location := searchIdInStorage(query) + if foundId && isLocal { + fmt.Println("Found the queried value in the current node storage") + continue + } + + location = getClosestResourceToQuery(query) + //craft RequestMessage + //Send to location + request := helper.RequestMessage{ + Query: query, + Sender: node.Resource, + } + sendRequestMessage(request, location) + + listenForIncomingMessages(node.Resource.Address, port) + } +} + +func listenForIncomingMessages(address string, port int) { + server, err := net.Listen("tcp", fmt.Sprintf("%s:%d", address, port)) + if err != nil { + fmt.Println("Error listening:", err.Error()) + os.Exit(1) + } + defer server.Close() + fmt.Printf("Listening on %s:%d\n", address, port) + fmt.Println("Waiting for client...") + for { + connection, err := server.Accept() + if err != nil { + fmt.Println("Error accepting: ", err.Error()) + os.Exit(1) + } + fmt.Println("client connected: \n" + connection.RemoteAddr().String()) + go processClient(connection) + } +} + +func processClient(connection net.Conn) { + buffer := make([]byte, 2048) + mlen, err := connection.Read(buffer) + //receive incoming message + if err != nil { + fmt.Println("Error reading:", err.Error()) + } + + //Request -> the value we search, the sender + //Response -> Resource -> Name,Address + + var m helper.Message + err = json.Unmarshal(buffer[:mlen], &m) + if err != nil { + panic(err) + } + + switch m.Type { + case helper.TRequest: + handleRequestMessage(m.RequestContent) + break + case helper.TResponse: + handleResponseMessage(m.ResponseContent) + break + default: + fmt.Println("Received an unknown message format.") + os.Exit(-1) + } + connection.Close() +} + +func firstRequestFromSender(request helper.RequestMessage) bool { + for _, historyRequest := range requestsHistory { + if historyRequest.Sender == request.Sender { + return false + } + } + return true +} + +func handleRequestMessage(request helper.RequestMessage) { + if !firstRequestFromSender(request) && mode != "INIT" { + if mode == "WAIT" { + fmt.Println("The sender already sent a RequestMessage, not allowed a second time.") + fmt.Printf("%s\n", request) + } else if mode == "INIT" { + fmt.Println("The initiator do not process messages of type RequestMessage") + sendMessage(helper.TRequestFailed, helper.RequestMessage{}, helper.ResponseMessage{}, request.Sender) + } + return + } + //append request to the history, we only handle one request message per sender + //search for the query in the current node storage + //if found, return a response to the sender with the QueryFound attribute set to true + //and with the Location attribute set to the current node Resource + //else query the neighbours starting from the neighbour with the closest value + fmt.Println("Handling request message") + fmt.Printf("%s\n", request) + requestsHistory = append(requestsHistory, request) + foundId, isLocal, location := searchIdInStorage(request.Query) + if foundId { + if isLocal { + fmt.Println("Found queried element in the storage...") + fmt.Printf("Location: %s\n", location) + fmt.Println("Sending ResponseMessage to the sender of the request") + //answer + //craft ResponseMessage + //handleResponseMessage + } else { + fmt.Println("Found queried element reference in the storage...") + fmt.Printf("Location: %s\n", location) + fmt.Println("Sending query to Location") + //transmits + //craft RequestMessage + //handleRequestMessage + + //handleRequestMessage(request) + + } + } else { + if !allResourcesQueried() { + resource := getClosestResourceToQuery(request.Query) + fmt.Printf("%s\n", queriedResourcesNames) + fmt.Println("Retrieving closest resource to query") + fmt.Printf("%s\n", resource) + sendRequestMessage(request, resource) + } else { + fmt.Println("All resources queried locally on this node, no more requests to send") + } + } +} + +func allResourcesQueried() bool { + var count = 0 + for _, storageElement := range node.Storage.Data { + if storageElement.Resource.Name != node.Resource.Name { + count++ + } + } + return count == len(queriedResourcesNames) +} + +func searchIdInStorage(query int) (bool, bool, helper.Resource) { + var foundId = false + var isLocal bool = false + var location helper.Resource + for index, storageElement := range node.Storage.Data { + value := storageElement.Id + fmt.Printf("Query:%d || Index:%d,Value:%d\n", query, index, value) + if value == query { + foundId = true + isLocal = storageElement.Resource.Name == node.Resource.Name + if isLocal { + location = node.Resource + } else { + location = storageElement.Resource + } + break + } + } + return foundId, isLocal, location +} + +func getClosestResourceToQuery(query int) helper.Resource { + var smallestDifference int = math.MaxInt + var closestResource helper.Resource + for _, storageElement := range node.Storage.Data { + //skip if local resource + if storageElement.Resource.Name == node.Resource.Name || resourceAlreadyQueried() { + continue + } + value := storageElement.Id + difference := int(math.Abs(float64(query - value))) + if difference < smallestDifference { + smallestDifference = difference + closestResource = storageElement.Resource + } + } + queriedResourcesNames = append(queriedResourcesNames, closestResource.Name) + return closestResource +} + +func resourceAlreadyQueried() bool { + for _, resourceName := range queriedResourcesNames { + if resourceName == node.Resource.Name { + return true + } + } + return false +} + +func sendMessage(messageType int, request helper.RequestMessage, response helper.ResponseMessage, destination helper.Resource) { + fmt.Printf("Sending message of type %d to server %s:%d\n", messageType, destination.Address, port) + connection, err := net.Dial("tcp", fmt.Sprintf("%s:%d", destination.Address, port)) + if err != nil { + panic(err) + } + + m := helper.Message{ + Type: messageType, + RequestContent: request, + ResponseContent: response, + } + fmt.Println(m) + + data, err := json.Marshal(m) + if err != nil { + panic(err) + } + _, err = connection.Write(data) + defer connection.Close() +} + +func sendRequestMessage(request helper.RequestMessage, destination helper.Resource) { + fmt.Printf("Sending RequestMessage to server %s:%d\n", destination.Address, port) + connection, err := net.Dial("tcp", fmt.Sprintf("%s:%d", destination.Address, port)) + if err != nil { + panic(err) + } + + m := helper.Message{ + Type: helper.TRequest, + RequestContent: helper.RequestMessage{ + Query: request.Query, + Sender: node.Resource, + }, + } + fmt.Println(m) + + data, err := json.Marshal(m) + if err != nil { + panic(err) + } + _, err = connection.Write(data) + defer connection.Close() +} + +func handleResponseMessage(message helper.ResponseMessage) { + fmt.Println("Received a response") } func parseConfigFile(file string) helper.Node {