Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Optimistic writes #210

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions packages/frontpage/app/(app)/_components/post-card.tsx
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import Link from "next/link";
import { createVote, deleteVote } from "@/lib/data/atproto/vote";
import { getVoteForPost } from "@/lib/data/db/vote";
import { ensureUser, getUser } from "@/lib/data/user";
import { TimeAgo } from "@/lib/components/time-ago";
import { VoteButton } from "./vote-button";
import { PostCollection, deletePost } from "@/lib/data/atproto/post";
import { PostCollection } from "@/lib/data/atproto/post";
import { getVerifiedHandle } from "@/lib/data/atproto/identity";
import { UserHoverCard } from "@/lib/components/user-hover-card";
import type { DID } from "@/lib/data/atproto/did";
Expand All @@ -15,6 +14,8 @@ import { revalidatePath } from "next/cache";
import { ReportDialogDropdownButton } from "./report-dialog";
import { DeleteButton } from "./delete-button";
import { ShareDropdownButton } from "./share-button";
import { createVote, deleteVote } from "@/lib/api/vote";
import { deletePost } from "@/lib/api/post";

type PostProps = {
id: number;
Expand Down Expand Up @@ -56,22 +57,24 @@ export async function PostCard({
"use server";
await ensureUser();
await createVote({
subjectAuthorDid: author,
subjectCid: cid,
subjectRkey: rkey,
subjectCollection: PostCollection,
subject: {
rkey,
cid,
authorDid: author,
collection: PostCollection,
},
});
}}
unvoteAction={async () => {
"use server";
await ensureUser();
const user = await ensureUser();
const vote = await getVoteForPost(id);
if (!vote) {
// TODO: Show error notification
console.error("Vote not found for post", id);
return;
}
await deleteVote(vote.rkey);
await deleteVote({ authorDid: user.did, rkey: vote.rkey });
}}
initialState={
(await getUser())?.did === author
Expand Down Expand Up @@ -130,6 +133,7 @@ export async function PostCard({
author,
})}
/>
{/* TODO: there's a bug here where delete shows on deleted posts */}
{user?.did === author ? (
<DeleteButton
deleteAction={deletePostAction.bind(null, rkey)}
Expand All @@ -146,8 +150,8 @@ export async function PostCard({

export async function deletePostAction(rkey: string) {
"use server";
await ensureUser();
await deletePost(rkey);
const user = await ensureUser();
await deletePost({ authorDid: user.did, rkey });

revalidatePath("/");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
"use server";

import {
CommentCollection,
createComment,
deleteComment,
} from "@/lib/data/atproto/comment";
import { CommentCollection } from "@/lib/data/atproto/comment";
import { DID } from "@/lib/data/atproto/did";
import { createVote, deleteVote } from "@/lib/data/atproto/vote";
import { getComment, uncached_doesCommentExist } from "@/lib/data/db/comment";
import { getComment } from "@/lib/data/db/comment";
import { getPost } from "@/lib/data/db/post";
import { parseReportForm } from "@/lib/data/db/report-shared";
import { createReport } from "@/lib/data/db/report";
import { getVoteForComment } from "@/lib/data/db/vote";
import { ensureUser } from "@/lib/data/user";
import { revalidatePath } from "next/cache";
import { createComment, deleteComment } from "@/lib/api/comment";
import { createVote, deleteVote } from "@/lib/api/vote";

export async function createCommentAction(
input: { parentRkey?: string; postRkey: string; postAuthorDid: DID },
Expand Down Expand Up @@ -41,32 +38,19 @@ export async function createCommentAction(
throw new Error(`[naughty] Cannot comment on deleted post. ${user.did}`);
}

const { rkey } = await createComment({
content,
post,
await createComment({
parent: comment,
post,
content,
repo: user.did,
});
await waitForComment(rkey);
revalidatePath(`/post`);
}

const MAX_POLLS = 15;
async function waitForComment(rkey: string) {
let exists = false;
let polls = 0;
while (!exists && polls < MAX_POLLS) {
exists = await uncached_doesCommentExist(rkey);
await new Promise((resolve) => setTimeout(resolve, 250));
polls++;
}
if (!exists) {
throw new Error(`Comment not found after polling: ${rkey}`);
}
revalidatePath(`/post`);
}

export async function deleteCommentAction(rkey: string) {
await ensureUser();
await deleteComment(rkey);
const user = await ensureUser();
await deleteComment({ rkey, authorDid: user.did });
revalidatePath("/post");
}

Expand Down Expand Up @@ -101,20 +85,22 @@ export async function commentVoteAction(input: {
}) {
await ensureUser();
await createVote({
subjectAuthorDid: input.authorDid,
subjectCid: input.cid,
subjectRkey: input.rkey,
subjectCollection: CommentCollection,
subject: {
rkey: input.rkey,
cid: input.cid,
authorDid: input.authorDid,
collection: CommentCollection,
},
});
}

export async function commentUnvoteAction(commentId: number) {
await ensureUser();
const vote = await getVoteForComment(commentId);
const user = await ensureUser();
const vote = await getVoteForComment(commentId, user.did);
if (!vote) {
console.error("Vote not found for comment", commentId);
return;
}

await deleteVote(vote.rkey);
await deleteVote({ authorDid: user.did, rkey: vote.rkey });
}
22 changes: 3 additions & 19 deletions packages/frontpage/app/(app)/post/new/_action.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
"use server";

import { DID } from "@/lib/data/atproto/did";
import { createPost } from "@/lib/api/post";
import { getVerifiedHandle } from "@/lib/data/atproto/identity";
import { createPost } from "@/lib/data/atproto/post";
import { uncached_doesPostExist } from "@/lib/data/db/post";
import { DataLayerError } from "@/lib/data/error";
import { ensureUser } from "@/lib/data/user";
import { redirect } from "next/navigation";
Expand All @@ -27,25 +25,11 @@ export async function newPostAction(_prevState: unknown, formData: FormData) {
}

try {
const { rkey } = await createPost({ title, url });
const [handle] = await Promise.all([
getVerifiedHandle(user.did),
waitForPost(user.did, rkey),
]);
const { rkey } = await createPost({ title, url, createdAt: new Date() });
const handle = await getVerifiedHandle(user.did);
Comment on lines +28 to +29
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should in a Promise.all

redirect(`/post/${handle}/${rkey}`);
} catch (error) {
if (!(error instanceof DataLayerError)) throw error;
return { error: "Failed to create post" };
}
}

const MAX_POLLS = 10;
async function waitForPost(authorDid: DID, rkey: string) {
let exists = false;
let polls = 0;
while (!exists && polls < MAX_POLLS) {
exists = await uncached_doesPostExist(authorDid, rkey);
await new Promise((resolve) => setTimeout(resolve, 250));
polls++;
}
}
137 changes: 28 additions & 109 deletions packages/frontpage/app/api/receive_hook/route.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,12 @@
import { db } from "@/lib/db";
import * as schema from "@/lib/schema";
import { atprotoGetRecord } from "@/lib/data/atproto/record";
import { Commit } from "@/lib/data/atproto/event";
import * as atprotoPost from "@/lib/data/atproto/post";
import * as dbPost from "@/lib/data/db/post";
import * as atprotoComment from "@/lib/data/atproto/comment";
import { VoteRecord } from "@/lib/data/atproto/vote";
import * as atprotoVote from "@/lib/data/atproto/vote";
import { getPdsUrl } from "@/lib/data/atproto/did";
import {
unauthed_createComment,
unauthed_deleteComment,
} from "@/lib/data/db/comment";
import {
unauthed_createPostVote,
unauthed_deleteVote,
unauthed_createCommentVote,
} from "@/lib/data/db/vote";
import { unauthed_createNotification } from "@/lib/data/db/notification";
import { handleComment, handlePost, handleVote } from "@/lib/api/relayHandler";
import { eq } from "drizzle-orm";

export async function POST(request: Request) {
const auth = request.headers.get("Authorization");
Expand All @@ -31,113 +21,42 @@ export async function POST(request: Request) {
}

const { ops, repo, seq } = commit.data;
const row = await db
.select()
.from(schema.ConsumedOffset)
.where(eq(schema.ConsumedOffset.offset, seq))
.limit(1);

const operationConsumed = Boolean(row[0]);
if (operationConsumed) {
console.log("Already consumed sequence:", seq);
return new Response("OK");
}

const service = await getPdsUrl(repo);
if (!service) {
throw new Error("No AtprotoPersonalDataServer service found");
}

const promises = ops.map(async (op) => {
const { collection, rkey } = op.path;
console.log("Processing", collection, rkey, op.action);

if (collection === atprotoPost.PostCollection) {
if (op.action === "create") {
const record = await atprotoGetRecord({
serviceEndpoint: service,
repo,
collection,
rkey,
});
const postRecord = atprotoPost.PostRecord.parse(record.value);
await dbPost.unauthed_createPost({
post: postRecord,
rkey,
authorDid: repo,
cid: record.cid,
offset: seq,
});
} else if (op.action === "delete") {
await dbPost.unauthed_deletePost({
rkey,
authorDid: repo,
offset: seq,
});
}
}
// repo is actually the did of the user
if (collection === atprotoComment.CommentCollection) {
if (op.action === "create") {
const comment = await atprotoComment.getComment({ rkey, repo });

const createdComment = await unauthed_createComment({
cid: comment.cid,
comment,
repo,
rkey,
});

const didToNotify = createdComment.parent
? createdComment.parent.authorDid
: createdComment.post.authordid;

if (didToNotify !== repo) {
await unauthed_createNotification({
commentId: createdComment.id,
did: didToNotify,
reason: createdComment.parent ? "commentReply" : "postComment",
});
}
} else if (op.action === "delete") {
await unauthed_deleteComment({ rkey, repo });
}

await db.transaction(async (tx) => {
await tx.insert(schema.ConsumedOffset).values({ offset: seq });
});
}

if (collection === "fyi.unravel.frontpage.vote") {
if (op.action === "create") {
const hydratedRecord = await atprotoGetRecord({
serviceEndpoint: service,
repo,
collection,
rkey,
});
const hydratedVoteRecordValue = VoteRecord.parse(hydratedRecord.value);

if (
hydratedVoteRecordValue.subject.uri.collection ===
atprotoPost.PostCollection
) {
await unauthed_createPostVote({
repo,
rkey,
vote: hydratedVoteRecordValue,
cid: hydratedRecord.cid,
});
} else if (
hydratedVoteRecordValue.subject.uri.collection ===
atprotoComment.CommentCollection
) {
await unauthed_createCommentVote({
cid: hydratedRecord.cid,
vote: hydratedVoteRecordValue,
repo,
rkey,
});
}
} else if (op.action === "delete") {
await unauthed_deleteVote(rkey, repo);
}

await db.transaction(async (tx) => {
await tx.insert(schema.ConsumedOffset).values({ offset: seq });
});
switch (collection) {
case atprotoPost.PostCollection:
await handlePost({ op, repo, rkey });
break;
case atprotoComment.CommentCollection:
await handleComment({ op, repo, rkey });
break;
case atprotoVote.VoteCollection:
await handleVote({ op, repo, rkey });
break;
default:
throw new Error(`Unknown collection: ${collection}, ${op}`);
}
});

await Promise.all(promises);

await db.insert(schema.ConsumedOffset).values({ offset: seq });
return new Response("OK");
}
Loading