Skip to content
This repository has been archived by the owner on Jan 2, 2025. It is now read-only.

Commit

Permalink
feat(backend): implement ListDocumentDrafts API
Browse files Browse the repository at this point in the history
  • Loading branch information
burdiyan committed Apr 11, 2024
1 parent 525e055 commit 1535e5a
Show file tree
Hide file tree
Showing 9 changed files with 839 additions and 444 deletions.
321 changes: 184 additions & 137 deletions backend/daemon/api/documents/v1alpha/documents.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"mintter/backend/core"
"mintter/backend/daemon/api/documents/v1alpha/docmodel"
groups "mintter/backend/daemon/api/groups/v1alpha"
"mintter/backend/daemon/apiutil"
documents "mintter/backend/genproto/documents/v1alpha"
groups_proto "mintter/backend/genproto/groups/v1alpha"
"mintter/backend/hlc"
"mintter/backend/mttnet"
"strconv"
"strings"
Expand Down Expand Up @@ -259,164 +261,209 @@ func (api *Server) GetDraft(ctx context.Context, in *documents.GetDraftRequest)
return mut.Hydrate(ctx, api.blobs)
}

var qListAllDrafts = dqb.Str(`
WITH RECURSIVE resource_authors AS (
SELECT
r.iri,
r.create_time,
r.owner,
mv.meta,
pk.principal AS author_raw,
sb.ts,
sb.id AS blob_id
FROM
resources r
JOIN structural_blobs sb ON r.id = sb.resource
JOIN public_keys pk ON sb.author = pk.id
JOIN meta_view mv ON r.iri = mv.iri
WHERE
sb.author IS NOT NULL
AND r.iri GLOB :pattern
AND sb.id in (SELECT distinct blob from drafts)
UNION ALL
SELECT
ra.iri,
ra.create_time,
ra.owner,
sb.meta,
pk.principal,
sb.ts,
sb.id
FROM
resource_authors ra
JOIN structural_blobs sb ON ra.iri = sb.resource
JOIN public_keys pk ON sb.author = pk.id
WHERE
sb.author IS NOT NULL
AND ra.iri GLOB :pattern
),
owners_raw AS (
SELECT
id,
principal AS owner_raw
FROM
public_keys
),
latest_blobs AS (
SELECT
ra.iri,
MAX(ra.ts) AS latest_ts,
b.multihash,
b.codec
FROM
resource_authors ra
JOIN blobs b ON ra.blob_id = b.id
GROUP BY ra.iri
)
SELECT
ra.iri,
ra.create_time,
GROUP_CONCAT(DISTINCT HEX(ra.author_raw)) AS authors_hex,
ra.meta,
MAX(ra.ts) AS latest_ts,
HEX(oraw.owner_raw),
ra.blob_id
FROM
resource_authors ra
LEFT JOIN owners_raw oraw ON ra.owner = oraw.id
LEFT JOIN latest_blobs lb ON ra.iri = lb.iri
WHERE ra.blob_id <= :idx
GROUP BY
ra.iri, ra.create_time, ra.meta
ORDER BY ra.blob_id asc LIMIT :page_size;
`)

