diff --git a/README.rst b/README.rst index 9bf1533..e27e73b 100644 --- a/README.rst +++ b/README.rst @@ -47,6 +47,13 @@ Links Changes ------- +v1.6.2 +~~~~~~ + +* **ADD** ``save_data_on_error`` param to ``VerticaBatch``. This option, + when set, will save all uncommitted lines to a file. + + v1.6.1 ~~~~~~ diff --git a/pyvertica/__init__.py b/pyvertica/__init__.py index bb64aa4..4a9b978 100644 --- a/pyvertica/__init__.py +++ b/pyvertica/__init__.py @@ -1 +1 @@ -__version__ = '1.6.1' +__version__ = '1.6.2' diff --git a/pyvertica/batch.py b/pyvertica/batch.py index 573a51c..df05e8d 100644 --- a/pyvertica/batch.py +++ b/pyvertica/batch.py @@ -54,6 +54,15 @@ class Query(object): :param exc_queue: An instance of class:`Queue.Queue` instance to put exceptions in. + :param drain_to_file: + A ``bolean``. This indicates whether or not data in the fifo should + be saved to a file if an error occurs. If false, all data in the + fifo when an exception occurs will bediscarded. If true, data will + be saved to a random directory in /tmp, and recover_dir on the + :py:class:`Query` instance with contain the full path to the + directory. It is possible that the contents of the fifo will overload + the disk. If that is the case, the data will be discarded. + Default: ``False``. """ daemon = True @@ -62,13 +71,15 @@ class Query(object): if only those are left. """ - def __init__( - self, cursor, sql_query_str, fifo_path, exc_queue): + def __init__(self, cursor, sql_query_str, fifo_path, exc_queue, + drain_to_file=False): super(Query, self).__init__() self.cursor = cursor self.sql_query_str = sql_query_str self.exc_queue = exc_queue self.fifo_path = fifo_path + self.drain_to_file = drain_to_file + self.recover_file = None def run_query(self): """ @@ -89,11 +100,39 @@ def run_query(self): # we need to consume the fifo, to make sure it isn't blocking the # write (and thus hanging forever). - for line in codecs.open(self.fifo_path, 'r', 'utf-8'): - pass + fifo = codecs.open(self.fifo_path, 'r', 'utf-8') + if self.drain_to_file: + self._drain_to_file(fifo) + else: + self._drain(fifo) logger.debug('Query done') + def _drain_to_file(self, fifo_obj): + """ + Pulls all data out of the fifo and saves it to a tmp file. + If an exception is thrown while saving data to the file, + it simply discards the data and removes the file. + """ + self.recover_file = os.path.join(tempfile.mkdtemp(), 'saved_inserts') + file = open(self.recover_file, 'w') + try: + for line in fifo_obj: + file.write(line) + logger.info("Insert data saved to file %s" % self.recover_file) + except IOError, ex: + logger.error("Error saving data to file: %s" % ex) + self._drain(fifo_obj) + os.remove(self.recover_file) + self.recover_file = None + + def _drain(self, fifo_obj): + """ + Discards all data in the fifo. + """ + for line in fifo_obj: + pass + class VerticaBatch(object): """ @@ -194,6 +233,15 @@ class VerticaBatch(object): closing all of its resources. Default: ``False``. *Optional*. + :param save_data_on_error: + A ``boolean``. If true, when an error occurs, uncommitted data will + be saved to a file. The file path will be accessible via + ``VerticaBatch.recover_file`` if an error occurred + after calling ``commit_batch``. + Default: ``False``. If the disk fills up while saving data, + the recovery will be marked as failed and the recovery + file will be removed. + """ copy_options_dict = { 'DELIMITER': ';', @@ -223,7 +271,8 @@ def __init__( column_list=[], copy_options={}, connection=None, - multi_batch=False): + multi_batch=False, + save_data_on_error=False): if connection and odbc_kwargs: raise ValueError("May only specify one of " @@ -363,6 +412,7 @@ def close_batch(self): logger.debug('Batch ended') if not self._query_exc_queue.empty(): + self.recover_file = self._query.recover_file raise self._query_exc_queue.get() self._batch_initialized = False diff --git a/pyvertica/tests/unit/test_batch.py b/pyvertica/tests/unit/test_batch.py index 334b2f4..231ed59 100644 --- a/pyvertica/tests/unit/test_batch.py +++ b/pyvertica/tests/unit/test_batch.py @@ -102,6 +102,77 @@ def test_run_raising_exception(self): os.remove(file_obj.name) self.assertTrue(isinstance(exc_queue.get(), Exception)) + def test_run_raising_exception_saving_data(self): + """ + Test :py:meth:`.QueryThread.run` raising an exception. + """ + file_obj = tempfile.NamedTemporaryFile(bufsize=0, delete=False) + file_obj.write('foo\nbar\n') + file_obj.close() + cursor = Mock() + cursor.execute.side_effect = Exception('boom!') + exc_queue = Queue() + + query = Query(cursor, Mock(), file_obj.name, exc_queue, + drain_to_file=True) + + task_thread = TaskThread(query.run_query) + task_thread.start() + task_thread.run_task() + task_thread.join_task(2) + task_thread.join(2) + + os.remove(file_obj.name) + self.assertTrue(isinstance(exc_queue.get(), Exception)) + self.assertIsNotNone(query.recover_file) + lines = open(query.recover_file).readlines() + self.assertEqual(2, len(lines)) + os.remove(query.recover_file) + + def test__drain(self): + file_mock = Mock() + file_mock.__iter__ = Mock(return_value=iter([1, 2])) + query = Query(None, None, None, None) + query._drain(file_mock) + file_mock.__iter__.assert_called_once_with() + + @patch('os.path.join') + @patch('__builtin__.open') + @patch('tempfile.mkdtemp') + def test__drain_to_file(self, mkdtemp_mock, open_mock, join): + join.return_value = "/tmp/fake/saved_inserts" + fifo_mock = Mock() + fifo_mock.__iter__ = Mock(return_value=iter([1])) + mkdtemp_mock.mk.return_value = "/tmp/fake" + file_obj = Mock() + open_mock.return_value = file_obj + query = Query(None, None, None, None) + query._drain_to_file(fifo_mock) + file_obj.write.assert_called_once_with(1) + fifo_mock.__iter__.assert_called_once_with() + self.assertEqual('/tmp/fake/saved_inserts', + query.recover_file) + + @patch('os.remove') + @patch('os.path.join') + @patch('__builtin__.open') + @patch('tempfile.mkdtemp') + def test__drain_to_file_throws(self, mkdtemp_mock, open_mock, join, + remove): + join.return_value = "/tmp/fake/saved_inserts" + fifo_mock = Mock() + fifo_mock.__iter__ = Mock(return_value=iter([1])) + mkdtemp_mock.mk.return_value = "/tmp/fake" + file_obj = Mock() + file_obj.write.side_effect = IOError + open_mock.return_value = file_obj + query = Query(None, None, None, None) + query._drain_to_file(fifo_mock) + file_obj.write.assert_called_once_with(1) + self.assertEqual(1, file_obj.write.call_count) + self.assertEqual(None, query.recover_file) + self.assertEqual(1, remove.call_count) + class VerticaBatchTestCase(unittest.TestCase): """