| import csv |
| import io |
| import os |
| import unittest |
|
|
| from src.utils import upload_files, clear_files, convert_delimiter_to_comma |
|
|
|
|
| class UploadedFile: |
| def __init__(self, name, data, type): |
| self.name = name |
| self._data = data |
| self.type = type |
| self.size = len(data) |
|
|
| def getvalue(self): |
| return self._data |
|
|
| def read(self): |
| return self._data |
|
|
|
|
| class TestFileUtils(unittest.TestCase): |
|
|
| def setUp(self): |
| |
| self.sample_csv_file_comma = UploadedFile(name='sample_comma.csv', |
| data=b'col1,col2,col3\n1,2,3\n4,5,6', type='text/csv') |
| self.sample_csv_file_semicolon = UploadedFile(name='sample_semicolon.csv', |
| data=b'col1;col2;col3\n1;2;3\n4;5;6', type='text/csv') |
| self.sample_txt_file = UploadedFile(name='sample.txt', data=b'Sample text file content', type='text/plain') |
|
|
| def test_upload_non_csv_file(self): |
| |
| result = upload_files([self.sample_txt_file]) |
| self.assertEqual(len(result), 1) |
| self.assertTrue(result[0].endswith('sample.txt')) |
|
|
| def test_upload_csv_file_comma(self): |
| |
| result = upload_files([self.sample_csv_file_comma]) |
| self.assertEqual(len(result), 1) |
| self.assertTrue(result[0].endswith('sample_comma.csv')) |
|
|
| def test_upload_csv_file_semicolon(self): |
| |
| result = upload_files([self.sample_csv_file_semicolon]) |
| self.assertEqual(len(result), 1) |
| self.assertTrue(result[0].endswith('sample_semicolon.csv')) |
|
|
| |
| with open(result[0], 'r', encoding='utf-8') as file: |
| reader = csv.reader(file) |
| rows = list(reader) |
| self.assertEqual(rows, [['col1', 'col2', 'col3'], ['1', '2', '3'], ['4', '5', '6']]) |
|
|
| def test_clear_stored_files(self): |
| |
| upload_files([self.sample_txt_file, self.sample_csv_file_comma]) |
| clear_files("./tmp/upload_files/") |
| self.assertEqual(len(os.listdir("./tmp/upload_files/")), 0) |
|
|
| def test_comma_delimiter(self): |
| content = "a,b,c\n1,2,3\n" |
|
|
| |
| byte_content = content.encode() |
| bytes_stream = io.BytesIO(byte_content) |
|
|
| new_stream, converted = convert_delimiter_to_comma(bytes_stream) |
| self.assertEqual(converted, False) |
| self.assertEqual(new_stream.getvalue().decode(), content) |
|
|
| def test_semicolon_delimiter(self): |
| content = "a;b;c\n1;2;3\n" |
| expected_content = "a,b,c\n1,2,3\n" |
|
|
| |
| byte_content = content.encode() |
| bytes_stream = io.BytesIO(byte_content) |
|
|
| new_stream, converted = convert_delimiter_to_comma(bytes_stream) |
| self.assertEqual(converted, True) |
| self.assertEqual(new_stream.getvalue().decode(), expected_content) |
|
|
| def test_tab_delimiter(self): |
| content = "a\tb\tc\n1\t2\t3\n" |
| expected_content = "a,b,c\n1,2,3\n" |
| byte_content = content.encode() |
| bytes_stream = io.BytesIO(byte_content) |
| new_stream, converted = convert_delimiter_to_comma(bytes_stream) |
| self.assertEqual(converted, True) |
| string_content = new_stream.getvalue().decode() |
| self.assertEqual(string_content, expected_content) |
|
|
| def tearDown(self): |
| |
| clear_files("./tmp/upload_files/") |
|
|
|
|
| if __name__ == '__main__': |
| unittest.main() |
|
|