diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 0777d82567347b030c1b26ab4468b4a627651ae2..630e2629357dabf0398ba9ab69d34ed2f2c48cee 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,29 +1,34 @@ stages: + - steve_jobs - build_webservice - - test_maxima +# - test_maxima variables: GIT_SUBMODULE_STRATEGY: recursive REGISTRY: "172.30.190.249:5000" - DB: "pgsql" - POSTGRES_USER: "postgres" - POSTGRES_PASSWORD: "" - POSTGRES_HOST_AUTH_METHOD: "trust" - TRAVIS_BUILD_DIR: "$CI_PROJECT_DIR" # gitlab ci script taken from https://gist.github.com/danielneis/5c6140ec8150c6151a54bccd26950278 - -cache: - paths: - - $HOME/.compose/cache +steve_jobs: + stage: steve + image: golang + tags: + - docker + script: + - go get github.com/prometheus/client_golang/prometheus + - go get github.com/prometheus/client_golang/prometheus/promhttp + - cd src/web + - GOBIN=${CI_PROJECT_DIR}/bin go install + artifacts: + paths: + - bin/web + expire_in: 1 week build_webservice: image: "docker:latest" stage: build_webservice needs: - - project: martin.heide/goe_web - job: steve_jobs + - job: steve_jobs ref: master artifacts: true tags: @@ -31,15 +36,20 @@ build_webservice: script: - ./build.sh ${REGISTRY} -test_maxima: - image: "$REGISTRY/moodle-ci-stack" - stage: test_maxima - services: - - postgres:latest - variables: - MOODLE_BRANCH: "MOODLE_37_STABLE" - QSTACK_VERSION: "v4.3.2" - tags: - - docker - script: - - bash testimage.sh +#test_maxima: +# image: "$REGISTRY/moodle-ci-stack" +# stage: test_maxima +# services: +# - postgres:latest +# variables: +# MOODLE_BRANCH: "MOODLE_37_STABLE" +# QSTACK_VERSION: "v4.3.2" +# DB: "pgsql" +# POSTGRES_USER: "postgres" +# POSTGRES_PASSWORD: "" +# POSTGRES_HOST_AUTH_METHOD: "trust" +# TRAVIS_BUILD_DIR: "$CI_PROJECT_DIR" +# tags: +# - docker +# script: +# - bash testimage.sh diff --git a/doc/How_it_works.md b/doc/How_it_works.md new file mode 100644 index 0000000000000000000000000000000000000000..d3a3685a4f87c2051f903ceadf22eb5b3f66ca56 --- /dev/null +++ b/doc/How_it_works.md @@ -0,0 +1,36 @@ +How it works +============ +API +--- +The basic API stack uses for web access is simple: + +1. send a POST-request with `timeout`, `ploturlbase`, `version` and `input` +2. put the value of `ploturlbase` into a maxima variable +3. send `input` to the maxima process +4. if the maxima process is not finished after `timeout` milliseconds, return a 416 error +5. else return the output of `maxima` + +Implementation +-------------- +The web interface to the maxima processes is implemented in golang. + +At startup, the server process starts the maxima mother process. +The server process maintains a queue of some maxima processes derived from the mother process and ready to use. +Each maxima process gets assigned a user of the form `maxima-%d` which is a valid unix user (these are already there if you use the Docker image). +If a new request comes in, it takes a process from the queue and communicates with the process through some named pipes. +After it is finished (or the process timed out) all leftover process with the process' username are kill-9'd and temporary files/pipes deleted. +The user id is then again available for other processes. + +Process Lifecycle +----------------- +At first, the server process creates a temporary directory for the future process and creates 2 pipes, inpipe and outpipe for stdin and stdout of maxima. +The server then sends to the mother process the maxima user id and the path of the temporary directory. + +The mother process is in a loop which is called at server startup: +`(forking-loop)` is called, which was loaded through `/assets/maxima-fork.lisp` which uses sbcl's FFI to call the forking loop `fork_new_process()` in `/src/maxima_fork.c`. +When it receives a new id and path, it forks a new maxima process from the mother process and does some setup. + +Forking a process from an existing one is vastly more efficient (faster with a factor of around 10x) than execve'ing a new maxima process, and the memory consumption is reduced, too. +It does have the caveat that, at run time, there will be more page faults because of the copy-on-write mechanics, which slows the process down. + +After the child process responds that it is ready, the server process puts its description into a queue, where it is taken later for evaluating the request. diff --git a/src/web/web.go b/src/web/web.go new file mode 100644 index 0000000000000000000000000000000000000000..4c2408b964e1917b2445446c47efbee88e0b9e89 --- /dev/null +++ b/src/web/web.go @@ -0,0 +1,487 @@ +package main + +import ( + "archive/zip" + "bytes" + "bufio" + "fmt" + "path/filepath" + "io" + "io/ioutil" + "log" + "net/http" + "os" + "os/exec" + "os/user" + "strconv" + "time" + "strings" + "syscall" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +var tmp_prefix string +var debug uint + +type User struct { + Id uint + Name string + Uid int + Gid int +} + +type Metrics struct { + ResponseTime prometheus.Histogram + SpawnTime prometheus.Histogram + NumSuccess prometheus.Counter + NumIntError prometheus.Counter + NumTimeout prometheus.Counter + QueueLen prometheus.Gauge +} + +type MotherProcess struct { + Cmd *exec.Cmd + Input *io.PipeWriter + Output *io.PipeReader +} + +type ChildProcess struct { + User *User + Input *os.File + Output *bufio.Reader + Outfile *os.File + TempDir string +} + +func debugf(format string, a ...interface{}) { + if debug > 0 { + log.Printf(format, a...) + } +} + +func new_mother_proc(binpath string, lib string) (*MotherProcess, error) { + cmd := exec.Command(binpath) + cmd.Stderr = os.Stderr + + in_pipe_w, in_pipe_r := io.Pipe() + cmd.Stdin = in_pipe_w + + out_pipe_w, out_pipe_r := io.Pipe() + cmd.Stdout = out_pipe_r + + err := cmd.Start() + if err != nil { + return nil, fmt.Errorf("cannot start process: %s", err) + } + + if lib != "" { + // this should be fine since this string is given at startup + lib = strings.ReplaceAll(lib, "\\", "\\\\") + lib = strings.ReplaceAll(lib, "\"", "\\\"") + debugf("Debug: load(\"%s\")$\n", lib) + _, err = fmt.Fprintf(in_pipe_r, "load(\"%s\")$\n", lib) + if err != nil { + return nil, fmt.Errorf("cannot send command to process: %s", err) + } + debugf("Debug: Loaded file") + } + + _, err = fmt.Fprint(in_pipe_r, ":lisp (maxima-fork:forking-loop)\n") + if err != nil { + return nil, fmt.Errorf("cannot send command to process: %s", err) + } + debugf("Debug: Attempting to find out readiness of mother process") + i := 0 + for { + shortbuf := make([]byte,1) + n, err := out_pipe_w.Read(shortbuf) + if err != nil { + return nil, fmt.Errorf("cannot read readiness text from process: %s", err) + } + i += 1 + if debug >= 2 { + fmt.Fprintf(os.Stderr, "%c", shortbuf[0]) + } + if n != 1 { + return nil, fmt.Errorf("unexpected short read") + } + if shortbuf[0] == '\x02' { + break; + } + } + return &MotherProcess{ + Cmd: cmd, + Input: in_pipe_r, + Output: out_pipe_w, + }, nil +} + + +func (p *MotherProcess) spawn_new(user *User) (*ChildProcess, float64, error) { + start := time.Now() + tmp_dir, err := ioutil.TempDir(tmp_prefix, "maxima-plot-") + if err != nil { + return nil, 0.0, fmt.Errorf("unable to create temp dir: %s", err) + } + // right permission for folders + err = os.Chown(tmp_dir, user.Uid, user.Gid) + if err != nil { + return nil, 0.0, fmt.Errorf("not able to change tempdir permission: %s", err) + } + + // create named pipe for process stdout + pipe_name_out := filepath.Clean(filepath.Join(tmp_dir, "outpipe")) + + err = syscall.Mkfifo(pipe_name_out, 0600) + if err != nil { + return nil, 0.0, fmt.Errorf("could not create named pipe in temp folder: %s", err) + } + + // create named pipe for process stdin + pipe_name_in := filepath.Clean(filepath.Join(tmp_dir, "inpipe")) + + err = syscall.Mkfifo(pipe_name_in, 0600) + if err != nil { + return nil, 0.0, fmt.Errorf("could not create named pipe in temp folder: %s", err) + } + + _, err = fmt.Fprintf(p.Input, "%d%s\n", user.Id, tmp_dir) + if err != nil { + return nil, 0.0, fmt.Errorf("unable to communicate with process: %s", err) + } + + debugf("Debug: Opening pipes to child") + // note: open outpipe before inpipe to avoid deadlock + out, err := os.OpenFile(pipe_name_out, os.O_RDONLY, os.ModeNamedPipe) + if err != nil { + return nil, 0.0, fmt.Errorf("could not open temp dir outpipe: %s", err) + } + + in, err := os.OpenFile(pipe_name_in, os.O_WRONLY, os.ModeNamedPipe) + if err != nil { + return nil, 0.0, fmt.Errorf("could not open temp dir inpipe: %s", err) + } + + out.SetReadDeadline(time.Now().Add(10*time.Second)) + bufout := bufio.NewReader(out) + _, err = fmt.Fscanf(bufout, "\nT") + if err != nil { + return nil, 0.0, fmt.Errorf("not able to find end marker: %s", err) + } + total := time.Since(start) + debugf("Debug: child took %s for startup", total) + return &ChildProcess { + User: user, + Input: in, + Output: bufout, + Outfile: out, + TempDir: tmp_dir, + }, float64(total.Microseconds())/1000, nil +} + +func (p *ChildProcess) eval_command(command string, timeout uint64) (*bytes.Buffer, float64, error) { + start := time.Now() + in_err := make(chan error, 1) + go func() { + p.Input.SetWriteDeadline(time.Now().Add(time.Duration(timeout)*time.Millisecond)) + _, err := io.Copy(p.Input, strings.NewReader(command)) + p.Input.Close() + in_err<-err + }() + var outbuf bytes.Buffer + p.Outfile.SetReadDeadline(time.Now().Add(time.Duration(timeout)*time.Millisecond)) + _, err := io.Copy(&outbuf, p.Output) + p.Outfile.Close() + input_err := <-in_err + if input_err != nil { + return nil, 0.0, input_err + } + if err != nil { + return nil, 0.0, err + } + total := time.Since(start) + debugf("Debug: child took %s for evaluation", total) + return &outbuf, float64(total.Microseconds())/1000, nil +} + +func write_500(w http.ResponseWriter) { + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprint(w, "500 - internal server error\n") +} + +func process_cleanup(user *User, user_queue chan<- *User, tmp_dir string) { + defer os.RemoveAll(tmp_dir) + defer func() {user_queue <- user}() + + // TODO: replace with cgroups-v2 based freeze solution once docker support for cgroups2 lands + procs, err := ioutil.ReadDir("/proc") + if err != nil { + return + } + for _, dir := range procs { + pid, err := strconv.Atoi(dir.Name()) + if err != nil { + continue + } + stat, ok := dir.Sys().(*syscall.Stat_t) + if !ok { + continue + } + if int(stat.Uid) != user.Uid { + continue + } + syscall.Kill(pid, syscall.SIGKILL) + } + debugf("Debug: Process %d cleand up", user.Id) +} + +func handler(w http.ResponseWriter, r *http.Request, queue <-chan *ChildProcess, user_queue chan<- *User, metrics *Metrics) { + if r.Method == "GET" && r.FormValue("input") == "" { + hostname, _ := os.Hostname() + fmt.Fprintf(w, "Hostname: %s, version: 0.1.1\n", hostname) + return + } + // the maxima input to be evaluated + input := r.FormValue("input") + // template value for ploturl + ploturl := r.FormValue("ploturlbase") + timeout, err := strconv.ParseUint(r.FormValue("timeout"), 10, 64) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + fmt.Fprint(w, "400 - bad request (invalid timeout)\n") + log.Printf("Warn: Invalid timeout: %s", err) + return + } + + if timeout > 30000 { + log.Printf("Warn: timeout %d was out of range range, reduced to 30000", timeout) + timeout = 30000 + } + // put the temporary directories into maxima variables + var real_input string + if ploturl != "!ploturl!" { + real_input = fmt.Sprintf("URL_BASE: \"%s\"$\n%s", ploturl, input) + } else { + real_input = input + } + proc := <-queue + metrics.QueueLen.Dec() + user := proc.User + debugf("Debug: input (%d): %s", user.Id, input) + defer process_cleanup(user, user_queue, proc.TempDir) + proc_out := make(chan struct {buf *bytes.Buffer; time float64; err error}, 1) + go func() { + out, tim, err := proc.eval_command(real_input, timeout) + // grrr why doesn't go support tuples first-class!? + proc_out <- struct {buf *bytes.Buffer; time float64; err error}{out, tim, err} + }() + select { + case outstr := <-proc_out: + outbuf := outstr.buf + tim := outstr.time + err := outstr.err + if err != nil { + write_500(w) + metrics.NumIntError.Inc() + log.Printf("Error: Communicating with maxima failed: %s", err) + return + } + output_dir := filepath.Clean(filepath.Join(proc.TempDir, "output")) + "/" + // if there are any files inside the output dir, we give back a zip file containing all output + // files and the command output inside a file named OUTPUT + plots_output, err := ioutil.ReadDir(output_dir) + if err != nil { + // just return text if directory could not be read and assume no plots were generated + w.Header().Set("Content-Type", "text/plain;charset=UTF-8") + outbuf.WriteTo(w) + log.Printf("Warn: could not read temp directory of maxima process: %s", err) + metrics.ResponseTime.Observe(tim) + metrics.NumSuccess.Inc() + } + // if there are no files produced, just give back the output directly + if len(plots_output) == 0 { + debugf("Debug: output (%d) is text, len %d", user.Id, outbuf.Len()) + w.Header().Set("Content-Type", "text/plain;charset=UTF-8") + outbuf.WriteTo(w) + } else { + debugf("Debug: output (%d) is zip, OUTPUT len %d", user.Id, outbuf.Len()) + w.Header().Set("Content-Type", "application/zip;charset=UTF-8") + zipfile := zip.NewWriter(w) + out, err := zipfile.Create("OUTPUT") + if err != nil { + write_500(w) + log.Printf("Error: Could not add OUTPUT to zip archive: %s", err) + return + } + outbuf.WriteTo(out) + // loop over all plots in the output directory and put them into the zip + // that is to be returned + for _, file := range plots_output { + ffilein, err := os.Open(filepath.Join(output_dir, file.Name())) + defer ffilein.Close() + if err != nil { + write_500(w) + log.Printf("Error: could not open plot %s: %s", file.Name(), err) + return + } + fzipput, err := zipfile.Create("/" + file.Name()) + if err != nil { + write_500(w) + log.Printf("Error: could not add file %s to zip archive: %s", file.Name(), err) + return + } + io.Copy(fzipput, ffilein) + } + zipfile.Close() + } + metrics.ResponseTime.Observe(tim) + metrics.NumSuccess.Inc() + case <-time.After(time.Duration(timeout) * time.Millisecond): + w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) + fmt.Fprint(w, "416 - timeout\n") + log.Printf("Warn: Process %s had timeout", user.Name) + metrics.ResponseTime.Observe(float64(timeout)) + metrics.NumTimeout.Inc() + } +} + +func generate_maximas(binpath string, lib string, queue chan<- *ChildProcess, user_queue <-chan *User, metrics *Metrics) { + mother_proc := make(chan *MotherProcess, 0) + go func () { + mother, err := new_mother_proc(binpath, lib) + if err != nil { + log.Fatalf("Fatal: Could not start mother process: %s", err) + } + mother_proc <- mother + }() + var mother *MotherProcess + select { + case mom := <- mother_proc: + mother = mom + case <- time.After(10*time.Second): + log.Fatal("Fatal: Could not start the mother process, timed out") + } + fails := 0 + for { + user := <-user_queue + new_proc, time, err := mother.spawn_new(user) + if err != nil { + fails += 1 + log.Printf("Error: Could not spawn child process - fail %d, %s", fails, err) + if fails == 3 { + log.Fatal("Fatal: Failed to spawn child process 3 times in a row, giving up") + } + } else { + fails = 0 + } + debugf("Debug: Spawning process with id %d", user.Id) + metrics.QueueLen.Inc() + metrics.SpawnTime.Observe(time) + queue <- new_proc + debugf("Debug: Spawned process with id %d", user.Id) + } +} + +func get_env_number_positive(varname string, def uint) (uint, error) { + value, exists := os.LookupEnv(varname) + if !exists { + return def, nil + } + number, err := strconv.ParseUint(value, 32, 10) + if err != nil { + return 0, err + } + return uint(number), nil +} + +func main() { + metrics := Metrics { + ResponseTime: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "maxima_response_time", + Help: "Response time of maxima processes", + Buckets:prometheus.ExponentialBuckets(1.19, 1.5, 25)}), + SpawnTime: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "maxima_spawn_time", + Help: "Spawn time of maxima child", + Buckets:prometheus.LinearBuckets(1.0, 1.0, 20)}), + NumSuccess: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "maxima_success_response", + Help: "Count of successful responses"}), + NumIntError: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "maxima_500_response", + Help: "Count of 500 responses"}), + NumTimeout: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "maxima_timeout_response", + Help: "Count of timeouts"}), + QueueLen: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "maxima_queue_len", + Help: "Number of maxima processes waiting in queue"})} + metrics.QueueLen.Set(0) + prometheus.MustRegister(metrics.ResponseTime) + prometheus.MustRegister(metrics.SpawnTime) + prometheus.MustRegister(metrics.NumSuccess) + prometheus.MustRegister(metrics.NumIntError) + prometheus.MustRegister(metrics.NumTimeout) + prometheus.MustRegister(metrics.QueueLen) + + if len(os.Args) != 2 { + log.Fatal("Fatal: wrong cli-argument usage: web [path to maxima executable]") + } + user_number, err := get_env_number_positive("GOEMAXIMA_NUSER", 16) + if err != nil { + log.Fatal("Fatal: GOEMAXIMA_NUSER contains invalid number"); + } + queue_len, err := get_env_number_positive("GOEMAXIMA_QUEUE_LEN", 3) + if err != nil { + log.Fatal("Fatal: GOEMAXIMA_QUEUE_LEN contains invalid number"); + } + debug, err = get_env_number_positive("GOEMAXIMA_DEBUG", 0) + if err != nil { + log.Fatal("Fatal: GOEMAXIMA_DEBUG contains invalid number"); + } + tmp_prefix = os.Getenv("GOEMAXIMA_TEMP_DIR") + if tmp_prefix == "" { + tmp_prefix = "/tmp/maxima" + } else if !strings.HasPrefix(tmp_prefix, "/") { + log.Fatal("Fatal: GOEMAXIMA_TEMP_DIR must be an absolute path"); + } + err = os.MkdirAll(tmp_prefix, 0711) + if err != nil { + log.Fatalf("Fatal: Cannot create %s: %s", tmp_prefix, err) + } + + queue := make(chan *ChildProcess, queue_len) + user_queue := make(chan *User, user_number) + for i := (uint)(1); i <= user_number; i++ { + user_name := fmt.Sprintf("maxima-%d", i) + user, err := user.Lookup(user_name) + if err != nil { + log.Fatalf("Fatal: Could not look up user with id %d: %s", i, err) + } + uid, err := strconv.Atoi(user.Uid) + if err != nil { + log.Fatalf("Fatal: Cannot parse uid %s as number: %s", user.Uid, err) + } + gid, err := strconv.Atoi(user.Gid) + if err != nil { + log.Fatalf("Fatal: Cannot parse uid %s as number: %s", user.Gid, err) + } + user_queue <- &User { + Id: i, + Name: user_name, + Uid: uid, + Gid: gid, + } + } + go generate_maximas(os.Args[1], os.Getenv("GOEMAXIMA_LIB_PATH"), queue, user_queue, &metrics) + http.Handle("/metrics", promhttp.Handler()) + http.HandleFunc("/maxima/", + func (w http.ResponseWriter, r *http.Request) { + handler(w, r, queue, user_queue, &metrics) + }) + log.Print("Info: goe handler started") + err = http.ListenAndServe(":8080", nil) + log.Printf("Fatal: http handler closed unexpectedly, %s", err) + return +}