// ListDrafts implements the corresponding gRPC method.
func (api *Server) ListDrafts(ctx context.Context, req *documents.ListDraftsRequest) (*documents.ListDraftsResponse, error) {
var (
entities []hyper.EntityID
err error
)
me, ok := api.me.Get()
if !ok {
return nil, fmt.Errorf("account is not initialized yet")
}
conn, cancel, err := api.db.Conn(ctx)
me, err := api.getMe()
if err != nil {
return nil, fmt.Errorf("Can't get a connection from the db: %w", err)
}
defer cancel()
resp := &documents.ListDraftsResponse{
Documents: make([]*documents.Document, 0, len(entities)),
return nil, err
}
var cursorBlobID int64 = math.MaxInt32

resp := &documents.ListDraftsResponse{}

if req.PageSize == 0 {
req.PageSize = 30
}
if req.PageToken != "" {
pageTokenBytes, _ := base64.StdEncoding.DecodeString(req.PageToken)
if err != nil {
return nil, fmt.Errorf("Token encoding not valid: %w", err)
}
clearPageToken, err := me.DeviceKey().Decrypt(pageTokenBytes)
if err != nil {
return nil, fmt.Errorf("Token not valid: %w", err)
}
pageToken, err := strconv.ParseUint(string(clearPageToken), 10, 32)
if err != nil {
return nil, fmt.Errorf("Token not valid: %w", err)

type Cursor struct {
Ts int64 `json:"t"`
Resource int64 `json:"r"`
}

var cursor Cursor
if req.PageToken == "" {
cursor.Ts = math.MaxInt64
cursor.Resource = math.MaxInt64
} else {
if err := apiutil.DecodePageToken(req.PageToken, &cursor, me.DeviceKey()); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "%v", err)
}
cursorBlobID = int64(pageToken)
}
pattern := "hm://d/*"
var lastBlobID int64
err = sqlitex.Exec(conn, qListAllDrafts(), func(stmt *sqlite.Stmt) error {
var (
id = stmt.ColumnText(0)
createTime = stmt.ColumnInt64(1)
editorsStr = stmt.ColumnText(2)
title = stmt.ColumnText(3)
updatedTime = stmt.ColumnInt64(4)
ownerHex = stmt.ColumnText(5)
)
lastBlobID = stmt.ColumnInt64(6)
editors := []string{}
for _, editorHex := range strings.Split(editorsStr, ",") {
editorBin, err := hex.DecodeString(editorHex)
if err != nil {

var lastCursor Cursor

if err := api.db.WithSave(ctx, func(conn *sqlite.Conn) error {
var count int32
return sqlitex.Exec(conn, qListAllDrafts(), func(stmt *sqlite.Stmt) error {
// This is necessary to always return empty page token when we reach the last result.
if count == req.PageSize {
var err error
resp.NextPageToken, err = apiutil.EncodePageToken(lastCursor, me.DeviceKey())
return err
}
editors = append(editors, core.Principal(editorBin).String())
}
ownerBin, err := hex.DecodeString(ownerHex)
count++

var (
iri = stmt.ColumnText(0)
createTime = stmt.ColumnInt64(1)
updateTime = stmt.ColumnInt64(2)
author = stmt.ColumnBytesUnsafe(3)
editors = strings.Split(stmt.ColumnText(4), ",")
meta = stmt.ColumnText(5)
cursorResource = stmt.ColumnInt64(6)
)

for i, x := range editors {
data, err := hex.DecodeString(x)
if err != nil {
return fmt.Errorf("failed to decode editor: %w", err)
}

editors[i] = core.Principal(data).String()
}

lastCursor.Resource = cursorResource
lastCursor.Ts = updateTime

doc := &documents.Document{
Id: iri,
Title: meta,
Author: core.Principal(author).String(),
Editors: editors,
CreateTime: timestamppb.New(time.Unix(createTime, 0)),
UpdateTime: timestamppb.New(hlc.Timestamp(updateTime).Time()),
}

resp.Documents = append(resp.Documents, doc)

return nil
}, cursor.Ts, cursor.Resource, req.PageSize)
}); err != nil {
return nil, err
}

return resp, nil
}

var qListAllDrafts = dqb.Str(`
WITH RECURSIVE
-- Finding the drafts we want, sorted as desired in the output,
-- and filtered out to find the requested page.
subset AS (
SELECT structural_blobs.*
FROM drafts
JOIN structural_blobs ON structural_blobs.id = drafts.blob
WHERE structural_blobs.ts < :cursor_ts AND drafts.resource < :cursor_resource
ORDER BY structural_blobs.ts DESC, structural_blobs.resource DESC
LIMIT :page_size + 1
),
-- Resolving the DAG of changes for each document
-- starting from the draft changes and following the dependency links.
cset AS (
SELECT * FROM subset
UNION
SELECT structural_blobs.*
FROM structural_blobs
JOIN blob_links ON blob_links.target = structural_blobs.id
JOIN cset ON cset.id = blob_links.source
WHERE blob_links.type = 'change/dep'
)
-- Reducing the DAG of changes for each document to get the current state.
SELECT
resources.iri AS iri,
resources.create_time AS create_time,
MAX(cset.ts) AS update_time,
authors.principal AS author,
GROUP_CONCAT(DISTINCT HEX(public_keys.principal)) AS editors,
(JSONB_GROUP_ARRAY(cset.meta ORDER BY ts DESC, public_keys.principal DESC) FILTER (WHERE cset.meta IS NOT NULL))->>'0' AS meta,
MAX(resources.id) AS cursor_resource
FROM cset
JOIN public_keys ON public_keys.id = cset.author
JOIN resources ON resources.id = cset.resource
JOIN public_keys authors ON authors.id = resources.owner
GROUP BY resource
ORDER BY update_time DESC, resources.id DESC;
`)

func (api *Server) ListDocumentDrafts(ctx context.Context, in *documents.ListDocumentDraftsRequest) (*documents.ListDocumentDraftsResponse, error) {
if in.DocumentId == "" {
return nil, status.Errorf(codes.InvalidArgument, "must specify document ID to get the draft")
}

resp := &documents.ListDocumentDraftsResponse{}

if err := api.db.WithSave(ctx, func(conn *sqlite.Conn) error {
edb, err := hypersql.EntitiesLookupID(conn, in.DocumentId)
if err != nil {
return err
}
pub := &documents.Document{
Id: id,
Title: title,
Author: core.Principal(ownerBin).String(),
Editors: editors,
CreateTime: timestamppb.New(time.Unix(int64(createTime), 0)),
UpdateTime: timestamppb.New(time.Unix(int64(updatedTime/1000000), (updatedTime%1000000)*1000)),
if edb.ResourcesID == 0 {
return status.Errorf(codes.NotFound, "document %s not found", in.DocumentId)
}
resp.Documents = append(resp.Documents, pub)
return nil
}, pattern, cursorBlobID, req.PageSize)
if err != nil {
return nil, err
}

pageToken, err := me.DeviceKey().Encrypt([]byte(strconv.Itoa(int(lastBlobID - 1))))
if err != nil {
return sqlitex.Exec(conn, qListDocumentDrafts(), func(stmt *sqlite.Stmt) error {
var (
createTime = stmt.ColumnInt64(0)
updateTime = stmt.ColumnInt64(1)
author = stmt.ColumnBytesUnsafe(2)
editors = strings.Split(stmt.ColumnText(3), ",")
meta = stmt.ColumnText(4)
)
for i, x := range editors {
data, err := hex.DecodeString(x)
if err != nil {
return fmt.Errorf("failed to decode editor: %w", err)
}

editors[i] = core.Principal(data).String()
}

doc := &documents.Document{
Id: in.DocumentId,
Title: meta,
Author: core.Principal(author).String(),
Editors: editors,
CreateTime: timestamppb.New(time.Unix(createTime, 0)),
UpdateTime: timestamppb.New(hlc.Timestamp(updateTime).Time()),
}

resp.Drafts = append(resp.Drafts, doc)

return err
}, edb.ResourcesID)
}); err != nil {
return nil, err
}
if lastBlobID != 0 && req.PageSize == int32(len(resp.Documents)) {
resp.NextPageToken = base64.StdEncoding.EncodeToString(pageToken)
}

return resp, nil
}

var qListDocumentDrafts = dqb.Str(`
WITH RECURSIVE
-- Resolve the change DAG starting from the draft change for a given resource.
cset (id) AS (
SELECT blob FROM drafts
WHERE resource = :resource
UNION
SELECT blob_links.target
FROM blob_links
JOIN cset ON cset.id = blob_links.source AND blob_links.type = 'change/dep'
)
-- Process the resolved change DAG to get the current state of the resource.
SELECT
resources.create_time AS create_time,
MAX(sb.ts) AS update_time,
authors.principal AS author,
GROUP_CONCAT(DISTINCT HEX(editors.principal)) AS editors,
(JSONB_GROUP_ARRAY(sb.meta ORDER BY ts DESC, editors.principal DESC) FILTER (WHERE sb.meta IS NOT NULL))->>'0' AS meta
FROM cset
JOIN structural_blobs sb ON sb.id = cset.id
JOIN resources ON resources.id = sb.resource
JOIN public_keys editors ON editors.id = sb.author
JOIN public_keys authors ON authors.id = resources.owner
GROUP BY sb.resource
ORDER BY update_time DESC, resources.id DESC;
`)

// PublishDraft implements the corresponding gRPC method.
func (api *Server) PublishDraft(ctx context.Context, in *documents.PublishDraftRequest) (*documents.Publication, error) {
if in.DocumentId == "" {
Expand Down
Loading

0 comments on commit 1535e5a

Please sign in to comment.