updates
This commit is contained in:
20
.vscode/launch.json
vendored
Normal file
20
.vscode/launch.json
vendored
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
{
|
||||||
|
// Use IntelliSense to learn about possible attributes.
|
||||||
|
// Hover to view descriptions of existing attributes.
|
||||||
|
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
|
||||||
|
"version": "0.2.0",
|
||||||
|
"configurations": [
|
||||||
|
|
||||||
|
{
|
||||||
|
"name": "Launch Package",
|
||||||
|
"type": "go",
|
||||||
|
"request": "launch",
|
||||||
|
"mode": "debug",
|
||||||
|
"program": "llmproxymetrics.go",
|
||||||
|
"env": {
|
||||||
|
"PORT": "6677",
|
||||||
|
"BASE_URL": "http://10.0.0.10:11434"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
@@ -1,7 +1,7 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bufio"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
@@ -10,7 +10,7 @@ import (
|
|||||||
"net/http/httputil"
|
"net/http/httputil"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"strings"
|
||||||
|
|
||||||
"github.com/caarlos0/env/v11"
|
"github.com/caarlos0/env/v11"
|
||||||
)
|
)
|
||||||
@@ -28,35 +28,44 @@ func createProxy(target *url.URL) func(http.ResponseWriter, *http.Request) {
|
|||||||
r.URL.Scheme = target.Scheme
|
r.URL.Scheme = target.Scheme
|
||||||
r.URL.Host = target.Host
|
r.URL.Host = target.Host
|
||||||
|
|
||||||
lrw := &LoggingResponseWriter{ResponseWriter: w, body: new(bytes.Buffer)}
|
// startTime := time.Now()
|
||||||
proxy := httputil.NewSingleHostReverseProxy(target)
|
proxy := httputil.NewSingleHostReverseProxy(target)
|
||||||
|
proxy.Director = func(req *http.Request) {
|
||||||
|
req.Header.Set("X-Forwarded-For", r.RemoteAddr)
|
||||||
|
req.Host = target.Host
|
||||||
|
}
|
||||||
|
|
||||||
startTime := time.Now()
|
|
||||||
recorder := httptest.NewRecorder()
|
recorder := httptest.NewRecorder()
|
||||||
|
|
||||||
proxy.ServeHTTP(recorder, r)
|
proxy.ServeHTTP(recorder, r)
|
||||||
|
|
||||||
responseBody := recorder.Body.Bytes()
|
responseBody := recorder.Body.Bytes()
|
||||||
|
contentType := recorder.Result().Header.Get("Content-Type")
|
||||||
|
|
||||||
|
log.Printf("Response Content-Type: %s", contentType)
|
||||||
|
log.Printf("Response Body: %s", string(responseBody))
|
||||||
|
|
||||||
|
if strings.Contains(contentType, "application/json") {
|
||||||
var jsonResponse map[string]interface{}
|
var jsonResponse map[string]interface{}
|
||||||
err := json.Unmarshal(responseBody, &jsonResponse)
|
err := json.Unmarshal(responseBody, &jsonResponse)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Error unmarshalling JSON response: %v", err)
|
log.Printf("Error unmarshalling JSON response: %v", err)
|
||||||
lrw.Write(responseBody)
|
w.WriteHeader(recorder.Code)
|
||||||
|
w.Write(responseBody)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add your metrics metadata here
|
// jsonResponse["metrics"] = map[string]interface{}{
|
||||||
jsonResponse["metrics"] = map[string]interface{}{
|
// "requestPath": r.URL.Path,
|
||||||
"requestPath": r.URL.Path,
|
// "statusCode": recorder.Code,
|
||||||
"statusCode": recorder.Code,
|
// "responseTime": time.Since(startTime).Milliseconds(),
|
||||||
"responseTime": time.Since(startTime).Milliseconds(),
|
// }
|
||||||
// Add more metrics as needed
|
|
||||||
}
|
|
||||||
|
|
||||||
modifiedResponseBody, err := json.Marshal(jsonResponse)
|
modifiedResponseBody, err := json.Marshal(jsonResponse)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Error marshalling modified JSON response: %v", err)
|
log.Printf("Error marshalling modified JSON response: %v", err)
|
||||||
lrw.Write(responseBody)
|
w.WriteHeader(recorder.Code)
|
||||||
|
w.Write(responseBody)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -67,28 +76,76 @@ func createProxy(target *url.URL) func(http.ResponseWriter, *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
w.WriteHeader(recorder.Code)
|
w.WriteHeader(recorder.Code)
|
||||||
lrw.Write(modifiedResponseBody)
|
w.Write(modifiedResponseBody)
|
||||||
|
|
||||||
log.Printf("Response with metrics: %s", lrw.Body())
|
// log.Printf("Modified Response Body: %s", string(modifiedResponseBody))
|
||||||
|
} else if strings.Contains(contentType, "application/x-ndjson") {
|
||||||
|
var modifiedResponseBody []string
|
||||||
|
scanner := bufio.NewScanner(strings.NewReader(string(responseBody)))
|
||||||
|
for scanner.Scan() {
|
||||||
|
line := scanner.Text()
|
||||||
|
if line != "" {
|
||||||
|
var jsonResponse map[string]interface{}
|
||||||
|
err := json.Unmarshal([]byte(line), &jsonResponse)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Error unmarshalling NDJSON line: %v", err)
|
||||||
|
modifiedResponseBody = append(modifiedResponseBody, line)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// jsonResponse["metrics"] = map[string]interface{}{
|
||||||
|
// "requestPath": r.URL.Path,
|
||||||
|
// "statusCode": recorder.Code,
|
||||||
|
// "responseTime": time.Since(startTime).Milliseconds(),
|
||||||
|
// }
|
||||||
|
|
||||||
|
modifiedLine, err := json.Marshal(jsonResponse)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Error marshalling modified NDJSON line: %v", err)
|
||||||
|
modifiedResponseBody = append(modifiedResponseBody, string([]byte(line)))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
modifiedResponseBody = append(modifiedResponseBody, string(modifiedLine))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := scanner.Err(); err != nil {
|
||||||
|
log.Printf("Error scanning NDJSON: %v", err)
|
||||||
|
w.WriteHeader(recorder.Code)
|
||||||
|
w.Write(responseBody)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, values := range recorder.Header() {
|
||||||
|
for _, value := range values {
|
||||||
|
w.Header().Add(name, value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
w.WriteHeader(recorder.Code)
|
||||||
|
w.Write([]byte(strings.Join(modifiedResponseBody, "\n")))
|
||||||
|
|
||||||
|
// log.Printf("Modified Response Body: %s", strings.Join(modifiedResponseBody, "\n"))
|
||||||
|
} else {
|
||||||
|
for name, values := range recorder.Header() {
|
||||||
|
for _, value := range values {
|
||||||
|
w.Header().Add(name, value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
w.WriteHeader(recorder.Code)
|
||||||
|
w.Write(responseBody)
|
||||||
|
|
||||||
|
// log.Printf("Response without metrics: %s", string(responseBody))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type LoggingResponseWriter struct {
|
|
||||||
http.ResponseWriter
|
|
||||||
body *bytes.Buffer
|
|
||||||
}
|
|
||||||
|
|
||||||
func (lrw *LoggingResponseWriter) Write(b []byte) (int, error) {
|
|
||||||
lrw.body.Write(b)
|
|
||||||
return lrw.ResponseWriter.Write(b)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (lrw *LoggingResponseWriter) Body() string {
|
|
||||||
return lrw.body.String()
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
env.Parse(&cfg)
|
err := env.Parse(&cfg)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Error parsing environment variables: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
targetURL, err := url.Parse(cfg.BaseURL)
|
targetURL, err := url.Parse(cfg.BaseURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -98,7 +155,7 @@ func main() {
|
|||||||
http.HandleFunc("/", createProxy(targetURL))
|
http.HandleFunc("/", createProxy(targetURL))
|
||||||
|
|
||||||
log.Printf("Starting proxy server on :%s", strconv.Itoa(cfg.Port))
|
log.Printf("Starting proxy server on :%s", strconv.Itoa(cfg.Port))
|
||||||
err = http.ListenAndServe(fmt.Sprintf(":%s", strconv.Itoa(cfg.Port)), nil)
|
err = http.ListenAndServe(fmt.Sprintf(":%d", cfg.Port), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user