Skip to content

Commit

Permalink
feat(files): add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nunogoncalves03 committed Dec 2, 2024
1 parent 20de2c5 commit 43b24d4
Show file tree
Hide file tree
Showing 3 changed files with 324 additions and 1 deletion.
18 changes: 18 additions & 0 deletions singlestoredb/tests/test.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Test Notebook"
]
}
],
"metadata": {
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
18 changes: 18 additions & 0 deletions singlestoredb/tests/test2.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Test Notebook 2"
]
}
],
"metadata": {
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
289 changes: 288 additions & 1 deletion singlestoredb/tests/test_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ def test_os_rename(self):
'rename_test_2/nest_1/nested_rename_test_3.sql', overwrite=True,
)

def test_stage_object(self):
def test_file_object(self):
st = self.wg.stage

st.mkdir('obj_test')
Expand Down Expand Up @@ -1028,3 +1028,290 @@ def test_job_with_database_target(self):
assert deleted
job = job_manager.get(job.job_id)
assert job.terminated_at is not None


@pytest.mark.management
class TestFileSpaces(unittest.TestCase):

manager = None
personal_space = None
shared_space = None

@classmethod
def setUpClass(cls):
cls.manager = s2.manage_files()
cls.personal_space = cls.manager.personal_space
cls.shared_space = cls.manager.shared_space

@classmethod
def tearDownClass(cls):
cls.manager = None
cls.personal_space = None
cls.shared_space = None

def test_upload_file(self):
for space in [self.personal_space, self.shared_space]:
root = space.info('/')
assert str(root.path) == '/'
assert root.type == 'directory'

# Upload files
f = space.upload_file(
TEST_DIR / 'test.ipynb',
'upload_test.ipynb',
)
assert str(f.path) == 'upload_test.ipynb'
assert f.type == 'notebook'

if space is self.personal_space:
space.upload_file(
TEST_DIR / 'test.sql',
'upload_test.sql',
)

# Cleanup
space.remove('upload_test.sql')
elif space is self.shared_space:
with self.assertRaises(s2.ManagementError):
space.upload_file(
TEST_DIR / 'test.sql',
'upload_test.sql',
)

# Download and compare to original
txt = f.download(encoding='utf-8')
assert txt == open(TEST_DIR / 'test.ipynb').read()

# Make sure we can't overwrite
with self.assertRaises(OSError):
space.upload_file(
TEST_DIR / 'test.ipynb',
'upload_test.ipynb',
)

# Force overwrite with new content
f = space.upload_file(
TEST_DIR / 'test2.ipynb',
'upload_test.ipynb', overwrite=True,
)
assert str(f.path) == 'upload_test.ipynb'
assert f.type == 'notebook'

# Verify new content
txt = f.download(encoding='utf-8')
assert txt == open(TEST_DIR / 'test2.ipynb').read()

# Make sure we can't upload a folder
with self.assertRaises(s2.ManagementError):
space.upload_folder(TEST_DIR, 'test')

# Cleanup
space.remove('upload_test.ipynb')

def test_open(self):
for space in [self.personal_space, self.shared_space]:
# See if error is raised for non-existent file
with self.assertRaises(s2.ManagementError):
space.open('open_test.ipynb', 'r')

# Load test file
space.upload_file(TEST_DIR / 'test.ipynb', 'open_test.ipynb')

# Read file using `open`
with space.open('open_test.ipynb', 'r') as rfile:
assert rfile.read() == open(TEST_DIR / 'test.ipynb').read()

# Read file using `open` with 'rt' mode
with space.open('open_test.ipynb', 'rt') as rfile:
assert rfile.read() == open(TEST_DIR / 'test.ipynb').read()

# Read file using `open` with 'rb' mode
with space.open('open_test.ipynb', 'rb') as rfile:
assert rfile.read() == open(TEST_DIR / 'test.ipynb', 'rb').read()

# Read file using `open` with 'rb' mode
with self.assertRaises(ValueError):
with space.open('open_test.ipynb', 'b') as rfile:
pass

# Attempt overwrite file using `open` with mode 'x'
with self.assertRaises(OSError):
with space.open('open_test.ipynb', 'x') as wfile:
pass

# Attempt overwrite file using `open` with mode 'w'
with space.open('open_test.ipynb', 'w') as wfile:
wfile.write(open(TEST_DIR / 'test2.ipynb').read())

txt = space.download_file('open_test.ipynb', encoding='utf-8')

assert txt == open(TEST_DIR / 'test2.ipynb').read()

# Test writer without context manager
wfile = space.open('open_raw_test.ipynb', 'w')
for line in open(TEST_DIR / 'test.ipynb'):
wfile.write(line)
wfile.close()

