Skip to content

Commit

Permalink
Remove skip storing MinSyncedTicket when the ticket is initial (#655)
Browse files Browse the repository at this point in the history
Previously, in order to enhance GC efficiency, when a client initially
attaches a document that has any changes, the process of storing
MinSyncedTicket was skipped. However, this approach is flawed because
the server stores the ticket based on the checkpoint of requests for
handling scenarios where the client's response may be lost.

So this commit removes the optimization for now.

This commit also revises the logs of the PushPull process to be more
simple.
  • Loading branch information
hackerwins authored Oct 27, 2023
1 parent a000a29 commit 107498e
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 24 deletions.
173 changes: 173 additions & 0 deletions pkg/units/size.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright 2023 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* copy from docker/go-units:
* https://github.com/docker/go-units/blob/master/size.go
*/

package units

import (
"fmt"
"strconv"
"strings"
)

// See: http://en.wikipedia.org/wiki/Binary_prefix
const (
// Decimal

KB = 1000
MB = 1000 * KB
GB = 1000 * MB
TB = 1000 * GB
PB = 1000 * TB

// Binary

KiB = 1024
MiB = 1024 * KiB
GiB = 1024 * MiB
TiB = 1024 * GiB
PiB = 1024 * TiB
)

type unitMap map[byte]int64

var (
decimalMap = unitMap{'k': KB, 'm': MB, 'g': GB, 't': TB, 'p': PB}
binaryMap = unitMap{'k': KiB, 'm': MiB, 'g': GiB, 't': TiB, 'p': PiB}
)

var (
decimapAbbrs = []string{"B", "kB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"}
binaryAbbrs = []string{"B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"}
)

func getSizeAndUnit(size float64, base float64, _map []string) (float64, string) {
i := 0
unitsLimit := len(_map) - 1
for size >= base && i < unitsLimit {
size = size / base
i++
}
return size, _map[i]
}

// CustomSize returns a human-readable approximation of a size
// using custom format.
func CustomSize(format string, size float64, base float64, _map []string) string {
size, unit := getSizeAndUnit(size, base, _map)
return fmt.Sprintf(format, size, unit)
}

// HumanSizeWithPrecision allows the size to be in any precision,
// instead of 4 digit precision used in units.HumanSize.
func HumanSizeWithPrecision(size float64, precision int) string {
size, unit := getSizeAndUnit(size, 1000.0, decimapAbbrs)
return fmt.Sprintf("%.*g%s", precision, size, unit)
}

// HumanSize returns a human-readable approximation of a size
// capped at 4 valid numbers (eg. "2.746 MB", "796 KB").
func HumanSize(size float64) string {
return HumanSizeWithPrecision(size, 4)
}

// BytesSize returns a human-readable size in bytes, kibibytes,
// mebibytes, gibibytes, or tebibytes (eg. "44kiB", "17MiB").
func BytesSize(size float64) string {
return CustomSize("%.4g%s", size, 1024.0, binaryAbbrs)
}

// FromHumanSize returns an integer from a human-readable specification of a
// size using SI standard (eg. "44kB", "17MB").
func FromHumanSize(size string) (int64, error) {
return parseSize(size, decimalMap)
}

// RAMInBytes parses a human-readable string representing an amount of RAM
// in bytes, kibibytes, mebibytes, gibibytes, or tebibytes and
// returns the number of bytes, or -1 if the string is unparseable.
// Units are case-insensitive, and the 'b' suffix is optional.
func RAMInBytes(size string) (int64, error) {
return parseSize(size, binaryMap)
}

// Parses the human-readable size string into the amount it represents.
func parseSize(sizeStr string, uMap unitMap) (int64, error) {
// TODO: rewrite to use strings.Cut if there's a space
// once Go < 1.18 is deprecated.
sep := strings.LastIndexAny(sizeStr, "01234567890. ")
if sep == -1 {
// There should be at least a digit.
return -1, fmt.Errorf("invalid size: '%s'", sizeStr)
}
var num, sfx string
if sizeStr[sep] != ' ' {
num = sizeStr[:sep+1]
sfx = sizeStr[sep+1:]
} else {
// Omit the space separator.
num = sizeStr[:sep]
sfx = sizeStr[sep+1:]
}

size, err := strconv.ParseFloat(num, 64)
if err != nil {
return -1, fmt.Errorf("parse size: %w", err)
}
// Backward compatibility: reject negative sizes.
if size < 0 {
return -1, fmt.Errorf("invalid size: '%s'", sizeStr)
}

if len(sfx) == 0 {
return int64(size), nil
}

// Process the suffix.

if len(sfx) > 3 { // Too long.
goto badSuffix
}
sfx = strings.ToLower(sfx)
// Trivial case: b suffix.
if sfx[0] == 'b' {
if len(sfx) > 1 { // no extra characters allowed after b.
goto badSuffix
}
return int64(size), nil
}
// A suffix from the map.
if mul, ok := uMap[sfx[0]]; ok {
size *= float64(mul)
} else {
goto badSuffix
}

// The suffix may have extra "b" or "ib" (e.g. KiB or MB).
switch {
case len(sfx) == 2 && sfx[1] != 'b':
goto badSuffix
case len(sfx) == 3 && sfx[1:] != "ib":
goto badSuffix
}

return int64(size), nil

badSuffix:
return -1, fmt.Errorf("invalid suffix: '%s'", sfx)
}
10 changes: 0 additions & 10 deletions server/backend/database/memory/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1161,16 +1161,6 @@ func (d *DB) UpdateSyncedSeq(
return err
}

// NOTE: skip storing the initial ticket to prevent GC interruption.
// Documents in this state do not need to be saved because they do not
// have any tombstones to be referenced by other documents.
//
// (The initial ticket is used as the creation time of the root
// element that operations can not remove.)
if ticket.Compare(time.InitialTicket) == 0 {
return nil
}

raw, err := txn.First(
tblSyncedSeqs,
"doc_id_client_id",
Expand Down
10 changes: 0 additions & 10 deletions server/backend/database/mongo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1349,16 +1349,6 @@ func (c *Client) UpdateSyncedSeq(
return err
}

// NOTE: skip storing the initial ticket to prevent GC interruption.
// Documents in this state do not need to be saved because they do not
// have any tombstones to be referenced by other documents.
//
// (The initial ticket is used as the creation time of the root
// element that operations can not remove.)
if ticket.Compare(time.InitialTicket) == 0 {
return nil
}

if _, err = c.collection(colSyncedSeqs).UpdateOne(ctx, bson.M{
"doc_id": encodedDocID,
"client_id": encodedClientID,
Expand Down
15 changes: 15 additions & 0 deletions server/packs/packs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package packs
import (
"context"
"fmt"
"strconv"
gotime "time"

"go.uber.org/zap"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/yorkie-team/yorkie/pkg/document"
"github.com/yorkie-team/yorkie/pkg/document/change"
"github.com/yorkie-team/yorkie/pkg/document/key"
"github.com/yorkie-team/yorkie/pkg/units"
"github.com/yorkie-team/yorkie/server/backend"
"github.com/yorkie-team/yorkie/server/backend/database"
"github.com/yorkie-team/yorkie/server/backend/sync"
Expand Down Expand Up @@ -119,6 +121,19 @@ func PushPull(
respPack.MinSyncedTicket = minSyncedTicket
respPack.ApplyDocInfo(docInfo)

pullLog := strconv.Itoa(respPack.ChangesLen())
if respPack.SnapshotLen() > 0 {
pullLog = units.HumanSize(float64(respPack.SnapshotLen()))
}
logging.From(ctx).Infof(
"SYNC: '%s' is synced by '%s', push: %d, pull: %s, min: %s",
docInfo.Key,
clientInfo.Key,
len(pushedChanges),
pullLog,
minSyncedTicket.StructureAsString(),
)

// 05. publish document change event then store snapshot asynchronously.
if len(pushedChanges) > 0 || reqPack.IsRemoved {
be.Background.AttachGoroutine(func(ctx context.Context) {
Expand Down
6 changes: 3 additions & 3 deletions server/packs/pushpull.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func pushChanges(
}

if len(reqPack.Changes) > 0 {
logging.From(ctx).Infof(
logging.From(ctx).Debugf(
"PUSH: '%s' pushes %d changes into '%s', rejected %d changes, serverSeq: %d -> %d, cp: %s",
clientInfo.Key,
len(pushedChanges),
Expand Down Expand Up @@ -162,7 +162,7 @@ func pullSnapshot(
return nil, err
}

logging.From(ctx).Infof(
logging.From(ctx).Debugf(
"PULL: '%s' build snapshot with changes(%d~%d) from '%s', cp: %s",
clientInfo.Key,
reqPack.Checkpoint.ServerSeq+1,
Expand Down Expand Up @@ -211,7 +211,7 @@ func pullChangeInfos(
cpAfterPull := cpAfterPush.NextServerSeq(docInfo.ServerSeq)

if len(pulledChanges) > 0 {
logging.From(ctx).Infof(
logging.From(ctx).Debugf(
"PULL: '%s' pulls %d changes(%d~%d) from '%s', cp: %s, filtered changes: %d",
clientInfo.Key,
len(pulledChanges),
Expand Down
7 changes: 6 additions & 1 deletion server/rpc/interceptors/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ func NewDefaultInterceptor() *DefaultInterceptor {
return &DefaultInterceptor{}
}

const (
// SlowThreshold is the threshold for slow RPC.
SlowThreshold = 100 * gotime.Millisecond
)

// Unary creates a unary server interceptor for default.
func (i *DefaultInterceptor) Unary() grpc.UnaryServerInterceptor {
return func(
Expand All @@ -50,7 +55,7 @@ func (i *DefaultInterceptor) Unary() grpc.UnaryServerInterceptor {
return nil, grpchelper.ToStatusError(err)
}

if gotime.Since(start) > 100*gotime.Millisecond {
if gotime.Since(start) > SlowThreshold {
reqLogger.Infof("RPC : %q %s", info.FullMethod, gotime.Since(start))
}
return resp, nil
Expand Down

0 comments on commit 107498e

Please sign in to comment.