Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qstash implementation #135

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .env.local.example
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,7 @@ SLACK_MODERATOR_PASSWORD=
NEXT_PUBLIC_MUX_BYO_DOMAIN=
PLAYER_SPLIT_TESTING_PERCENTAGE=
TELEMETRY_ENDPOINT=
QSTASH_TOPIC=
QSTASH_TOKEN=
QSTASH_CURRENT_SIGNING_KEY=
QSTASH_NEXT_SIGNING_KEY=
26 changes: 24 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@
<li><a href="#step-2-set-up-environment-variables">Step 2. Set Up Environment Variables</a></li>
<li><a href="#step-3-deploy-on-vercel">Step 3. Deploy on Vercel</a></li>
<li><a href="#step-4-optional-slackbot-moderator">Step 4. (Optional) Slackbot Moderator</a></li>
<li><a
<li><a
href="#step-5-optional-add-automatic-content-analysis-to-slackbot-moderator-google-vision-api"
>Step 5. (Optional) Add automatic content analysis to Slackbot Moderator (Google Vision API)</a></li>
<li><a
<li><a
href="#step-6-optional-add-automatic-content-analysis-to-slackbot-moderator-hive-ai"
>Step 6. (Optional) Add automatic content analysis to Slackbot Moderator Hive AI</a></li>
</ul>
Expand Down Expand Up @@ -246,6 +246,28 @@ Each dimension will have a score from 0-1 with a precision of 6 decimal places.
<img src="images/moderation-score-slack.png" width="80%" alt="Slackbot Moderation Message"></img>
</div>



## Step 7 (optional) Process Mux webhooks asynchronously ([QStash](https://docs.upstash.com/qstash/))

