Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate from xz to zstd for dumps compression #3215

Merged
merged 1 commit into from
Mar 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions admin/create-dumps.sh
Original file line number Diff line number Diff line change
Expand Up @@ -236,22 +236,22 @@ touch "$FTP_CURRENT_DUMP_DIR/.rsync-filter"

add_rsync_include_rule \
"$FTP_CURRENT_DUMP_DIR" \
"listenbrainz-public-dump-$DUMP_TIMESTAMP.tar.xz"
"listenbrainz-public-dump-$DUMP_TIMESTAMP.tar.zst"
add_rsync_include_rule \
"$FTP_CURRENT_DUMP_DIR" \
"listenbrainz-public-timescale-dump-$DUMP_TIMESTAMP.tar.xz"
"listenbrainz-public-timescale-dump-$DUMP_TIMESTAMP.tar.zst"
add_rsync_include_rule \
"$FTP_CURRENT_DUMP_DIR" \
"listenbrainz-listens-dump-$DUMP_ID-$DUMP_TIMESTAMP-$DUMP_TYPE.tar.xz"
"listenbrainz-listens-dump-$DUMP_ID-$DUMP_TIMESTAMP-$DUMP_TYPE.tar.zst"
add_rsync_include_rule \
"$FTP_CURRENT_DUMP_DIR" \
"listenbrainz-spark-dump-$DUMP_ID-$DUMP_TIMESTAMP-$DUMP_TYPE.tar"
add_rsync_include_rule \
"$FTP_CURRENT_DUMP_DIR" \
"listenbrainz-feedback-dump-$DUMP_TIMESTAMP.tar.xz"
"listenbrainz-feedback-dump-$DUMP_TIMESTAMP.tar.zst"
add_rsync_include_rule \
"$FTP_CURRENT_DUMP_DIR" \
"listenbrainz-statistics-dump-$DUMP_TIMESTAMP.tar.xz"
"listenbrainz-statistics-dump-$DUMP_TIMESTAMP.tar.zst"
add_rsync_include_rule \
"$FTP_CURRENT_DUMP_DIR" \
"musicbrainz-canonical-dump-$DUMP_TIMESTAMP.tar.zst"
Expand Down
12 changes: 6 additions & 6 deletions docs/users/listenbrainz-dumps.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,28 @@ File Descriptions

A ListenBrainz data dump consists of three archives:

#. ``listenbrainz-public-dump.tar.xz``
#. ``listenbrainz-public-dump.tar.zst``

#. ``listenbrainz-listens-dump.tar.xz``
#. ``listenbrainz-listens-dump.tar.zst``

#. ``listenbrainz-listens-dump-spark.tar.xz``
#. ``listenbrainz-listens-dump-spark.tar.zst``


listenbrainz-public-dump.tar.xz
listenbrainz-public-dump.tar.zst
-------------------------------

This file contains information about ListenBrainz users and statistics derived
from listens submitted to ListenBrainz calculated from users, artists, recordings etc.


listenbrainz-listens-dump.tar.xz
listenbrainz-listens-dump.tar.zst
--------------------------------

This is the core ListenBrainz data dump. This file contains all the listens
submitted to ListenBrainz by its users.


listenbrainz-listens-dump-spark.tar.xz
listenbrainz-listens-dump-spark.tar.zst
--------------------------------------

This is also a dump of the core ListenBrainz listen data. These dumps are
Expand Down
25 changes: 11 additions & 14 deletions listenbrainz/db/dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,16 +393,13 @@ def _create_dump(location: str, db_engine: Optional[sqlalchemy.engine.Engine], d
dump_type=dump_type,
time=dump_time.strftime('%Y%m%d-%H%M%S')
)
archive_path = os.path.join(location, '{archive_name}.tar.xz'.format(
archive_name=archive_name,
))
archive_path = os.path.join(location, f'{archive_name}.tar.zst')

with open(archive_path, 'w') as archive:
zstd_command = ['zstd', '--compress', f'-T{threads}', '-10']
zstd = subprocess.Popen(zstd_command, stdin=subprocess.PIPE, stdout=archive)

xz_command = ['xz', '--compress', '-T{threads}'.format(threads=threads)]
xz = subprocess.Popen(xz_command, stdin=subprocess.PIPE, stdout=archive)

with tarfile.open(fileobj=xz.stdin, mode='w|') as tar:
with tarfile.open(fileobj=zstd.stdin, mode='w|') as tar:

