Skip to content

Commit

Permalink
feat: printing dataset result for call command (#614)
Browse files Browse the repository at this point in the history
  • Loading branch information
vladfrangu authored Aug 21, 2024
1 parent 56c7d23 commit b052fb8
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 28 deletions.
59 changes: 57 additions & 2 deletions src/commands/call.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
import process from 'node:process';

import { ACTOR_JOB_STATUSES } from '@apify/consts';
import { Args, Flags } from '@oclif/core';
import type { ActorStartOptions, ApifyClient } from 'apify-client';
import { type ActorStartOptions, type ApifyClient, type Dataset, DownloadItemsFormat } from 'apify-client';

import { ApifyCommand } from '../lib/apify_command.js';
import { getInputOverride } from '../lib/commands/resolve-input.js';
import { SharedRunOnCloudFlags, runActorOrTaskOnCloud } from '../lib/commands/run-on-cloud.js';
import { LOCAL_CONFIG_PATH } from '../lib/consts.js';
import { getLocalConfig, getLocalUserInfo, getLoggedClientOrThrow } from '../lib/utils.js';

const TerminalStatuses = [
ACTOR_JOB_STATUSES.SUCCEEDED,
ACTOR_JOB_STATUSES.ABORTED,
ACTOR_JOB_STATUSES.FAILED,
ACTOR_JOB_STATUSES.TIMED_OUT,
];

export class ActorCallCommand extends ApifyCommand<typeof ActorCallCommand> {
static override description =
'Runs a specific Actor remotely on the Apify cloud platform.\n' +
Expand All @@ -32,6 +40,15 @@ export class ActorCallCommand extends ApifyCommand<typeof ActorCallCommand> {
allowStdin: true,
exclusive: ['input'],
}),
silent: Flags.boolean({
char: 's',
description: 'Prevents printing the logs of the Actor run to the console.',
default: false,
}),
'output-dataset': Flags.boolean({
char: 'o',
description: 'Prints out the entire default dataset on successful run of the Actor.',
}),
};

