Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package main
import (
"archive/zip"
"bytes"
"bufio"
"fmt"
"path/filepath"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"os/exec"
"os/user"
"strconv"
"time"
"strings"
"syscall"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
var tmp_prefix string
var debug uint
type User struct {
Id uint
Name string
Uid int
Gid int
}
type Metrics struct {
ResponseTime prometheus.Histogram
SpawnTime prometheus.Histogram
NumSuccess prometheus.Counter
NumIntError prometheus.Counter
NumTimeout prometheus.Counter
QueueLen prometheus.Gauge
}
type MotherProcess struct {
Cmd *exec.Cmd
Input *io.PipeWriter
Output *io.PipeReader
}
type ChildProcess struct {
User *User
Input *os.File
Output *bufio.Reader
Outfile *os.File
TempDir string
}
func debugf(format string, a ...interface{}) {
if debug > 0 {
log.Printf(format, a...)
}
}
func new_mother_proc(binpath string, lib string) (*MotherProcess, error) {
cmd := exec.Command(binpath)
cmd.Stderr = os.Stderr
in_pipe_w, in_pipe_r := io.Pipe()
cmd.Stdin = in_pipe_w
out_pipe_w, out_pipe_r := io.Pipe()
cmd.Stdout = out_pipe_r
err := cmd.Start()
if err != nil {
return nil, fmt.Errorf("cannot start process: %s", err)
}
if lib != "" {
// this should be fine since this string is given at startup
lib = strings.ReplaceAll(lib, "\\", "\\\\")
lib = strings.ReplaceAll(lib, "\"", "\\\"")
debugf("Debug: load(\"%s\")$\n", lib)
_, err = fmt.Fprintf(in_pipe_r, "load(\"%s\")$\n", lib)
if err != nil {
return nil, fmt.Errorf("cannot send command to process: %s", err)
}
debugf("Debug: Loaded file")
}
// start the lisp forking loop, which calls a functino from a c library
// that loops over stdin input and forks a new process for each line
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
_, err = fmt.Fprint(in_pipe_r, ":lisp (maxima-fork:forking-loop)\n")
if err != nil {
return nil, fmt.Errorf("cannot send command to process: %s", err)
}
debugf("Debug: Attempting to find out readiness of mother process")
i := 0
for {
shortbuf := make([]byte,1)
n, err := out_pipe_w.Read(shortbuf)
if err != nil {
return nil, fmt.Errorf("cannot read readiness text from process: %s", err)
}
i += 1
if debug >= 2 {
fmt.Fprintf(os.Stderr, "%c", shortbuf[0])
}
if n != 1 {
return nil, fmt.Errorf("unexpected short read")
}
if shortbuf[0] == '\x02' {
break;
}
}
return &MotherProcess{
Cmd: cmd,
Input: in_pipe_r,
Output: out_pipe_w,
}, nil
}
func (p *MotherProcess) spawn_new(user *User) (*ChildProcess, float64, error) {
start := time.Now()
tmp_dir, err := ioutil.TempDir(tmp_prefix, "maxima-plot-")
if err != nil {
return nil, 0.0, fmt.Errorf("unable to create temp dir: %s", err)
}
// right permission for folders
err = os.Chown(tmp_dir, user.Uid, user.Gid)
if err != nil {
return nil, 0.0, fmt.Errorf("not able to change tempdir permission: %s", err)
}
// create named pipe for process stdout
pipe_name_out := filepath.Clean(filepath.Join(tmp_dir, "outpipe"))
err = syscall.Mkfifo(pipe_name_out, 0600)
if err != nil {
return nil, 0.0, fmt.Errorf("could not create named pipe in temp folder: %s", err)
}
// create named pipe for process stdin
pipe_name_in := filepath.Clean(filepath.Join(tmp_dir, "inpipe"))
err = syscall.Mkfifo(pipe_name_in, 0600)
if err != nil {
return nil, 0.0, fmt.Errorf("could not create named pipe in temp folder: %s", err)
}
_, err = fmt.Fprintf(p.Input, "%d%s\n", user.Id, tmp_dir)
if err != nil {
return nil, 0.0, fmt.Errorf("unable to communicate with process: %s", err)
}
debugf("Debug: Opening pipes to child")
// note: open outpipe before inpipe to avoid deadlock
out, err := os.OpenFile(pipe_name_out, os.O_RDONLY, os.ModeNamedPipe)
if err != nil {
return nil, 0.0, fmt.Errorf("could not open temp dir outpipe: %s", err)
}
in, err := os.OpenFile(pipe_name_in, os.O_WRONLY, os.ModeNamedPipe)
if err != nil {
return nil, 0.0, fmt.Errorf("could not open temp dir inpipe: %s", err)
}
out.SetReadDeadline(time.Now().Add(10*time.Second))
bufout := bufio.NewReader(out)
_, err = fmt.Fscanf(bufout, "\nT")
if err != nil {
return nil, 0.0, fmt.Errorf("not able to find end marker: %s", err)
}
total := time.Since(start)
debugf("Debug: child took %s for startup", total)
return &ChildProcess {
User: user,
Input: in,
Output: bufout,
Outfile: out,
TempDir: tmp_dir,
}, float64(total.Microseconds())/1000, nil
}
// takes a child process and evaluates a maxima command in it, while timing out if timeout is reached
func (p *ChildProcess) eval_command(command string, timeout uint64) (*bytes.Buffer, float64, error) {
start := time.Now()
in_err := make(chan error, 1)
// write to stdin in separate goroutine to prevent deadlocks
go func() {
p.Input.SetWriteDeadline(time.Now().Add(time.Duration(timeout)*time.Millisecond))
_, err := io.Copy(p.Input, strings.NewReader(command))
p.Input.Close()
in_err<-err
}()
var outbuf bytes.Buffer
p.Outfile.SetReadDeadline(time.Now().Add(time.Duration(timeout)*time.Millisecond))
_, err := io.Copy(&outbuf, p.Output)
p.Outfile.Close()
input_err := <-in_err
if input_err != nil {
return nil, 0.0, input_err
}
if err != nil {
return nil, 0.0, err
}
total := time.Since(start)
debugf("Debug: child took %s for evaluation", total)
return &outbuf, float64(total.Microseconds())/1000, nil
}
func write_500(w http.ResponseWriter) {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprint(w, "500 - internal server error\n")
}
// kills all processes of user and remove temporary directories
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
func process_cleanup(user *User, user_queue chan<- *User, tmp_dir string) {
defer os.RemoveAll(tmp_dir)
defer func() {user_queue <- user}()
// TODO: replace with cgroups-v2 based freeze solution once docker support for cgroups2 lands
procs, err := ioutil.ReadDir("/proc")
if err != nil {
return
}
for _, dir := range procs {
pid, err := strconv.Atoi(dir.Name())
if err != nil {
continue
}
stat, ok := dir.Sys().(*syscall.Stat_t)
if !ok {
continue
}
if int(stat.Uid) != user.Uid {
continue
}
syscall.Kill(pid, syscall.SIGKILL)
}
debugf("Debug: Process %d cleand up", user.Id)
}
func handler(w http.ResponseWriter, r *http.Request, queue <-chan *ChildProcess, user_queue chan<- *User, metrics *Metrics) {
health := r.FormValue("health") == "1"
if r.Method == "GET" && r.FormValue("input") == "" && !health {
fmt.Fprintf(w, "Hostname: %s, version: 1.0.3\n", hostname)
return
}
// the maxima input to be evaluated
input := r.FormValue("input")
// template value for ploturl
ploturl := r.FormValue("ploturlbase")
var timeout uint64
var err error
if health {
input = "print(\"healthcheck successful\");"
timeout = 1000
debugf("Debug: doing healthcheck")
} else {
timeout, err = strconv.ParseUint(r.FormValue("timeout"), 10, 64)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprint(w, "400 - bad request (invalid timeout)\n")
log.Printf("Warn: Invalid timeout: %s", err)
return
}
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
if timeout > 30000 {
log.Printf("Warn: timeout %d was out of range range, reduced to 30000", timeout)
timeout = 30000
}
// put the temporary directories into maxima variables
var real_input string
if ploturl != "!ploturl!" {
real_input = fmt.Sprintf("URL_BASE: \"%s\"$\n%s", ploturl, input)
} else {
real_input = input
}
proc := <-queue
metrics.QueueLen.Dec()
user := proc.User
debugf("Debug: input (%d): %s", user.Id, input)
defer process_cleanup(user, user_queue, proc.TempDir)
proc_out := make(chan struct {buf *bytes.Buffer; time float64; err error}, 1)
go func() {
out, tim, err := proc.eval_command(real_input, timeout)
// grrr why doesn't go support tuples first-class!?
proc_out <- struct {buf *bytes.Buffer; time float64; err error}{out, tim, err}
}()
select {
case outstr := <-proc_out:
outbuf := outstr.buf
tim := outstr.time
err := outstr.err
if err != nil {
write_500(w)
metrics.NumIntError.Inc()
log.Printf("Error: Communicating with maxima failed: %s", err)
return
}
if bytes.Contains(outbuf.Bytes(), []byte("healthcheck successful")) {
w.Header().Set("Content-Type", "text/plain;charset=UTF-8")
outbuf.WriteTo(w)
debugf("Healthcheck passed")
// note: we don't update metrics here since they would get
// too polluted by the healthchecks
return
} else {
write_500(w)
metrics.NumIntError.Inc()
log.Printf("Error: Healthcheck did not pass")
return
}
}
output_dir := filepath.Clean(filepath.Join(proc.TempDir, "output")) + "/"
// if there are any files inside the output dir, we give back a zip file containing all output
// files and the command output inside a file named OUTPUT
plots_output, err := ioutil.ReadDir(output_dir)
if err != nil {
// just return text if directory could not be read and assume no plots were generated
w.Header().Set("Content-Type", "text/plain;charset=UTF-8")
outbuf.WriteTo(w)
log.Printf("Warn: could not read temp directory of maxima process: %s", err)
metrics.ResponseTime.Observe(tim)
metrics.NumSuccess.Inc()
}
// if there are no files produced, just give back the output directly
if len(plots_output) == 0 {
debugf("Debug: output (%d) is text, len %d", user.Id, outbuf.Len())
w.Header().Set("Content-Type", "text/plain;charset=UTF-8")
outbuf.WriteTo(w)
} else {
debugf("Debug: output (%d) is zip, OUTPUT len %d", user.Id, outbuf.Len())
w.Header().Set("Content-Type", "application/zip;charset=UTF-8")
zipfile := zip.NewWriter(w)
out, err := zipfile.Create("OUTPUT")
if err != nil {
write_500(w)
log.Printf("Error: Could not add OUTPUT to zip archive: %s", err)
return
}
outbuf.WriteTo(out)
// loop over all plots in the output directory and put them into the zip
// that is to be returned
for _, file := range plots_output {
ffilein, err := os.Open(filepath.Join(output_dir, file.Name()))
defer ffilein.Close()
if err != nil {
write_500(w)
log.Printf("Error: could not open plot %s: %s", file.Name(), err)
return
}
fzipput, err := zipfile.Create("/" + file.Name())
if err != nil {
write_500(w)
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
log.Printf("Error: could not add file %s to zip archive: %s", file.Name(), err)
return
}
io.Copy(fzipput, ffilein)
}
zipfile.Close()
}
metrics.ResponseTime.Observe(tim)
metrics.NumSuccess.Inc()
case <-time.After(time.Duration(timeout) * time.Millisecond):
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
fmt.Fprint(w, "416 - timeout\n")
log.Printf("Warn: Process %s had timeout", user.Name)
metrics.ResponseTime.Observe(float64(timeout))
metrics.NumTimeout.Inc()
}
}
func generate_maximas(binpath string, lib string, queue chan<- *ChildProcess, user_queue <-chan *User, metrics *Metrics) {
mother_proc := make(chan *MotherProcess, 0)
go func () {
mother, err := new_mother_proc(binpath, lib)
if err != nil {
log.Fatalf("Fatal: Could not start mother process: %s", err)
}
mother_proc <- mother
}()
var mother *MotherProcess
select {
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
mother = mom
case <- time.After(10*time.Second):
log.Fatal("Fatal: Could not start the mother process, timed out")
}
fails := 0
for {
user := <-user_queue
new_proc, time, err := mother.spawn_new(user)
if err != nil {
fails += 1
log.Printf("Error: Could not spawn child process - fail %d, %s", fails, err)
if fails == 3 {
log.Fatal("Fatal: Failed to spawn child process 3 times in a row, giving up")
}
} else {
fails = 0
}
debugf("Debug: Spawning process with id %d", user.Id)
metrics.QueueLen.Inc()
metrics.SpawnTime.Observe(time)
queue <- new_proc
debugf("Debug: Spawned process with id %d", user.Id)
}
}
func get_env_number_positive(varname string, def uint) (uint, error) {
value, exists := os.LookupEnv(varname)
if !exists {
return def, nil
}
number, err := strconv.ParseUint(value, 32, 10)
if err != nil {
return 0, err
}
return uint(number), nil
}
func main() {
// register/initialize various prometheus metrics
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
metrics := Metrics {
ResponseTime: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "maxima_response_time",
Help: "Response time of maxima processes",
Buckets:prometheus.ExponentialBuckets(1.19, 1.5, 25)}),
SpawnTime: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "maxima_spawn_time",
Help: "Spawn time of maxima child",
Buckets:prometheus.LinearBuckets(1.0, 1.0, 20)}),
NumSuccess: prometheus.NewCounter(prometheus.CounterOpts{
Name: "maxima_success_response",
Help: "Count of successful responses"}),
NumIntError: prometheus.NewCounter(prometheus.CounterOpts{
Name: "maxima_500_response",
Help: "Count of 500 responses"}),
NumTimeout: prometheus.NewCounter(prometheus.CounterOpts{
Name: "maxima_timeout_response",
Help: "Count of timeouts"}),
QueueLen: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "maxima_queue_len",
Help: "Number of maxima processes waiting in queue"})}
metrics.QueueLen.Set(0)
prometheus.MustRegister(metrics.ResponseTime)
prometheus.MustRegister(metrics.SpawnTime)
prometheus.MustRegister(metrics.NumSuccess)
prometheus.MustRegister(metrics.NumIntError)
prometheus.MustRegister(metrics.NumTimeout)
prometheus.MustRegister(metrics.QueueLen)
if len(os.Args) != 2 {
log.Fatal("Fatal: wrong cli-argument usage: web [path to maxima executable]")
}
user_number, err := get_env_number_positive("GOEMAXIMA_NUSER", 32)
if err != nil {
log.Fatal("Fatal: GOEMAXIMA_NUSER contains invalid number");
}
// length of queue of ready maxima processes (-1)
queue_len, err := get_env_number_positive("GOEMAXIMA_QUEUE_LEN", 3)
if err != nil {
log.Fatal("Fatal: GOEMAXIMA_QUEUE_LEN contains invalid number");
}
debug, err = get_env_number_positive("GOEMAXIMA_DEBUG", 0)
if err != nil {
log.Fatal("Fatal: GOEMAXIMA_DEBUG contains invalid number");
}
// where to store temp files (plots, named pipes)
// should preferrably be tmpfs since it needs to be fast
tmp_prefix = os.Getenv("GOEMAXIMA_TEMP_DIR")
if tmp_prefix == "" {
tmp_prefix = "/tmp/maxima"
} else if !strings.HasPrefix(tmp_prefix, "/") {
log.Fatal("Fatal: GOEMAXIMA_TEMP_DIR must be an absolute path");
}
err = os.MkdirAll(tmp_prefix, 0711)
if err != nil {
log.Fatalf("Fatal: Cannot create %s: %s", tmp_prefix, err)
}
queue := make(chan *ChildProcess, queue_len)
user_queue := make(chan *User, user_number)
for i := (uint)(1); i <= user_number; i++ {
user_name := fmt.Sprintf("maxima-%d", i)
user, err := user.Lookup(user_name)
if err != nil {
log.Fatalf("Fatal: Could not look up user with id %d: %s", i, err)
}
uid, err := strconv.Atoi(user.Uid)
if err != nil {
log.Fatalf("Fatal: Cannot parse uid %s as number: %s", user.Uid, err)
}
gid, err := strconv.Atoi(user.Gid)
if err != nil {
log.Fatalf("Fatal: Cannot parse uid %s as number: %s", user.Gid, err)
}
user_queue <- &User {
Id: i,
Name: user_name,
Uid: uid,
Gid: gid,
}
}
// spawn maxima processes in separate goroutine
go generate_maximas(os.Args[1], os.Getenv("GOEMAXIMA_LIB_PATH"), queue, user_queue, &metrics)
http.Handle("/metrics", promhttp.Handler())
handler := func (w http.ResponseWriter, r *http.Request) {
handler(w, r, queue, user_queue, &metrics)
}
http.HandleFunc("/maxima", handler)
http.HandleFunc("/maxima/", handler)
http.HandleFunc("/goemaxima", handler)
http.HandleFunc("/goemaxima/", handler)
log.Print("Info: goe handler started")
err = http.ListenAndServe(":8080", nil)
log.Printf("Fatal: http handler closed unexpectedly, %s", err)
return
}