Skip to content
Snippets Groups Projects
Select Git revision
  • 595d379c74e0b5bf89beb36b232baa0035145481
  • live_exam_os_ubuntu default protected
2 results

vms.go

Blame
  • vms.go 19.84 KiB
    package vms
    
    import (
        "os"
        "sort"
        "sync"
        "math"
        "errors"
        "syscall"
        "io/ioutil"
        "path/filepath"
        "encoding/base64"
        vmc "nexus-common/vm"
        "nexus-server/qga"
        "nexus-common/caps"
        "nexus-common/params"
        "nexus-server/exec"
        "nexus-server/utils"
        "nexus-server/users"
        "nexus-server/logger"
        "nexus-server/config"
        "github.com/google/uuid"
    )
    
    type (
        VMKeeperFn func(vm *VM) bool
    
        VMs struct {
            m map[string]*VM
            dir string             // Base directory where VMs are stored
            rwlock *sync.RWMutex   // RWlock to ensure the VMs' map (m) coherency
            usedPorts [65536]bool  // Ports used by VMs
            usedRAM int            // RAM used by running VMs (in MB)
        }
    )
    
    var log = logger.GetInstance()
    var conf = config.GetInstance()
    var vms *VMs
    
    // Returns a VMs singleton to access this module's public functions.
    // IMPORTANT: the InitVMs function must have been previously called!
    // Concurrency: safe
    func GetVMsInstance() *VMs {
        return vms
    }
    
    // Creates all VMs from their files on disk.
    // REMARK: path is the root directory where VMs reside.
    // Concurrency: unsafe
    func InitVMs() error {
        vmsDir := conf.VMsDir
        vms = &VMs { m: make(map[string]*VM), dir: vmsDir, rwlock: new(sync.RWMutex), usedRAM: 0 }
        vms.usedPorts[conf.Core.APIDefaultPort] = true
    
        errMsg := "Failed reading VMs directory: "
        dirs1, err := utils.GetSubDirs(vmsDir)
        if err != nil {
            return errors.New(errMsg+err.Error())
        }
    
        for d1 := range(dirs1) {
            dirs2, err := utils.GetSubDirs(dirs1[d1])
            if err != nil {
                return errors.New(errMsg+err.Error())
            }
    
            for d2 := range(dirs2) {
                dirs3, err := utils.GetSubDirs(dirs2[d2])
                if err != nil {
                    return errors.New(errMsg+err.Error())
                }
    
                for i := range(dirs3) {
                    vmDir := dirs3[i]
                    filename := filepath.Join(vmDir, vmConfFile)
                    vm, err := newVMFromFile(filename)
                    if err != nil {
                        log.Warn("Skipping VM: failed reading \""+filename+"\" "+err.Error())
                        continue
                    }
    
                    filename = filepath.Join(vmDir, vmDiskFile)
                    f, err := os.OpenFile(filename, os.O_RDONLY, 0)
                    if err != nil {
                        log.Warn("Skipping VM: failed reading config file:"+err.Error())
                        continue
                    }
                    f.Close()
    
                    vms.m[vm.v.ID.String()] = vm
                }
            }
        }
    
        return nil
    }
    
    // Returns the list of serialized VMs for which VMKeeperFn is true.
    // Concurrency: safe
    func (vms *VMs)GetNetworkSerializedVMs(keepFn VMKeeperFn) []vmc.VMNetworkSerialized {
        vms.rwlock.RLock()
    
        list := []vmc.VMNetworkSerialized{}
        for _, vm := range vms.m {
            vm.mutex.Lock()
            if keepFn(vm) {
                list = append(list, vm.SerializeToNetwork())
            }
            vm.mutex.Unlock()
        }
        vms.rwlock.RUnlock()
    
        // Sort VMs by names
        sort.Slice(list, func(i, j int) bool {
            return list[i].Name < list[j].Name
        })
        return list
    }
    
    // Returns a VM by its ID. Concurrency-safe version.
    // Concurrency: safe
    func (vms *VMs)GetVM(vmID uuid.UUID) (*VM, error) {
        vms.rwlock.RLock()
        defer vms.rwlock.RUnlock()
        return vms.getVMUnsafe(vmID)
    }
    
    // Returns a VM by its ID. Concurrency-unsafe version.
    // Concurrency: unsafe
    func (vms *VMs)getVMUnsafe(vmID uuid.UUID) (*VM, error) {
        vm, exists := vms.m[vmID.String()]
        if !exists {
            return nil, errors.New("VM not found")
        }
        return vm, nil
    }
    
    // Deletes a VM by its ID and deletes its associated files.
    // Concurrency: safe
    func (vms *VMs)DeleteVM(vmID uuid.UUID) error {
        vms.rwlock.Lock()
        defer vms.rwlock.Unlock()
    
        vm, err := vms.getVMUnsafe(vmID)
        if err != nil {
            return err
        }
    
        vm.mutex.Lock()
        defer vm.mutex.Unlock()
    
        if vm.IsRunning() {
            return errors.New("Failed deleting VM: VM must be stopped")
        }
    
        if vm.IsDiskBusy() {
            return errors.New("Failed deleting VM: disk in use (busy)")
        }
    
        // Deletes the VM's files (and directories).
        if err := vm.delete(); err != nil {
            return err
        }
    
        // Removes the VM from the map.
        delete(vms.m, vmID.String())
        return nil
    }
    
    // Adds a new VM and writes its associated files.
    // Concurrency: safe
    func (vms *VMs)AddVM(vm *VM) error {
        vm.mutex.Lock()
        defer vm.mutex.Unlock()
    
        // First, writes VM files since it can fail.
        err := vm.writeFiles()
        if err != nil {
            return err
        }
    
        // Adds VM to the map of VMs.
        vms.rwlock.Lock()
        key := vm.v.ID.String()
        vms.m[key] = vm
        vms.rwlock.Unlock()
    
        return nil
    }
    
    // Starts a VM by its ID, using the specified port and password.
    // If checkPort is true, a check is performed on the specified port and if it is already
    // in use, the function fails and returns a corresponding error.
    // Concurrency: safe
    func (vms *VMs)StartVMWithCreds(vmID uuid.UUID, port int, checkPort bool, pwd string) error {
        prefix := "Failed starting VM: "
    
        vms.rwlock.Lock()
        defer vms.rwlock.Unlock()
    
        vm, err := vms.getVMUnsafe(vmID)
        if err != nil {
            return err
        }
    
        vm.mutex.Lock()
        defer vm.mutex.Unlock()
    
        if vm.IsRunning() {
            return errors.New(prefix+"already running")
        }
    
        if vm.IsDiskBusy() {
            return errors.New(prefix+"disk in use (busy)")
        }
    
        if checkPort {
            if vms.usedPorts[port] {
                return errors.New(prefix+"port already in use")
            } else if !utils.IsPortAvailable(port) {
                return errors.New(prefix+"port not available")
            }
        }
    
        totalRAM, availRAM, err := utils.GetRAM()
        if err != nil {
            return errors.New(prefix+"failed obtaining memory info: "+err.Error())
        }
    
        // We estimate ~30% of RAM saving thanks to KSM (due to page sharing across VMs).
        estimatedVmRAM := int(math.Round(float64(vm.v.Ram)*(1.-conf.Limits.KsmRamSaving)))
    
        vms.usedRAM += estimatedVmRAM
    
        // Checks that enough available RAM would be left after the VM has started,
        // otherwise, refuses to run it in order to avoid RAM saturation.
        if availRAM - vms.usedRAM <= int(math.Round(float64(totalRAM)*(1.-conf.Limits.RamUsageLimit))) {
            vms.usedRAM -= estimatedVmRAM
            return errors.New(prefix+"insufficient free RAM")
        }
    
        // Function that executes the VM in QEMU using the specified spice port and password.
        runQemuFn := func(vm *VM, port int, pwd, pwdFile string, endofExecFn endOfExecCallback) error {
            certsDir := conf.CertsDir
            cmd, err := exec.NewQemuSystem(vm.qgaSock, vm.v.Cpus, vm.v.Ram, string(vm.v.Nic), vm.v.UsbDevs, filepath.Join(vm.dir, vmDiskFile), port, pwdFile, certsDir)
            if err != nil {
                log.Error(prefix+"filepath join error: "+err.Error())
                return err
            }
    
            if err := cmd.Start(); err != nil {
                log.Error(prefix+vm.v.ID.String()+": exec.Start error: "+err.Error())
                log.Error("Failed cmd: "+cmd.String())
                return err
            }
    
            vm.Run = runStates { State: vmc.StateRunning, Pid: cmd.Process.Pid, Port: port, Pwd: pwd }
            vm.DiskBusy = true
    
            // Go routine that executes cmd.Wait() which is a blocking call (hence the go routine).
            // From here on, there are 2 flows of execution!
            go func() {
                if err := cmd.Wait(); err != nil {
                    log.Error(prefix+vm.v.ID.String()+": exec.Wait error: "+err.Error())
                    log.Error("Failed cmd: "+cmd.String())
                }
                endofExecFn(vm)
            } ()
    
            return nil
        }
    
        // Function that deletes a VM's secret file.
        removeSecretFileFn := func(vm *VM) {
            pwdFile := filepath.Join(vm.dir, vmSecretFile)
            os.Remove(pwdFile)
        }
    
        // Function that starts a VM on the given port with the given password.
        // endofExecFn is called once the VM's execution is over.
        startFn := func(vm *VM, port int, pwd string, endofExecFn endOfExecCallback) error {
            // Writes the password in Base64 in a secret file inside the VM's directory.
            pwdBase64 := base64.StdEncoding.EncodeToString([]byte(pwd))
            content := []byte(pwdBase64)
            pwdFile := filepath.Join(vm.dir, vmSecretFile)
            if err := ioutil.WriteFile(pwdFile, content, 0600); err != nil {
                msg := prefix+"error creating secret file: "+err.Error()
                log.Error(msg)
                return errors.New(msg)
            }
    
            if err = runQemuFn(vm, port, pwd, pwdFile, endofExecFn); err != nil {
                removeSecretFileFn(vm)
                os.Remove(vm.qgaSock)  // If QEMU fails it's likely the Guest Agent file it created is still there.
                return errors.New(prefix+"error running QEMU: "+err.Error())
            }
    
            return nil
        }
    
        // Function that resets the VM's states, frees its port and deletes its secret file.
        // Called once the VM's execution is over.
        endofExecFn := func(vm *VM) {
            vms.rwlock.Lock()
            vms.usedPorts[port] = false
            vms.usedRAM -= estimatedVmRAM
            vms.rwlock.Unlock()
            vm.mutex.Lock()
            removeSecretFileFn(vm)
            // Resets VM states and disk busy flag.
            vm.Run = runStates { State: vmc.StateStopped, Pid: 0, Port: 0, Pwd: "" }
            vm.DiskBusy = false
            vm.mutex.Unlock()
        }
    
        vms.usedPorts[port] = true
    
        if err = startFn(vm, port, pwd, endofExecFn); err != nil {
            vms.usedPorts[port] = false
            return err
        }
    
        return nil
    }
    
    // Allocates and returns a free port randomly chosen within [VMSpiceMinPort,VMSpiceMaxPort].
    // REMARK: this function updates the vms map.
    // Concurrency: safe
    func (vms *VMs)allocateFreeRandomPort() int {
        vms.rwlock.Lock()
        defer vms.rwlock.Unlock()
    
        for {
            port := utils.Rand(conf.Core.VMSpiceMinPort, conf.Core.VMSpiceMaxPort)
            if !vms.usedPorts[port] {
                if utils.IsPortAvailable(port) {
                    vms.usedPorts[port] = true
                    return port
                }
            }
        }
    }
    
    // Starts a VM by its ID using randomly generated port number and password.
    // Returns the port on which the VM is running and the access password.
    // Concurrency: safe
    func (vms *VMs)StartVM(vmID uuid.UUID) (int, string, error) {
        port := vms.allocateFreeRandomPort()
    
        // Randomly generates a 8 characters long password with 4 digits, 0 symbols,
        // allowing upper and lower case letters, disallowing repeat characters.
        pwd, err := passwordGen.Generate(conf.VMPwdLength, conf.VMPwdDigitCount, conf.VMPwdSymbolCount, false, conf.VMPwdRepeatChars)
        if err != nil {
            msg := "Failed starting VM: password generation error: "+err.Error()
            log.Error(msg)
            return -1, "", errors.New(msg)
        }
    
        if err = vms.StartVMWithCreds(vmID, port, false, pwd); err != nil {
            return -1, "", err
        }
        return port, pwd, nil
    }
    
    // Kills a VM by its ID.
    // Concurrency: safe
    func (vms *VMs)KillVM(vmID uuid.UUID) error {
        vms.rwlock.RLock()
        defer vms.rwlock.RUnlock()
    
        vm, err := vms.getVMUnsafe(vmID)
        if err != nil {
            return err
        }
    
        vm.mutex.Lock()
        defer vm.mutex.Unlock()
    
        if !vm.IsRunning() {
            return errors.New("Failed killing VM: VM must be running")
        }
    
        // Sends a SIGINT signal to terminate the QEMU process.
        // Note that QEMU terminates with status code 0 in this case (i.e. no error).
        if err := syscall.Kill(vm.Run.Pid, syscall.SIGTERM); err != nil {
            msg := "Failed killing VM: "+err.Error()
            log.Error(msg)
            return errors.New(msg)
        }
    
        return nil
    }
    
    // Gracefully stops a VM by its ID.
    // Concurrency: safe
    func (vms *VMs)ShutdownVM(vmID uuid.UUID) error {
        vms.rwlock.Lock()
        defer vms.rwlock.Unlock()
    
        vm, err := vms.getVMUnsafe(vmID)
        if err != nil {
            return err
        }
    
        vm.mutex.Lock()
        defer vm.mutex.Unlock()
    
        if !vm.IsRunning() {
            return errors.New("Shutdown failed: VM must be running")
        }
    
        // Function that gracefully shutdowns a running VM.
        // Uses QGA commands to talk to the guest OS, which means QEMU Guest Agent must be
        // running in the guest, otherwise nothing will happen. Furthermore, the guest OS must
        // already be running and ready to accept QGA commands.
        shutdownFn := func(vm *VM) error {
            prefix := "Shutdown failed: "
    
            // Sends a QGA command to order the VM to shutdown.
            con := qga.New()
            if err := con.Open(vm.qgaSock); err != nil {
                log.Error(prefix+"(open): "+err.Error())
                return errors.New(prefix+"(open): "+err.Error())
            }
    
            if err := con.SendShutdown(); err != nil {
                con.Close()
                log.Error(prefix+"(send): "+err.Error())
                return errors.New(prefix+"(send): "+err.Error())
            }
    
            if err := con.Close(); err != nil {
                log.Error(prefix+"(close): "+err.Error())
                return errors.New(prefix+"(close): "+err.Error())
            }
    
            return nil
        }
    
        return shutdownFn(vm)
    }
    
    // Reboots a VM by its ID.
    // Concurrency: safe
    func (vms *VMs)RebootVM(vmID uuid.UUID) error {
        vms.rwlock.Lock()
        defer vms.rwlock.Unlock()
    
        vm, err := vms.getVMUnsafe(vmID)
        if err != nil {
            return err
        }
    
        vm.mutex.Lock()
        defer vm.mutex.Unlock()
    
        if !vm.IsRunning() {
            return errors.New("Reboot failed: VM must be running")
        }
    
        // Function that reboots a running VM.
        // Uses QGA commands to talk to the VM, which means QEMU Guest Agent must be
        // running in the VM, otherwise it won't work.
        rebootFn := func(vm *VM) error {
            prefix := "Reboot failed: "
    
            // Sends a QGA command to order the VM to shutdown.
            con := qga.New()
            if err := con.Open(vm.qgaSock); err != nil {
                log.Error(prefix+"(open): "+err.Error())
                return errors.New(prefix+"(open): "+err.Error())
            }
    
            if err := con.SendReboot(); err != nil {
                con.Close()
                log.Error(prefix+"(send): "+err.Error())
                return errors.New(prefix+"(send): "+err.Error())
            }
    
            if err := con.Close(); err != nil {
                log.Error(prefix+"(close): "+err.Error())
                return errors.New(prefix+"(close): "+err.Error())
            }
    
            return nil
        }
    
        return rebootFn(vm)
    }
    
    // Returns true if the given template is used by a VM.
    // Concurrency: safe
    func (vms *VMs)IsTemplateUsed(templateID string) bool {
        vms.rwlock.RLock()
        defer vms.rwlock.RUnlock()
    
        for _, vm := range vms.m {
            vm.mutex.Lock()
            if vm.v.TemplateID.String() == templateID {
                vm.mutex.Unlock()
                return true
            }
            vm.mutex.Unlock()
        }
        return false
    }
    
    // Edit a VM' specs: name, cpus, ram, nic
    // Concurrency: safe
    func (vms *VMs)EditVM(vmID uuid.UUID, p *params.VMEdit) error {
        vms.rwlock.Lock()
        defer vms.rwlock.Unlock()
    
        vm, err := vms.getVMUnsafe(vmID)
        if err != nil {
            return err
        }
    
        vm.mutex.Lock()
        defer vm.mutex.Unlock()
    
        // Only updates fields that have changed.
        oriVal := vm.v  // Saves original VM values.
    
        if p.Name != "" {
            vm.v.Name = p.Name
        }
        if p.CpusUpdated {
            vm.v.Cpus = p.Cpus
        }
        if p.RamUpdated {
            vm.v.Ram = p.Ram
        }
        if p.Nic != "" {
            vm.v.Nic = p.Nic
        }
        if p.UsbDevs != nil {
            vm.v.UsbDevs = p.UsbDevs
        }
    
        if err = vm.validate(); err != nil {
            // Restores original VM values.
            vm.v = oriVal
            return err
        }
    
        if err = vm.writeConfig(); err != nil {
            return err
        }
    
        return nil
    }
    
    // Set a VM's Access for a given user (email).
    // user is the currently logged user
    // destUserEmail is the email of the user for which to modify the access
    // Concurrency: safe
    func (vms *VMs)SetVMAccess(vmID uuid.UUID, user *users.User, destUserEmail string, newAccess caps.Capabilities) error {
        if err := caps.ValidateVMAccessCaps(newAccess); err != nil {
            return err
        }
    
        vms.rwlock.Lock()
        defer vms.rwlock.Unlock()
    
        // Retrieves the VM for which the access caps must be changed.
        vm, err := vms.getVMUnsafe(vmID)
        if err != nil {
            return err
        }
    
        vm.mutex.Lock()
        defer vm.mutex.Unlock()
    
        // If user has VM_SET_ACCESS_ANY, modify is allowed.
        if !user.HasCapability(caps.CAP_VM_SET_ACCESS_ANY) {
            // If user is the VM's owner, modify is allowed.
            if !vm.IsOwner(user.Email) {
                // If user has VM_SET_ACCESS and VM's VM access is present for the same user, modify is allowed.
                userCaps := vm.v.Access[user.Email]
                _, exists := userCaps[caps.CAP_VM_SET_ACCESS]
                if !exists {
                    return errors.New("Insufficient capability")
                }
            }
        }
    
        vm.v.Access[destUserEmail] = newAccess
    
        if err = vm.writeConfig(); err != nil {
            return err
        }
    
        return nil
    }
    
    // Remove a VM's Access for a given user (email).
    // user is the currently logged user
    // destUserEmail is the email of the user for which to modify the access
    // Concurrency: safe
    func (vms *VMs)DeleteVMAccess(vmID uuid.UUID, user *users.User, destUserEmail string) error {
        vms.rwlock.Lock()
        defer vms.rwlock.Unlock()
    
        // Retrieves the VM for which the access caps must be changed.
        vm, err := vms.getVMUnsafe(vmID)
        if err != nil {
            return err
        }
    
        vm.mutex.Lock()
        defer vm.mutex.Unlock()
    
        // If user has VM_SET_ACCESS_ANY, modify is allowed.
        if !user.HasCapability(caps.CAP_VM_SET_ACCESS_ANY) {
            // If user is the VM's owner, modify is allowed.
            if !vm.IsOwner(user.Email) {
                // If user has VM_SET_ACCESS and VM's VM access is present for the same user, modify is allowed.
                userCaps := vm.v.Access[user.Email]
                _, exists := userCaps[caps.CAP_VM_SET_ACCESS]
                if !exists {
                    return errors.New("Insufficient capability")
                }
            }
        }
    
        // Only removes the user from the Access map if it actually had an access.
        if _, exists := vm.v.Access[destUserEmail]; exists {
            delete(vm.v.Access, destUserEmail)
        } else {
            return errors.New("User "+destUserEmail+" has no VM access")
        }
    
        if err = vm.writeConfig(); err != nil {
            return err
        }
    
        return nil
    }
    
    // Exports a VM's directory and its subdirectories into a tar.gz archive on the host.
    // Technically, extracting files from a running VM should work, but some files might be inconsistent.
    // In consequence, we forbid this action on a running VM.
    // Concurrency: safe
    func (vms *VMs)ExportVMFiles(vm *VM, vmDir, tarGzFile string) error {
        prefix := "Failed exporting files from VM: "
    
        vm.mutex.Lock()
        defer vm.mutex.Unlock()
    
        if vm.IsRunning() {
            return errors.New(prefix+"VM must be stopped")
        }
    
        if vm.IsDiskBusy() {
            return errors.New(prefix+"disk in use (busy)")
        }
    
        vm.DiskBusy = true
        defer func(vm *VM) { vm.DiskBusy = false }(vm)
    
        vmDisk := vm.getDiskPath()
    
        if err := exec.CopyFromVM(vmDisk, vmDir, tarGzFile); err != nil {
            msg := prefix+err.Error()
            log.Error(msg)
            return errors.New(msg)
        }
    
        return nil
    }
    
    // Imports files from a tar.gz archive into a VM disk image (guest's filesystem), in a specified directory.
    // Concurrency: safe
    func (vms *VMs)ImportFilesToVM(vm *VM, tarGzFile, vmDir string) error {
        prefix := "Failed importing files into VM: "
    
        vm.mutex.Lock()
        defer vm.mutex.Unlock()
    
        if vm.IsRunning() {
            return errors.New(prefix+"VM must be stopped")
        }
    
        if vm.IsDiskBusy() {
            return errors.New(prefix+"disk in use (busy)")
        }
    
        vm.DiskBusy = true
        defer func(vm *VM) { vm.DiskBusy = false }(vm)
    
        vmDisk := vm.getDiskPath()
    
        if err := exec.CopyToVM(vmDisk, tarGzFile, vmDir); err != nil {
            msg := prefix+err.Error()
            log.Error(msg)
            return errors.New(msg)
        }
    
        return nil
    }