txt = space.download_file(
'open_raw_test.ipynb',
encoding='utf-8',
)

assert txt == open(TEST_DIR / 'test.ipynb').read()

# Test reader without context manager
rfile = space.open('open_raw_test.ipynb', 'r')
txt = ''
for line in rfile:
txt += line
rfile.close()

assert txt == open(TEST_DIR / 'test.ipynb').read()

# Cleanup
space.remove('open_test.ipynb')
space.remove('open_raw_test.ipynb')

def test_obj_open(self):
for space in [self.personal_space, self.shared_space]:
# Load test file
f = space.upload_file(
TEST_DIR / 'test.ipynb',
'obj_open_test.ipynb',
)

# Read file using `open`
with f.open() as rfile:
assert rfile.read() == open(TEST_DIR / 'test.ipynb').read()

# Make sure directories error out
with self.assertRaises(s2.ManagementError):
space.mkdir('obj_open_dir')

# Write file using `open`
with f.open('w', encoding='utf-8') as wfile:
wfile.write(open(TEST_DIR / 'test2.ipynb').read())

assert f.download(encoding='utf-8') == open(TEST_DIR / 'test2.ipynb').read()

# Test writer without context manager
wfile = f.open('w')
for line in open(TEST_DIR / 'test.ipynb'):
wfile.write(line)
wfile.close()

txt = space.download_file(f.path, encoding='utf-8')

assert txt == open(TEST_DIR / 'test.ipynb').read()

# Test reader without context manager
rfile = f.open('r')
txt = ''
for line in rfile:
txt += line
rfile.close()

assert txt == open(TEST_DIR / 'test.ipynb').read()

# Cleanup
space.remove('obj_open_test.ipynb')

def test_os_directories(self):
for space in [self.personal_space, self.shared_space]:
# Make sure directories error out
with self.assertRaises(s2.ManagementError):
space.mkdir('mkdir_test_1')

with self.assertRaises(s2.ManagementError):
space.exists('mkdir_test_1/')

out = space.listdir('/')
assert 'mkdir_test_1/' not in out

with self.assertRaises(s2.ManagementError):
space.rmdir('mkdir_test_1/')

def test_os_rename(self):
for space in [self.personal_space, self.shared_space]:
space.upload_file(
TEST_DIR / 'test.ipynb',
'rename_test.ipynb',
)
assert 'rename_test.ipynb' in space.listdir('/')
assert 'rename_test_2.ipynb' not in space.listdir('/')

space.rename(
'rename_test.ipynb',
'rename_test_2.ipynb',
)
assert 'rename_test.ipynb' not in space.listdir('/')
assert 'rename_test_2.ipynb' in space.listdir('/')

# non-existent file
with self.assertRaises(OSError):
space.rename('rename_foo.ipynb', 'rename_foo_2.ipynb')

space.upload_file(
TEST_DIR / 'test.ipynb',
'rename_test_3.ipynb',
)

# overwrite
with self.assertRaises(OSError):
space.rename(
'rename_test_2.ipynb',
'rename_test_3.ipynb',
)

space.rename(
'rename_test_2.ipynb',
'rename_test_3.ipynb', overwrite=True,
)

# Cleanup
space.remove('rename_test_3.ipynb')

def test_file_object(self):
for space in [self.personal_space, self.shared_space]:
f = space.upload_file(
TEST_DIR / 'test.ipynb',
'obj_test.ipynb',
)

assert not f.is_dir()
assert f.is_file()

# abspath / basename / dirname / exists
assert f.abspath() == 'obj_test.ipynb'
assert f.basename() == 'obj_test.ipynb'
assert f.dirname() == '/'
assert f.exists()

# download
assert f.download(encoding='utf-8') == \
open(TEST_DIR / 'test.ipynb', 'r').read()
assert f.download() == open(TEST_DIR / 'test.ipynb', 'rb').read()

assert space.is_file('obj_test.ipynb')
f.remove()
assert not space.is_file('obj_test.ipynb')

# mtime / ctime
assert f.getmtime() > 0
assert f.getctime() > 0

# rename
f = space.upload_file(
TEST_DIR / 'test.ipynb',
'obj_test.ipynb',
)
assert space.exists('obj_test.ipynb')
assert not space.exists('obj_test_2.ipynb')
f.rename('obj_test_2.ipynb')
assert not space.exists('obj_test.ipynb')
assert space.exists('obj_test_2.ipynb')
assert f.abspath() == 'obj_test_2.ipynb'

# Cleanup
space.remove('obj_test_2.ipynb')

0 comments on commit 43b24d4

Please sign in to comment.