stream.new can leverage [QStash](https://docs.upstash.com/qstash/) to process Mux webhooks asynchronously to address time-out issues. When stream.new receives a webhook from Mux, it will send the webhook data to QStash topic to be processed later. Data sent to QStash will be sent back to stream.new for the actual processing.

Follow these steps to set it up:

1. First, you will need to set up an account at [upstash.com](https://upstash.com/).
2. Navigate to QStash, create a topic under Topics tab
You need to enter webhook URL(s) where messages in topic will need to be sent
3. After setting up the topic, you will need to set following env vars:

- `QSTASH_TOKEN` - This is a token that will be used to send webhook data to QStash topic.
- `QSTASH_TOPIC` - Topic name to send webhook data to.
- `QSTASH_CURRENT_SIGNING_KEY` - Signing key to be used to verify the request from QStash.
- `QSTASH_NEXT_SIGNING_KEY` - Next signing key to be used to verify the request from QStash.

Once all environment variables set, stream.new will send webhook data to the QStash after verifying the request and then it will respond back to the Mux webhook request. You should be able to find the QStash message id in the console of your application. From QStash, it will send the webhook data back to stream.new for actual processing.

Read more about QStash [here](https://docs.upstash.com/qstash).

# Hidden playback features via query params:

- `time`: will start the video at a specific timestamp in seconds, for example `?time=10` will start at 10 seconds [like this](https://stream.new/v/XQDCNm01ZPyGg81GzK4mQfL7fxFoqP8uo?time=10)
Expand Down
50 changes: 50 additions & 0 deletions lib/mux-webhook-processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import {
sendSlackAssetReady,
sendSlackAutoDeleteMessage,
} from './slack-notifier';
import { getScores as moderationGoogle } from './moderation-google';
import { getScores as moderationHive } from './moderation-hive';
import { autoDelete } from './moderation-action';

type WebhookRequestBody = {
type: string;
object: any;
id: string;
environment: any;
data: any;
playback_ids?: any[];
duration: number;
created_at: string;
accessor_source: string;
accessor: string;
request_id: string;
}
Comment on lines +9 to +21
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a place where I can get all available fields of the webhook request?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There has been some work on this recently by techops team, ask in #devex-dev channel

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look at the spec/webhook/samples directory in this PR: https://github.com/muxinc/openapi-specification/pull/181

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @jaredsmith :)


export async function processMuxWebhook(data: WebhookRequestBody): Promise<boolean> {
const assetId = data.id;
const playbackId =
data.playback_ids && data.playback_ids[0] && data.playback_ids[0].id;
const duration = data.duration;

const googleScores = await moderationGoogle({ playbackId, duration });
const hiveScores = await moderationHive({ playbackId, duration });

const didAutoDelete = hiveScores
? await autoDelete({ assetId, playbackId, hiveScores })
: false;

if (didAutoDelete) {
await sendSlackAutoDeleteMessage({ assetId, duration, hiveScores });
console.log('Auto deleted this asset because it was bad');
} else {
await sendSlackAssetReady({
assetId,
playbackId,
duration,
googleScores,
hiveScores,
});
console.log('Notified myself about this');
}
return true;
}
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"@mux/mux-video-react": "^0.5.1",
"@mux/upchunk": "^2.3",
"@types/probe-image-size": "^7.0.0",
"@upstash/qstash": "^0.1.7",
"copy-to-clipboard": "^3.3.1",
"got": "^11.8.3",
"hls.js": "^1.1.5",
Expand Down
84 changes: 47 additions & 37 deletions pages/api/webhooks/mux.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,42 @@
import { NextApiRequest, NextApiResponse } from 'next';
import Mux from '@mux/mux-node';
import { processMuxWebhook } from '../../../lib/mux-webhook-processor';
import { buffer } from 'micro';
import { sendSlackAssetReady, sendSlackAutoDeleteMessage } from '../../../lib/slack-notifier';
import { getScores as moderationGoogle } from '../../../lib/moderation-google';
import { getScores as moderationHive } from '../../../lib/moderation-hive';
import { autoDelete } from '../../../lib/moderation-action';
import { Client } from '@upstash/qstash';

const webhookSignatureSecret = process.env.MUX_WEBHOOK_SIGNATURE_SECRET;
const qstashTopic = process.env.QSTASH_TOPIC;
const qstashToken = process.env.QSTASH_TOKEN;

const verifyWebhookSignature = (rawBody: string | Buffer, req: NextApiRequest) => {
const verifyWebhookSignature = (
rawBody: string | Buffer,
req: NextApiRequest
) => {
if (webhookSignatureSecret) {
// this will raise an error if signature is not valid
Mux.Webhooks.verifyHeader(rawBody, req.headers['mux-signature'] as string, webhookSignatureSecret);
Mux.Webhooks.verifyHeader(
rawBody,
req.headers['mux-signature'] as string,
webhookSignatureSecret
);
} else {
console.log('Skipping webhook sig verification because no secret is configured'); // eslint-disable-line no-console
console.log(
'Skipping webhook sig verification because no secret is configured'
); // eslint-disable-line no-console
}
return true;
};

const scheduleAsyncJob = async (rawBody: string) => {
const qstashClient = new Client({
token: `${qstashToken}`,
});
return await qstashClient.publishJSON({
url: qstashTopic,
body: rawBody,
});
};

//
// By default, NextJS will look at the content type and intelligently parse the body
// This is great. Except that for webhooks we need access to the raw body if we want
Expand All @@ -34,7 +53,10 @@ export const config = {
},
};

export default async function muxWebhookHandler (req: NextApiRequest, res: NextApiResponse): Promise<void> {
export default async function muxWebhookHandler(
req: NextApiRequest,
res: NextApiResponse
): Promise<void> {
const { method } = req;

switch (method) {
Expand All @@ -43,47 +65,35 @@ export default async function muxWebhookHandler (req: NextApiRequest, res: NextA
try {
verifyWebhookSignature(rawBody, req);
} catch (e) {
console.error('Error verifyWebhookSignature - is the correct signature secret set?', e);
console.error(
'Error verifyWebhookSignature - is the correct signature secret set?',
e
);
res.status(400).json({ message: (e as Error).message });
return;
}
const jsonBody = JSON.parse(rawBody);
const { data, type } = jsonBody;

if (type !== 'video.asset.ready') {
res.json({ message: 'thanks Mux' });
if (jsonBody.type !== 'video.asset.ready') {
res.json({ message: 'Thanks Mux, webhook received.' });
return;
}
try {
const assetId = data.id;
const playbackId = data.playback_ids && data.playback_ids[0] && data.playback_ids[0].id;
const duration = data.duration;

const googleScores = await moderationGoogle ({ playbackId, duration });
const hiveScores = await moderationHive ({ playbackId, duration });

const didAutoDelete = hiveScores ? (await autoDelete({ assetId, playbackId, hiveScores })) : false;

if (didAutoDelete) {
await sendSlackAutoDeleteMessage({ assetId, duration, hiveScores });
res.json({ message: 'thanks Mux, I autodeleted this asset because it was bad' });
try {
if (qstashTopic && qstashToken) {
const { messageId } = await scheduleAsyncJob(rawBody);
console.log('qstash messageId: ', messageId);
} else {
await sendSlackAssetReady({
assetId,
playbackId,
duration,
googleScores,
hiveScores,
});
res.json({ message: 'thanks Mux, I notified myself about this' });
await processMuxWebhook(jsonBody);
console.log('webhook processed sync');
}
res.json({ message: 'Thanks Mux, webhook received.' });
} catch (e) {
res.statusCode = 500;
console.error('Request error', e); // eslint-disable-line no-console
res.json({ error: 'Error handling webhook' });
console.error('Error in muxWebhookReceiver, request error: ', e); // eslint-disable-line no-console
res.status(500).json({ error: 'Error handling webhook' });
}
break;
} default:
}
default:
res.setHeader('Allow', ['POST']);
res.status(405).end(`Method ${method} Not Allowed`);
}
Expand Down
36 changes: 36 additions & 0 deletions pages/api/webhooks/qstash.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { NextApiRequest, NextApiResponse } from 'next';
import { processMuxWebhook } from '../../../lib/mux-webhook-processor';
import { verifySignature } from '@upstash/qstash/nextjs';

async function qstashWebhookHandler(
req: NextApiRequest,
res: NextApiResponse
): Promise<void> {
const { method, body } = req;

switch (method) {
case 'POST': {
try {
await processMuxWebhook(body);
res.json({ message: 'Success' });
} catch (e) {
res.statusCode = 500;
console.error('Request error', e); // eslint-disable-line no-console
res.json({ error: 'Error handling webhook' });
}
break;
}
default:
res.setHeader('Allow', ['POST']);
res.status(405).end(`Method ${method} Not Allowed`);
}
}

export const config = {
api: {
bodyParser: false,
},
};

// If QStash signing_keys are not defined in the env file, then it will respond back with error without processing the job
export default verifySignature(qstashWebhookHandler);
12 changes: 12 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,11 @@
resolved "https://registry.yarnpkg.com/@bcoe/v8-coverage/-/v8-coverage-0.2.3.tgz#75a2e8b51cb758a7553d6804a5932d7aace75c39"
integrity sha512-0hYQ8SB4Db5zvZB4axdMHGwEaQjkZzFjQiN9LVYvIFB2nSUHW9tYpxWriPrWDASIxiaXax83REcLxuSdnGPZtw==

"@deno/shim-crypto@~0.3.0":
version "0.3.1"
resolved "https://registry.yarnpkg.com/@deno/shim-crypto/-/shim-crypto-0.3.1.tgz#416155b2b6f9ad728f80ebcb422c803efc1023c2"
integrity sha512-ed4pNnfur6UbASEgF34gVxR9p7Mc3qF+Ygbmjiil8ws5IhNFhPDFy5vE5hQAUA9JmVsSxXPcVLM5Rf8LOZqQ5Q==

"@eslint/eslintrc@^0.4.3":
version "0.4.3"
resolved "https://registry.yarnpkg.com/@eslint/eslintrc/-/eslintrc-0.4.3.tgz#9e42981ef035beb3dd49add17acb96e8ff6f394c"
Expand Down Expand Up @@ -1245,6 +1250,13 @@
"@typescript-eslint/types" "4.33.0"
eslint-visitor-keys "^2.0.0"

"@upstash/qstash@^0.1.7":
version "0.1.7"
resolved "https://registry.yarnpkg.com/@upstash/qstash/-/qstash-0.1.7.tgz#637555fa6d13d044eb8ebefc0dad5a54517c21c5"
integrity sha512-QrQuT+m54cIAC+XPp3sWF89G/1gV63i+jiSQWonoMLC6vmXaEg+YSAvWkFLdFWBm1WPheFFQLuKwlZNtUH51SA==
dependencies:
"@deno/shim-crypto" "~0.3.0"

abab@^2.0.3, abab@^2.0.5:
version "2.0.5"
resolved "https://registry.yarnpkg.com/abab/-/abab-2.0.5.tgz#c0b678fb32d60fc1219c784d6a826fe385aeb79a"
Expand Down