From 98cf2c812fe4820eaaa4ef64b015c17f03948437 Mon Sep 17 00:00:00 2001 From: Boming Zhang Date: Sat, 1 Feb 2025 03:13:18 -0500 Subject: [PATCH] feat(executor/local): clock time limit --- internal/executor/local/executor.go | 175 ++++++++++++++++++---------- 1 file changed, 116 insertions(+), 59 deletions(-) diff --git a/internal/executor/local/executor.go b/internal/executor/local/executor.go index 78fe467..a9dd556 100644 --- a/internal/executor/local/executor.go +++ b/internal/executor/local/executor.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "io" + "math" "os" "os/exec" "strings" @@ -16,12 +17,81 @@ import ( type Local struct{} +func generateResult( + err error, + processState *os.ProcessState, + runTime time.Duration, + cmd stage.Cmd, + stdoutBuffer, stderrBuffer bytes.Buffer, +) stage.ExecutorResult { + result := stage.ExecutorResult{ + Status: stage.Status(envexec.StatusAccepted), + ExitStatus: processState.ExitCode(), + Error: "", + Time: func() uint64 { + nanos := processState.UserTime().Nanoseconds() + if nanos < 0 { + return 0 + } + return uint64(nanos) + }(), + Memory: func() uint64 { + usage := processState.SysUsage() + rusage, ok := usage.(*syscall.Rusage) + if !ok { + return 0 + } + maxRssKB := rusage.Maxrss + maxRssBytes := maxRssKB * 1024 + if maxRssBytes < 0 { + return 0 + } + return uint64(maxRssBytes) + }(), + RunTime: func() uint64 { + nanos := runTime.Nanoseconds() + if nanos < 0 { + return 0 + } + return uint64(nanos) + }(), + Files: map[string]string{}, + FileIDs: map[string]string{}, + } + + if err != nil { + if exitErr, ok := err.(*exec.ExitError); ok { + result.Status = stage.Status(envexec.StatusNonzeroExitStatus) + result.Error = exitErr.Error() + } else { + result.Status = stage.Status(envexec.StatusInternalError) + result.Error = err.Error() + } + } + + if cmd.Stdout != nil && cmd.Stdout.Name != nil { + result.Files[*cmd.Stdout.Name] = stdoutBuffer.String() + } + if cmd.Stderr != nil && cmd.Stderr.Name != nil { + result.Files[*cmd.Stderr.Name] = stderrBuffer.String() + } + + if err := handleCopyOut(&result, cmd); err != nil { + result.Status = stage.Status(envexec.StatusFileError) + result.Error = err.Error() + } + + return result +} + func (e *Local) Run(cmds []stage.Cmd) ([]stage.ExecutorResult, error) { var results []stage.ExecutorResult for _, cmd := range cmds { execCmd := exec.Command(cmd.Args[0], cmd.Args[1:]...) // #nosec G204 - + if cmd.CPULimit > 0 && cmd.ClockLimit <= 0 { + cmd.ClockLimit = cmd.CPULimit * 2 + } env := os.Environ() if len(cmd.Env) > 0 { env = append(env, cmd.Env...) @@ -50,68 +120,55 @@ func (e *Local) Run(cmds []stage.Cmd) ([]stage.ExecutorResult, error) { return nil, fmt.Errorf("failed to start command: %v", err) } - err = execCmd.Wait() - endTime := time.Now() - runTime := endTime.Sub(startTime) - processState := execCmd.ProcessState - result := stage.ExecutorResult{ - Status: stage.Status(envexec.StatusAccepted), - ExitStatus: processState.ExitCode(), - Error: "", - Time: func() uint64 { - nanos := processState.UserTime().Nanoseconds() - if nanos < 0 { - return 0 - } - return uint64(nanos) - }(), - Memory: func() uint64 { - usage := processState.SysUsage() - rusage, ok := usage.(*syscall.Rusage) - if !ok { - return 0 - } - maxRssKB := rusage.Maxrss - maxRssBytes := maxRssKB * 1024 - if maxRssBytes < 0 { - return 0 - } - return uint64(maxRssBytes) - }(), - RunTime: func() uint64 { - nanos := runTime.Nanoseconds() - if nanos < 0 { - return 0 - } - return uint64(nanos) - }(), - Files: map[string]string{}, - FileIDs: map[string]string{}, - } + done := make(chan error, 1) + go func() { + done <- execCmd.Wait() + }() - if err != nil { - if exitErr, ok := err.(*exec.ExitError); ok { - result.Status = stage.Status(envexec.StatusNonzeroExitStatus) - result.Error = exitErr.Error() + if cmd.ClockLimit > 0 { + var duration time.Duration + if cmd.ClockLimit > uint64(math.MaxInt64) { + duration = time.Duration(math.MaxInt64) } else { - result.Status = stage.Status(envexec.StatusInternalError) - result.Error = err.Error() + duration = time.Duration(cmd.ClockLimit) * time.Nanosecond // #nosec G115 } + select { + case err := <-done: + endTime := time.Now() + runTime := endTime.Sub(startTime) + result := generateResult( + err, + execCmd.ProcessState, + runTime, + cmd, + stdoutBuffer, + stderrBuffer, + ) + results = append(results, result) + case <-time.After(duration): + _ = execCmd.Process.Kill() + result := stage.ExecutorResult{ + Status: stage.Status(envexec.StatusTimeLimitExceeded), + Error: "", + Files: map[string]string{}, + FileIDs: map[string]string{}, + } + results = append(results, result) + } + } else { + err := <-done + endTime := time.Now() + runTime := endTime.Sub(startTime) + result := generateResult( + err, + execCmd.ProcessState, + runTime, + cmd, + stdoutBuffer, + stderrBuffer, + ) + results = append(results, result) } - - if cmd.Stdout != nil && cmd.Stdout.Name != nil { - result.Files[*cmd.Stdout.Name] = stdoutBuffer.String() - } - if cmd.Stderr != nil && cmd.Stderr.Name != nil { - result.Files[*cmd.Stderr.Name] = stderrBuffer.String() - } - - if err := handleCopyOut(&result, cmd); err != nil { - result.Status = stage.Status(envexec.StatusFileError) - result.Error = err.Error() - } - - results = append(results, result) } return results, nil