static override args = {
Expand Down Expand Up @@ -84,7 +101,7 @@ export class ActorCallCommand extends ApifyCommand<typeof ActorCallCommand> {
return;
}

await runActorOrTaskOnCloud(apifyClient, {
let run = await runActorOrTaskOnCloud(apifyClient, {
actorOrTaskData: {
id: actorId,
userFriendlyId,
Expand All @@ -93,7 +110,45 @@ export class ActorCallCommand extends ApifyCommand<typeof ActorCallCommand> {
type: 'Actor',
waitForFinishMillis,
inputOverride: inputOverride?.input,
silent: this.flags.silent,
});

if (this.flags.outputDataset) {
// TODO: cleaner way to do this (aka move it to a util function, or integrate it into runActorOrTaskOnCloud)
while (!TerminalStatuses.includes(run.status as never)) {
run = (await apifyClient.run(run.id).get())!;

if (TerminalStatuses.includes(run.status as never)) {
break;
}

// Wait a second before checking again
await new Promise((resolve) => setTimeout(resolve, 1000));
}

const datasetId = run.defaultDatasetId;

let info: Dataset;
let retries = 5;

// Why is this needed? Sometimes, when fetching the dataset info right after the run ends, the object doesn't have the stats up-to-date.
// But sometimes it does!
do {
info = (await apifyClient.dataset(datasetId).get())!;

if (info?.itemCount) {
break;
}

await new Promise((resolve) => setTimeout(resolve, 250));
} while (retries--);

const dataset = await apifyClient.dataset(datasetId).downloadItems(DownloadItemsFormat.JSON, {
clean: true,
});

console.log(dataset.toString());
}
}

private static async resolveActorId({
Expand Down
61 changes: 35 additions & 26 deletions src/lib/commands/run-on-cloud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import process from 'node:process';

import { ACTOR_JOB_STATUSES } from '@apify/consts';
import { Flags } from '@oclif/core';
import { ActorRun, ApifyClient, TaskStartOptions } from 'apify-client';
import type { ActorRun, ApifyClient, TaskStartOptions } from 'apify-client';

import { resolveInput } from './resolve-input.js';
import { CommandExitCodes } from '../consts.js';
Expand All @@ -19,25 +19,28 @@ export interface RunOnCloudOptions {
type: 'Actor' | 'Task';
waitForFinishMillis?: number;
inputOverride?: Record<string, unknown>;
silent?: boolean;
}

export async function runActorOrTaskOnCloud(apifyClient: ApifyClient, options: RunOnCloudOptions) {
const cwd = process.cwd();
const { actorOrTaskData, runOptions, type, waitForFinishMillis, inputOverride } = options;
const { actorOrTaskData, runOptions, type, waitForFinishMillis, inputOverride, silent } = options;

const clientMethod = type === 'Actor' ? 'actor' : 'task';

// Get input for actor
const actorInput = resolveInput(cwd, inputOverride);

if (type === 'Actor') {
runLog({ message: `Calling ${type} ${actorOrTaskData.userFriendlyId} (${actorOrTaskData.id})` });
} else if (actorOrTaskData.title) {
runLog({
message: `Calling ${type} ${actorOrTaskData.title} (${actorOrTaskData.userFriendlyId}, ${actorOrTaskData.id})`,
});
} else {
runLog({ message: `Calling ${type} ${actorOrTaskData.userFriendlyId} (${actorOrTaskData.id})` });
if (!silent) {
if (type === 'Actor') {
runLog({ message: `Calling ${type} ${actorOrTaskData.userFriendlyId} (${actorOrTaskData.id})` });
} else if (actorOrTaskData.title) {
runLog({
message: `Calling ${type} ${actorOrTaskData.title} (${actorOrTaskData.userFriendlyId}, ${actorOrTaskData.id})`,
});
} else {
runLog({ message: `Calling ${type} ${actorOrTaskData.userFriendlyId} (${actorOrTaskData.id})` });
}
}

let run: ActorRun;
Expand All @@ -62,28 +65,34 @@ export async function runActorOrTaskOnCloud(apifyClient: ApifyClient, options: R
else throw err;
}

try {
await outputJobLog(run, waitForFinishMillis);
} catch (err) {
warning({ message: 'Can not get log:' });
console.error(err);
if (!silent) {
try {
await outputJobLog(run, waitForFinishMillis);
} catch (err) {
warning({ message: 'Can not get log:' });
console.error(err);
}
}

run = (await apifyClient.run(run.id).get())!;

link({ message: `${type} run detail`, url: `https://console.apify.com/actors/${run.actId}#/runs/${run.id}` });
if (!silent) {
link({ message: `${type} run detail`, url: `https://console.apify.com/actors/${run.actId}#/runs/${run.id}` });

if (run.status === ACTOR_JOB_STATUSES.SUCCEEDED) {
success({ message: `${type} finished.` });
} else if (run.status === ACTOR_JOB_STATUSES.RUNNING) {
warning({ message: `${type} is still running!` });
} else if (run.status === ACTOR_JOB_STATUSES.ABORTED || run.status === ACTOR_JOB_STATUSES.ABORTING) {
warning({ message: `${type} was aborted!` });
process.exitCode = CommandExitCodes.RunAborted;
} else {
error({ message: `${type} failed!` });
process.exitCode = CommandExitCodes.RunFailed;
if (run.status === ACTOR_JOB_STATUSES.SUCCEEDED) {
success({ message: `${type} finished.` });
} else if (run.status === ACTOR_JOB_STATUSES.RUNNING) {
warning({ message: `${type} is still running!` });
} else if (run.status === ACTOR_JOB_STATUSES.ABORTED || run.status === ACTOR_JOB_STATUSES.ABORTING) {
warning({ message: `${type} was aborted!` });
process.exitCode = CommandExitCodes.RunAborted;
} else {
error({ message: `${type} failed!` });
process.exitCode = CommandExitCodes.RunFailed;
}
}

return run;
}

export const SharedRunOnCloudFlags = (type: 'Actor' | 'Task') => ({
Expand Down

0 comments on commit b052fb8

Please sign in to comment.