From a7ee7577c4bddb1723d3a7312010bedf9d85b964 Mon Sep 17 00:00:00 2001 From: Ade9 Date: Tue, 22 Apr 2025 19:31:34 +0300 Subject: [PATCH] updates --- .vscode/launch.json | 20 ++++++ llmproxymetrics.go | 157 ++++++++++++++++++++++++++++++-------------- 2 files changed, 127 insertions(+), 50 deletions(-) create mode 100644 .vscode/launch.json diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..810545e --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,20 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + + { + "name": "Launch Package", + "type": "go", + "request": "launch", + "mode": "debug", + "program": "llmproxymetrics.go", + "env": { + "PORT": "6677", + "BASE_URL": "http://10.0.0.10:11434" + } + } + ] +} \ No newline at end of file diff --git a/llmproxymetrics.go b/llmproxymetrics.go index df02871..3d498b9 100644 --- a/llmproxymetrics.go +++ b/llmproxymetrics.go @@ -1,7 +1,7 @@ package main import ( - "bytes" + "bufio" "encoding/json" "fmt" "log" @@ -10,7 +10,7 @@ import ( "net/http/httputil" "net/url" "strconv" - "time" + "strings" "github.com/caarlos0/env/v11" ) @@ -28,67 +28,124 @@ func createProxy(target *url.URL) func(http.ResponseWriter, *http.Request) { r.URL.Scheme = target.Scheme r.URL.Host = target.Host - lrw := &LoggingResponseWriter{ResponseWriter: w, body: new(bytes.Buffer)} + // startTime := time.Now() proxy := httputil.NewSingleHostReverseProxy(target) + proxy.Director = func(req *http.Request) { + req.Header.Set("X-Forwarded-For", r.RemoteAddr) + req.Host = target.Host + } - startTime := time.Now() recorder := httptest.NewRecorder() + proxy.ServeHTTP(recorder, r) responseBody := recorder.Body.Bytes() + contentType := recorder.Result().Header.Get("Content-Type") - var jsonResponse map[string]interface{} - err := json.Unmarshal(responseBody, &jsonResponse) - if err != nil { - log.Printf("Error unmarshalling JSON response: %v", err) - lrw.Write(responseBody) - return - } + log.Printf("Response Content-Type: %s", contentType) + log.Printf("Response Body: %s", string(responseBody)) - // Add your metrics metadata here - jsonResponse["metrics"] = map[string]interface{}{ - "requestPath": r.URL.Path, - "statusCode": recorder.Code, - "responseTime": time.Since(startTime).Milliseconds(), - // Add more metrics as needed - } - - modifiedResponseBody, err := json.Marshal(jsonResponse) - if err != nil { - log.Printf("Error marshalling modified JSON response: %v", err) - lrw.Write(responseBody) - return - } - - for name, values := range recorder.Header() { - for _, value := range values { - w.Header().Add(name, value) + if strings.Contains(contentType, "application/json") { + var jsonResponse map[string]interface{} + err := json.Unmarshal(responseBody, &jsonResponse) + if err != nil { + log.Printf("Error unmarshalling JSON response: %v", err) + w.WriteHeader(recorder.Code) + w.Write(responseBody) + return } + + // jsonResponse["metrics"] = map[string]interface{}{ + // "requestPath": r.URL.Path, + // "statusCode": recorder.Code, + // "responseTime": time.Since(startTime).Milliseconds(), + // } + + modifiedResponseBody, err := json.Marshal(jsonResponse) + if err != nil { + log.Printf("Error marshalling modified JSON response: %v", err) + w.WriteHeader(recorder.Code) + w.Write(responseBody) + return + } + + for name, values := range recorder.Header() { + for _, value := range values { + w.Header().Add(name, value) + } + } + + w.WriteHeader(recorder.Code) + w.Write(modifiedResponseBody) + + // log.Printf("Modified Response Body: %s", string(modifiedResponseBody)) + } else if strings.Contains(contentType, "application/x-ndjson") { + var modifiedResponseBody []string + scanner := bufio.NewScanner(strings.NewReader(string(responseBody))) + for scanner.Scan() { + line := scanner.Text() + if line != "" { + var jsonResponse map[string]interface{} + err := json.Unmarshal([]byte(line), &jsonResponse) + if err != nil { + log.Printf("Error unmarshalling NDJSON line: %v", err) + modifiedResponseBody = append(modifiedResponseBody, line) + continue + } + + // jsonResponse["metrics"] = map[string]interface{}{ + // "requestPath": r.URL.Path, + // "statusCode": recorder.Code, + // "responseTime": time.Since(startTime).Milliseconds(), + // } + + modifiedLine, err := json.Marshal(jsonResponse) + if err != nil { + log.Printf("Error marshalling modified NDJSON line: %v", err) + modifiedResponseBody = append(modifiedResponseBody, string([]byte(line))) + continue + } + modifiedResponseBody = append(modifiedResponseBody, string(modifiedLine)) + } + } + + if err := scanner.Err(); err != nil { + log.Printf("Error scanning NDJSON: %v", err) + w.WriteHeader(recorder.Code) + w.Write(responseBody) + return + } + + for name, values := range recorder.Header() { + for _, value := range values { + w.Header().Add(name, value) + } + } + + w.WriteHeader(recorder.Code) + w.Write([]byte(strings.Join(modifiedResponseBody, "\n"))) + + // log.Printf("Modified Response Body: %s", strings.Join(modifiedResponseBody, "\n")) + } else { + for name, values := range recorder.Header() { + for _, value := range values { + w.Header().Add(name, value) + } + } + + w.WriteHeader(recorder.Code) + w.Write(responseBody) + + // log.Printf("Response without metrics: %s", string(responseBody)) } - - w.WriteHeader(recorder.Code) - lrw.Write(modifiedResponseBody) - - log.Printf("Response with metrics: %s", lrw.Body()) } } -type LoggingResponseWriter struct { - http.ResponseWriter - body *bytes.Buffer -} - -func (lrw *LoggingResponseWriter) Write(b []byte) (int, error) { - lrw.body.Write(b) - return lrw.ResponseWriter.Write(b) -} - -func (lrw *LoggingResponseWriter) Body() string { - return lrw.body.String() -} - func main() { - env.Parse(&cfg) + err := env.Parse(&cfg) + if err != nil { + log.Fatalf("Error parsing environment variables: %v", err) + } targetURL, err := url.Parse(cfg.BaseURL) if err != nil { @@ -98,7 +155,7 @@ func main() { http.HandleFunc("/", createProxy(targetURL)) log.Printf("Starting proxy server on :%s", strconv.Itoa(cfg.Port)) - err = http.ListenAndServe(fmt.Sprintf(":%s", strconv.Itoa(cfg.Port)), nil) + err = http.ListenAndServe(fmt.Sprintf(":%d", cfg.Port), nil) if err != nil { log.Fatal(err) }