Select Git revision
vms.go 19.84 KiB
package vms
import (
"os"
"sort"
"sync"
"math"
"errors"
"syscall"
"io/ioutil"
"path/filepath"
"encoding/base64"
vmc "nexus-common/vm"
"nexus-server/qga"
"nexus-common/caps"
"nexus-common/params"
"nexus-server/exec"
"nexus-server/utils"
"nexus-server/users"
"nexus-server/logger"
"nexus-server/config"
"github.com/google/uuid"
)
type (
VMKeeperFn func(vm *VM) bool
VMs struct {
m map[string]*VM
dir string // Base directory where VMs are stored
rwlock *sync.RWMutex // RWlock to ensure the VMs' map (m) coherency
usedPorts [65536]bool // Ports used by VMs
usedRAM int // RAM used by running VMs (in MB)
}
)
var log = logger.GetInstance()
var conf = config.GetInstance()
var vms *VMs
// Returns a VMs singleton to access this module's public functions.
// IMPORTANT: the InitVMs function must have been previously called!
// Concurrency: safe
func GetVMsInstance() *VMs {
return vms
}
// Creates all VMs from their files on disk.
// REMARK: path is the root directory where VMs reside.
// Concurrency: unsafe
func InitVMs() error {
vmsDir := conf.VMsDir
vms = &VMs { m: make(map[string]*VM), dir: vmsDir, rwlock: new(sync.RWMutex), usedRAM: 0 }
vms.usedPorts[conf.Core.APIDefaultPort] = true
errMsg := "Failed reading VMs directory: "
dirs1, err := utils.GetSubDirs(vmsDir)
if err != nil {
return errors.New(errMsg+err.Error())
}
for d1 := range(dirs1) {
dirs2, err := utils.GetSubDirs(dirs1[d1])
if err != nil {
return errors.New(errMsg+err.Error())
}
for d2 := range(dirs2) {
dirs3, err := utils.GetSubDirs(dirs2[d2])
if err != nil {
return errors.New(errMsg+err.Error())
}
for i := range(dirs3) {
vmDir := dirs3[i]
filename := filepath.Join(vmDir, vmConfFile)
vm, err := newVMFromFile(filename)
if err != nil {
log.Warn("Skipping VM: failed reading \""+filename+"\" "+err.Error())
continue
}
filename = filepath.Join(vmDir, vmDiskFile)
f, err := os.OpenFile(filename, os.O_RDONLY, 0)
if err != nil {
log.Warn("Skipping VM: failed reading config file:"+err.Error())
continue
}
f.Close()
vms.m[vm.v.ID.String()] = vm
}
}
}
return nil
}
// Returns the list of serialized VMs for which VMKeeperFn is true.
// Concurrency: safe
func (vms *VMs)GetNetworkSerializedVMs(keepFn VMKeeperFn) []vmc.VMNetworkSerialized {
vms.rwlock.RLock()
list := []vmc.VMNetworkSerialized{}
for _, vm := range vms.m {
vm.mutex.Lock()
if keepFn(vm) {
list = append(list, vm.SerializeToNetwork())
}
vm.mutex.Unlock()
}
vms.rwlock.RUnlock()
// Sort VMs by names
sort.Slice(list, func(i, j int) bool {
return list[i].Name < list[j].Name
})
return list
}
// Returns a VM by its ID. Concurrency-safe version.
// Concurrency: safe
func (vms *VMs)GetVM(vmID uuid.UUID) (*VM, error) {
vms.rwlock.RLock()
defer vms.rwlock.RUnlock()
return vms.getVMUnsafe(vmID)
}
// Returns a VM by its ID. Concurrency-unsafe version.
// Concurrency: unsafe
func (vms *VMs)getVMUnsafe(vmID uuid.UUID) (*VM, error) {
vm, exists := vms.m[vmID.String()]
if !exists {
return nil, errors.New("VM not found")
}
return vm, nil
}
// Deletes a VM by its ID and deletes its associated files.
// Concurrency: safe
func (vms *VMs)DeleteVM(vmID uuid.UUID) error {
vms.rwlock.Lock()
defer vms.rwlock.Unlock()
vm, err := vms.getVMUnsafe(vmID)
if err != nil {
return err
}
vm.mutex.Lock()
defer vm.mutex.Unlock()
if vm.IsRunning() {
return errors.New("Failed deleting VM: VM must be stopped")
}
if vm.IsDiskBusy() {
return errors.New("Failed deleting VM: disk in use (busy)")
}
// Deletes the VM's files (and directories).
if err := vm.delete(); err != nil {
return err
}
// Removes the VM from the map.
delete(vms.m, vmID.String())
return nil
}
// Adds a new VM and writes its associated files.
// Concurrency: safe
func (vms *VMs)AddVM(vm *VM) error {
vm.mutex.Lock()
defer vm.mutex.Unlock()
// First, writes VM files since it can fail.
err := vm.writeFiles()
if err != nil {
return err
}
// Adds VM to the map of VMs.
vms.rwlock.Lock()
key := vm.v.ID.String()
vms.m[key] = vm
vms.rwlock.Unlock()
return nil
}
// Starts a VM by its ID, using the specified port and password.
// If checkPort is true, a check is performed on the specified port and if it is already
// in use, the function fails and returns a corresponding error.
// Concurrency: safe
func (vms *VMs)StartVMWithCreds(vmID uuid.UUID, port int, checkPort bool, pwd string) error {
prefix := "Failed starting VM: "
vms.rwlock.Lock()
defer vms.rwlock.Unlock()
vm, err := vms.getVMUnsafe(vmID)
if err != nil {
return err
}
vm.mutex.Lock()
defer vm.mutex.Unlock()
if vm.IsRunning() {
return errors.New(prefix+"already running")
}
if vm.IsDiskBusy() {
return errors.New(prefix+"disk in use (busy)")
}
if checkPort {
if vms.usedPorts[port] {
return errors.New(prefix+"port already in use")
} else if !utils.IsPortAvailable(port) {
return errors.New(prefix+"port not available")
}
}
totalRAM, availRAM, err := utils.GetRAM()
if err != nil {
return errors.New(prefix+"failed obtaining memory info: "+err.Error())
}
// We estimate ~30% of RAM saving thanks to KSM (due to page sharing across VMs).
estimatedVmRAM := int(math.Round(float64(vm.v.Ram)*(1.-conf.Limits.KsmRamSaving)))
vms.usedRAM += estimatedVmRAM
// Checks that enough available RAM would be left after the VM has started,
// otherwise, refuses to run it in order to avoid RAM saturation.
if availRAM - vms.usedRAM <= int(math.Round(float64(totalRAM)*(1.-conf.Limits.RamUsageLimit))) {
vms.usedRAM -= estimatedVmRAM
return errors.New(prefix+"insufficient free RAM")
}
// Function that executes the VM in QEMU using the specified spice port and password.
runQemuFn := func(vm *VM, port int, pwd, pwdFile string, endofExecFn endOfExecCallback) error {
certsDir := conf.CertsDir
cmd, err := exec.NewQemuSystem(vm.qgaSock, vm.v.Cpus, vm.v.Ram, string(vm.v.Nic), vm.v.UsbDevs, filepath.Join(vm.dir, vmDiskFile), port, pwdFile, certsDir)
if err != nil {
log.Error(prefix+"filepath join error: "+err.Error())
return err
}
if err := cmd.Start(); err != nil {
log.Error(prefix+vm.v.ID.String()+": exec.Start error: "+err.Error())
log.Error("Failed cmd: "+cmd.String())
return err
}
vm.Run = runStates { State: vmc.StateRunning, Pid: cmd.Process.Pid, Port: port, Pwd: pwd }
vm.DiskBusy = true
// Go routine that executes cmd.Wait() which is a blocking call (hence the go routine).
// From here on, there are 2 flows of execution!
go func() {
if err := cmd.Wait(); err != nil {
log.Error(prefix+vm.v.ID.String()+": exec.Wait error: "+err.Error())
log.Error("Failed cmd: "+cmd.String())
}
endofExecFn(vm)
} ()
return nil
}
// Function that deletes a VM's secret file.
removeSecretFileFn := func(vm *VM) {
pwdFile := filepath.Join(vm.dir, vmSecretFile)
os.Remove(pwdFile)
}
// Function that starts a VM on the given port with the given password.
// endofExecFn is called once the VM's execution is over.
startFn := func(vm *VM, port int, pwd string, endofExecFn endOfExecCallback) error {
// Writes the password in Base64 in a secret file inside the VM's directory.
pwdBase64 := base64.StdEncoding.EncodeToString([]byte(pwd))
content := []byte(pwdBase64)
pwdFile := filepath.Join(vm.dir, vmSecretFile)
if err := ioutil.WriteFile(pwdFile, content, 0600); err != nil {
msg := prefix+"error creating secret file: "+err.Error()
log.Error(msg)
return errors.New(msg)
}
if err = runQemuFn(vm, port, pwd, pwdFile, endofExecFn); err != nil {
removeSecretFileFn(vm)
os.Remove(vm.qgaSock) // If QEMU fails it's likely the Guest Agent file it created is still there.
return errors.New(prefix+"error running QEMU: "+err.Error())
}
return nil
}
// Function that resets the VM's states, frees its port and deletes its secret file.
// Called once the VM's execution is over.
endofExecFn := func(vm *VM) {
vms.rwlock.Lock()
vms.usedPorts[port] = false
vms.usedRAM -= estimatedVmRAM
vms.rwlock.Unlock()
vm.mutex.Lock()
removeSecretFileFn(vm)
// Resets VM states and disk busy flag.
vm.Run = runStates { State: vmc.StateStopped, Pid: 0, Port: 0, Pwd: "" }
vm.DiskBusy = false
vm.mutex.Unlock()
}
vms.usedPorts[port] = true
if err = startFn(vm, port, pwd, endofExecFn); err != nil {
vms.usedPorts[port] = false
return err
}
return nil
}
// Allocates and returns a free port randomly chosen within [VMSpiceMinPort,VMSpiceMaxPort].
// REMARK: this function updates the vms map.
// Concurrency: safe
func (vms *VMs)allocateFreeRandomPort() int {
vms.rwlock.Lock()
defer vms.rwlock.Unlock()
for {
port := utils.Rand(conf.Core.VMSpiceMinPort, conf.Core.VMSpiceMaxPort)
if !vms.usedPorts[port] {
if utils.IsPortAvailable(port) {
vms.usedPorts[port] = true
return port
}
}
}
}
// Starts a VM by its ID using randomly generated port number and password.
// Returns the port on which the VM is running and the access password.
// Concurrency: safe
func (vms *VMs)StartVM(vmID uuid.UUID) (int, string, error) {
port := vms.allocateFreeRandomPort()
// Randomly generates a 8 characters long password with 4 digits, 0 symbols,
// allowing upper and lower case letters, disallowing repeat characters.
pwd, err := passwordGen.Generate(conf.VMPwdLength, conf.VMPwdDigitCount, conf.VMPwdSymbolCount, false, conf.VMPwdRepeatChars)
if err != nil {
msg := "Failed starting VM: password generation error: "+err.Error()
log.Error(msg)
return -1, "", errors.New(msg)
}
if err = vms.StartVMWithCreds(vmID, port, false, pwd); err != nil {
return -1, "", err
}
return port, pwd, nil
}
// Kills a VM by its ID.
// Concurrency: safe
func (vms *VMs)KillVM(vmID uuid.UUID) error {
vms.rwlock.RLock()
defer vms.rwlock.RUnlock()
vm, err := vms.getVMUnsafe(vmID)
if err != nil {
return err
}
vm.mutex.Lock()
defer vm.mutex.Unlock()
if !vm.IsRunning() {
return errors.New("Failed killing VM: VM must be running")
}
// Sends a SIGINT signal to terminate the QEMU process.
// Note that QEMU terminates with status code 0 in this case (i.e. no error).
if err := syscall.Kill(vm.Run.Pid, syscall.SIGTERM); err != nil {
msg := "Failed killing VM: "+err.Error()
log.Error(msg)
return errors.New(msg)
}
return nil
}
// Gracefully stops a VM by its ID.
// Concurrency: safe
func (vms *VMs)ShutdownVM(vmID uuid.UUID) error {
vms.rwlock.Lock()
defer vms.rwlock.Unlock()
vm, err := vms.getVMUnsafe(vmID)
if err != nil {
return err
}
vm.mutex.Lock()
defer vm.mutex.Unlock()
if !vm.IsRunning() {
return errors.New("Shutdown failed: VM must be running")
}
// Function that gracefully shutdowns a running VM.
// Uses QGA commands to talk to the guest OS, which means QEMU Guest Agent must be
// running in the guest, otherwise nothing will happen. Furthermore, the guest OS must
// already be running and ready to accept QGA commands.
shutdownFn := func(vm *VM) error {
prefix := "Shutdown failed: "
// Sends a QGA command to order the VM to shutdown.
con := qga.New()
if err := con.Open(vm.qgaSock); err != nil {
log.Error(prefix+"(open): "+err.Error())
return errors.New(prefix+"(open): "+err.Error())
}
if err := con.SendShutdown(); err != nil {
con.Close()
log.Error(prefix+"(send): "+err.Error())
return errors.New(prefix+"(send): "+err.Error())
}
if err := con.Close(); err != nil {
log.Error(prefix+"(close): "+err.Error())
return errors.New(prefix+"(close): "+err.Error())
}
return nil
}
return shutdownFn(vm)
}
// Reboots a VM by its ID.
// Concurrency: safe
func (vms *VMs)RebootVM(vmID uuid.UUID) error {
vms.rwlock.Lock()
defer vms.rwlock.Unlock()
vm, err := vms.getVMUnsafe(vmID)
if err != nil {
return err
}
vm.mutex.Lock()
defer vm.mutex.Unlock()
if !vm.IsRunning() {
return errors.New("Reboot failed: VM must be running")
}
// Function that reboots a running VM.
// Uses QGA commands to talk to the VM, which means QEMU Guest Agent must be
// running in the VM, otherwise it won't work.
rebootFn := func(vm *VM) error {
prefix := "Reboot failed: "
// Sends a QGA command to order the VM to shutdown.
con := qga.New()
if err := con.Open(vm.qgaSock); err != nil {
log.Error(prefix+"(open): "+err.Error())
return errors.New(prefix+"(open): "+err.Error())
}
if err := con.SendReboot(); err != nil {
con.Close()
log.Error(prefix+"(send): "+err.Error())
return errors.New(prefix+"(send): "+err.Error())
}
if err := con.Close(); err != nil {
log.Error(prefix+"(close): "+err.Error())
return errors.New(prefix+"(close): "+err.Error())
}
return nil
}
return rebootFn(vm)
}
// Returns true if the given template is used by a VM.
// Concurrency: safe
func (vms *VMs)IsTemplateUsed(templateID string) bool {
vms.rwlock.RLock()
defer vms.rwlock.RUnlock()
for _, vm := range vms.m {
vm.mutex.Lock()
if vm.v.TemplateID.String() == templateID {
vm.mutex.Unlock()
return true
}
vm.mutex.Unlock()
}
return false
}
// Edit a VM' specs: name, cpus, ram, nic
// Concurrency: safe
func (vms *VMs)EditVM(vmID uuid.UUID, p *params.VMEdit) error {
vms.rwlock.Lock()
defer vms.rwlock.Unlock()
vm, err := vms.getVMUnsafe(vmID)
if err != nil {
return err
}
vm.mutex.Lock()
defer vm.mutex.Unlock()
// Only updates fields that have changed.
oriVal := vm.v // Saves original VM values.
if p.Name != "" {
vm.v.Name = p.Name
}
if p.CpusUpdated {
vm.v.Cpus = p.Cpus
}
if p.RamUpdated {
vm.v.Ram = p.Ram
}
if p.Nic != "" {
vm.v.Nic = p.Nic
}
if p.UsbDevs != nil {
vm.v.UsbDevs = p.UsbDevs
}
if err = vm.validate(); err != nil {
// Restores original VM values.
vm.v = oriVal
return err
}
if err = vm.writeConfig(); err != nil {
return err
}
return nil
}
// Set a VM's Access for a given user (email).
// user is the currently logged user
// destUserEmail is the email of the user for which to modify the access
// Concurrency: safe
func (vms *VMs)SetVMAccess(vmID uuid.UUID, user *users.User, destUserEmail string, newAccess caps.Capabilities) error {
if err := caps.ValidateVMAccessCaps(newAccess); err != nil {
return err
}
vms.rwlock.Lock()
defer vms.rwlock.Unlock()
// Retrieves the VM for which the access caps must be changed.
vm, err := vms.getVMUnsafe(vmID)
if err != nil {
return err
}
vm.mutex.Lock()
defer vm.mutex.Unlock()
// If user has VM_SET_ACCESS_ANY, modify is allowed.
if !user.HasCapability(caps.CAP_VM_SET_ACCESS_ANY) {
// If user is the VM's owner, modify is allowed.
if !vm.IsOwner(user.Email) {
// If user has VM_SET_ACCESS and VM's VM access is present for the same user, modify is allowed.
userCaps := vm.v.Access[user.Email]
_, exists := userCaps[caps.CAP_VM_SET_ACCESS]
if !exists {
return errors.New("Insufficient capability")
}
}
}
vm.v.Access[destUserEmail] = newAccess
if err = vm.writeConfig(); err != nil {
return err
}
return nil
}
// Remove a VM's Access for a given user (email).
// user is the currently logged user
// destUserEmail is the email of the user for which to modify the access
// Concurrency: safe
func (vms *VMs)DeleteVMAccess(vmID uuid.UUID, user *users.User, destUserEmail string) error {
vms.rwlock.Lock()
defer vms.rwlock.Unlock()
// Retrieves the VM for which the access caps must be changed.
vm, err := vms.getVMUnsafe(vmID)
if err != nil {
return err
}
vm.mutex.Lock()
defer vm.mutex.Unlock()
// If user has VM_SET_ACCESS_ANY, modify is allowed.
if !user.HasCapability(caps.CAP_VM_SET_ACCESS_ANY) {
// If user is the VM's owner, modify is allowed.
if !vm.IsOwner(user.Email) {
// If user has VM_SET_ACCESS and VM's VM access is present for the same user, modify is allowed.
userCaps := vm.v.Access[user.Email]
_, exists := userCaps[caps.CAP_VM_SET_ACCESS]
if !exists {
return errors.New("Insufficient capability")
}
}
}
// Only removes the user from the Access map if it actually had an access.
if _, exists := vm.v.Access[destUserEmail]; exists {
delete(vm.v.Access, destUserEmail)
} else {
return errors.New("User "+destUserEmail+" has no VM access")
}
if err = vm.writeConfig(); err != nil {
return err
}
return nil
}
// Exports a VM's directory and its subdirectories into a tar.gz archive on the host.
// Technically, extracting files from a running VM should work, but some files might be inconsistent.
// In consequence, we forbid this action on a running VM.
// Concurrency: safe
func (vms *VMs)ExportVMFiles(vm *VM, vmDir, tarGzFile string) error {
prefix := "Failed exporting files from VM: "
vm.mutex.Lock()
defer vm.mutex.Unlock()
if vm.IsRunning() {
return errors.New(prefix+"VM must be stopped")
}
if vm.IsDiskBusy() {
return errors.New(prefix+"disk in use (busy)")
}
vm.DiskBusy = true
defer func(vm *VM) { vm.DiskBusy = false }(vm)
vmDisk := vm.getDiskPath()
if err := exec.CopyFromVM(vmDisk, vmDir, tarGzFile); err != nil {
msg := prefix+err.Error()
log.Error(msg)
return errors.New(msg)
}
return nil
}
// Imports files from a tar.gz archive into a VM disk image (guest's filesystem), in a specified directory.
// Concurrency: safe
func (vms *VMs)ImportFilesToVM(vm *VM, tarGzFile, vmDir string) error {
prefix := "Failed importing files into VM: "
vm.mutex.Lock()
defer vm.mutex.Unlock()
if vm.IsRunning() {
return errors.New(prefix+"VM must be stopped")
}
if vm.IsDiskBusy() {
return errors.New(prefix+"disk in use (busy)")
}
vm.DiskBusy = true
defer func(vm *VM) { vm.DiskBusy = false }(vm)
vmDisk := vm.getDiskPath()
if err := exec.CopyToVM(vmDisk, tarGzFile, vmDir); err != nil {
msg := prefix+err.Error()
log.Error(msg)
return errors.New(msg)
}
return nil
}