temp_dir = tempfile.mkdtemp()

Expand Down Expand Up @@ -462,9 +459,9 @@ def _create_dump(location: str, db_engine: Optional[sqlalchemy.engine.Engine], d

shutil.rmtree(temp_dir)

xz.stdin.close()
zstd.stdin.close()

xz.wait()
zstd.wait()
return archive_path


Expand Down Expand Up @@ -831,7 +828,7 @@ def _import_dump(archive_path, db_engine: sqlalchemy.engine.Engine,
""" Import dump present in passed archive path into postgres db.

Arguments:
archive_path: path to the .tar.xz archive to be imported
archive_path: path to the .tar.zst archive to be imported
db_engine: an sqlalchemy Engine instance for making a connection
tables: dict of tables present in the archive with table name as key and
columns to import as values
Expand All @@ -840,13 +837,13 @@ def _import_dump(archive_path, db_engine: sqlalchemy.engine.Engine,
db.DUMP_DEFAULT_THREAD_COUNT
"""

xz_command = ['xz', '--decompress', '--stdout', archive_path, '-T{threads}'.format(threads=threads)]
xz = subprocess.Popen(xz_command, stdout=subprocess.PIPE)
zstd_command = ['zstd', '--decompress', '--stdout', archive_path, f'-T{threads}']
zstd = subprocess.Popen(zstd_command, stdout=subprocess.PIPE)

connection = db_engine.raw_connection()
try:
cursor = connection.cursor()
with tarfile.open(fileobj=xz.stdout, mode='r|') as tar:
with tarfile.open(fileobj=zstd.stdout, mode='r|') as tar:
for member in tar:
file_name = member.name.split('/')[-1]

Expand Down Expand Up @@ -875,7 +872,7 @@ def _import_dump(archive_path, db_engine: sqlalchemy.engine.Engine,
current_app.logger.info('Imported table %s', file_name)
finally:
connection.close()
xz.stdout.close()
zstd.stdout.close()


def _update_sequence(db_engine: sqlalchemy.engine.Engine, seq_name, table_name):
Expand Down
6 changes: 3 additions & 3 deletions listenbrainz/db/tests/test_dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ def test_create_stats_dump(self):

found = set()
found_stats = None
xz_command = ['xz', '--decompress', '--stdout', dump_location, '-T4']
xz = subprocess.Popen(xz_command, stdout=subprocess.PIPE)
with tarfile.open(fileobj=xz.stdout, mode='r|') as tar:
zstd_command = ['zstd', '--decompress', '--stdout', dump_location, '-T4']
zstd = subprocess.Popen(zstd_command, stdout=subprocess.PIPE)
with tarfile.open(fileobj=zstd.stdout, mode='r|') as tar:
for member in tar:
file_name = member.name.split('/')[-1]
if file_name.endswith(".jsonl"):
Expand Down
25 changes: 14 additions & 11 deletions listenbrainz/db/tests/test_dump_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import os
import shutil
import subprocess
import tarfile
import tempfile
import time
Expand Down Expand Up @@ -148,13 +149,13 @@ def test_create_full_db(self):
# dumps should contain the 7 archives
archive_count = 0
for file_name in os.listdir(os.path.join(self.tempdir, dump_name)):
if file_name.endswith('.tar.xz') or file_name.endswith(".tar"):
if file_name.endswith(".tar.zst") or file_name.endswith(".tar"):
archive_count += 1
self.assertEqual(archive_count, 5)

private_archive_count = 0
for file_name in os.listdir(os.path.join(self.tempdir_private, dump_name)):
if file_name.endswith('.tar.xz') or file_name.endswith(".tar"):
if file_name.endswith(".tar.zst") or file_name.endswith(".tar"):
private_archive_count += 1
self.assertEqual(private_archive_count, 2)

Expand Down Expand Up @@ -197,13 +198,13 @@ def test_create_full_dump_with_id(self):
# dumps should contain the 7 archives
archive_count = 0
for file_name in os.listdir(os.path.join(self.tempdir, dump_name)):
if file_name.endswith('.tar.xz') or file_name.endswith(".tar"):
if file_name.endswith(".tar.zst") or file_name.endswith(".tar"):
archive_count += 1
self.assertEqual(archive_count, 5)

private_archive_count = 0
for file_name in os.listdir(os.path.join(self.tempdir_private, dump_name)):
if file_name.endswith('.tar.xz') or file_name.endswith(".tar"):
if file_name.endswith(".tar.zst") or file_name.endswith(".tar"):
private_archive_count += 1
self.assertEqual(private_archive_count, 2)

Expand Down Expand Up @@ -276,7 +277,7 @@ def test_create_incremental(self):
# make sure that the dump contains a full listens and spark dump
archive_count = 0
for file_name in os.listdir(os.path.join(self.tempdir, dump_name)):
if file_name.endswith('.tar.xz') or file_name.endswith(".tar"):
if file_name.endswith(".tar.zst") or file_name.endswith(".tar"):
archive_count += 1
self.assertEqual(archive_count, 2)

Expand Down Expand Up @@ -315,14 +316,16 @@ def test_create_full_when_incremental_exists(self):
# make sure that the dump contains a full listens and spark dump
archive_count = 0
for file_name in os.listdir(os.path.join(self.tempdir, dump_name)):
if file_name.endswith(".tar.xz") or file_name.endswith(".tar"):
if file_name.endswith(".tar.zst") or file_name.endswith(".tar"):
archive_count += 1
self.assertEqual(archive_count, 2)

dump_file_name = dump_name.replace("dump", "listens-dump") + ".tar.xz"
dump_file_name = dump_name.replace("dump", "listens-dump") + ".tar.zst"
listens_dump_file = os.path.join(self.tempdir, dump_name, dump_file_name)
with tarfile.open(listens_dump_file, "r:xz") as f:
for member in f.getmembers():
zstd_command = ["zstd", "--decompress", "--stdout", listens_dump_file, "-T4"]
zstd = subprocess.Popen(zstd_command, stdout=subprocess.PIPE)
with tarfile.open(fileobj=zstd.stdout, mode="r|") as f:
for member in f:
if member.name.endswith(".listens"):
lines = f.extractfile(member).readlines()
# five listens were dumped as expected as only five listens were created until the
Expand Down Expand Up @@ -353,7 +356,7 @@ def test_create_incremental_dump_with_id(self):
# dump should contain the listen and spark archive
archive_count = 0
for file_name in os.listdir(os.path.join(self.tempdir, dump_name)):
if file_name.endswith('.tar.xz') or file_name.endswith(".tar"):
if file_name.endswith(".tar.zst") or file_name.endswith(".tar"):
archive_count += 1
self.assertEqual(archive_count, 2)

Expand Down Expand Up @@ -419,6 +422,6 @@ def test_create_feedback(self):
# make sure that the dump contains a feedback dump
archive_count = 0
for file_name in os.listdir(os.path.join(self.tempdir, dump_name)):
if file_name.endswith('.tar.xz') or file_name.endswith(".tar"):
if file_name.endswith(".tar.zst") or file_name.endswith(".tar"):
archive_count += 1
self.assertEqual(archive_count, 1)
14 changes: 7 additions & 7 deletions listenbrainz/listenstore/dump_listenstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def write_listens(self, temp_dir, tar_file, archive_name,

def dump_listens(self, location, dump_id, start_time, end_time, dump_type,
threads=DUMP_DEFAULT_THREAD_COUNT):
""" Dumps all listens in the ListenStore into a .tar.xz archive.
""" Dumps all listens in the ListenStore into a .tar.zst archive.

Files are created with UUIDs as names. Each file can contain listens for a number of users.
An index.json file is used to save which file contains the listens of which users.
Expand All @@ -254,12 +254,12 @@ def dump_listens(self, location, dump_id, start_time, end_time, dump_type,
archive_name = '{}-full'.format(archive_name)
else:
archive_name = '{}-incremental'.format(archive_name)
archive_path = os.path.join(location, f'{archive_name}.tar.xz')
archive_path = os.path.join(location, f'{archive_name}.tar.zst')
with open(archive_path, 'w') as archive:
xz_command = ['xz', '--compress', f'-T{threads}']
xz = subprocess.Popen(xz_command, stdin=subprocess.PIPE, stdout=archive)
zstd_command = ['zstd', '--compress', f'-T{threads}', '-10']
zstd = subprocess.Popen(zstd_command, stdin=subprocess.PIPE, stdout=archive)

with tarfile.open(fileobj=xz.stdin, mode='w|') as tar:
with tarfile.open(fileobj=zstd.stdin, mode='w|') as tar:
temp_dir = os.path.join(self.dump_temp_dir_root, str(uuid.uuid4()))
create_path(temp_dir)
self.write_dump_metadata(
Expand All @@ -276,9 +276,9 @@ def dump_listens(self, location, dump_id, start_time, end_time, dump_type,
# remove the temporary directory
shutil.rmtree(temp_dir)

xz.stdin.close()
zstd.stdin.close()

xz.wait()
zstd.wait()
self.log.info('ListenBrainz listen dump done!')
self.log.info('Dump present at %s!', archive_path)
return archive_path
Expand Down
67 changes: 36 additions & 31 deletions listenbrainz/listenstore/tests/test_dumplistenstore.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import os
import shutil
import subprocess
import tarfile
import tempfile
from datetime import datetime, timezone, timedelta
from tempfile import TemporaryDirectory

from psycopg2.extras import execute_values

Expand Down Expand Up @@ -196,7 +198,7 @@ def test_dump_and_import_listens_escaped(self):

# test test_import_dump_many_users is gone -- why are we testing user dump/restore here??

def create_test_dump(self, archive_name, archive_path, schema_version=None):
def create_test_dump(self, temp_dir, archive_name, archive_path, schema_version=None):
""" Creates a test dump to test the import listens functionality.
Args:
archive_name (str): the name of the archive
Expand All @@ -206,40 +208,43 @@ def create_test_dump(self, archive_name, archive_path, schema_version=None):
Returns:
the full path to the archive created
"""

temp_dir = tempfile.mkdtemp()
with tarfile.open(archive_path, mode='w|xz') as tar:
schema_version_path = os.path.join(temp_dir, 'SCHEMA_SEQUENCE')
with open(schema_version_path, 'w') as f:
f.write(str(schema_version or ' '))
tar.add(schema_version_path,
arcname=os.path.join(archive_name, 'SCHEMA_SEQUENCE'))

with open(archive_path, 'w') as archive:
zstd_command = ['zstd', '--compress', '-T4']
zstd = subprocess.Popen(zstd_command, stdin=subprocess.PIPE, stdout=archive)
with tarfile.open(fileobj=zstd.stdin, mode='w|') as tar:
schema_version_path = os.path.join(temp_dir, 'SCHEMA_SEQUENCE')
with open(schema_version_path, 'w') as f:
f.write(str(schema_version or ' '))
tar.add(schema_version_path,
arcname=os.path.join(archive_name, 'SCHEMA_SEQUENCE'))
zstd.stdin.close()
zstd.wait()
return archive_path

def test_schema_mismatch_exception_for_dump_incorrect_schema(self):
""" Tests that SchemaMismatchException is raised when the schema of the dump is old """

# create a temp archive with incorrect SCHEMA_VERSION_CORE
temp_dir = tempfile.mkdtemp()
archive_name = 'temp_dump'
archive_path = os.path.join(temp_dir, archive_name + '.tar.xz')
archive_path = self.create_test_dump(
archive_name=archive_name,
archive_path=archive_path,
schema_version=LISTENS_DUMP_SCHEMA_VERSION - 1
)
with self.assertRaises(SchemaMismatchException):
self.ls.import_listens_dump(archive_path)
with TemporaryDirectory() as temp_dir:
# create a temp archive with incorrect SCHEMA_VERSION_CORE
archive_name = 'temp_dump'
archive_path = os.path.join(temp_dir, archive_name + '.tar.zst')
archive_path = self.create_test_dump(
temp_dir=temp_dir,
archive_name=archive_name,
archive_path=archive_path,
schema_version=LISTENS_DUMP_SCHEMA_VERSION - 1
)
with self.assertRaises(SchemaMismatchException):
self.ls.import_listens_dump(archive_path)

def test_schema_mismatch_exception_for_dump_no_schema(self):
""" Tests that SchemaMismatchException is raised when there is no schema version in the archive """

temp_dir = tempfile.mkdtemp()
archive_name = 'temp_dump'
archive_path = os.path.join(temp_dir, archive_name + '.tar.xz')

archive_path = self.create_test_dump(archive_name=archive_name, archive_path=archive_path)

with self.assertRaises(SchemaMismatchException):
self.ls.import_listens_dump(archive_path)
with TemporaryDirectory() as temp_dir:
archive_name = 'temp_dump'
archive_path = os.path.join(temp_dir, archive_name + '.tar.zst')
archive_path = self.create_test_dump(
temp_dir=temp_dir,
archive_name=archive_name,
archive_path=archive_path
)
with self.assertRaises(SchemaMismatchException):
self.ls.import_listens_dump(archive_path)
Loading