Skip to content
Snippets Groups Projects
Commit 6493ef15 authored by Lennart Kramer's avatar Lennart Kramer
Browse files

refactor overly long request handler function

parent 622405ed
No related branches found
No related tags found
No related merge requests found
......@@ -4,11 +4,10 @@ go 1.15
require (
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/go-kit/kit v0.10.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e
google.golang.org/protobuf v1.27.1 // indirect
)
......@@ -435,6 +435,7 @@ golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5 h1:wjuX4b5yYQnEQHzd+CBcrcC6OVR2J1CN6mUy0oSxIPo=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
......@@ -584,6 +585,7 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7
google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0=
google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/appengine v1.6.6 h1:lMO5rYAqUxkmaj76jAkRUvt5JZgFymx/+Q5Mzfivuhc=
google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
......
......@@ -2,11 +2,10 @@ package main
import (
"archive/zip"
"bytes"
"bufio"
"bytes"
"errors"
"fmt"
"path/filepath"
"io"
"io/ioutil"
"log"
......@@ -14,10 +13,12 @@ import (
"os"
"os/exec"
"os/user"
"path/filepath"
"strconv"
"time"
"strings"
"syscall"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
......@@ -113,7 +114,7 @@ func new_mother_proc(binpath string, libs []string) (*MotherProcess, error) {
return nil, fmt.Errorf("unexpected short read")
}
if shortbuf[0] == '\x02' {
break;
break
}
}
return &MotherProcess{
......@@ -123,7 +124,6 @@ func new_mother_proc(binpath string, libs []string) (*MotherProcess, error) {
}, nil
}
func (p *MotherProcess) spawn_new(user *User) (*ChildProcess, float64, error) {
start := time.Now()
tmp_dir, err := ioutil.TempDir(tmp_prefix, "maxima-plot-")
......@@ -186,8 +186,14 @@ func (p *MotherProcess) spawn_new(user *User) (*ChildProcess, float64, error) {
}, float64(total.Microseconds()) / 1000, nil
}
type MaximaResponse struct {
Response *bytes.Buffer
Time float64
Err error
}
// takes a child process and evaluates a maxima command in it, while timing out if timeout is reached
func (p *ChildProcess) eval_command(command string, timeout uint64) (*bytes.Buffer, float64, error) {
func (p *ChildProcess) eval_command(command string, timeout uint64) MaximaResponse {
start := time.Now()
in_err := make(chan error, 1)
// write to stdin in separate goroutine to prevent deadlocks
......@@ -204,14 +210,14 @@ func (p *ChildProcess) eval_command(command string, timeout uint64) (*bytes.Buff
p.Outfile.Close()
input_err := <-in_err
if input_err != nil {
return nil, 0.0, input_err
return MaximaResponse{nil, 0.0, input_err}
}
if err != nil {
return nil, 0.0, err
return MaximaResponse{nil, 0.0, err}
}
total := time.Since(start)
debugf("Debug: child took %s for evaluation", total)
return &outbuf, float64(total.Microseconds())/1000, nil
return MaximaResponse{&outbuf, float64(total.Microseconds()) / 1000, nil}
}
func write_500(w http.ResponseWriter) {
......@@ -246,6 +252,140 @@ func process_cleanup(user *User, user_queue chan<- *User, tmp_dir string) {
debugf("Debug: Process %d cleand up", user.Id)
}
type MaximaRequest struct {
Health bool
Timeout uint64
Input string
Ploturl string
Proc *ChildProcess
Metrics *Metrics
User *User
UserQueue chan<- *User
W http.ResponseWriter
}
func (req *MaximaRequest) log_with_input(format string, a ...interface{}) {
msg := fmt.Sprintf(format, a...)
log.Printf("%s - input: `%s`, timeout: %d", msg, req.Input, req.Timeout)
}
func (req *MaximaRequest) write_timeout_err() {
req.W.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
fmt.Fprint(req.W, "416 - timeout\n")
req.log_with_input("Warn: Process %s had timeout", req.User.Name)
req.Metrics.ResponseTime.Observe(float64(req.Timeout))
req.Metrics.NumTimeout.Inc()
}
func (req *MaximaRequest) respond_with_error(format string, a ...interface{}) {
write_500(req.W)
req.Metrics.NumIntError.Inc()
req.log_with_input(format, a...)
}
func (req *MaximaRequest) WriteResponseWithoutPlots(response MaximaResponse) {
req.W.Header().Set("Content-Type", "text/plain;charset=UTF-8")
response.Response.WriteTo(req.W)
req.Metrics.ResponseTime.Observe(response.Time)
req.Metrics.NumSuccess.Inc()
}
func (req *MaximaRequest) WriteResponseWithPlots(response MaximaResponse, output_dir string, plots_output []os.FileInfo) {
debugf("Debug: output (%d) is zip, OUTPUT len %d", req.User.Id, response.Response.Len())
req.W.Header().Set("Content-Type", "application/zip;charset=UTF-8")
zipfile := zip.NewWriter(req.W)
out, err := zipfile.Create("OUTPUT")
if err != nil {
req.respond_with_error("Error: Could not add OUTPUT to zip archive: %s", err)
return
}
response.Response.WriteTo(out)
// loop over all plots in the output directory and put them into the zip
// that is to be returned
for _, file := range plots_output {
ffilein, err := os.Open(filepath.Join(output_dir, file.Name()))
if err != nil {
req.respond_with_error("Error: could not open plot %s: %s", file.Name(), err)
return
}
defer ffilein.Close()
fzipput, err := zipfile.Create("/" + file.Name())
if err != nil {
req.respond_with_error("Error: could not add file %s to zip archive: %s", file.Name(), err)
return
}
io.Copy(fzipput, ffilein)
}
zipfile.Close()
req.Metrics.ResponseTime.Observe(response.Time)
req.Metrics.NumSuccess.Inc()
}
func (req *MaximaRequest) WriteResponse(response MaximaResponse) {
err := response.Err
if errors.Is(err, os.ErrDeadlineExceeded) {
debugf("Timeout with I/O pipe timeout")
req.write_timeout_err()
return
}
if err != nil {
write_500(req.W)
req.Metrics.NumIntError.Inc()
req.log_with_input("Error: Communicating with maxima failed: %s", err)
return
}
if req.Health {
if bytes.Contains(response.Response.Bytes(), []byte("healthcheck successful")) {
req.W.Header().Set("Content-Type", "text/plain;charset=UTF-8")
response.Response.WriteTo(req.W)
debugf("Healthcheck passed")
// note: we don't update metrics here since they would get
// too polluted by the healthchecks
return
} else {
req.respond_with_error("Error: Healthcheck did not pass, output: %s", response.Response)
return
}
}
output_dir := filepath.Clean(filepath.Join(req.Proc.TempDir, "output")) + "/"
// if there are any files inside the output dir, we give back a zip file containing all output
// files and the command output inside a file named OUTPUT
plots_output, err := ioutil.ReadDir(output_dir)
if err != nil {
// just return text if directory could not be read and assume no plots were generated
req.log_with_input("Warn: could not read temp directory of maxima process: %s", err)
req.WriteResponseWithoutPlots(response)
return
}
// if there are no files produced, just give back the output directly
if len(plots_output) == 0 {
req.WriteResponseWithoutPlots(response)
} else {
req.WriteResponseWithPlots(response, output_dir, plots_output)
}
}
func (req *MaximaRequest) Respond() {
debugf("Debug: input (%d): %s", req.User.Id, req.Input)
defer process_cleanup(req.User, req.UserQueue, req.Proc.TempDir)
proc_out := make(chan MaximaResponse, 1)
go func() {
proc_out <- req.Proc.eval_command(req.Input, req.Timeout)
}()
select {
case outstr := <-proc_out:
req.WriteResponse(outstr)
return
case <-time.After(time.Duration(req.Timeout) * time.Millisecond):
debugf("Timeout with internal timer")
req.write_timeout_err()
return
}
}
func handler(w http.ResponseWriter, r *http.Request, queue <-chan *ChildProcess, user_queue chan<- *User, metrics *Metrics) {
health := r.FormValue("health") == "1"
if r.Method == "GET" && r.FormValue("input") == "" && !health {
......@@ -287,120 +427,24 @@ func handler(w http.ResponseWriter, r *http.Request, queue <-chan *ChildProcess,
proc := <-queue
metrics.QueueLen.Dec()
user := proc.User
log_with_input := func(format string, a ...interface{}) {
msg := fmt.Sprintf(format, a...)
log.Printf("%s - input: `%s`, timeout: %d", msg, input, timeout);
}
debugf("Debug: input (%d): %s", user.Id, input)
write_timeout_err := func() {
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
fmt.Fprint(w, "416 - timeout\n")
log_with_input("Warn: Process %s had timeout", user.Name)
metrics.ResponseTime.Observe(float64(timeout))
metrics.NumTimeout.Inc()
request := MaximaRequest{
Health: health,
Timeout: timeout,
Input: real_input,
Ploturl: ploturl,
Proc: proc,
Metrics: metrics,
User: user,
UserQueue: user_queue,
W: w,
}
defer process_cleanup(user, user_queue, proc.TempDir)
proc_out := make(chan struct {buf *bytes.Buffer; time float64; err error}, 1)
go func() {
out, tim, err := proc.eval_command(real_input, timeout)
// grrr why doesn't go support tuples first-class!?
proc_out <- struct {buf *bytes.Buffer; time float64; err error}{out, tim, err}
}()
select {
case outstr := <-proc_out:
outbuf := outstr.buf
tim := outstr.time
err := outstr.err
if errors.Is(err, os.ErrDeadlineExceeded) {
debugf("Timeout with I/O pipe timeout")
write_timeout_err()
return
}
if err != nil {
write_500(w)
metrics.NumIntError.Inc()
log_with_input("Error: Communicating with maxima failed: %s", err)
return
}
if health {
if bytes.Contains(outbuf.Bytes(), []byte("healthcheck successful")) {
w.Header().Set("Content-Type", "text/plain;charset=UTF-8")
outbuf.WriteTo(w)
debugf("Healthcheck passed")
// note: we don't update metrics here since they would get
// too polluted by the healthchecks
return
} else {
write_500(w)
metrics.NumIntError.Inc()
log.Printf("Error: Healthcheck did not pass, output: %s", outbuf)
return
}
}
output_dir := filepath.Clean(filepath.Join(proc.TempDir, "output")) + "/"
// if there are any files inside the output dir, we give back a zip file containing all output
// files and the command output inside a file named OUTPUT
plots_output, err := ioutil.ReadDir(output_dir)
if err != nil {
// just return text if directory could not be read and assume no plots were generated
w.Header().Set("Content-Type", "text/plain;charset=UTF-8")
outbuf.WriteTo(w)
log_with_input("Warn: could not read temp directory of maxima process: %s", err)
metrics.ResponseTime.Observe(tim)
metrics.NumSuccess.Inc()
}
// if there are no files produced, just give back the output directly
if len(plots_output) == 0 {
debugf("Debug: output (%d) is text, len %d", user.Id, outbuf.Len())
w.Header().Set("Content-Type", "text/plain;charset=UTF-8")
outbuf.WriteTo(w)
} else {
debugf("Debug: output (%d) is zip, OUTPUT len %d", user.Id, outbuf.Len())
w.Header().Set("Content-Type", "application/zip;charset=UTF-8")
zipfile := zip.NewWriter(w)
out, err := zipfile.Create("OUTPUT")
if err != nil {
write_500(w)
metrics.NumIntError.Inc()
log_with_input("Error: Could not add OUTPUT to zip archive: %s", err)
return
}
outbuf.WriteTo(out)
// loop over all plots in the output directory and put them into the zip
// that is to be returned
for _, file := range plots_output {
ffilein, err := os.Open(filepath.Join(output_dir, file.Name()))
defer ffilein.Close()
if err != nil {
write_500(w)
metrics.NumIntError.Inc()
log_with_input("Error: could not open plot %s: %s", file.Name(), err)
return
}
fzipput, err := zipfile.Create("/" + file.Name())
if err != nil {
write_500(w)
metrics.NumIntError.Inc()
log_with_input("Error: could not add file %s to zip archive: %s", file.Name(), err)
return
}
io.Copy(fzipput, ffilein)
}
zipfile.Close()
}
metrics.ResponseTime.Observe(tim)
metrics.NumSuccess.Inc()
case <-time.After(time.Duration(timeout) * time.Millisecond):
debugf("Timeout with internal timer")
write_timeout_err()
return
}
request.Respond()
}
func generate_maximas(binpath string, libs []string, queue chan<- *ChildProcess, user_queue <-chan *User, metrics *Metrics) {
mother_proc := make(chan *MotherProcess, 0)
mother_proc := make(chan *MotherProcess)
go func() {
mother, err := new_mother_proc(binpath, libs)
if err != nil {
......@@ -486,18 +530,18 @@ func main() {
user_number, err := get_env_number_positive("GOEMAXIMA_NUSER", 32)
log.Printf("Info: Maximum number of processes is %d", user_number)
if err != nil {
log.Fatal("Fatal: GOEMAXIMA_NUSER contains invalid number");
log.Fatal("Fatal: GOEMAXIMA_NUSER contains invalid number")
}
// length of queue of ready maxima processes (-1)
queue_len, err := get_env_number_positive("GOEMAXIMA_QUEUE_LEN", 3)
if err != nil {
log.Fatal("Fatal: GOEMAXIMA_QUEUE_LEN contains invalid number");
log.Fatal("Fatal: GOEMAXIMA_QUEUE_LEN contains invalid number")
}
log.Printf("Info: Ready process queue length is %d", queue_len)
// enable debug messages
debug, err = get_env_number_positive("GOEMAXIMA_DEBUG", 0)
if err != nil {
log.Fatal("Fatal: GOEMAXIMA_DEBUG contains invalid number");
log.Fatal("Fatal: GOEMAXIMA_DEBUG contains invalid number")
}
// where to store temp files (plots, named pipes)
// should preferrably be tmpfs since it needs to be fast
......@@ -505,7 +549,7 @@ func main() {
if tmp_prefix == "" {
tmp_prefix = "/tmp/maxima"
} else if !strings.HasPrefix(tmp_prefix, "/") {
log.Fatal("Fatal: GOEMAXIMA_TEMP_DIR must be an absolute path");
log.Fatal("Fatal: GOEMAXIMA_TEMP_DIR must be an absolute path")
}
err = os.MkdirAll(tmp_prefix, 0711)
if err != nil {
......@@ -555,5 +599,4 @@ func main() {
log.Print("Info: goe handler started")
err = http.ListenAndServe(":8080", nil)
log.Printf("Fatal: http handler closed unexpectedly, %s", err)
return
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment