Skip to content
Snippets Groups Projects
Commit 4feea4cb authored by Lennart Kramer's avatar Lennart Kramer
Browse files

add doc + pull in web.go into repo

parent 6ff0e14d
No related branches found
No related tags found
No related merge requests found
stages:
- steve_jobs
- build_webservice
- test_maxima
# - test_maxima
variables:
GIT_SUBMODULE_STRATEGY: recursive
REGISTRY: "172.30.190.249:5000"
DB: "pgsql"
POSTGRES_USER: "postgres"
POSTGRES_PASSWORD: ""
POSTGRES_HOST_AUTH_METHOD: "trust"
TRAVIS_BUILD_DIR: "$CI_PROJECT_DIR"
# gitlab ci script taken from https://gist.github.com/danielneis/5c6140ec8150c6151a54bccd26950278
cache:
steve_jobs:
stage: steve
image: golang
tags:
- docker
script:
- go get github.com/prometheus/client_golang/prometheus
- go get github.com/prometheus/client_golang/prometheus/promhttp
- cd src/web
- GOBIN=${CI_PROJECT_DIR}/bin go install
artifacts:
paths:
- $HOME/.compose/cache
- bin/web
expire_in: 1 week
build_webservice:
image: "docker:latest"
stage: build_webservice
needs:
- project: martin.heide/goe_web
job: steve_jobs
- job: steve_jobs
ref: master
artifacts: true
tags:
......@@ -31,15 +36,20 @@ build_webservice:
script:
- ./build.sh ${REGISTRY}
test_maxima:
image: "$REGISTRY/moodle-ci-stack"
stage: test_maxima
services:
- postgres:latest
variables:
MOODLE_BRANCH: "MOODLE_37_STABLE"
QSTACK_VERSION: "v4.3.2"
tags:
- docker
script:
- bash testimage.sh
#test_maxima:
# image: "$REGISTRY/moodle-ci-stack"
# stage: test_maxima
# services:
# - postgres:latest
# variables:
# MOODLE_BRANCH: "MOODLE_37_STABLE"
# QSTACK_VERSION: "v4.3.2"
# DB: "pgsql"
# POSTGRES_USER: "postgres"
# POSTGRES_PASSWORD: ""
# POSTGRES_HOST_AUTH_METHOD: "trust"
# TRAVIS_BUILD_DIR: "$CI_PROJECT_DIR"
# tags:
# - docker
# script:
# - bash testimage.sh
How it works
============
API
---
The basic API stack uses for web access is simple:
1. send a POST-request with `timeout`, `ploturlbase`, `version` and `input`
2. put the value of `ploturlbase` into a maxima variable
3. send `input` to the maxima process
4. if the maxima process is not finished after `timeout` milliseconds, return a 416 error
5. else return the output of `maxima`
Implementation
--------------
The web interface to the maxima processes is implemented in golang.
At startup, the server process starts the maxima mother process.
The server process maintains a queue of some maxima processes derived from the mother process and ready to use.
Each maxima process gets assigned a user of the form `maxima-%d` which is a valid unix user (these are already there if you use the Docker image).
If a new request comes in, it takes a process from the queue and communicates with the process through some named pipes.
After it is finished (or the process timed out) all leftover process with the process' username are kill-9'd and temporary files/pipes deleted.
The user id is then again available for other processes.
Process Lifecycle
-----------------
At first, the server process creates a temporary directory for the future process and creates 2 pipes, inpipe and outpipe for stdin and stdout of maxima.
The server then sends to the mother process the maxima user id and the path of the temporary directory.
The mother process is in a loop which is called at server startup:
`(forking-loop)` is called, which was loaded through `/assets/maxima-fork.lisp` which uses sbcl's FFI to call the forking loop `fork_new_process()` in `/src/maxima_fork.c`.
When it receives a new id and path, it forks a new maxima process from the mother process and does some setup.
Forking a process from an existing one is vastly more efficient (faster with a factor of around 10x) than execve'ing a new maxima process, and the memory consumption is reduced, too.
It does have the caveat that, at run time, there will be more page faults because of the copy-on-write mechanics, which slows the process down.
After the child process responds that it is ready, the server process puts its description into a queue, where it is taken later for evaluating the request.
package main
import (
"archive/zip"
"bytes"
"bufio"
"fmt"
"path/filepath"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"os/exec"
"os/user"
"strconv"
"time"
"strings"
"syscall"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
var tmp_prefix string
var debug uint
type User struct {
Id uint
Name string
Uid int
Gid int
}
type Metrics struct {
ResponseTime prometheus.Histogram
SpawnTime prometheus.Histogram
NumSuccess prometheus.Counter
NumIntError prometheus.Counter
NumTimeout prometheus.Counter
QueueLen prometheus.Gauge
}
type MotherProcess struct {
Cmd *exec.Cmd
Input *io.PipeWriter
Output *io.PipeReader
}
type ChildProcess struct {
User *User
Input *os.File
Output *bufio.Reader
Outfile *os.File
TempDir string
}
func debugf(format string, a ...interface{}) {
if debug > 0 {
log.Printf(format, a...)
}
}
func new_mother_proc(binpath string, lib string) (*MotherProcess, error) {
cmd := exec.Command(binpath)
cmd.Stderr = os.Stderr
in_pipe_w, in_pipe_r := io.Pipe()
cmd.Stdin = in_pipe_w
out_pipe_w, out_pipe_r := io.Pipe()
cmd.Stdout = out_pipe_r
err := cmd.Start()
if err != nil {
return nil, fmt.Errorf("cannot start process: %s", err)
}
if lib != "" {
// this should be fine since this string is given at startup
lib = strings.ReplaceAll(lib, "\\", "\\\\")
lib = strings.ReplaceAll(lib, "\"", "\\\"")
debugf("Debug: load(\"%s\")$\n", lib)
_, err = fmt.Fprintf(in_pipe_r, "load(\"%s\")$\n", lib)
if err != nil {
return nil, fmt.Errorf("cannot send command to process: %s", err)
}
debugf("Debug: Loaded file")
}
_, err = fmt.Fprint(in_pipe_r, ":lisp (maxima-fork:forking-loop)\n")
if err != nil {
return nil, fmt.Errorf("cannot send command to process: %s", err)
}
debugf("Debug: Attempting to find out readiness of mother process")
i := 0
for {
shortbuf := make([]byte,1)
n, err := out_pipe_w.Read(shortbuf)
if err != nil {
return nil, fmt.Errorf("cannot read readiness text from process: %s", err)
}
i += 1
if debug >= 2 {
fmt.Fprintf(os.Stderr, "%c", shortbuf[0])
}
if n != 1 {
return nil, fmt.Errorf("unexpected short read")
}
if shortbuf[0] == '\x02' {
break;
}
}
return &MotherProcess{
Cmd: cmd,
Input: in_pipe_r,
Output: out_pipe_w,
}, nil
}
func (p *MotherProcess) spawn_new(user *User) (*ChildProcess, float64, error) {
start := time.Now()
tmp_dir, err := ioutil.TempDir(tmp_prefix, "maxima-plot-")
if err != nil {
return nil, 0.0, fmt.Errorf("unable to create temp dir: %s", err)
}
// right permission for folders
err = os.Chown(tmp_dir, user.Uid, user.Gid)
if err != nil {
return nil, 0.0, fmt.Errorf("not able to change tempdir permission: %s", err)
}
// create named pipe for process stdout
pipe_name_out := filepath.Clean(filepath.Join(tmp_dir, "outpipe"))
err = syscall.Mkfifo(pipe_name_out, 0600)
if err != nil {
return nil, 0.0, fmt.Errorf("could not create named pipe in temp folder: %s", err)
}
// create named pipe for process stdin
pipe_name_in := filepath.Clean(filepath.Join(tmp_dir, "inpipe"))
err = syscall.Mkfifo(pipe_name_in, 0600)
if err != nil {
return nil, 0.0, fmt.Errorf("could not create named pipe in temp folder: %s", err)
}
_, err = fmt.Fprintf(p.Input, "%d%s\n", user.Id, tmp_dir)
if err != nil {
return nil, 0.0, fmt.Errorf("unable to communicate with process: %s", err)
}
debugf("Debug: Opening pipes to child")
// note: open outpipe before inpipe to avoid deadlock
out, err := os.OpenFile(pipe_name_out, os.O_RDONLY, os.ModeNamedPipe)
if err != nil {
return nil, 0.0, fmt.Errorf("could not open temp dir outpipe: %s", err)
}
in, err := os.OpenFile(pipe_name_in, os.O_WRONLY, os.ModeNamedPipe)
if err != nil {
return nil, 0.0, fmt.Errorf("could not open temp dir inpipe: %s", err)
}
out.SetReadDeadline(time.Now().Add(10*time.Second))
bufout := bufio.NewReader(out)
_, err = fmt.Fscanf(bufout, "\nT")
if err != nil {
return nil, 0.0, fmt.Errorf("not able to find end marker: %s", err)
}
total := time.Since(start)
debugf("Debug: child took %s for startup", total)
return &ChildProcess {
User: user,
Input: in,
Output: bufout,
Outfile: out,
TempDir: tmp_dir,
}, float64(total.Microseconds())/1000, nil
}
func (p *ChildProcess) eval_command(command string, timeout uint64) (*bytes.Buffer, float64, error) {
start := time.Now()
in_err := make(chan error, 1)
go func() {
p.Input.SetWriteDeadline(time.Now().Add(time.Duration(timeout)*time.Millisecond))
_, err := io.Copy(p.Input, strings.NewReader(command))
p.Input.Close()
in_err<-err
}()
var outbuf bytes.Buffer
p.Outfile.SetReadDeadline(time.Now().Add(time.Duration(timeout)*time.Millisecond))
_, err := io.Copy(&outbuf, p.Output)
p.Outfile.Close()
input_err := <-in_err
if input_err != nil {
return nil, 0.0, input_err
}
if err != nil {
return nil, 0.0, err
}
total := time.Since(start)
debugf("Debug: child took %s for evaluation", total)
return &outbuf, float64(total.Microseconds())/1000, nil
}
func write_500(w http.ResponseWriter) {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprint(w, "500 - internal server error\n")
}
func process_cleanup(user *User, user_queue chan<- *User, tmp_dir string) {
defer os.RemoveAll(tmp_dir)
defer func() {user_queue <- user}()
// TODO: replace with cgroups-v2 based freeze solution once docker support for cgroups2 lands
procs, err := ioutil.ReadDir("/proc")
if err != nil {
return
}
for _, dir := range procs {
pid, err := strconv.Atoi(dir.Name())
if err != nil {
continue
}
stat, ok := dir.Sys().(*syscall.Stat_t)
if !ok {
continue
}
if int(stat.Uid) != user.Uid {
continue
}
syscall.Kill(pid, syscall.SIGKILL)
}
debugf("Debug: Process %d cleand up", user.Id)
}
func handler(w http.ResponseWriter, r *http.Request, queue <-chan *ChildProcess, user_queue chan<- *User, metrics *Metrics) {
if r.Method == "GET" && r.FormValue("input") == "" {
hostname, _ := os.Hostname()
fmt.Fprintf(w, "Hostname: %s, version: 0.1.1\n", hostname)
return
}
// the maxima input to be evaluated
input := r.FormValue("input")
// template value for ploturl
ploturl := r.FormValue("ploturlbase")
timeout, err := strconv.ParseUint(r.FormValue("timeout"), 10, 64)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprint(w, "400 - bad request (invalid timeout)\n")
log.Printf("Warn: Invalid timeout: %s", err)
return
}
if timeout > 30000 {
log.Printf("Warn: timeout %d was out of range range, reduced to 30000", timeout)
timeout = 30000
}
// put the temporary directories into maxima variables
var real_input string
if ploturl != "!ploturl!" {
real_input = fmt.Sprintf("URL_BASE: \"%s\"$\n%s", ploturl, input)
} else {
real_input = input
}
proc := <-queue
metrics.QueueLen.Dec()
user := proc.User
debugf("Debug: input (%d): %s", user.Id, input)
defer process_cleanup(user, user_queue, proc.TempDir)
proc_out := make(chan struct {buf *bytes.Buffer; time float64; err error}, 1)
go func() {
out, tim, err := proc.eval_command(real_input, timeout)
// grrr why doesn't go support tuples first-class!?
proc_out <- struct {buf *bytes.Buffer; time float64; err error}{out, tim, err}
}()
select {
case outstr := <-proc_out:
outbuf := outstr.buf
tim := outstr.time
err := outstr.err
if err != nil {
write_500(w)
metrics.NumIntError.Inc()
log.Printf("Error: Communicating with maxima failed: %s", err)
return
}
output_dir := filepath.Clean(filepath.Join(proc.TempDir, "output")) + "/"
// if there are any files inside the output dir, we give back a zip file containing all output
// files and the command output inside a file named OUTPUT
plots_output, err := ioutil.ReadDir(output_dir)
if err != nil {
// just return text if directory could not be read and assume no plots were generated
w.Header().Set("Content-Type", "text/plain;charset=UTF-8")
outbuf.WriteTo(w)
log.Printf("Warn: could not read temp directory of maxima process: %s", err)
metrics.ResponseTime.Observe(tim)
metrics.NumSuccess.Inc()
}
// if there are no files produced, just give back the output directly
if len(plots_output) == 0 {
debugf("Debug: output (%d) is text, len %d", user.Id, outbuf.Len())
w.Header().Set("Content-Type", "text/plain;charset=UTF-8")
outbuf.WriteTo(w)
} else {
debugf("Debug: output (%d) is zip, OUTPUT len %d", user.Id, outbuf.Len())
w.Header().Set("Content-Type", "application/zip;charset=UTF-8")
zipfile := zip.NewWriter(w)
out, err := zipfile.Create("OUTPUT")
if err != nil {
write_500(w)
log.Printf("Error: Could not add OUTPUT to zip archive: %s", err)
return
}
outbuf.WriteTo(out)
// loop over all plots in the output directory and put them into the zip
// that is to be returned
for _, file := range plots_output {
ffilein, err := os.Open(filepath.Join(output_dir, file.Name()))
defer ffilein.Close()
if err != nil {
write_500(w)
log.Printf("Error: could not open plot %s: %s", file.Name(), err)
return
}
fzipput, err := zipfile.Create("/" + file.Name())
if err != nil {
write_500(w)
log.Printf("Error: could not add file %s to zip archive: %s", file.Name(), err)
return
}
io.Copy(fzipput, ffilein)
}
zipfile.Close()
}
metrics.ResponseTime.Observe(tim)
metrics.NumSuccess.Inc()
case <-time.After(time.Duration(timeout) * time.Millisecond):
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
fmt.Fprint(w, "416 - timeout\n")
log.Printf("Warn: Process %s had timeout", user.Name)
metrics.ResponseTime.Observe(float64(timeout))
metrics.NumTimeout.Inc()
}
}
func generate_maximas(binpath string, lib string, queue chan<- *ChildProcess, user_queue <-chan *User, metrics *Metrics) {
mother_proc := make(chan *MotherProcess, 0)
go func () {
mother, err := new_mother_proc(binpath, lib)
if err != nil {
log.Fatalf("Fatal: Could not start mother process: %s", err)
}
mother_proc <- mother
}()
var mother *MotherProcess
select {
case mom := <- mother_proc:
mother = mom
case <- time.After(10*time.Second):
log.Fatal("Fatal: Could not start the mother process, timed out")
}
fails := 0
for {
user := <-user_queue
new_proc, time, err := mother.spawn_new(user)
if err != nil {
fails += 1
log.Printf("Error: Could not spawn child process - fail %d, %s", fails, err)
if fails == 3 {
log.Fatal("Fatal: Failed to spawn child process 3 times in a row, giving up")
}
} else {
fails = 0
}
debugf("Debug: Spawning process with id %d", user.Id)
metrics.QueueLen.Inc()
metrics.SpawnTime.Observe(time)
queue <- new_proc
debugf("Debug: Spawned process with id %d", user.Id)
}
}
func get_env_number_positive(varname string, def uint) (uint, error) {
value, exists := os.LookupEnv(varname)
if !exists {
return def, nil
}
number, err := strconv.ParseUint(value, 32, 10)
if err != nil {
return 0, err
}
return uint(number), nil
}
func main() {
metrics := Metrics {
ResponseTime: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "maxima_response_time",
Help: "Response time of maxima processes",
Buckets:prometheus.ExponentialBuckets(1.19, 1.5, 25)}),
SpawnTime: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "maxima_spawn_time",
Help: "Spawn time of maxima child",
Buckets:prometheus.LinearBuckets(1.0, 1.0, 20)}),
NumSuccess: prometheus.NewCounter(prometheus.CounterOpts{
Name: "maxima_success_response",
Help: "Count of successful responses"}),
NumIntError: prometheus.NewCounter(prometheus.CounterOpts{
Name: "maxima_500_response",
Help: "Count of 500 responses"}),
NumTimeout: prometheus.NewCounter(prometheus.CounterOpts{
Name: "maxima_timeout_response",
Help: "Count of timeouts"}),
QueueLen: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "maxima_queue_len",
Help: "Number of maxima processes waiting in queue"})}
metrics.QueueLen.Set(0)
prometheus.MustRegister(metrics.ResponseTime)
prometheus.MustRegister(metrics.SpawnTime)
prometheus.MustRegister(metrics.NumSuccess)
prometheus.MustRegister(metrics.NumIntError)
prometheus.MustRegister(metrics.NumTimeout)
prometheus.MustRegister(metrics.QueueLen)
if len(os.Args) != 2 {
log.Fatal("Fatal: wrong cli-argument usage: web [path to maxima executable]")
}
user_number, err := get_env_number_positive("GOEMAXIMA_NUSER", 16)
if err != nil {
log.Fatal("Fatal: GOEMAXIMA_NUSER contains invalid number");
}
queue_len, err := get_env_number_positive("GOEMAXIMA_QUEUE_LEN", 3)
if err != nil {
log.Fatal("Fatal: GOEMAXIMA_QUEUE_LEN contains invalid number");
}
debug, err = get_env_number_positive("GOEMAXIMA_DEBUG", 0)
if err != nil {
log.Fatal("Fatal: GOEMAXIMA_DEBUG contains invalid number");
}
tmp_prefix = os.Getenv("GOEMAXIMA_TEMP_DIR")
if tmp_prefix == "" {
tmp_prefix = "/tmp/maxima"
} else if !strings.HasPrefix(tmp_prefix, "/") {
log.Fatal("Fatal: GOEMAXIMA_TEMP_DIR must be an absolute path");
}
err = os.MkdirAll(tmp_prefix, 0711)
if err != nil {
log.Fatalf("Fatal: Cannot create %s: %s", tmp_prefix, err)
}
queue := make(chan *ChildProcess, queue_len)
user_queue := make(chan *User, user_number)
for i := (uint)(1); i <= user_number; i++ {
user_name := fmt.Sprintf("maxima-%d", i)
user, err := user.Lookup(user_name)
if err != nil {
log.Fatalf("Fatal: Could not look up user with id %d: %s", i, err)
}
uid, err := strconv.Atoi(user.Uid)
if err != nil {
log.Fatalf("Fatal: Cannot parse uid %s as number: %s", user.Uid, err)
}
gid, err := strconv.Atoi(user.Gid)
if err != nil {
log.Fatalf("Fatal: Cannot parse uid %s as number: %s", user.Gid, err)
}
user_queue <- &User {
Id: i,
Name: user_name,
Uid: uid,
Gid: gid,
}
}
go generate_maximas(os.Args[1], os.Getenv("GOEMAXIMA_LIB_PATH"), queue, user_queue, &metrics)
http.Handle("/metrics", promhttp.Handler())
http.HandleFunc("/maxima/",
func (w http.ResponseWriter, r *http.Request) {
handler(w, r, queue, user_queue, &metrics)
})
log.Print("Info: goe handler started")
err = http.ListenAndServe(":8080", nil)
log.Printf("Fatal: http handler closed unexpectedly, %s", err)
return
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment