Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qstash implementation #135

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .env.local.example
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,7 @@ SLACK_MODERATOR_PASSWORD=
NEXT_PUBLIC_MUX_BYO_DOMAIN=
PLAYER_SPLIT_TESTING_PERCENTAGE=
TELEMETRY_ENDPOINT=
QSTASH_TOPIC=
QSTASH_TOKEN=
QSTASH_CURRENT_SIGNING_KEY=
QSTASH_NEXT_SIGNING_KEY=
50 changes: 50 additions & 0 deletions lib/mux-webhook-processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import {
sendSlackAssetReady,
sendSlackAutoDeleteMessage,
} from './slack-notifier';
import { getScores as moderationGoogle } from './moderation-google';
import { getScores as moderationHive } from './moderation-hive';
import { autoDelete } from './moderation-action';

type WebhookRequestBody = {
type: string;
object: any;
id: string;
environment: any;
data: any;
playback_ids?: any[];
duration: number;
created_at: string;
accessor_source: string;
accessor: string;
request_id: string;
}
Comment on lines +9 to +21
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a place where I can get all available fields of the webhook request?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There has been some work on this recently by techops team, ask in #devex-dev channel

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look at the spec/webhook/samples directory in this PR: https://github.com/muxinc/openapi-specification/pull/181

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @jaredsmith :)


export async function processMuxWebhook(data: WebhookRequestBody): Promise<boolean> {
const assetId = data.id;
const playbackId =
data.playback_ids && data.playback_ids[0] && data.playback_ids[0].id;
const duration = data.duration;

const googleScores = await moderationGoogle({ playbackId, duration });
const hiveScores = await moderationHive({ playbackId, duration });

const didAutoDelete = hiveScores
? await autoDelete({ assetId, playbackId, hiveScores })
: false;

if (didAutoDelete) {
await sendSlackAutoDeleteMessage({ assetId, duration, hiveScores });
console.log('Auto deleted this asset because it was bad');
} else {
await sendSlackAssetReady({
assetId,
playbackId,
duration,
googleScores,
hiveScores,
});
console.log('Notified myself about this');
}
return true;
}
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"@mux/mux-video-react": "^0.5.1",
"@mux/upchunk": "^2.3",
"@types/probe-image-size": "^7.0.0",
"@upstash/qstash": "^0.1.7",
"copy-to-clipboard": "^3.3.1",
"got": "^11.8.3",
"hls.js": "^1.1.5",
Expand Down
84 changes: 47 additions & 37 deletions pages/api/webhooks/mux.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,42 @@
import { NextApiRequest, NextApiResponse } from 'next';
import Mux from '@mux/mux-node';
import { processMuxWebhook } from '../../../lib/mux-webhook-processor';
import { buffer } from 'micro';
import { sendSlackAssetReady, sendSlackAutoDeleteMessage } from '../../../lib/slack-notifier';
import { getScores as moderationGoogle } from '../../../lib/moderation-google';
import { getScores as moderationHive } from '../../../lib/moderation-hive';
import { autoDelete } from '../../../lib/moderation-action';
import { Client } from '@upstash/qstash';

const webhookSignatureSecret = process.env.MUX_WEBHOOK_SIGNATURE_SECRET;
const qstashTopic = process.env.QSTASH_TOPIC;
const qstashToken = process.env.QSTASH_TOKEN;

const verifyWebhookSignature = (rawBody: string | Buffer, req: NextApiRequest) => {
const verifyWebhookSignature = (
rawBody: string | Buffer,
req: NextApiRequest
) => {
if (webhookSignatureSecret) {
// this will raise an error if signature is not valid
Mux.Webhooks.verifyHeader(rawBody, req.headers['mux-signature'] as string, webhookSignatureSecret);
Mux.Webhooks.verifyHeader(
rawBody,
req.headers['mux-signature'] as string,
webhookSignatureSecret
);
} else {
console.log('Skipping webhook sig verification because no secret is configured'); // eslint-disable-line no-console
console.log(
'Skipping webhook sig verification because no secret is configured'
); // eslint-disable-line no-console
}
return true;
};

const scheduleAsyncJob = async (rawBody: string) => {
const qstashClient = new Client({
token: `${qstashToken}`,
});
return await qstashClient.publishJSON({
url: qstashTopic,
body: rawBody,
});
};

//
// By default, NextJS will look at the content type and intelligently parse the body
// This is great. Except that for webhooks we need access to the raw body if we want
Expand All @@ -34,7 +53,10 @@ export const config = {
},
};

export default async function muxWebhookHandler (req: NextApiRequest, res: NextApiResponse): Promise<void> {
export default async function muxWebhookHandler(
req: NextApiRequest,
res: NextApiResponse
): Promise<void> {
const { method } = req;

switch (method) {
Expand All @@ -43,47 +65,35 @@ export default async function muxWebhookHandler (req: NextApiRequest, res: NextA
try {
verifyWebhookSignature(rawBody, req);
} catch (e) {
console.error('Error verifyWebhookSignature - is the correct signature secret set?', e);
console.error(
'Error verifyWebhookSignature - is the correct signature secret set?',
e
);
res.status(400).json({ message: (e as Error).message });
return;
}
const jsonBody = JSON.parse(rawBody);
const { data, type } = jsonBody;

if (type !== 'video.asset.ready') {
res.json({ message: 'thanks Mux' });
if (jsonBody.type !== 'video.asset.ready') {
res.json({ message: 'Thanks Mux, webhook received.' });
return;
}
try {
const assetId = data.id;
const playbackId = data.playback_ids && data.playback_ids[0] && data.playback_ids[0].id;
const duration = data.duration;

const googleScores = await moderationGoogle ({ playbackId, duration });
const hiveScores = await moderationHive ({ playbackId, duration });

const didAutoDelete = hiveScores ? (await autoDelete({ assetId, playbackId, hiveScores })) : false;

if (didAutoDelete) {
await sendSlackAutoDeleteMessage({ assetId, duration, hiveScores });
res.json({ message: 'thanks Mux, I autodeleted this asset because it was bad' });
try {
if (qstashTopic && qstashToken) {
const { messageId } = await scheduleAsyncJob(rawBody);
console.log('qstash messageId: ', messageId);
} else {
await sendSlackAssetReady({
assetId,
playbackId,
duration,
googleScores,
hiveScores,
});
res.json({ message: 'thanks Mux, I notified myself about this' });
await processMuxWebhook(jsonBody);
console.log('webhook processed sync');
}
res.json({ message: 'Thanks Mux, webhook received.' });
} catch (e) {
res.statusCode = 500;
console.error('Request error', e); // eslint-disable-line no-console
res.json({ error: 'Error handling webhook' });
console.error('Error in muxWebhookReceiver, request error: ', e); // eslint-disable-line no-console
res.status(500).json({ error: 'Error handling webhook' });
}
break;
} default:
}
default:
res.setHeader('Allow', ['POST']);
res.status(405).end(`Method ${method} Not Allowed`);
}
Expand Down
36 changes: 36 additions & 0 deletions pages/api/webhooks/qstash.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { NextApiRequest, NextApiResponse } from 'next';
import { processMuxWebhook } from '../../../lib/mux-webhook-processor';
import { verifySignature } from '@upstash/qstash/nextjs';

async function qstashWebhookHandler(
req: NextApiRequest,
res: NextApiResponse
): Promise<void> {
const { method, body } = req;

switch (method) {
case 'POST': {
try {
await processMuxWebhook(body);
res.json({ message: 'Success' });
} catch (e) {
res.statusCode = 500;
console.error('Request error', e); // eslint-disable-line no-console
res.json({ error: 'Error handling webhook' });
}
break;
}
default:
res.setHeader('Allow', ['POST']);
res.status(405).end(`Method ${method} Not Allowed`);
}
}

export const config = {
api: {
bodyParser: false,
},
};

// If QStash signing_keys are not defined in the env file, then it will respond back with error without processing the job
export default verifySignature(qstashWebhookHandler);
12 changes: 12 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,11 @@
resolved "https://registry.yarnpkg.com/@bcoe/v8-coverage/-/v8-coverage-0.2.3.tgz#75a2e8b51cb758a7553d6804a5932d7aace75c39"
integrity sha512-0hYQ8SB4Db5zvZB4axdMHGwEaQjkZzFjQiN9LVYvIFB2nSUHW9tYpxWriPrWDASIxiaXax83REcLxuSdnGPZtw==

"@deno/shim-crypto@~0.3.0":
version "0.3.1"
resolved "https://registry.yarnpkg.com/@deno/shim-crypto/-/shim-crypto-0.3.1.tgz#416155b2b6f9ad728f80ebcb422c803efc1023c2"
integrity sha512-ed4pNnfur6UbASEgF34gVxR9p7Mc3qF+Ygbmjiil8ws5IhNFhPDFy5vE5hQAUA9JmVsSxXPcVLM5Rf8LOZqQ5Q==

"@eslint/eslintrc@^0.4.3":
version "0.4.3"
resolved "https://registry.yarnpkg.com/@eslint/eslintrc/-/eslintrc-0.4.3.tgz#9e42981ef035beb3dd49add17acb96e8ff6f394c"
Expand Down Expand Up @@ -1245,6 +1250,13 @@
"@typescript-eslint/types" "4.33.0"
eslint-visitor-keys "^2.0.0"

"@upstash/qstash@^0.1.7":
version "0.1.7"
resolved "https://registry.yarnpkg.com/@upstash/qstash/-/qstash-0.1.7.tgz#637555fa6d13d044eb8ebefc0dad5a54517c21c5"
integrity sha512-QrQuT+m54cIAC+XPp3sWF89G/1gV63i+jiSQWonoMLC6vmXaEg+YSAvWkFLdFWBm1WPheFFQLuKwlZNtUH51SA==
dependencies:
"@deno/shim-crypto" "~0.3.0"

abab@^2.0.3, abab@^2.0.5:
version "2.0.5"
resolved "https://registry.yarnpkg.com/abab/-/abab-2.0.5.tgz#c0b678fb32d60fc1219c784d6a826fe385aeb79a"
Expand Down