From 770c04a0f23610ecdb78691a6308fee3ba6036b1 Mon Sep 17 00:00:00 2001 From: Jason Madden Date: Tue, 1 Sep 2020 12:22:57 -0500 Subject: [PATCH] Checkpoint on refactoring to better manage store connections. Refs #409 and refs #390 --- .pylintrc | 3 + src/relstorage/adapters/connections.py | 5 +- src/relstorage/cache/mvcc.py | 8 +- src/relstorage/storage/__init__.py | 54 +++--- src/relstorage/storage/interfaces.py | 112 +++++++++++++ src/relstorage/storage/tpc/__init__.py | 221 ++++++++++++++++--------- src/relstorage/storage/tpc/begin.py | 66 ++++---- src/relstorage/storage/tpc/finish.py | 28 ++-- src/relstorage/storage/tpc/restore.py | 32 ++-- src/relstorage/storage/tpc/vote.py | 90 +++++----- src/relstorage/tests/locking.py | 11 +- 11 files changed, 413 insertions(+), 217 deletions(-) diff --git a/.pylintrc b/.pylintrc index e04b198f..3d549c21 100644 --- a/.pylintrc +++ b/.pylintrc @@ -114,6 +114,9 @@ max-locals=20 # undefined-all-variable crash. unsafe-load-any-extension = yes +property-classes=zope.cachedescriptors.property.Lazy,zope.cachedescriptors.property.Cached,relstorage._util.Lazy + + # Local Variables: # mode: conf-space # End: diff --git a/src/relstorage/adapters/connections.py b/src/relstorage/adapters/connections.py index 9d4266fd..7f9abad3 100644 --- a/src/relstorage/adapters/connections.py +++ b/src/relstorage/adapters/connections.py @@ -213,7 +213,7 @@ def call(self, f, can_reconnect, *args, **kwargs): The function may be called up to twice, if the *fresh_connection_p* is false on the first call and a disconnected exception is raised. :keyword bool can_reconnect: If True, then we will attempt to reconnect - the connection and try again if an exception is raised if *f*. If False, + the connection and try again if a disconnected exception is raised in *f*. If False, we let that exception propagate. For example, if a transaction is in progress, set this to false. """ @@ -269,6 +269,7 @@ def __repr__(self): self._cursor ) + @implementer(interfaces.IManagedLoadConnection) class LoadConnection(AbstractManagedConnection): @@ -290,10 +291,12 @@ class StoreConnection(AbstractManagedConnection): def begin(self): self.connmanager.begin(*self.open_if_needed()) + class PrePackConnection(StoreConnection): __slots__ = () _NEW_CONNECTION_NAME = 'open_for_pre_pack' + @implementer(interfaces.IManagedDBConnection) class ClosedConnection(object): """ diff --git a/src/relstorage/cache/mvcc.py b/src/relstorage/cache/mvcc.py index 885793c9..8d672464 100644 --- a/src/relstorage/cache/mvcc.py +++ b/src/relstorage/cache/mvcc.py @@ -658,7 +658,7 @@ def _poll(self, cache, conn, cursor, if polled_tid == 0 or polled_tid < polling_since: assert change_iter is None - # Freshly zapped or empty database (tid==0) or stale and + # Freshly zapped or empty database (tid==0) or stale replica and # asked to revert (as opposed to raising # ReadConflictError). Mark not just this one, but all # other extent indexes as needing a full rebuild. @@ -668,7 +668,8 @@ def _poll(self, cache, conn, cursor, return None # Ok cool, we got data to move us forward. - # We must be careful to always consume the iterator, even if we exit early. + # We must be careful to always consume the iterator, even if we exit early + # (because it could be a server-side cursor holding connection state). # So we do that now. change_iter = list(change_iter) self.log( @@ -697,7 +698,8 @@ def _poll(self, cache, conn, cursor, # it's older than our poll, we take control; # but the history of the index must always move forward, # so we build it starting from what's currently installed. - # There could be some overlap. + # There could be some overlap. Since we moved the index forward, + # we can vacuum. change_index = self.object_index = installed_index.with_polled_changes( polled_tid, polling_since, diff --git a/src/relstorage/storage/__init__.py b/src/relstorage/storage/__init__.py index 25ef8ff3..53e9f99b 100644 --- a/src/relstorage/storage/__init__.py +++ b/src/relstorage/storage/__init__.py @@ -98,6 +98,7 @@ def close(self): stats = lambda s: {'closed': True} afterCompletion = lambda s, c: None + @implementer(IRelStorage) class RelStorage(LegacyMethodsMixin, ConflictResolution.ConflictResolvingStorage): @@ -112,8 +113,8 @@ class RelStorage(LegacyMethodsMixin, _options = None _is_read_only = False _read_only_error = ReadOnlyError - # _ltid is the ID of the last transaction committed by this instance. - _ltid = z64 + # ZODB TID of the last transaction committed by this instance. + _last_tid_i_committed_bytes = z64 # _closed is True after self.close() is called. _closed = False @@ -137,15 +138,15 @@ class RelStorage(LegacyMethodsMixin, _load_connection = ClosedConnection() _store_connection = ClosedConnection() - _tpc_begin_factory = None + #_tpc_begin_factory = None _oids = ReadOnlyOIDs() - # Attributes in our dictionary that shouldn't have stale()/no_longer_stale() - # called on them. At this writing, it's just the type object. - _STALE_IGNORED_ATTRS = ( - '_tpc_begin_factory', - ) + # # Attributes in our dictionary that shouldn't have stale()/no_longer_stale() + # # called on them. At this writing, it's just the type object. + # _STALE_IGNORED_ATTRS = ( + # '_tpc_begin_factory', + # ) def __init__(self, adapter, name=None, create=None, options=None, cache=None, blobhelper=None, @@ -218,12 +219,12 @@ def __init__(self, adapter, name=None, create=None, else: self.blobhelper = BlobHelper(options=options, adapter=adapter) - self._tpc_begin_factory = HistoryPreserving if self._options.keep_history else HistoryFree + tpc_begin_factory = HistoryPreserving if self._options.keep_history else HistoryFree if hasattr(self._adapter.packundo, 'deleteObject'): interface.alsoProvides(self, ZODB.interfaces.IExternalGC) - self._tpc_phase = NotInTransaction.from_storage(self) + self._tpc_phase = NotInTransaction(tpc_begin_factory, self._is_read_only) if not self._is_read_only: self._oids = OIDs(self._adapter.oidallocator, self._store_connection) @@ -333,7 +334,7 @@ def before_instance(self, before): @property def highest_visible_tid(self): cache_tid = self._cache.highest_visible_tid or 0 - committed_tid = bytes8_to_int64(self._ltid) + committed_tid = bytes8_to_int64(self._last_tid_i_committed_bytes) # In case we haven't polled yet. return max(cache_tid, committed_tid) @@ -457,7 +458,7 @@ def checkCurrentSerialInTransaction(self, oid, serial, transaction): @metricmethod def tpc_begin(self, transaction, tid=None, status=' '): try: - self._tpc_phase = self._tpc_phase.tpc_begin(transaction, self._tpc_begin_factory) + self._tpc_phase = self._tpc_phase.tpc_begin(self, transaction) except: # Could be a database (connection) error, could be a programming # bug. Either way, we're fine to roll everything back and hope @@ -483,7 +484,7 @@ def tpc_vote(self, transaction): # the object has changed during the commit process, due to # conflict resolution or undo. try: - next_phase = self._tpc_phase.tpc_vote(transaction, self) + next_phase = self._tpc_phase.tpc_vote(self, transaction) except: self.tpc_abort(transaction, _force=True) raise @@ -494,7 +495,7 @@ def tpc_vote(self, transaction): @metricmethod def tpc_finish(self, transaction, f=None): try: - next_phase, committed_tid = self._tpc_phase.tpc_finish(transaction, f) + next_phase = self._tpc_phase.tpc_finish(self, transaction, f) except: # OH NO! This isn't supposed to happen! # It's unlikely tpc_abort will get called... @@ -503,7 +504,9 @@ def tpc_finish(self, transaction, f=None): # The store connection is either committed or rolledback; # the load connection is now rolledback. self._tpc_phase = next_phase - self._ltid = committed_tid + # XXX: De-dup this + committed_tid = int64_to_8bytes(next_phase.last_committed_tid_int) + self._last_tid_i_committed_bytes = committed_tid return committed_tid @metricmethod @@ -516,17 +519,18 @@ def tpc_abort(self, transaction, _force=False): if _force: # We're here under unexpected circumstances. It's possible something # might go wrong rolling back. - self._tpc_phase = NotInTransaction.from_storage(self) + self._tpc_phase = NotInTransaction(self._is_read_only, self.lastTransactionInt()) raise def lastTransaction(self): - if self._ltid == z64 and self._cache.highest_visible_tid is None: + if self._last_tid_i_committed_bytes == z64 and self._cache.highest_visible_tid is None: # We haven't committed *or* polled for transactions, # so our MVCC state is "floating". # Read directly from the database to get the latest value, return int64_to_8bytes(self._adapter.txncontrol.get_tid(self._load_connection.cursor)) - return max(self._ltid, int64_to_8bytes(self._cache.highest_visible_tid or 0)) + return max(self._last_tid_i_committed_bytes, + int64_to_8bytes(self._cache.highest_visible_tid or 0)) def lastTransactionInt(self): return bytes8_to_int64(self.lastTransaction()) @@ -729,8 +733,8 @@ def __stale(self, stale_error): replacements = {} my_ns = vars(self) for k, v in my_ns.items(): - if k in self._STALE_IGNORED_ATTRS: - continue + # if k in self._STALE_IGNORED_ATTRS: + # continue if callable(getattr(v, 'stale', None)): new_v = v.stale(stale_error) replacements[k] = new_v @@ -759,8 +763,8 @@ def __no_longer_stale(self, _conn, _cursor): replacements = { k: v.no_longer_stale() for k, v in my_ns.items() - if k not in self._STALE_IGNORED_ATTRS - and callable(getattr(v, 'no_longer_stale', None)) +# if k not in self._STALE_IGNORED_ATTRS + if callable(getattr(v, 'no_longer_stale', None)) } my_ns.update(replacements) @@ -786,8 +790,8 @@ def __on_load_first_use(self, conn, cursor): # by this connection, we don't want to ghost objects that we're sure # are up-to-date unless someone else has changed them. # Note that transactions can happen between us committing and polling. - if self._ltid is not None: - ignore_tid = bytes8_to_int64(self._ltid) + if self._last_tid_i_committed_bytes is not None: + ignore_tid = bytes8_to_int64(self._last_tid_i_committed_bytes) else: ignore_tid = None @@ -853,7 +857,7 @@ def __on_load_first_use(self, conn, cursor): # Hmm, ok, the Connection isn't polling us in a timely fashion. # Maybe we're the root storage? Maybe our APIs are being used # independently? At any rate, we're going to stop tracking now; - # if a connection eventually gets around to polling us, they'll + # if a Connection eventually gets around to polling us, they'll # need to clear their whole cache self.__queued_changes = None diff --git a/src/relstorage/storage/interfaces.py b/src/relstorage/storage/interfaces.py index cfa913a8..da723815 100644 --- a/src/relstorage/storage/interfaces.py +++ b/src/relstorage/storage/interfaces.py @@ -25,6 +25,7 @@ from __future__ import print_function from zope.interface import Interface +from zope.interface import Attribute from transaction.interfaces import TransientError from ZODB.POSException import StorageTransactionError @@ -67,3 +68,114 @@ class VoteReadConflictError(ReadConflictError): A read conflict (from Connection.readCurrent()) that wasn't detected until the storage voted. """ + +class ITPCState(Interface): + """ + An object representing the current state (phase) of the two-phase commit protocol, + and how to transition between it and other phases. + """ + + transaction = Attribute("The *transaction* object from ZODB.") + + def tpc_abort(storage, transaction, force=False): + """ + Clear any used resources, and return the object + representing :class:`ITPCStateNotInTransaction`. + + :param transaction: The transaction object from ZODB. + If this is not the current transaction, does nothing + unless *force* is true. + :keyword bool force: Whether to forcibly abort the transaction. + If this is true, then it doesn't matter if the *transaction* parameter + matches or not. Also, underlying RDBMS connections should also be closed + and discarded. + :return: The previous `ITPCStateNotInTransaction`. + """ + + +class ITPCStateNotInTransaction(ITPCState): + """ + The "idle" state. + + In this state, no store connection is available, + and the *transaction* is always `None`. + + Because ZODB tests this, this method has to define + a bunch of methods that are also defined by various other states. + These methods should raise ``StorageTransactionError``, or + ``ReadOnlyError``, as appropriate. + """ + + last_committed_tid_int = Attribute( + """ + The TID of the last transaction committed to get us to this state. + + Initially, this is 0. In the value returned from :meth:`ITPCPhaseVoting.tpc_finish`, + it is the TID just committed. + """ + ) + + def tpc_begin(storage, transaction): + """ + Enter the two-phase commit protocol. + + :return: An implementation of :class:`ITPCStateBegan`. + """ + + def tpc_finish(*args, **kwargs): + """ + Raises ``StorageTranasctionError`` + """ + + def tpc_vote(*args, **kwargs): + """ + As for `tpc_finish`. + """ + + def store(*args, **kwargs): + """ + Raises ``ReadOnlyError`` or ``StorageTranasctionError`` + """ + + restore = deleteObject = undo = restoreBlob = store + +class ITPCStateDatabaseAvailable(ITPCState): + """ + A state where the database connection is usually available. + """ + + store_connection = Attribute("The IManagedStoreConnection in use.") + + +class ITPCStateBegan(ITPCStateDatabaseAvailable): + """ + The first phase where the database is available for storage. + + Storing objects, restoring objects, storing blobs, deleting objects, + all happen in this phase. + """ + + def tpc_vote(storage, transaction): + """ + Enter the voting state. + + :return: An implementation of :class:`ITPCStateVoting` + """ + +class ITPCStateBeganHP(ITPCStateBegan): + """ + The extra methods that are available for history-preserving + storages. + """ + +class ITPCPhaseVoting(ITPCStateDatabaseAvailable): + """ + The phase where voting happens. This follows the beginning phase. + """ + + def tpc_finish(storage, transaction, f=None): + """ + Finish the transaction. + + :return: The next implementation of :class:`ITPCPhaseNotInTransaction` + """ diff --git a/src/relstorage/storage/tpc/__init__.py b/src/relstorage/storage/tpc/__init__.py index 3fb5f51f..a77d2d16 100644 --- a/src/relstorage/storage/tpc/__init__.py +++ b/src/relstorage/storage/tpc/__init__.py @@ -31,9 +31,16 @@ from transaction._transaction import rm_key from transaction import get as get_thread_local_transaction +from zope.interface import implementer + from ZODB.POSException import ReadOnlyError from ZODB.POSException import StorageTransactionError +from ..interfaces import ITPCStateNotInTransaction +from ..._util import Lazy + +from .temporary_storage import TemporaryStorage + log = logging.getLogger("relstorage") #: Set the ``RELSTORAGE_LOCK_EARLY`` environment variable if you @@ -61,17 +68,105 @@ def __init__(self, storage): self.cache = storage._cache self.read_only = storage._is_read_only -class AbstractTPCState(object): +class SharedTPCState(object): + """ + Contains attributes marking resources that *might* be used during the commit + process. If any of them are, then the `abort` method takes care of cleaning them up. + + Accessing a resource implicitly begins it, if needed. + """ + + prepared_txn = None + transaction = None + not_in_transaction_state = None + read_only = False # Or we wouldn't allocate this object. + + def __init__(self, initial_state, storage, transaction): + self.initial_state = initial_state + self._storage = storage + self.transaction = transaction + + @Lazy + def store_connection(self): + # TODO: This is where we'd check one out of the pool + conn = self._storage._store_connection + conn.restart() + conn.begin() + return conn + + @Lazy + def load_connection(self): + return self._storage._load_connection + + @Lazy + def blobhelper(self): + blobhelper = self._storage.blobhelper + blobhelper.begin() + return blobhelper + + @Lazy + def cache(self): + return self._storage._cache + + @Lazy + def adapter(self): + return self._storage._adapter + + @Lazy + def temp_storage(self): + return TemporaryStorage() + + def has_temp_data(self): + return 'temp_storage' in self.__dict__ and self.temp_storage + + def abort(self, force=False): + # pylint:disable=no-member,using-constant-test + try: + # Drop locks first. + if 'store_connection' in self.__dict__: + store_connection = self.store_connection + if store_connection: + # It's possible that this connection/cursor was + # already closed if an error happened (which would + # release the locks). Don't try to re-open it. + self.adapter.locker.release_commit_lock(store_connection.cursor) + + # Though, this might re-open it. + self.adapter.txncontrol.abort( + store_connection, + self.prepared_txn) + + if force: + store_connection.drop() + if 'load_connection' in self.__dict__: + if force: + self.load_connection.drop() + else: + self.load_connection.rollback_quietly() + + if 'blobhelper' in self.__dict__: + self.blobhelper.abort() + + if 'temp_storage' in self.__dict__: + self.temp_storage.close() + finally: + for attr in 'store_connection', 'load_connection', 'blobhelper', 'temp_storage': + self.__dict__.pop(attr, None) + + + def release(self): + # TODO: Release the store_connection back to the pool. + # pylint:disable=no-member + if 'temp_storage' in self.__dict__: + self.temp_storage.close() + del self.temp_storage + + + +class AbstractTPCStateDatabaseAvailable(object): __slots__ = ( - 'adapter', - 'load_connection', - 'store_connection', - 'transaction', - 'prepared_txn', - 'blobhelper', - 'cache', - 'read_only', + 'shared_state', ) # - store @@ -87,29 +182,17 @@ class AbstractTPCState(object): # read only, this needs to happen in the "not in transaction" # state. - @classmethod - def from_storage(cls, storage): - return cls(_StorageFacade(storage), None) - - def __init__(self, previous_state, transaction=None): - if 0: # pylint:disable=using-constant-test - # This block (elided from the bytecode) - # is for pylint static analysis - self.adapter = self.load_connection = self.store_connection = self.transaction = None - self.prepared_txn = self.blobhelper = None - self.cache = None # type: relstorage.cache.storage_cache.StorageCache - self.read_only = False - for attr in AbstractTPCState.__slots__: - val = getattr(previous_state, attr) - setattr(self, attr, val) + def __init__(self, shared_state): + self.shared_state = shared_state # type: SharedTPCState - self.transaction = transaction + @property + def transaction(self): + return self.shared_state.transaction def __repr__(self): - result = "<%s at 0x%x blobhelper=%r stored_count=%s %s" % ( + result = "<%s at 0x%x stored_count=%s %s" % ( type(self).__name__, id(self), - self.blobhelper, len(getattr(self, 'temp_storage', ()) or ()), self._tpc_state_transaction_data(), ) @@ -154,8 +237,7 @@ def _tpc_state_transaction_data(self): resources = sorted(global_tx._resources, key=rm_key) return "transaction=%r resources=%r" % (global_tx, resources) - - def tpc_finish(self, transaction, f=None): # pylint:disable=unused-argument + def tpc_finish(self, storage, transaction, f=None): # pylint:disable=unused-argument # For the sake of some ZODB tests, we need to implement this everywhere, # even if it's not actually usable, and the first thing it needs to # do is check the transaction. @@ -163,46 +245,18 @@ def tpc_finish(self, transaction, f=None): # pylint:disable=unused-argument raise StorageTransactionError('tpc_finish called with wrong transaction') raise NotImplementedError("tpc_finish not allowed in this state.") + def tpc_begin(self, _storage, transaction): + # Ditto as for tpc_finish + raise StorageTransactionError('tpc_begin not allowed in this state', type(self)) + def tpc_abort(self, transaction, force=False): if not force: if transaction is not self.transaction: return self - try: - # Drop locks first. - if self.store_connection: - # It's possible that this connection/cursor was - # already closed if an error happened (which would - # release the locks). Don't try to re-open it. - self.adapter.locker.release_commit_lock(self.store_connection.cursor) - self.adapter.txncontrol.abort( - self.store_connection, - self.prepared_txn) - - if force: - self.load_connection.drop() - self.store_connection.drop() - else: - self.load_connection.rollback_quietly() - self.blobhelper.abort() - finally: - self._clear_temp() - return NotInTransaction(self) - - def _clear_temp(self): - """ - Clear all attributes used for transaction commit. - Subclasses should override. Called on tpc_abort; subclasses - should call on other exit states. - """ - - def tpc_begin(self, transaction, begin_factory): - if transaction is self.transaction: - raise StorageTransactionError("Duplicate tpc_begin calls for same transaction.") - # XXX: Shouldn't we tpc_abort() first (well, not that exactly, because - # the transaction won't match, but logically)? The original storage - # code didn't do that, but it seems like it should. - return begin_factory(self, transaction) + self.shared_state.abort(force) + return self.shared_state.initial_state + def no_longer_stale(self): return self @@ -210,17 +264,23 @@ def no_longer_stale(self): def stale(self, e): return Stale(self, e) - -class NotInTransaction(AbstractTPCState): +@implementer(ITPCStateNotInTransaction) +class NotInTransaction(object): # The default state, when the storage is not attached to a # transaction. - __slots__ = () + __slots__ = ( + 'last_committed_tid_int', + 'read_only', + 'begin_factory', + ) - def __init__(self, previous_state, transaction=None): - super(NotInTransaction, self).__init__(previous_state) - # Reset some things that need to go away. - self.prepared_txn = None + transaction = None + + def __init__(self, begin_factory, read_only, committed_tid_int=0): + self.begin_factory = begin_factory + self.read_only = read_only + self.last_committed_tid_int = committed_tid_int def tpc_abort(self, *args, **kwargs): # pylint:disable=arguments-differ,unused-argument,signature-differs # Nothing to do @@ -238,18 +298,28 @@ def store(self, *_args, **_kwargs): restore = deleteObject = undo = restoreBlob = store - def tpc_begin(self, transaction, begin_factory): + def tpc_begin(self, storage, transaction): # XXX: Signature needs to change. if self.read_only: raise ReadOnlyError() - return super(NotInTransaction, self).tpc_begin(transaction, begin_factory) + if transaction is self.transaction: # Also handles None. + raise StorageTransactionError("Duplicate tpc_begin calls for same transaction.") + state = SharedTPCState(self, storage, transaction) + return self.begin_factory(state) + + # def no_longer_stale(self): + # return self + + # def stale(self, e): + # return Stale(self, e) + # This object appears to be false. def __bool__(self): return False __nonzero__ = __bool__ - -class Stale(AbstractTPCState): +@implementer(ITPCStateNotInTransaction) +class Stale(object): """ An error that lets us know we are stale was encountered. @@ -259,7 +329,6 @@ class Stale(AbstractTPCState): """ def __init__(self, previous_state, stale_error): - super(Stale, self).__init__(previous_state, None) self.previous_state = previous_state self.stale_error = stale_error diff --git a/src/relstorage/storage/tpc/begin.py b/src/relstorage/storage/tpc/begin.py index 7b7d1a0e..3cd815ae 100644 --- a/src/relstorage/storage/tpc/begin.py +++ b/src/relstorage/storage/tpc/begin.py @@ -31,8 +31,7 @@ from relstorage._compat import OID_TID_MAP_TYPE from relstorage._util import to_utf8 -from . import AbstractTPCState -from .temporary_storage import TemporaryStorage +from . import AbstractTPCStateDatabaseAvailable from .vote import DatabaseLockedForTid from .vote import HistoryFree as HFVoteFactory from .vote import HistoryPreserving as HPVoteFactory @@ -49,7 +48,7 @@ class _BadFactory(object): def enter(self, storage): raise NotImplementedError -class AbstractBegin(AbstractTPCState): +class AbstractBegin(AbstractTPCStateDatabaseAvailable): """ The phase we enter after ``tpc_begin`` has been called. """ @@ -71,21 +70,21 @@ class AbstractBegin(AbstractTPCState): # OIDs of things we have deleted or undone. # Stored in their 8 byte form 'invalidated_oids', + ) _DEFAULT_TPC_VOTE_FACTORY = _BadFactory # type: Callable[..., AbstractTPCState] - def __init__(self, previous_state, transaction): - super(AbstractBegin, self).__init__(previous_state, transaction) + def __init__(self, shared_state): + super(AbstractBegin, self).__init__(shared_state) self.invalidated_oids = () # We'll replace this later with the right type when it's needed. self.required_tids = {} # type: Dict[int, int] self.tpc_vote_factory = self._DEFAULT_TPC_VOTE_FACTORY # type: ignore - self.temp_storage = TemporaryStorage() - user = to_utf8(transaction.user) - desc = to_utf8(transaction.description) - ext = transaction.extension + user = to_utf8(self.transaction.user) + desc = to_utf8(self.transaction.description) + ext = self.transaction.extension if ext: ext = dumps(ext, 1) @@ -93,19 +92,20 @@ def __init__(self, previous_state, transaction): ext = b"" self.ude = user, desc, ext - # In many cases we can defer this; we only need it - # if we do deleteObject() or store a blob (which we're not fully in - # control of) - self.store_connection.restart() - - self.store_connection.begin() - self.blobhelper.begin() + # self.adapter = storage._adapter + # self.cache = storage._cache + # # XXX: This is where we would open one from the pool + # self.store_connection = storage._store_connection + # self.blobhelper = storage.blobhelper + # # In many cases we can defer this; we only need it + # # if we do deleteObject() or store a blob (which we're not fully in + # # control of) + # self.store_connection.restart() - def _clear_temp(self): - # Clear all attributes used for transaction commit. - self.temp_storage.close() + # self.store_connection.begin() + # self.blobhelper.begin() - def tpc_vote(self, transaction, storage): + def tpc_vote(self, storage, transaction): if transaction is not self.transaction: raise StorageTransactionError( "tpc_vote called with wrong transaction") @@ -136,7 +136,7 @@ def store(self, oid, previous_tid, data, transaction): # Save the data locally in a temporary place. Later, closer to commit time, # we'll send it all over at once. This lets us do things like use # COPY in postgres. - self.temp_storage.store_temp(oid_int, data, prev_tid_int) + self.shared_state.temp_storage.store_temp(oid_int, data, prev_tid_int) @metricmethod_sampled @@ -216,16 +216,16 @@ def deleteObject(self, oid, oldserial, transaction): # delete a specific verison? Etc. oid_int = bytes8_to_int64(oid) tid_int = bytes8_to_int64(oldserial) - self.cache.remove_cached_data(oid_int, tid_int) + self.shared_state.cache.remove_cached_data(oid_int, tid_int) # We delegate the actual operation to the adapter's packundo, # just like native pack - cursor = self.store_connection.cursor + cursor = self.shared_state.store_connection.cursor # When this is done, we get a tpc_vote, # and a tpc_finish. # The interface doesn't specify a return value, so for testing # we return the count of rows deleted (should be 1 if successful) - deleted = self.adapter.packundo.deleteObject(cursor, oid, oldserial) + deleted = self.shared_state.adapter.packundo.deleteObject(cursor, oid, oldserial) self._invalidated_oids(oid) return deleted @@ -247,8 +247,8 @@ class HistoryPreserving(AbstractBegin): _DEFAULT_TPC_VOTE_FACTORY = HPVoteFactory - def __init__(self, storage, transaction): - AbstractBegin.__init__(self, storage, transaction) + def __init__(self, *args): + AbstractBegin.__init__(self, *args) self.committing_tid_lock = None def _obtain_commit_lock(self, cursor): @@ -258,7 +258,7 @@ def _obtain_commit_lock(self, cursor): # because the database adapters also acquire in that # order during packing. tid_lock = DatabaseLockedForTid.lock_database_for_next_tid( - cursor, self.adapter, self.ude) + cursor, self.shared_state.adapter, self.ude) self.committing_tid_lock = tid_lock def deleteObject(self, oid, oldserial, transaction): @@ -266,7 +266,7 @@ def deleteObject(self, oid, oldserial, transaction): # theoretically these are unreachable? Our custom # vote stage just removes this transaction anyway; maybe it # can skip the committing. - self._obtain_commit_lock(self.store_connection.cursor) + self._obtain_commit_lock(self.shared_state.store_connection.cursor) # A transaction that deletes objects can *only* delete objects. # That way we don't need to store an entry in the transaction table # (and add extra bloat to the DB; that kind of defeats the point of @@ -296,8 +296,8 @@ def undo(self, transaction_id, transaction): assert len(undo_tid) == 8 undo_tid_int = bytes8_to_int64(undo_tid) - adapter = self.adapter - cursor = self.store_connection.cursor + adapter = self.shared_state.adapter + cursor = self.shared_state.store_connection.cursor assert cursor is not None adapter.locker.hold_pack_lock(cursor) @@ -318,15 +318,15 @@ def undo(self, transaction_id, transaction): # we're probably just undoing the latest state. Still, play it # a bit safer. oid_ints = [oid_int for oid_int, _ in copied] - self.cache.remove_all_cached_data_for_oids(oid_ints) + self.shared_state.cache.remove_all_cached_data_for_oids(oid_ints) # Update the current object pointers immediately, so that # subsequent undo operations within this transaction will see # the new current objects. adapter.mover.update_current(cursor, self_tid_int) - self.blobhelper.copy_undone(copied, - self.committing_tid_lock.tid) + self.shared_state.blobhelper.copy_undone(copied, + self.committing_tid_lock.tid) oids = [int64_to_8bytes(oid_int) for oid_int in oid_ints] self._invalidated_oids(*oids) diff --git a/src/relstorage/storage/tpc/finish.py b/src/relstorage/storage/tpc/finish.py index 759682cc..9db34194 100644 --- a/src/relstorage/storage/tpc/finish.py +++ b/src/relstorage/storage/tpc/finish.py @@ -19,7 +19,7 @@ from . import NotInTransaction -def Finish(vote_state, needs_store_commit=True): +def Finish(vote_state, committed_tid_int, needs_store_commit=True): """ The state we enter with tpc_finish. @@ -27,23 +27,27 @@ def Finish(vote_state, needs_store_commit=True): to the not-in-transaction state. """ # Bring the load connection to current status. - vote_state.load_connection.rollback_quietly() + vote_state.shared_state.load_connection.rollback_quietly() if needs_store_commit: # We may have already committed the store connection, so there's # no point doing so again. Also no point in rolling it back either. - txn = vote_state.prepared_txn + txn = vote_state.shared_state.prepared_txn assert txn is not None - vote_state.adapter.txncontrol.commit_phase2( - vote_state.store_connection, + vote_state.shared_state.adapter.txncontrol.commit_phase2( + vote_state.shared_state.store_connection, txn, - vote_state.load_connection) + vote_state.shared_state.load_connection) - vote_state.committing_tid_lock.release_commit_lock(vote_state.store_connection.cursor) - vote_state.cache.after_tpc_finish(vote_state.committing_tid_lock.tid, - vote_state.temp_storage) + vote_state.committing_tid_lock.release_commit_lock(vote_state.shared_state.store_connection.cursor) + vote_state.shared_state.cache.after_tpc_finish(vote_state.committing_tid_lock.tid, + vote_state.shared_state.temp_storage) # Make sure we're not holding any elevated privileges still; # that would be a bug in the driver. - vote_state.load_connection.exit_critical_phase() - vote_state.store_connection.exit_critical_phase() - return NotInTransaction(vote_state) + vote_state.shared_state.load_connection.exit_critical_phase() + vote_state.shared_state.store_connection.exit_critical_phase() + return NotInTransaction( + vote_state.shared_state.initial_state.begin_factory, + vote_state.shared_state.initial_state.read_only, + committed_tid_int, + ) diff --git a/src/relstorage/storage/tpc/restore.py b/src/relstorage/storage/tpc/restore.py index 7e57219b..4627e3d8 100644 --- a/src/relstorage/storage/tpc/restore.py +++ b/src/relstorage/storage/tpc/restore.py @@ -54,7 +54,8 @@ class Restore(object): ) def __init__(self, begin_state, committing_tid, status): - # type: (AbstractBegin, DatabaseLockedForTid, str) -> None + # type: (relstorage.storage.tpc.begin.AbstractBegin, DatabaseLockedForTid, str) -> None + # This is an extension we use for copyTransactionsFrom; # it is not part of the IStorage API. assert committing_tid is not None @@ -65,8 +66,8 @@ def __init__(self, begin_state, committing_tid, status): # other than this transaction. We currently avoid the temp tables, # though, so if we do multiple things in a restore transaction, # we could still wind up with locking issues (I think?) - adapter = begin_state.adapter - cursor = begin_state.store_connection.cursor + adapter = begin_state.shared_state.adapter + cursor = begin_state.shared_state.store_connection.cursor packed = (status == 'p') try: committing_tid_lock = DatabaseLockedForTid.lock_database_for_given_tid( @@ -74,7 +75,7 @@ def __init__(self, begin_state, committing_tid, status): cursor, adapter, begin_state.ude ) except: - begin_state.store_connection.drop() + begin_state.shared_state.store_connection.drop() raise # This is now only used for restore() @@ -94,13 +95,14 @@ def __init__(self, begin_state, committing_tid, status): # the mover is unaware of it. factory = begin_state.tpc_vote_factory assert factory is HFVoteFactory or factory is HPVoteFactory - def tpc_vote_factory(state, + def tpc_vote_factory(begin_state, _f=_HFVoteFactory if factory is HFVoteFactory else _HPVoteFactory, _c=committing_tid_lock, _b=batcher): - return _f(state, _c, _b) + return _f(begin_state, _c, _b) begin_state.tpc_vote_factory = tpc_vote_factory - begin_state.temp_storage = _TempStorageWrapper(begin_state.temp_storage) + begin_state.shared_state.temp_storage = _TempStorageWrapper( + begin_state.shared_state.temp_storage) def restore(self, oid, this_tid, data, prev_txn, transaction): # Similar to store() (see comments in FileStorage.restore for @@ -113,16 +115,16 @@ def restore(self, oid, this_tid, data, prev_txn, transaction): if transaction is not state.transaction: raise StorageTransactionError(self, transaction) - adapter = state.adapter - cursor = state.store_connection.cursor + adapter = state.shared_state.adapter + cursor = state.shared_state.store_connection.cursor assert cursor is not None oid_int = bytes8_to_int64(oid) tid_int = bytes8_to_int64(this_tid) # Save the `data`. Note that `data` can be None. # Note also that this doesn't go through the cache. - state.temp_storage.max_restored_oid = max(state.temp_storage.max_restored_oid, - oid_int) + state.shared_state.temp_storage.max_restored_oid = max(state.shared_state.temp_storage.max_restored_oid, + oid_int) # TODO: Make it go through the cache, or at least the same # sort of queing thing, so that we can do a bulk COPY. # The way we do it now complicates restoreBlob() and it complicates voting. @@ -139,8 +141,8 @@ def restoreBlob(self, oid, serial, data, blobfilename, prev_txn, txn): # (we'd prefer having DEFERRABLE INITIALLY DEFERRED FK # constraints, but as-of 8.0 MySQL doesn't support that.) self.batcher.flush() - cursor = state.store_connection.cursor - state.blobhelper.restoreBlob(cursor, oid, serial, blobfilename) + cursor = state.shared_state.store_connection.cursor + state.shared_state.blobhelper.restoreBlob(cursor, oid, serial, blobfilename) class _TempStorageWrapper(object): @@ -169,9 +171,9 @@ def __getattr__(self, name): class _VoteFactoryMixin(object): __slots__ = () - def __init__(self, state, committing_tid_lock, batcher): + def __init__(self, begin_state, committing_tid_lock, batcher): # type: (Restore, Optional[DatabaseLockedForTid], Any) -> None - super(_VoteFactoryMixin, self).__init__(state) + super(_VoteFactoryMixin, self).__init__(begin_state) # pylint:disable=assigning-non-slot self.committing_tid_lock = committing_tid_lock self.batcher = batcher diff --git a/src/relstorage/storage/tpc/vote.py b/src/relstorage/storage/tpc/vote.py index b5e7e102..767f4731 100644 --- a/src/relstorage/storage/tpc/vote.py +++ b/src/relstorage/storage/tpc/vote.py @@ -36,7 +36,7 @@ from ..interfaces import VoteReadConflictError from . import LOCK_EARLY -from . import AbstractTPCState +from . import AbstractTPCStateDatabaseAvailable from .finish import Finish @@ -94,7 +94,7 @@ def __repr__(self): self.local_allocation_time ) -class AbstractVote(AbstractTPCState): +class AbstractVote(AbstractTPCStateDatabaseAvailable): """ The state we're in following ``tpc_vote``. @@ -106,15 +106,13 @@ class AbstractVote(AbstractTPCState): __slots__ = ( # (user, description, extension) from the transaction. byte objects. 'ude', - # The TemporaryStorage. - 'temp_storage', # required_tids: {oid_int: tid_int}; confirms that certain objects # have not changed at commit. May be a BTree 'required_tids', # The DatabaseLockedForTid object 'committing_tid_lock', # {oid_bytes}: Things that get changed as part of the vote process - # and thus need to be invalidated. + # and thus need to be invalidated. TODO: Move to shared state? 'invalidated_oids', # How many conflicts there were to resolve. None if we're not there yet. 'count_conflicts', @@ -130,10 +128,10 @@ def __init__(self, begin_state, committing_tid_lock=None): # If committing_tid is passed to this method, it means the # database has already been locked and the TID is locked in. # This is (only!) done when we're restoring transactions. - super(AbstractVote, self).__init__(begin_state, begin_state.transaction) + super(AbstractVote, self).__init__(begin_state.shared_state) self.required_tids = begin_state.required_tids or {} # type: Dict[int, int] - self.temp_storage = begin_state.temp_storage # type: .temporary_storage.TemporaryStorage + #self.temp_storage = begin_state.temp_storage # type: .temporary_storage.TemporaryStorage self.ude = begin_state.ude self.committing_tid_lock = committing_tid_lock # type: Optional[DatabaseLockedForTid] self.count_conflicts = None @@ -142,10 +140,6 @@ def __init__(self, begin_state, committing_tid_lock=None): # Anything that we've undone or deleted is also invalidated. self.invalidated_oids = begin_state.invalidated_oids or set() # type: Set[bytes] - def _clear_temp(self): - # Clear all attributes used for transaction commit. - self.temp_storage.close() - def _tpc_state_extra_repr_info(self): return { 'share_lock_count': len(self.required_tids), @@ -160,13 +154,13 @@ def enter(self, storage): @log_timed def _flush_temps_to_db(self, cursor): - if self.temp_storage: + if self.shared_state.has_temp_data(): # Don't bother if we're empty. - self.adapter.mover.store_temps(cursor, self.temp_storage) + self.shared_state.adapter.mover.store_temps(cursor, self.shared_state.temp_storage) def __enter_critical_phase_until_transaction_end(self): - self.load_connection.enter_critical_phase_until_transaction_end() - self.store_connection.enter_critical_phase_until_transaction_end() + self.shared_state.load_connection.enter_critical_phase_until_transaction_end() + self.shared_state.store_connection.enter_critical_phase_until_transaction_end() def _vote(self, storage): @@ -185,10 +179,10 @@ def _vote(self, storage): """ # It is assumed that self._lock.acquire was called before this # method was called. - cursor = self.store_connection.cursor - __traceback_info__ = self.store_connection, cursor + cursor = self.shared_state.store_connection.cursor + __traceback_info__ = self.shared_state.store_connection, cursor assert cursor is not None - adapter = self.adapter + adapter = self.shared_state.adapter # execute all remaining batch store operations. # This exists as an extension point. @@ -219,7 +213,7 @@ def _vote(self, storage): # used, or whether we're updating existing objects and avoid a # bit more overhead, but benchmarking suggests that it's not # worth it in common cases. - storage._oids.set_min_oid(self.temp_storage.max_stored_oid) + storage._oids.set_min_oid(self.shared_state.temp_storage.max_stored_oid) # Lock objects being modified and those registered with # readCurrent(). This could raise ReadConflictError or locking @@ -244,7 +238,7 @@ def _vote(self, storage): invalidated_oid_ints = self.__check_and_resolve_conflicts(storage, conflicts) blobs_must_be_moved_now = False - blobhelper = self.blobhelper + blobhelper = self.shared_state.blobhelper # TODO: Don't access unless we need to committing_tid_bytes = None if self.committing_tid_lock: # We've already picked a TID. Must have called undo(). @@ -359,8 +353,8 @@ def __check_and_resolve_conflicts(self, storage, conflicts): # priority and regain control ASAP. self.__enter_critical_phase_until_transaction_end() - old_states_and_tids = self.cache.prefetch_for_conflicts( - self.load_connection.cursor, + old_states_and_tids = self.shared_state.cache.prefetch_for_conflicts( + self.shared_state.load_connection.cursor, old_states_to_prefetch ) @@ -368,9 +362,9 @@ def __check_and_resolve_conflicts(self, storage, conflicts): storage, old_states_and_tids ).tryToResolveConflict - adapter = self.adapter - read_temp = self.temp_storage.read_temp - store_temp = self.temp_storage.store_temp + adapter = self.shared_state.adapter + read_temp = self.shared_state.temp_storage.read_temp + store_temp = self.shared_state.temp_storage.store_temp # The conflicts can be very large binary strings, no need to include # them in traceback info. (Plus they could be sensitive.) @@ -406,8 +400,8 @@ def __check_and_resolve_conflicts(self, storage, conflicts): # We resolved some conflicts, so we need to send them over to the database. adapter.mover.replace_temps( - self.store_connection.cursor, - self.temp_storage.iter_for_oids(invalidated_oid_ints) + self.shared_state.store_connection.cursor, + self.shared_state.temp_storage.iter_for_oids(invalidated_oid_ints) ) return invalidated_oid_ints @@ -426,9 +420,10 @@ def _lock_and_move(self, vote_only=False): # a shared blob dir. # # Returns True if we also committed to the database. - if self.prepared_txn: + if self.shared_state.prepared_txn: # Already done; *should* have been vote_only. - assert self.committing_tid_lock, (self.prepared_txn, self.committing_tid_lock) + assert self.committing_tid_lock, (self.shared_state.prepared_txn, + self.committing_tid_lock) return False kwargs = { @@ -438,11 +433,11 @@ def _lock_and_move(self, vote_only=False): kwargs['committing_tid_int'] = self.committing_tid_lock.tid_int if vote_only: # Must be voting. - blob_meth = self.blobhelper.vote + blob_meth = self.shared_state.blobhelper.vote # TODO: Don't access if we don't need to kwargs['after_selecting_tid'] = lambda tid_int: blob_meth(int64_to_8bytes(tid_int)) kwargs['commit'] = False - if vote_only or self.adapter.DEFAULT_LOCK_OBJECTS_AND_DETECT_CONFLICTS_INTERLEAVABLE: + if vote_only or self.shared_state.adapter.DEFAULT_LOCK_OBJECTS_AND_DETECT_CONFLICTS_INTERLEAVABLE: # If we're going to have to make two trips to the database, one to lock it and get a # tid and then one to commit and release locks, either because we're # just voting right now, not committing, or because the database doesn't @@ -452,14 +447,14 @@ def _lock_and_move(self, vote_only=False): # Note that this may commit the load_connection and make it not # viable for a historical view anymore. - committing_tid_int, prepared_txn = self.adapter.lock_database_and_move( - self.store_connection, self.load_connection, - self.blobhelper, + committing_tid_int, prepared_txn = self.shared_state.adapter.lock_database_and_move( + self.shared_state.store_connection, self.shared_state.load_connection, + self.shared_state.blobhelper, # TODO: Don't access this if we don't need to self.ude, **kwargs ) - self.prepared_txn = prepared_txn + self.shared_state.prepared_txn = prepared_txn committing_tid_lock = self.committing_tid_lock assert committing_tid_lock is None or committing_tid_int == committing_tid_lock.tid_int, ( committing_tid_int, committing_tid_lock) @@ -469,7 +464,7 @@ def _lock_and_move(self, vote_only=False): self.committing_tid_lock = DatabaseLockedForTid( int64_to_8bytes(committing_tid_int), committing_tid_int, - self.adapter + self.shared_state.adapter ) log_msg = "Adapter locked database and allocated tid: %s" @@ -478,7 +473,7 @@ def _lock_and_move(self, vote_only=False): return kwargs['commit'] @log_timed - def tpc_finish(self, transaction, f=None): + def tpc_finish(self, storage, transaction, f=None): if transaction is not self.transaction: raise StorageTransactionError( "tpc_finish called with wrong transaction") @@ -505,9 +500,10 @@ def tpc_finish(self, transaction, f=None): # and commit, releasing any locks it can (some adapters do, # some don't). So we may or may not have a database lock at # this point. - assert not self.blobhelper.NEEDS_DB_LOCK_TO_FINISH + # TODO: Could optimize this call away by checking to see if we used the blobhelper. + assert not self.shared_state.blobhelper.NEEDS_DB_LOCK_TO_FINISH try: - self.blobhelper.finish(self.committing_tid_lock.tid) + self.shared_state.blobhelper.finish(self.committing_tid_lock.tid) except (IOError, OSError): # If something failed to move, that's not really a problem: # if we did any moving now, we're just a cache. @@ -518,7 +514,7 @@ def tpc_finish(self, transaction, f=None): if f is not None: f(self.committing_tid_lock.tid) - next_phase = Finish(self, not did_commit) + next_phase = Finish(self, self.committing_tid_lock.tid_int, not did_commit) if not did_commit: locks_released = time.time() @@ -539,9 +535,9 @@ def tpc_finish(self, transaction, f=None): perf_logger ) - return next_phase, self.committing_tid_lock.tid + return next_phase finally: - self._clear_temp() + self.shared_state.release() class HistoryFree(AbstractVote): @@ -563,15 +559,15 @@ class HistoryPreservingDeleteOnly(HistoryPreserving): __slots__ = () def _vote(self, storage): - if self.temp_storage and self.temp_storage.stored_oids: + if self.shared_state.temp_storage and self.shared_state.temp_storage.stored_oids: raise StorageTransactionError("Cannot store and delete at the same time.") # We only get here if we've deleted objects, meaning we hold their row locks. # We only delete objects once we hold the commit lock. assert self.committing_tid_lock # Holding the commit lock put an entry in the transaction table, # but we don't want to bump the TID or store that data. - self.adapter.txncontrol.delete_transaction( - self.store_connection.cursor, + self.shared_state.adapter.txncontrol.delete_transaction( + self.shared_state.store_connection.cursor, self.committing_tid_lock.tid_int ) self.lock_and_vote_times[0] = time.time() @@ -580,8 +576,8 @@ def _vote(self, storage): def _lock_and_move(self, vote_only=False): # We don't do the final commit, # we just prepare. - self.prepared_txn = self.adapter.txncontrol.commit_phase1( - self.store_connection, + self.shared_state.prepared_txn = self.shared_state.adapter.txncontrol.commit_phase1( + self.shared_state.store_connection, self.committing_tid_lock.tid_int ) return False diff --git a/src/relstorage/tests/locking.py b/src/relstorage/tests/locking.py index d1d30f37..5835ad18 100644 --- a/src/relstorage/tests/locking.py +++ b/src/relstorage/tests/locking.py @@ -98,9 +98,10 @@ def __store_two_for_read_current_error(self): # still need. return obj1_oid, obj2_oid, obj1_tid, db - def __read_current_and_lock(self, storage, read_current_oid, lock_oid, tid): - tx = TransactionMetaData() - storage.tpc_begin(tx) + def __read_current_and_lock(self, storage, read_current_oid, lock_oid, tid, begin=True, tx=None): + tx = tx if tx is not None else TransactionMetaData() + if begin: + storage.tpc_begin(tx) if read_current_oid is not None: storage.checkCurrentSerialInTransaction(read_current_oid, tid, tx) storage.store(lock_oid, tid, b'bad pickle2', '', tx) @@ -247,7 +248,7 @@ def checkTL_ReadCurrentConflict_DoesNotTakeExclusiveLocks(self): # on obj1. We should immediately get a read current error and not conflict with the # exclusive lock. with self.assertRaisesRegex(VoteReadConflictError, "serial this txn started"): - self.__read_current_and_lock(storageB, obj2_oid, obj1_oid, tid) + self.__read_current_and_lock(storageB, obj2_oid, obj1_oid, tid, begin=False, tx=txb) # Which is still held because we cannot lock it. with self.assertRaises(UnableToLockRowsToModifyError): @@ -271,7 +272,7 @@ def checkTL_OverlappedReadCurrent_SharedLocksFirst(self): ) # The NOWAIT lock should be very quick to fire. if self._storage._adapter.locker.supports_row_lock_nowait: - self.assertLessEqual(duration_blocking, commit_lock_timeout) + self.assertLessEqual(duration_blocking, commit_lock_timeout * 1.3) else: # Sigh. Old MySQL. Very slow. This takes around 4.5s to run both iterations. self.assertLessEqual(duration_blocking, commit_lock_timeout * 2.5)