diff --git a/pkg/pgmodel/ingestor/copier.go b/pkg/pgmodel/ingestor/copier.go index 84ff81e5a8..36a53a95fd 100644 --- a/pkg/pgmodel/ingestor/copier.go +++ b/pkg/pgmodel/ingestor/copier.go @@ -336,6 +336,59 @@ func debugInsert() { var labelsCopier = prometheus.Labels{"type": "metric", "subsystem": "copier"} +func copyWithoutConflict(ctx context.Context, tx pgx.Tx, tableName, schemaName string, isExemplar bool, req *copyRequest, rows [][]interface{}, insertedRows *[]int) (func() error, error) { + columns := schema.PromDataColumns + if isExemplar { + columns = schema.PromExemplarColumns + } + table := pgx.Identifier{schemaName, tableName} + inserted, err := tx.CopyFrom(ctx, table, columns, pgx.CopyFromRows(rows)) + if err != nil { + return nil, err + } + *insertedRows = append(*insertedRows, int(inserted)) + return nil, nil +} + +func copyWithConflict(ctx context.Context, tx pgx.Tx, tableName, schemaName string, isExemplar bool, req *copyRequest, rows [][]interface{}, insertedRows *[]int) (func() error, error) { + columns := schema.PromDataColumns + tempTablePrefix := fmt.Sprintf("s%d_", req.info.MetricID) + if isExemplar { + columns = schema.PromExemplarColumns + tempTablePrefix = fmt.Sprintf("e%d_", req.info.MetricID) + } + + // we append table prefix to make sure that temp table name is unique + table, err := createTempIngestTable(ctx, tx, tableName, schemaName, tempTablePrefix) + if err != nil { + return nil, err + } + _, err = tx.CopyFrom(ctx, table, columns, pgx.CopyFromRows(rows)) + if err != nil { + return nil, err + } + + //execute the transfer from the temp table to the hypertable at the end of the transaction. This minimizes locking. + finalizer := func() error { + columns := schema.PromDataColumns + if isExemplar { + columns = schema.PromExemplarColumns + } + + res, err := tx.Exec(ctx, + fmt.Sprintf(sqlInsertIntoFrom, schemaName, pgx.Identifier{tableName}.Sanitize(), + strings.Join(columns[:], ","), table.Sanitize())) + if err != nil { + return err + } + inserted := res.RowsAffected() + + *insertedRows = append(*insertedRows, int(inserted)) + return nil + } + return finalizer, nil +} + // insertSeries performs the insertion of time-series into the DB. func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, reqs ...copyRequest) (error, int64) { _, span := tracer.Default().Start(ctx, "insert-series") @@ -361,6 +414,11 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re } } }() + copyFromFunc := copyWithoutConflict + if onConflict { + copyFromFunc = copyWithConflict + } + finalizers := make([]func() error, 0) for r := range reqs { req := &reqs[r] @@ -371,18 +429,6 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re numSamples, numExemplars := req.data.batch.Count() metrics.IngestorRowsPerInsert.With(labelsCopier).Observe(float64(numSamples + numExemplars)) - // flatten the various series into arrays. - // there are four main bottlenecks for insertion: - // 1. The round trip time. - // 2. The number of requests sent. - // 3. The number of individual INSERT statements. - // 4. The amount of data sent. - // While the first two of these can be handled by batching, for the latter - // two we need to actually reduce the work done. It turns out that simply - // collecting all the data into a postgres array and performing a single - // INSERT using that overcomes most of the performance issues for sending - // multiple data, and brings INSERT nearly on par with CopyFrom. In the - // future we may wish to send compressed data instead. var ( hasSamples bool hasExemplars bool @@ -422,52 +468,31 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re totalSamples += numSamples totalExemplars += numExemplars - copyFromFunc := func(tableName, schemaName string, isExemplar bool) error { - columns := schema.PromDataColumns - tempTablePrefix := fmt.Sprintf("s%d_", req.info.MetricID) - rows := sampleRows - if isExemplar { - columns = schema.PromExemplarColumns - tempTablePrefix = fmt.Sprintf("e%d_", req.info.MetricID) - rows = exemplarRows - } - table := pgx.Identifier{schemaName, tableName} - if onConflict { - // we append table prefix to make sure that temp table name is unique - table, err = createTempIngestTable(ctx, tx, tableName, schemaName, tempTablePrefix) - if err != nil { - return err - } - } - inserted, err := tx.CopyFrom(ctx, table, columns, pgx.CopyFromRows(rows)) - if err != nil { - return err - } - if onConflict { - res, err := tx.Exec(ctx, - fmt.Sprintf(sqlInsertIntoFrom, schemaName, pgx.Identifier{tableName}.Sanitize(), - strings.Join(columns[:], ","), table.Sanitize())) - if err != nil { - return err - } - inserted = res.RowsAffected() - - } - insertedRows = append(insertedRows, int(inserted)) - return nil - } - if hasSamples { numRowsPerInsert = append(numRowsPerInsert, numSamples) - if err = copyFromFunc(req.info.TableName, req.info.TableSchema, false); err != nil { + finalizer, err := copyFromFunc(ctx, tx, req.info.TableName, req.info.TableSchema, false, req, sampleRows, &insertedRows) + if err != nil { return err, lowestMinTime } + if finalizer != nil { + finalizers = append(finalizers, finalizer) + } } if hasExemplars { numRowsPerInsert = append(numRowsPerInsert, numExemplars) - if err = copyFromFunc(req.info.TableName, schema.PromDataExemplar, true); err != nil { + finalizer, err := copyFromFunc(ctx, tx, req.info.TableName, req.info.TableSchema, true, req, exemplarRows, &insertedRows) + if err != nil { return err, lowestMinTime } + if finalizer != nil { + finalizers = append(finalizers, finalizer) + } + } + } + for _, f := range finalizers { + err = f() + if err != nil { + return err, lowestMinTime } }