2025-04-20 13:47:17 +03:00
|
|
|
package main
|
|
|
|
|
|
|
|
|
|
import (
|
2025-04-22 19:31:34 +03:00
|
|
|
"bufio"
|
2025-04-25 09:06:47 +03:00
|
|
|
"bytes"
|
2025-04-20 13:47:17 +03:00
|
|
|
"encoding/json"
|
|
|
|
|
"fmt"
|
2025-04-23 14:46:49 +03:00
|
|
|
"io"
|
2025-04-20 13:47:17 +03:00
|
|
|
"log"
|
|
|
|
|
"net/http"
|
|
|
|
|
"net/http/httputil"
|
|
|
|
|
"net/url"
|
|
|
|
|
"strconv"
|
2025-04-23 15:15:34 +03:00
|
|
|
"strings"
|
2025-04-20 13:47:17 +03:00
|
|
|
|
|
|
|
|
"github.com/caarlos0/env/v11"
|
2025-04-23 15:15:34 +03:00
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
|
|
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
2025-04-20 13:47:17 +03:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
var cfg config
|
|
|
|
|
|
|
|
|
|
type config struct {
|
|
|
|
|
BaseURL string `env:"BASE_URL"`
|
|
|
|
|
Port int `env:"PORT"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func createProxy(target *url.URL) func(http.ResponseWriter, *http.Request) {
|
|
|
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
2025-04-23 15:15:34 +03:00
|
|
|
r.URL.Path = strings.TrimPrefix(r.URL.Path, "/proxy")
|
2025-04-20 13:47:17 +03:00
|
|
|
r.Host = target.Host
|
|
|
|
|
r.URL.Scheme = target.Scheme
|
|
|
|
|
r.URL.Host = target.Host
|
|
|
|
|
|
2025-04-23 14:46:49 +03:00
|
|
|
director := func(req *http.Request) {
|
2025-04-22 19:31:34 +03:00
|
|
|
req.Header.Set("X-Forwarded-For", r.RemoteAddr)
|
2025-04-23 15:15:34 +03:00
|
|
|
req.URL = r.URL
|
2025-04-22 19:31:34 +03:00
|
|
|
req.Host = target.Host
|
2025-04-23 14:46:49 +03:00
|
|
|
req.URL.Scheme = target.Scheme
|
|
|
|
|
req.URL.Host = target.Host
|
2025-04-22 19:31:34 +03:00
|
|
|
}
|
2025-04-25 09:30:34 +03:00
|
|
|
wasstreamed := -1
|
2025-04-25 09:06:47 +03:00
|
|
|
data, err := io.ReadAll(r.Body)
|
|
|
|
|
if err == nil {
|
|
|
|
|
r.Body = io.NopCloser(bytes.NewReader(data))
|
|
|
|
|
var jsonData map[string]interface{}
|
|
|
|
|
json.NewDecoder(r.Body).Decode(&jsonData)
|
|
|
|
|
|
|
|
|
|
if jsonData["stream"] != nil {
|
|
|
|
|
if !jsonData["stream"].(bool) {
|
2025-04-25 09:30:34 +03:00
|
|
|
wasstreamed = 0
|
2025-04-25 09:06:47 +03:00
|
|
|
} else {
|
2025-04-25 09:30:34 +03:00
|
|
|
wasstreamed = 1
|
2025-04-25 09:06:47 +03:00
|
|
|
}
|
2025-04-25 08:43:03 +03:00
|
|
|
}
|
2025-04-25 09:06:47 +03:00
|
|
|
} else {
|
|
|
|
|
fmt.Println("Error reading body:", err)
|
2025-04-25 08:31:06 +03:00
|
|
|
}
|
2025-04-25 09:06:47 +03:00
|
|
|
r.Body = io.NopCloser(bytes.NewReader(data))
|
2025-04-20 13:47:17 +03:00
|
|
|
|
2025-04-23 14:46:49 +03:00
|
|
|
modifyResponse := func(response *http.Response) error {
|
|
|
|
|
pr, pw := io.Pipe()
|
|
|
|
|
body := response.Body
|
|
|
|
|
response.Body = pr
|
2025-04-22 19:31:34 +03:00
|
|
|
|
2025-04-23 14:46:49 +03:00
|
|
|
go func() {
|
|
|
|
|
defer pw.Close()
|
2025-04-22 19:31:34 +03:00
|
|
|
|
2025-04-23 14:46:49 +03:00
|
|
|
reader := bufio.NewReader(body)
|
|
|
|
|
for {
|
|
|
|
|
line, err := reader.ReadBytes('\n')
|
2025-04-22 19:31:34 +03:00
|
|
|
if err != nil {
|
2025-04-23 14:46:49 +03:00
|
|
|
if err == io.EOF {
|
2025-04-25 09:30:34 +03:00
|
|
|
handleJsonLine([]byte(string(line)), wasstreamed)
|
2025-04-23 14:46:49 +03:00
|
|
|
pw.Write(line)
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
return
|
2025-04-22 19:31:34 +03:00
|
|
|
}
|
2025-04-25 09:30:34 +03:00
|
|
|
handleJsonLine(line, wasstreamed)
|
2025-04-23 14:46:49 +03:00
|
|
|
pw.Write(line)
|
2025-04-22 19:31:34 +03:00
|
|
|
}
|
2025-04-20 13:47:17 +03:00
|
|
|
|
2025-04-23 14:46:49 +03:00
|
|
|
}()
|
2025-04-20 13:47:17 +03:00
|
|
|
|
2025-04-23 14:46:49 +03:00
|
|
|
return nil
|
|
|
|
|
}
|
2025-04-20 13:47:17 +03:00
|
|
|
|
2025-04-23 14:46:49 +03:00
|
|
|
proxy := httputil.NewSingleHostReverseProxy(target)
|
|
|
|
|
proxy.Director = director
|
2025-04-23 15:32:32 +03:00
|
|
|
if r.URL.Path == "/api/generate" || r.URL.Path == "/api/chat" {
|
2025-04-23 15:23:13 +03:00
|
|
|
proxy.ModifyResponse = modifyResponse
|
|
|
|
|
}
|
2025-04-20 13:47:17 +03:00
|
|
|
|
2025-04-23 14:46:49 +03:00
|
|
|
proxy.ServeHTTP(w, r)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-04-25 09:30:34 +03:00
|
|
|
func handleJsonLine(line []byte, wasstreamed int) {
|
2025-04-25 09:06:47 +03:00
|
|
|
if len(line) == 0 {
|
|
|
|
|
return
|
|
|
|
|
}
|
2025-04-23 14:46:49 +03:00
|
|
|
var jsonData map[string]interface{}
|
|
|
|
|
err := json.Unmarshal([]byte(line), &jsonData)
|
|
|
|
|
if err != nil {
|
|
|
|
|
fmt.Println("Error parsing JSON:", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if jsonData["done"].(bool) {
|
2025-04-25 08:31:06 +03:00
|
|
|
duration := jsonData["eval_duration"].(float64) / 1000000000.0
|
|
|
|
|
fmt.Printf("Duration: %.2f seconds\n", duration)
|
2025-04-23 15:15:34 +03:00
|
|
|
opsProcessed.Inc()
|
2025-04-25 09:06:47 +03:00
|
|
|
tokens_out.Add(jsonData["eval_count"].(float64))
|
|
|
|
|
tokens_in.Add(jsonData["prompt_eval_count"].(float64))
|
2025-04-25 08:31:06 +03:00
|
|
|
eval_time.Observe(duration)
|
2025-04-25 09:40:10 +03:00
|
|
|
if wasstreamed == 1 {
|
|
|
|
|
streamed.Inc()
|
|
|
|
|
} else if wasstreamed == 0 {
|
|
|
|
|
notStreamed.Inc()
|
|
|
|
|
}
|
2025-04-25 09:30:34 +03:00
|
|
|
}
|
2025-04-20 13:47:17 +03:00
|
|
|
}
|
|
|
|
|
|
2025-04-23 15:15:34 +03:00
|
|
|
var opsProcessed = promauto.NewCounter(prometheus.CounterOpts{
|
2025-04-24 15:10:57 +03:00
|
|
|
Name: "llmproxymetrics_total_requests",
|
2025-04-23 15:15:34 +03:00
|
|
|
Help: "The total number of processed events",
|
|
|
|
|
})
|
|
|
|
|
|
2025-04-25 08:31:06 +03:00
|
|
|
var notStreamed = promauto.NewCounter(prometheus.CounterOpts{
|
|
|
|
|
Name: "llmproxymetrics_non_streamed_requests",
|
|
|
|
|
Help: "The total number of processed non-streamed events",
|
|
|
|
|
})
|
|
|
|
|
var streamed = promauto.NewCounter(prometheus.CounterOpts{
|
|
|
|
|
Name: "llmproxymetrics_streamed_requests",
|
|
|
|
|
Help: "The total number of processed streamed events",
|
|
|
|
|
})
|
|
|
|
|
var tokens_out = promauto.NewCounter(prometheus.CounterOpts{
|
|
|
|
|
Name: "llmproxymetrics_tokens_out",
|
|
|
|
|
Help: "Tokens generated.",
|
|
|
|
|
})
|
2025-04-25 09:06:47 +03:00
|
|
|
var tokens_in = promauto.NewCounter(prometheus.CounterOpts{
|
|
|
|
|
Name: "llmproxymetrics_tokens_in",
|
|
|
|
|
Help: "Tokens input.",
|
|
|
|
|
})
|
2025-04-25 08:31:06 +03:00
|
|
|
var eval_time = promauto.NewHistogram(prometheus.HistogramOpts{
|
|
|
|
|
Name: "llmproxymetrics_eval_time",
|
2025-04-24 15:10:57 +03:00
|
|
|
Help: "Tokens generated.",
|
2025-04-25 08:31:06 +03:00
|
|
|
Buckets: prometheus.LinearBuckets(0, 2.5, 20),
|
2025-04-24 15:10:57 +03:00
|
|
|
})
|
|
|
|
|
|
2025-04-20 13:47:17 +03:00
|
|
|
func main() {
|
2025-04-22 19:31:34 +03:00
|
|
|
err := env.Parse(&cfg)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Fatalf("Error parsing environment variables: %v", err)
|
|
|
|
|
}
|
2025-04-20 13:47:17 +03:00
|
|
|
|
|
|
|
|
targetURL, err := url.Parse(cfg.BaseURL)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Fatal(err)
|
|
|
|
|
}
|
|
|
|
|
|
2025-04-23 15:15:34 +03:00
|
|
|
http.HandleFunc("/proxy/", createProxy(targetURL))
|
|
|
|
|
http.Handle("/metrics", promhttp.Handler())
|
2025-04-20 13:47:17 +03:00
|
|
|
|
|
|
|
|
log.Printf("Starting proxy server on :%s", strconv.Itoa(cfg.Port))
|
2025-04-22 19:31:34 +03:00
|
|
|
err = http.ListenAndServe(fmt.Sprintf(":%d", cfg.Port), nil)
|
2025-04-20 13:47:17 +03:00
|
|
|
if err != nil {
|
|
|
|
|
log.Fatal(err)
|
|
|
|
|
}
|
|
|
|
|
}
|