Skip to content

Commit

Permalink
⚡️ Update osmcha:new-mapper command
Browse files Browse the repository at this point in the history
- Improve performance
- Add performance report
  • Loading branch information
jbelien committed Oct 20, 2024
1 parent 3c22628 commit adacee1
Showing 1 changed file with 104 additions and 127 deletions.
231 changes: 104 additions & 127 deletions src/Command/NewMapperCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
use App\Service\OSMChaAPI;
use App\Service\RegionsProvider;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Cache\CacheItemPoolInterface;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\ArrayInput;
Expand All @@ -21,6 +20,7 @@
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\HttpClient\Exception\ClientException;
use Symfony\Component\Stopwatch\Stopwatch;
use Symfony\Component\Validator\Constraints\Date;
use Symfony\Component\Validator\Validator\ValidatorInterface;

Expand All @@ -31,14 +31,14 @@
class NewMapperCommand extends Command
{
public function __construct(
private readonly Stopwatch $stopwatch,
private readonly ValidatorInterface $validator,
private readonly EntityManagerInterface $entityManager,
private readonly RegionsProvider $regionsProvider,
private readonly ChangesetProvider $changesetProvider,
private readonly MapperProvider $mapperProvider,
private readonly OSMChaAPI $osmcha,
private readonly OpenStreetMapAPI $osm,
private readonly CacheItemPoolInterface $cache,
) {
parent::__construct();
}
Expand All @@ -61,122 +61,105 @@ protected function initialize(InputInterface $input, OutputInterface $output)

protected function execute(InputInterface $input, OutputInterface $output): int
{
$usersDeletedCache = $this->cache->getItem(DeletedUsersCommand::CACHE_KEY);
if (!$usersDeletedCache->isHit()) {
$deletedUsersCommand = $this->getApplication()->find('osm:deleted-users');
$deletedUsersCommand->run(new ArrayInput([]), $output);
$io = new SymfonyStyle($input, $output);

$usersDeletedCache = $this->cache->getItem(DeletedUsersCommand::CACHE_KEY);
}
$this->stopwatch->start('initialize-region');

$usersDeleted = $usersDeletedCache->get();
$key = $input->getArgument('region');

$io = new SymfonyStyle($input, $output);
$config = $this->regionsProvider->getRegion(null, $key);
$region = $this->regionsProvider->getEntity($key);

$this->stopwatch->stop('initialize-region');

$this->stopwatch->start('initialize-date');

$key = $input->getArgument('region');
$date = $input->getOption('date');
$region = $this->regionsProvider->getRegion(null, $key);

if (null === $date) {
/** @var Region|null */
$r = $this->entityManager->find(Region::class, $key);
$lastUpdate = null === $r ? null : $r->getLastUpdate();
if (null === $region) {
$region = new Region();
$region->setId($key);

if (null === $r || null === $lastUpdate) {
// If there never was an update, get new mappers from the last 5 days
$date = (new \DateTime())->sub(new \DateInterval('P5D'))->format('Y-m-d');
$io->note(sprintf('Region was not processed yet, get new mappers from %s.', $date));
$io->info(sprintf('Region was not processed yet, get new mappers from %s.', $date));
} else {
$date = $lastUpdate->sub(new \DateInterval('P1D'))->format('Y-m-d');
$io->note(sprintf('Get new mappers from %s.', $date));
$date = $region->getLastUpdate()->sub(new \DateInterval('P1D'))->format('Y-m-d');
}
} elseif (null === $region) {
$region = new Region();
$region->setId($key);
}

$this->stopwatch->stop('initialize-date');

$this->stopwatch->start('update-aoi');

$io->title(sprintf('Update the Area of Interest "%s" (%s)', $key, $date));

$aoiCommand = $this->getApplication()->find('osmcha:aoi');
$aoiCommand->run(new ArrayInput([
'region' => $key,
'-d' => $date,
]), $output);

try {
/** @var int[] */
$usersId = [];
/** @var int[] */
$changesetsId = [];

$changesetsResponse = $this->osmcha->getAreaOfInterestChangesets($region['osmcha.id']);
$this->stopwatch->stop('update-aoi');

$io->text(
sprintf('%s %s', $changesetsResponse->getInfo('http_method'), $changesetsResponse->getInfo('url'))
);
$this->stopwatch->start('process');

$changesetsCollection = $changesetsResponse->toArray();
$io->title(sprintf('Get new mappers from Area of Interest "%s"', $key));

$features = array_filter($changesetsCollection['features'], fn (array $feature) => !\in_array((int) $feature['properties']['uid'], $usersDeleted, true));
try {
/** @var int[] $usersId */
$usersId = [];

$usersId = array_unique(array_map(fn (array $feature) => (int) $feature['properties']['uid'], $features), \SORT_NUMERIC);
$changesets = $this->getNewChangesets($config, $usersId, $io);

/** @var Mapper[] */
$mappers = [];
$usersIdChunks = array_chunk($usersId, 50);
foreach ($usersIdChunks as $i => $chunk) {
$mappers = array_merge($mappers, $this->getUsers($key, $chunk, $io));
}
for ($i = 0; $i < \count($usersId); ++$i) {
try {
$mapper = $this->entityManager->find(Mapper::class, $usersId[$i]);
if (null === $mapper) {
$mapper = $this->getMapper($usersId[$i], $io);
$firstChangeset = $this->getFirstChangeset($usersId[$i], $io);

$changesetsId = array_map(fn (array $feature) => (int) $feature['id'], $features);
if (true === \in_array($firstChangeset->getId(), array_column($changesets, 'id'), true)) {
$mapper->addRegion($region);

/** @var Changeset[] */
$changesets = array_map(function (array $feature) use ($io, $mappers): Changeset {
$mapper = current(array_filter($mappers, fn (Mapper $mapper): bool => $mapper->getId() === (int) $feature['properties']['uid']));
$mapperChangesets = array_filter($changesets, function (array $changeset) use ($mapper) { return (int) $changeset['properties']['uid'] === $mapper->getId(); });
foreach ($mapperChangesets as $changeset) {
$changeset = $this->changesetProvider->fromOSMCha($changeset);

$changeset = $this->changesetProvider->fromOSMCha($feature);
$mapper->addChangeset($changeset);

if (false === $mapper) {
$io->warning(sprintf('Can\'t find mapper #%d', $feature['properties']['uid']));
} else {
$changeset->setMapper($mapper);
}
$this->entityManager->persist($changeset);
}

return $changeset;
}, $features);
$this->entityManager->persist($mapper);

foreach ($mappers as $mapper) {
$firstChangeset = $this->getFirstChangeset($mapper, $io);

/* @todo Add first changeset check date ?? */
if (true === \in_array($firstChangeset->getId(), $changesetsId, true)) {
if (null === $this->entityManager->find(Mapper::class, $mapper->getId())) {
$this->entityManager->persist($mapper);
$io->info(sprintf('Mapper %s (%d) added with %d changeset(s) (first changeset: %d)', $mapper->getDisplayName(), $mapper->getId(), count($mapperChangesets), $firstChangeset->getId()));
}
} else {
$io->note(sprintf('Mapper #%d already exists', $usersId[$i]));
}
} catch (\Exception $e) {
$io->error($e->getMessage());
$io->block($e->getTraceAsString());
}

$mapperChangesets = array_filter($changesets, fn (Changeset $changeset): bool => $changeset->getMapper() === $mapper);
foreach ($mapperChangesets as $changeset) {
$this->entityManager->persist($changeset);
}
$this->stopwatch->lap('process');
}

$this->entityManager->flush();
$region->setLastUpdate(new \DateTime());

$io->success(
sprintf(
'[%s] %s : %s changeset(s)',
$firstChangeset->getCreatedAt()->format('r'),
$mapper->getDisplayName(),
\count($mapperChangesets)
)
);
}
}
$this->entityManager->persist($region);

/** @var Region|null */
$r = $this->entityManager->find(Region::class, $key);
if (null === $r) {
$r = new Region();
$r->setId($key);
}
$r->setLastUpdate(new \DateTime());
$this->entityManager->persist($r);
$this->entityManager->flush();

$this->stopwatch->stop('process');

$this->getPerformance($io);

return Command::SUCCESS;
} catch (ClientException $e) {
$io->error($e->getMessage());
Expand All @@ -186,11 +169,27 @@ protected function execute(InputInterface $input, OutputInterface $output): int
}
}

private function getFirstChangeset(Mapper $mapper, SymfonyStyle $io): Changeset
private function getNewChangesets(array $region, array &$users, SymfonyStyle $io): array
{
$response = $this->osm->getChangesetsByUser($mapper->getId());
$response = $this->osmcha->getAreaOfInterestChangesets($region['osmcha.id']);

// $io->text(sprintf('%s %s', $response->getInfo('http_method'), $response->getInfo('url')));

$geojson = $response->toArray();
$features = $geojson['features'];

$users = array_values(array_unique(array_map(fn (array $feature) => (int) $feature['properties']['uid'], $features), \SORT_NUMERIC));

$io->text(sprintf('%s %s', $response->getInfo('http_method'), $response->getInfo('url')));
$io->success(sprintf('Found %d new changeset(s) from %d new user(s)', \count($features), \count($users)));

return $features;
}

private function getFirstChangeset(int $userId, SymfonyStyle $io): Changeset
{
$response = $this->osm->getChangesetsByUser($userId);

// $io->text(sprintf('%s %s', $response->getInfo('http_method'), $response->getInfo('url')));

$xml = new \SimpleXMLElement($response->getContent());

Expand All @@ -209,57 +208,35 @@ private function getFirstChangeset(Mapper $mapper, SymfonyStyle $io): Changeset
return $changesets[0];
}

private function getUsers(string $key, array $ids, SymfonyStyle $io): array
private function getMapper(int $userId, SymfonyStyle $io): Mapper
{
try {
$getUsersResponse = $this->osm->getUsers($ids);
$response = $this->osm->getUsers([$userId]);

if (404 === $getUsersResponse->getStatusCode() && \count($ids) > 1) {
$io->warning(sprintf('%s %s (%d)', $getUsersResponse->getInfo('http_method'), $getUsersResponse->getInfo('url'), $getUsersResponse->getStatusCode()));
// $io->text(sprintf('%s %s', $response->getInfo('http_method'), $response->getInfo('url')));

$users = [];
foreach ($ids as $id) {
$getUserResponse = $this->osm->getUsers([$id]);
$response = $response->toArray(true);

if (404 !== $getUserResponse->getStatusCode()) {
$io->text(sprintf('%s %s', $getUserResponse->getInfo('http_method'), $getUserResponse->getInfo('url')));
$users = $response['users'];

$response = $getUserResponse->toArray();
$users = array_merge($users, $response['users']);
} else {
$io->warning(sprintf('%s %s (%d)', $getUserResponse->getInfo('http_method'), $getUserResponse->getInfo('url'), $getUserResponse->getStatusCode()));
}
}
} else {
$io->text(sprintf('%s %s', $getUsersResponse->getInfo('http_method'), $getUsersResponse->getInfo('url')));

$response = $getUsersResponse->toArray();
$users = $response['users'];
}

/** @var Mapper[] */
$mappers = array_map(function (array $array) use ($key): Mapper {
$region = $this->regionsProvider->getEntity($key);

if (null === $region) {
$region = new Region();
$region->setId($key);
$region->setLastUpdate(new \DateTime('1970-01-01'));

$this->entityManager->persist($region);
}

$mapper = $this->mapperProvider->fromOSM($array);
$mapper->addRegion($region);
if (0 === \count($users)) {
throw new \InvalidArgumentException(sprintf('User #%d not found', $userId));
}

return $mapper;
}, $users);
$mapper = $this->mapperProvider->fromOSM($users[0]);

return $mappers;
} catch (\Exception $exception) {
$io->warning($exception->getMessage());
return $mapper;
}

return [];
}
private function getPerformance(SymfonyStyle $io): void
{
$perf = [
['Initialize region', round($this->stopwatch->getEvent('initialize-region')->getDuration()), round($this->stopwatch->getEvent('initialize-region')->getMemory() / 1024 / 1024, 1)],
['Initialize date', round($this->stopwatch->getEvent('initialize-date')->getDuration()), round($this->stopwatch->getEvent('initialize-date')->getMemory() / 1024 / 1024, 1)],
['Update AOI', round($this->stopwatch->getEvent('update-aoi')->getDuration()), round($this->stopwatch->getEvent('update-aoi')->getMemory() / 1024 / 1024, 1)],
['Process', round($this->stopwatch->getEvent('process')->getDuration()), round($this->stopwatch->getEvent('process')->getMemory() / 1024 / 1024, 1)],
];

$io->table(['Event', 'Duration (ms)', 'Memory (MB)'], $perf);
$io->text(sprintf('Total: %.2f seconds - %.1f MB', array_sum(array_column($perf, 1)) / 1000, array_sum(array_column($perf, 2))));
}
}

0 comments on commit adacee1

Please sign in to comment.