package main import ( "bufio" "encoding/json" "fmt" "io" "log" "net/http" "net/http/httputil" "net/url" "strconv" "strings" "github.com/caarlos0/env/v11" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" ) var cfg config type config struct { BaseURL string `env:"BASE_URL"` Port int `env:"PORT"` } func createProxy(target *url.URL) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, r *http.Request) { r.URL.Path = strings.TrimPrefix(r.URL.Path, "/proxy") r.Host = target.Host r.URL.Scheme = target.Scheme r.URL.Host = target.Host director := func(req *http.Request) { req.Header.Set("X-Forwarded-For", r.RemoteAddr) req.URL = r.URL req.Host = target.Host req.URL.Scheme = target.Scheme req.URL.Host = target.Host } var jsonData map[string]interface{} json.NewDecoder(r.Body).Decode(&jsonData) if jsonData["stream"] != nil { if !jsonData["stream"].(bool) { notStreamed.Inc() } else { streamed.Inc() } } modifyResponse := func(response *http.Response) error { pr, pw := io.Pipe() body := response.Body response.Body = pr go func() { defer pw.Close() reader := bufio.NewReader(body) for { line, err := reader.ReadBytes('\n') if err != nil { if err == io.EOF { handleJsonLine([]byte(string(line))) pw.Write(line) break } return } handleJsonLine(line) pw.Write(line) } }() return nil } proxy := httputil.NewSingleHostReverseProxy(target) proxy.Director = director if r.URL.Path == "/api/generate" || r.URL.Path == "/api/chat" { proxy.ModifyResponse = modifyResponse } proxy.ServeHTTP(w, r) } } func handleJsonLine(line []byte) { var jsonData map[string]interface{} err := json.Unmarshal([]byte(line), &jsonData) if err != nil { fmt.Println("Error parsing JSON:", err) return } if jsonData["done"].(bool) { duration := jsonData["eval_duration"].(float64) / 1000000000.0 fmt.Printf("Duration: %.2f seconds\n", duration) opsProcessed.Inc() tokens_out.Add(jsonData["prompt_eval_count"].(float64)) eval_time.Observe(duration) } } var opsProcessed = promauto.NewCounter(prometheus.CounterOpts{ Name: "llmproxymetrics_total_requests", Help: "The total number of processed events", }) var notStreamed = promauto.NewCounter(prometheus.CounterOpts{ Name: "llmproxymetrics_non_streamed_requests", Help: "The total number of processed non-streamed events", }) var streamed = promauto.NewCounter(prometheus.CounterOpts{ Name: "llmproxymetrics_streamed_requests", Help: "The total number of processed streamed events", }) var tokens_out = promauto.NewCounter(prometheus.CounterOpts{ Name: "llmproxymetrics_tokens_out", Help: "Tokens generated.", }) var eval_time = promauto.NewHistogram(prometheus.HistogramOpts{ Name: "llmproxymetrics_eval_time", Help: "Tokens generated.", Buckets: prometheus.LinearBuckets(0, 2.5, 20), }) func main() { err := env.Parse(&cfg) if err != nil { log.Fatalf("Error parsing environment variables: %v", err) } targetURL, err := url.Parse(cfg.BaseURL) if err != nil { log.Fatal(err) } http.HandleFunc("/proxy/", createProxy(targetURL)) http.Handle("/metrics", promhttp.Handler()) log.Printf("Starting proxy server on :%s", strconv.Itoa(cfg.Port)) err = http.ListenAndServe(fmt.Sprintf(":%d", cfg.Port), nil) if err != nil { log.Fatal(err) } }