-
-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Question]: Why i keep 1.76GB of memory when try to uploading files over http #676
Comments
Upgrade to the latest version v2.7.1 and see if this issue still exists. |
Why would you do this? The buffer returned by |
even i've commented those line, it still happening @panjf2000 |
Could you provide the full examples of the server and client so that I can reproduce this issue on my computer? |
can i dm you on discord for the example ? |
This is the server type HttpServer struct {
gnet.BuiltinEventEngine
port int
multicore bool
eng gnet.Engine
m *metrics.Metrics
mon_registry *prometheus.Registry
Router Router
repositoryPool repository.RepositoryPool
addOns any
Logger *choulog.HttpLogger
Timeout time.Duration
allowedHostHeader []string
allowedMethodHeader []string
allowedHeaders string
validator *validator.Validate
}
// NewHttpServer creates a new instance of HttpServer with the provided router and port.
// It initializes the server address, sets multicore to false, and creates a new Metrics instance.
//
// router: The Router interface that will handle incoming HTTP requests.
// port: The port number on which the server will listen for incoming connections.
//
// Returns: A pointer to a new instance of HttpServer.
func NewHttpServer(port int) *HttpServer {
reg := prometheus.NewRegistry()
m := metrics.NewMetrics(reg)
config.ReadFromVault()
repositoryPool := repository.NewRepositoryPool()
timeout := config.VaultData["server.timeout"]
corsAllowedHost := config.VaultData["server.cors.allowedHost"]
corsAllowedHttpMethod := config.VaultData["server.cors.allowedHttpMethod"]
corsAllowedHeaders := config.VaultData["server.cors.allowedHeaders"]
dur, err := time.ParseDuration(timeout)
if err != nil {
panic(err)
}
multicore := false
if runtime.NumCPU() > 1 {
multicore = true
}
return &HttpServer{
port: port,
multicore: multicore,
m: m,
mon_registry: reg,
Router: NewRouter(),
repositoryPool: repositoryPool,
Logger: choulog.NewHttpLogger(),
Timeout: dur,
allowedHostHeader: strings.Split(corsAllowedHost, ","),
allowedMethodHeader: strings.Split(corsAllowedHttpMethod, ","),
allowedHeaders: corsAllowedHeaders,
validator: validator.New(validator.WithRequiredStructEnabled()),
}
}
// AddRouter adds the routes from the provided Router to the HTTP server's router.
// It appends the routes for each HTTP method (GET, POST, DELETE, PUT, PATCH) to the corresponding map in the HTTP server's router.
//
// Parameters:
// - r Router: The Router object containing the routes to be added.
func (hs *HttpServer) AddRouter(r RouterEngine) {
initRouter := r.Init()
// append get router
// The maps.All function is used to get all key-value pairs from the r.getRoutes map.
// The maps.Insert function is used to insert these key-value pairs into the hs.Router.getRoutes map.
maps.Insert(hs.Router.getRoutes, maps.All(initRouter.getRoutes))
// append post router
// The same process as above is followed for the POST method.
maps.Insert(hs.Router.postRoutes, maps.All(initRouter.postRoutes))
// append delete router
// The same process as above is followed for the DELETE method.
maps.Insert(hs.Router.deleteRoutes, maps.All(initRouter.deleteRoutes))
// append put router
// The same process as above is followed for the PUT method.
maps.Insert(hs.Router.putRoutes, maps.All(initRouter.putRoutes))
// append patch router
// The same process as above is followed for the PATCH method.
maps.Insert(hs.Router.patchRoutes, maps.All(initRouter.patchRoutes))
}
// SetAddOns sets the additional data or state for the HTTP server.
// This function is used to store any additional data or state that needs to be accessible within the server.
//
// Parameters:
// - data: Any type that represents the additional data or state to be stored. This can be of any type,
// including primitive types, structs, slices, maps, or custom types.
//
// Return:
// - This function does not return any value. It is a setter function used to set the additional data or state.
func (hs *HttpServer) SetAddOns(data any) {
hs.addOns = data
}
// RegisterValidator registers a custom validation function with the given tag in the validator.
//
// Parameters:
// - tag: A string representing the tag for the custom validation function.
// - validateFunc: A validator.FuncCtx representing the custom validation function to be registered.
//
// The function panics if an error occurs during the registration process.
func (hs *HttpServer) RegisterValidator(tag string, validateFunc validator.FuncCtx) {
if err := hs.validator.RegisterValidationCtx(tag, validateFunc); err != nil {
panic(err)
}
}
func (hs *HttpServer) DBMigrator(dst ...interface{}) {
if config.VaultData["dbMigrate"] == "yes" {
hs.Logger.Info("Running DB Migration")
migrator := hs.repositoryPool.Migrator()
err := migrator.AutoMigrate(dst...)
if err != nil {
panic(err)
}
hs.Logger.Info("DB Migrated")
} else {
hs.Logger.Info("Skipping DB Migration")
}
}
// SpinUp starts the HTTP server and blocks until it exits.
// It initializes and runs the gnet server using the provided address and multicore settings.
// The server logs the exit message and any errors encountered during startup.
func (s *HttpServer) SpinUp() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigChan
s.Logger.Info("Received signal: " + sig.String())
ctx, cancel := stdctx.WithTimeout(stdctx.Background(), 30*time.Second)
defer cancel()
select {
case <-ctx.Done():
s.Logger.Warn("Shutdown timeout")
default:
err := s.eng.Stop(ctx)
if err != nil {
panic(err)
}
}
}()
err := gnet.Run(s, fmt.Sprintf("tcp://0.0.0.0:%d", s.port), gnet.WithMulticore(s.multicore), gnet.WithEdgeTriggeredIO(true), gnet.WithReadBufferCap(1*1024*1024))
if err != nil {
s.Logger.Error(err)
}
}
func (hs *HttpServer) OnBoot(eng gnet.Engine) gnet.Action {
hs.eng = eng
green := color.New(color.FgGreen).SprintFunc()
yellow := color.New(color.FgYellow).SprintFunc()
blue := color.New(color.FgBlue).SprintFunc()
cyan := color.New(color.FgCyan).SprintFunc()
cores := strconv.Itoa(runtime.NumCPU())
fmt.Println(green("╔════════════════════════════════════════════════════╗"))
fmt.Println(green("║ Welcome to Data Infra HTTP Server ║"))
fmt.Println(blue("║ 🚀 Choukit Server 🚀 ║"))
fmt.Println(cyan("║ ======================================= ║"))
fmt.Printf("║%s%-22s%-5s║\n", yellow(" Version: "), choukit.Version, "")
fmt.Printf("║%s%-20s%-5s║\n", yellow(" CPU Cores: "), cores, "")
fmt.Printf("║%s%-20s%-5s║\n", yellow(" Multicore: "), strconv.FormatBool(hs.multicore), "")
fmt.Printf("║%s%-16s%-5s║\n", yellow(" Total Handler: "), strconv.Itoa(hs.Router.TotalRoutes()), "")
fmt.Printf("║%s%-25d%-5s║\n", yellow(" Port: "), hs.port, "")
fmt.Println(cyan("║ ======================================= ║"))
fmt.Println(green("╚════════════════════════════════════════════════════╝"))
hs.Logger.Info("choukit HTTP server has started")
return gnet.None
}
func (hs *HttpServer) OnShutdown(eng gnet.Engine) {
hs.Logger.Info("choukit HTTP server is shutting down")
db, _ := hs.repositoryPool.DB()
err := db.Close()
if err != nil {
hs.Logger.Error(err)
}
hs.Logger.Info("choukit Repository connection is shutdown gracefully")
workerPool.Release()
hs.Logger.Info("choukit worker pool is shutdown gracefully")
close(*hs.Logger.LoggingChannel)
}
func (hs *HttpServer) OnOpen(c gnet.Conn) ([]byte, gnet.Action) {
httpCodec := &http.HttpCodec{Parser: wildcat.NewHTTPParser(), ContentLength: -1}
context := &context.Context{
RepositoryConnection: hs.repositoryPool,
State: hs.addOns,
HttpCodec: httpCodec,
Logger: hs.Logger,
Timeout: hs.Timeout,
Validator: hs.validator,
}
c.SetContext(context)
return nil, gnet.None
}
// OnTraffic handles incoming traffic on the server. It reads data from the connection,
// parses HTTP requests, and dispatches them to the appropriate handler function.
//
// c: The gnet.Conn object representing the connection from which the traffic is received.
//
// Returns: A gnet.Action indicating the next action to be taken by the server.
func (hs *HttpServer) OnTraffic(c gnet.Conn) gnet.Action {
reqCtx := c.Context().(*context.Context) // HTTP codec for parsing and encoding HTTP requests and responses.
buf, _ := c.Peek(-1) // Read data from the connection without removing it from the buffer.
//n := len(buf) // Length of the data read from the connection.
pipeline:
nextOffset, _, err := reqCtx.HttpCodec.Parse(buf) // Parse the HTTP request from the buffer.
reqCtx.HttpCodec.ResetParser()
if err != nil {
goto response
}
if len(buf) < nextOffset { // If the parsed request is incomplete, continue reading more data.
goto response
}
// reqCtx.RequestBody = body
hs.writeResponse(c) // Dispatch the parsed request to the appropriate handler function.
buf = buf[nextOffset:] // Remove the parsed request from the buffer.
if len(buf) > 0 {
goto pipeline // Continue processing the remaining data in the buffer.
}
//body = nil
response:
if reqCtx.HttpCodec.Resp.Len() > 0 { // If there is a response to be sent, write it to the connection.
_, err := c.Write(reqCtx.HttpCodec.Resp.Bytes())
if err != nil {
reqCtx.Logger.Error(err)
}
}
reqCtx.HttpCodec.Reset() // Reset the HTTP codec to its initial state.
if len(buf) == 0 { // If the parsed request is incomplete, continue reading more data.
_, err = c.Discard(nextOffset)
if err != nil {
reqCtx.Logger.Error(err)
}
}
return gnet.None // Indicate that no further action is required by the server.
}
var (
workerPool = goroutine.Default()
)
func (hs *HttpServer) writeResponse(c gnet.Conn) {
ctx := c.Context().(*context.Context)
hs.m.Clients.Inc()
path := ctx.Path()
method := ctx.Method()
handler := hs.findRoutes(method, path)
additional_headers := make(map[string]string)
additional_headers["x-choukit-version"] = choukit.Version
additional_headers["x-choukit-author"] = choukit.Author
ctx.AdditionalResponseHeaders = additional_headers
if handler != nil {
err := workerPool.Submit(func() {
buf := new(bytes.Buffer)
duration := runHandler(handler, ctx)
resp_code := int(ctx.Response.StatusCode)
hs.corsMiddleware(ctx, additional_headers)
switch ctx.GetResponseType() {
case context.JSONResponse:
res, _ := ctx.Response.JSON()
ctx.AdditionalResponseHeaders["Content-Type"] = "application/json"
appendResponseWithBody(buf, "HTTP/1.1 "+ctx.Response.StatusCode.String(), res, additional_headers)
hs.asyncFlushResponseWithPrometheusTracking(c, buf, path, resp_code, duration, method)
return
case context.StreamResponse:
writeHttpStarterResponse(buf, "HTTP/1.1 200 OK", additional_headers)
asyncStream(buf, ctx, c)
return
}
})
// This is to make sure that when submitting task to go routine it can be run, if fails then give user error server is overload
if err != nil {
errorHandlerLayer(ctx, err)
res, _ := ctx.Response.JSON()
appendResponseWithBody(&ctx.HttpCodec.Resp, "HTTP/1.1 "+ctx.Response.StatusCode.String(), res, additional_headers)
}
} else if ctx.Path() == "/metrics" && ctx.Method() == "GET" {
additional_headers["Content-Type"] = "text/plain; version=0.0.4; charset=utf-8"
hs.prometheusResponse(&ctx.HttpCodec.Resp, additional_headers)
} else if ctx.Method() == "OPTIONS" {
hs.corsMiddleware(ctx, additional_headers)
appendResponseWithBody(&ctx.HttpCodec.Resp, "HTTP/1.1 204 No Content", "", additional_headers)
} else {
errorHandlerLayer(ctx, &errors.ErrPathNotFound)
res, _ := ctx.Response.JSON()
appendResponseWithBody(&ctx.HttpCodec.Resp, "HTTP/1.1 "+ctx.Response.StatusCode.String(), res, additional_headers)
*hs.Logger.LoggingChannel <- choulog.AccessLogField{Path: path, StatusCode: 404, Duration: 0}
}
}
// findRoutes searches for a matching handler function based on the HTTP method and path.
// It returns the handler function if a match is found, or nil if no matching handler is found.
//
// Parameters:
// - method: A string representing the HTTP method (e.g., "GET", "POST").
// - path: A string representing the requested path.
//
// Return:
// - HandlerFunc: A function that handles the HTTP request. If no matching handler is found, it returns nil.
func (hs *HttpServer) findRoutes(method string, path string) HandlerFunc {
switch method {
case "GET":
if handler, exists := hs.Router.getRoutes[path]; exists {
return handler
}
case "POST":
if handler, exists := hs.Router.postRoutes[path]; exists {
return handler
}
case "PUT":
if handler, exists := hs.Router.putRoutes[path]; exists {
return handler
}
case "DELETE":
if handler, exists := hs.Router.deleteRoutes[path]; exists {
return handler
}
case "PATCH":
if handler, exists := hs.Router.patchRoutes[path]; exists {
return handler
}
}
return nil
} this is the response util, package server import (
"bytes"
stdctx "context"
"fmt"
"io"
"strconv"
"time"
"github.com/panjf2000/gnet/v2"
"choukit/pkg/context"
"choukit/pkg/errors"
)
// asyncFlushResponseWithPrometheusTracking asynchronously writes the response to the connection,
// tracks the request duration and metrics using Prometheus, and logs the access details.
//
// Parameters:
// - c: The gnet.Conn object representing the connection to which the response will be written.
// - buf: A bytes.Buffer containing the response data to be written.
// - path: A string representing the requested path.
// - resp_code: An integer representing the HTTP response status code.
// - duration: A time.Duration representing the duration of the request.
// - method: A string representing the HTTP method (e.g., "GET", "POST").
func (hs *HttpServer) asyncFlushResponseWithPrometheusTracking(c gnet.Conn, buf *bytes.Buffer, path string, resp_code int, duration time.Duration, method string) {
err := c.AsyncWrite(buf.Bytes(), func(c gnet.Conn, err error) error {
buf.Reset()
buf = nil
return nil
})
if err != nil {
hs.Logger.Error(err)
}
go hs.m.Duration.WithLabelValues(method, strconv.Itoa(resp_code), path).Observe(float64(duration))
go hs.m.RequestTotal.WithLabelValues(method, strconv.Itoa(resp_code), path).Inc()
}
// runHandler executes the provided HandlerFunc with the given context and handles any errors or timeouts.
// It measures the duration of the execution and returns it.
//
// Parameters:
// - h: A HandlerFunc representing the function to be executed.
// - ctx: A *context.Context representing the context for the execution.
//
// Return:
// - time.Duration: The duration of the execution.
func runHandler(h HandlerFunc, ctx *context.Context) time.Duration {
startTime := time.Now()
var duration time.Duration
timectx, cancel := stdctx.WithTimeout(stdctx.Background(), ctx.Timeout)
defer cancel()
errChan := make(chan error, 1)
go func() {
ctx.Context = timectx
select {
case <-timectx.Done(): // If the context is already canceled, do not run
return
case errChan <- h(ctx):
}
}()
select {
case <-timectx.Done():
errorHandlerLayer(ctx, &errors.ErrRequestTimeout)
case err := <-errChan:
if err != nil {
errorHandlerLayer(ctx, err)
}
}
// if ctx.RequestBody !=nil {
// ctx.RequestBody.Close()
// }
duration = time.Since(startTime)
return duration
}
// asyncStream asynchronously streams data from a file reader to a gnet.Conn connection.
// It reads data in chunks of 1024 bytes and writes it to the connection.
// If an error occurs during the reading or writing process, it logs the error and returns it.
// After streaming the data, it discards any remaining data in the connection.
//
// Parameters:
// - buf: A bytes.Buffer containing initial data to be written to the connection.
// - ctx: A *context.Context providing contextual information for the streaming process.
// - c: A gnet.Conn representing the connection to which the data will be streamed.
func asyncStream(buf *bytes.Buffer, ctx *context.Context, c gnet.Conn) {
c.AsyncWrite(buf.Bytes(), func(c gnet.Conn, err error) error {
buf.Reset()
if err != nil {
return err
} else {
var buffer bytes.Buffer
defer ctx.FileReader.Close()
for {
n, err := io.CopyN(&buffer, ctx.FileReader, 1024)
if n > 0 {
_, writeErr := c.Write(buffer.Bytes())
if writeErr != nil {
ctx.Logger.Error(writeErr)
return writeErr
}
buffer.Reset()
}
if err != nil {
if err == io.EOF {
c.Write([]byte("\r\n\r\n"))
break
} else {
ctx.Logger.Error(err)
return err
}
}
}
}
buf = nil
_, err = c.Discard(-1)
if err != nil {
ctx.Logger.Error(err)
}
return nil
})
}
func appendResponseWithBody[T []byte | string](buf *bytes.Buffer, startLine string, msg T, additionalHeaders map[string]string) {
writeHttpStarterResponse(buf, startLine, additionalHeaders)
appendBody(buf, msg)
}
func writeHttpStarterResponse(buf *bytes.Buffer, startLine string, additionalHeaders map[string]string) {
buf.WriteString(startLine)
buf.WriteString("\r\nServer: choukit\r\nDate: ")
buf.WriteString(time.Now().Format("Mon, 02 Jan 2006 15:04:05 GMT"))
for k, v := range additionalHeaders {
buf.WriteString("\r\n")
buf.WriteString(fmt.Sprintf("%s: %s", k, v))
}
}
func appendBody[T []byte | string](buf *bytes.Buffer, msg T) {
buf.WriteString("\r\nContent-Length: ")
buf.WriteString(strconv.Itoa(len(msg)))
buf.WriteString("\r\n\r\n")
switch v := any(msg).(type) {
case []byte:
buf.Write(v)
case string:
buf.WriteString(v)
}
} This is the codec package http
import (
"bufio"
"bytes"
"errors"
"io"
"net/http"
"strconv"
"github.com/evanphx/wildcat"
)
var (
CRLF = []byte("\r\n\r\n")
lastChunk = []byte("0\r\n\r\n")
)
type HttpCodec struct {
Parser *wildcat.HTTPParser
ContentLength int
Resp bytes.Buffer
}
func (hc *HttpCodec) Parse(data []byte) (int, []byte, error) {
bodyOffset, err := hc.Parser.Parse(data)
if err != nil {
return 0, nil, err
}
contentLength := hc.GetContentLength()
if contentLength > -1 {
bodyEnd := bodyOffset + contentLength
var body []byte
if len(data) >= bodyEnd {
body = data[bodyOffset:bodyEnd]
}
return bodyEnd, body, nil
}
// Transfer-Encoding: chunked
if idx := bytes.Index(data[bodyOffset:], lastChunk); idx != -1 {
bodyEnd := idx + 5
var body []byte
if len(data) >= bodyEnd {
req, err := http.ReadRequest(bufio.NewReader(bytes.NewReader(data[:bodyEnd])))
if err != nil {
return bodyEnd, nil, err
}
body, _ = io.ReadAll(req.Body)
}
return bodyEnd, body, nil
}
// Request without a body.
if idx := bytes.Index(data, CRLF); idx != -1 {
return idx + 4, nil, nil
}
return 0, nil, errors.New("invalid http request")
}
var contentLengthKey = []byte("Content-Length")
func (hc *HttpCodec) GetContentLength() int {
if hc.ContentLength != -1 {
return hc.ContentLength
}
val := hc.Parser.FindHeader(contentLengthKey)
if val != nil {
i, err := strconv.ParseInt(string(val), 10, 0)
if err == nil {
hc.ContentLength = int(i)
}
}
return hc.ContentLength
}
func (hc *HttpCodec) ResetParser() {
hc.ContentLength = -1
}
func (hc *HttpCodec) Reset() {
hc.ResetParser()
hc.Resp.Reset()
} This is the application context package context
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"mime"
"mime/multipart"
"strconv"
"strings"
"time"
"github.com/go-playground/validator/v10"
"choukit/pkg/errors"
"choukit/pkg/http"
"choukit/pkg/log"
"choukit/pkg/repository"
)
type Context struct {
context.Context
Response http.Response
RequestBody []byte
RepositoryConnection repository.RepositoryPool
State any
HttpCodec *http.HttpCodec
Logger *log.HttpLogger
Timeout time.Duration
Validator *validator.Validate
AdditionalResponseHeaders HeaderMap
responseType ResponseType
FileReader io.ReadCloser
}
type ResponseType int
const (
JSONResponse ResponseType = iota
StreamResponse
)
type HeaderMap map[string]string
func (h *HeaderMap) Set(key string, value string) {
(*h)[key] = value
}
func (c *Context) Validate(i interface{}) error {
if err := c.Validator.StructCtx(c, i); err != nil {
return &errors.ErrValidateError
}
return nil
}
func (c *Context) DecodedBody(v any) error {
dec := json.NewDecoder(bytes.NewReader(c.RequestBody))
dec.DisallowUnknownFields() // Force errors
err := dec.Decode(v)
if err != nil {
return &errors.ErrDecodeError
}
return nil
}
func (c *Context) GetResponseType() ResponseType {
return c.responseType
}
func (c *Context) JSON(success bool, code http.StatusCode, data any, message string) {
c.responseType = JSONResponse
c.AdditionalResponseHeaders["Content-Type"] = "application/json"
c.Response = http.Response{
Success: success,
Message: message,
StatusCode: code,
Data: data,
}
}
func (c *Context) Stream(reader io.ReadCloser, fileName string, fileLength int64, contentType string) {
c.AdditionalResponseHeaders["Content-Length"] = strconv.Itoa(int(fileLength))
c.AdditionalResponseHeaders["Content-Type"] = contentType
c.AdditionalResponseHeaders["Content-Disposition"] = fmt.Sprintf("attachment; filename=\"%s\"", fileName)
c.responseType = StreamResponse
c.FileReader = reader
}
func (c *Context) GetHeader(key string) (string, error) {
res := c.HttpCodec.Parser.FindHeader([]byte(key))
if len(res) > 0 {
return string(res), nil
} else {
return "", &errors.ErrHeaderNotSet
}
}
func (c *Context) GetQueryParams(key string) (string, error) {
parts := strings.SplitN(string(c.HttpCodec.Parser.Path), "?", 2)
if len(parts) < 2 {
return "", &errors.ErrQueryParamNotSet
}
queryString := parts[1]
pairs := strings.Split(queryString, "&")
for _, pair := range pairs {
if strings.Contains(pair, key) {
kv := strings.SplitN(pair, "=", 2)
return kv[1], nil
}
}
return "", &errors.ErrQueryParamNotSet
}
func (c *Context) Path() string {
return strings.SplitN(string(c.HttpCodec.Parser.Path), "?", 2)[0]
}
func (c *Context) Method() string {
return string(c.HttpCodec.Parser.Method)
}
func (c *Context) FormFile(fileName string) (*multipart.Part, error) {
contentType, err := c.GetHeader("Content-Type")
if err != nil {
return nil, &errors.ErrHeaderNotSet
}
_, params, err := mime.ParseMediaType(contentType)
if err != nil {
return nil, &errors.ErrMediaTypeNotSet
}
boundary, ok := params["boundary"]
if !ok {
return nil, &errors.ErrBoundaryNotSet
}
reader := multipart.NewReader(bytes.NewReader(c.RequestBody), boundary)
for {
part, err := reader.NextPart()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
if part.FormName() == "file" && part.FileName() == fileName {
return part, nil
}
}
return nil, &errors.ErrFileNotFound
} |
I don't have any clue about this. I need more info, upload the pprof data here. |
Actions I've taken before I'm here
Questions with details
I create the http server ontop of gnet, as you know i use of the code from the example,
i'm trying to uploading files, file uploaded but why the memory usage is very high ?
This is the activity monitor from mac
This is the pprof
Code snippets (optional)
The text was updated successfully, but these errors were encountered: