Skip to content

Commit

Permalink
Checkpoint on refactoring to better manage store connections. Refs #409
Browse files Browse the repository at this point in the history
… and refs #390
  • Loading branch information
jamadden committed Sep 1, 2020
1 parent 542bb72 commit 770c04a
Show file tree
Hide file tree
Showing 11 changed files with 413 additions and 217 deletions.
3 changes: 3 additions & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ max-locals=20
# undefined-all-variable crash.
unsafe-load-any-extension = yes

property-classes=zope.cachedescriptors.property.Lazy,zope.cachedescriptors.property.Cached,relstorage._util.Lazy


# Local Variables:
# mode: conf-space
# End:
5 changes: 4 additions & 1 deletion src/relstorage/adapters/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def call(self, f, can_reconnect, *args, **kwargs):
The function may be called up to twice, if the *fresh_connection_p* is false
on the first call and a disconnected exception is raised.
:keyword bool can_reconnect: If True, then we will attempt to reconnect
the connection and try again if an exception is raised if *f*. If False,
the connection and try again if a disconnected exception is raised in *f*. If False,
we let that exception propagate. For example, if a transaction is in progress,
set this to false.
"""
Expand Down Expand Up @@ -269,6 +269,7 @@ def __repr__(self):
self._cursor
)


@implementer(interfaces.IManagedLoadConnection)
class LoadConnection(AbstractManagedConnection):

Expand All @@ -290,10 +291,12 @@ class StoreConnection(AbstractManagedConnection):
def begin(self):
self.connmanager.begin(*self.open_if_needed())


class PrePackConnection(StoreConnection):
__slots__ = ()
_NEW_CONNECTION_NAME = 'open_for_pre_pack'


@implementer(interfaces.IManagedDBConnection)
class ClosedConnection(object):
"""
Expand Down
8 changes: 5 additions & 3 deletions src/relstorage/cache/mvcc.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ def _poll(self, cache, conn, cursor,

if polled_tid == 0 or polled_tid < polling_since:
assert change_iter is None
# Freshly zapped or empty database (tid==0) or stale and
# Freshly zapped or empty database (tid==0) or stale replica and
# asked to revert (as opposed to raising
# ReadConflictError). Mark not just this one, but all
# other extent indexes as needing a full rebuild.
Expand All @@ -668,7 +668,8 @@ def _poll(self, cache, conn, cursor,
return None

# Ok cool, we got data to move us forward.
# We must be careful to always consume the iterator, even if we exit early.
# We must be careful to always consume the iterator, even if we exit early
# (because it could be a server-side cursor holding connection state).
# So we do that now.
change_iter = list(change_iter)
self.log(
Expand Down Expand Up @@ -697,7 +698,8 @@ def _poll(self, cache, conn, cursor,
# it's older than our poll, we take control;
# but the history of the index must always move forward,
# so we build it starting from what's currently installed.
# There could be some overlap.
# There could be some overlap. Since we moved the index forward,
# we can vacuum.
change_index = self.object_index = installed_index.with_polled_changes(
polled_tid,
polling_since,
Expand Down
54 changes: 29 additions & 25 deletions src/relstorage/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def close(self):
stats = lambda s: {'closed': True}
afterCompletion = lambda s, c: None


@implementer(IRelStorage)
class RelStorage(LegacyMethodsMixin,
ConflictResolution.ConflictResolvingStorage):
Expand All @@ -112,8 +113,8 @@ class RelStorage(LegacyMethodsMixin,
_options = None
_is_read_only = False
_read_only_error = ReadOnlyError
# _ltid is the ID of the last transaction committed by this instance.
_ltid = z64
# ZODB TID of the last transaction committed by this instance.
_last_tid_i_committed_bytes = z64

# _closed is True after self.close() is called.
_closed = False
Expand All @@ -137,15 +138,15 @@ class RelStorage(LegacyMethodsMixin,

_load_connection = ClosedConnection()
_store_connection = ClosedConnection()
_tpc_begin_factory = None
#_tpc_begin_factory = None

_oids = ReadOnlyOIDs()

# Attributes in our dictionary that shouldn't have stale()/no_longer_stale()
# called on them. At this writing, it's just the type object.
_STALE_IGNORED_ATTRS = (
'_tpc_begin_factory',
)
# # Attributes in our dictionary that shouldn't have stale()/no_longer_stale()
# # called on them. At this writing, it's just the type object.
# _STALE_IGNORED_ATTRS = (
# '_tpc_begin_factory',
# )

def __init__(self, adapter, name=None, create=None,
options=None, cache=None, blobhelper=None,
Expand Down Expand Up @@ -218,12 +219,12 @@ def __init__(self, adapter, name=None, create=None,
else:
self.blobhelper = BlobHelper(options=options, adapter=adapter)

self._tpc_begin_factory = HistoryPreserving if self._options.keep_history else HistoryFree
tpc_begin_factory = HistoryPreserving if self._options.keep_history else HistoryFree

if hasattr(self._adapter.packundo, 'deleteObject'):
interface.alsoProvides(self, ZODB.interfaces.IExternalGC)

self._tpc_phase = NotInTransaction.from_storage(self)
self._tpc_phase = NotInTransaction(tpc_begin_factory, self._is_read_only)
if not self._is_read_only:
self._oids = OIDs(self._adapter.oidallocator, self._store_connection)

Expand Down Expand Up @@ -333,7 +334,7 @@ def before_instance(self, before):
@property
def highest_visible_tid(self):
cache_tid = self._cache.highest_visible_tid or 0
committed_tid = bytes8_to_int64(self._ltid)
committed_tid = bytes8_to_int64(self._last_tid_i_committed_bytes)
# In case we haven't polled yet.
return max(cache_tid, committed_tid)

Expand Down Expand Up @@ -457,7 +458,7 @@ def checkCurrentSerialInTransaction(self, oid, serial, transaction):
@metricmethod
def tpc_begin(self, transaction, tid=None, status=' '):
try:
self._tpc_phase = self._tpc_phase.tpc_begin(transaction, self._tpc_begin_factory)
self._tpc_phase = self._tpc_phase.tpc_begin(self, transaction)
except:
# Could be a database (connection) error, could be a programming
# bug. Either way, we're fine to roll everything back and hope
Expand All @@ -483,7 +484,7 @@ def tpc_vote(self, transaction):
# the object has changed during the commit process, due to
# conflict resolution or undo.
try:
next_phase = self._tpc_phase.tpc_vote(transaction, self)
next_phase = self._tpc_phase.tpc_vote(self, transaction)
except:
self.tpc_abort(transaction, _force=True)
raise
Expand All @@ -494,7 +495,7 @@ def tpc_vote(self, transaction):
@metricmethod
def tpc_finish(self, transaction, f=None):
try:
next_phase, committed_tid = self._tpc_phase.tpc_finish(transaction, f)
next_phase = self._tpc_phase.tpc_finish(self, transaction, f)
except:
# OH NO! This isn't supposed to happen!
# It's unlikely tpc_abort will get called...
Expand All @@ -503,7 +504,9 @@ def tpc_finish(self, transaction, f=None):
# The store connection is either committed or rolledback;
# the load connection is now rolledback.
self._tpc_phase = next_phase
self._ltid = committed_tid
# XXX: De-dup this
committed_tid = int64_to_8bytes(next_phase.last_committed_tid_int)
self._last_tid_i_committed_bytes = committed_tid
return committed_tid

@metricmethod
Expand All @@ -516,17 +519,18 @@ def tpc_abort(self, transaction, _force=False):
if _force:
# We're here under unexpected circumstances. It's possible something
# might go wrong rolling back.
self._tpc_phase = NotInTransaction.from_storage(self)
self._tpc_phase = NotInTransaction(self._is_read_only, self.lastTransactionInt())
raise

def lastTransaction(self):
if self._ltid == z64 and self._cache.highest_visible_tid is None:
if self._last_tid_i_committed_bytes == z64 and self._cache.highest_visible_tid is None:
# We haven't committed *or* polled for transactions,
# so our MVCC state is "floating".
# Read directly from the database to get the latest value,
return int64_to_8bytes(self._adapter.txncontrol.get_tid(self._load_connection.cursor))

return max(self._ltid, int64_to_8bytes(self._cache.highest_visible_tid or 0))
return max(self._last_tid_i_committed_bytes,
int64_to_8bytes(self._cache.highest_visible_tid or 0))

def lastTransactionInt(self):
return bytes8_to_int64(self.lastTransaction())
Expand Down Expand Up @@ -729,8 +733,8 @@ def __stale(self, stale_error):
replacements = {}
my_ns = vars(self)
for k, v in my_ns.items():
if k in self._STALE_IGNORED_ATTRS:
continue
# if k in self._STALE_IGNORED_ATTRS:
# continue
if callable(getattr(v, 'stale', None)):
new_v = v.stale(stale_error)
replacements[k] = new_v
Expand Down Expand Up @@ -759,8 +763,8 @@ def __no_longer_stale(self, _conn, _cursor):
replacements = {
k: v.no_longer_stale()
for k, v in my_ns.items()
if k not in self._STALE_IGNORED_ATTRS
and callable(getattr(v, 'no_longer_stale', None))
# if k not in self._STALE_IGNORED_ATTRS
if callable(getattr(v, 'no_longer_stale', None))
}

my_ns.update(replacements)
Expand All @@ -786,8 +790,8 @@ def __on_load_first_use(self, conn, cursor):
# by this connection, we don't want to ghost objects that we're sure
# are up-to-date unless someone else has changed them.
# Note that transactions can happen between us committing and polling.
if self._ltid is not None:
ignore_tid = bytes8_to_int64(self._ltid)
if self._last_tid_i_committed_bytes is not None:
ignore_tid = bytes8_to_int64(self._last_tid_i_committed_bytes)
else:
ignore_tid = None

Expand Down Expand Up @@ -853,7 +857,7 @@ def __on_load_first_use(self, conn, cursor):
# Hmm, ok, the Connection isn't polling us in a timely fashion.
# Maybe we're the root storage? Maybe our APIs are being used
# independently? At any rate, we're going to stop tracking now;
# if a connection eventually gets around to polling us, they'll
# if a Connection eventually gets around to polling us, they'll
# need to clear their whole cache
self.__queued_changes = None

Expand Down
112 changes: 112 additions & 0 deletions src/relstorage/storage/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from __future__ import print_function

from zope.interface import Interface
from zope.interface import Attribute

from transaction.interfaces import TransientError
from ZODB.POSException import StorageTransactionError
Expand Down Expand Up @@ -67,3 +68,114 @@ class VoteReadConflictError(ReadConflictError):
A read conflict (from Connection.readCurrent()) that wasn't
detected until the storage voted.
"""

class ITPCState(Interface):
"""
An object representing the current state (phase) of the two-phase commit protocol,
and how to transition between it and other phases.
"""

transaction = Attribute("The *transaction* object from ZODB.")

def tpc_abort(storage, transaction, force=False):
"""
Clear any used resources, and return the object
representing :class:`ITPCStateNotInTransaction`.
:param transaction: The transaction object from ZODB.
If this is not the current transaction, does nothing
unless *force* is true.
:keyword bool force: Whether to forcibly abort the transaction.
If this is true, then it doesn't matter if the *transaction* parameter
matches or not. Also, underlying RDBMS connections should also be closed
and discarded.
:return: The previous `ITPCStateNotInTransaction`.
"""


class ITPCStateNotInTransaction(ITPCState):
"""
The "idle" state.
In this state, no store connection is available,
and the *transaction* is always `None`.
Because ZODB tests this, this method has to define
a bunch of methods that are also defined by various other states.
These methods should raise ``StorageTransactionError``, or
``ReadOnlyError``, as appropriate.
"""

last_committed_tid_int = Attribute(
"""
The TID of the last transaction committed to get us to this state.
Initially, this is 0. In the value returned from :meth:`ITPCPhaseVoting.tpc_finish`,
it is the TID just committed.
"""
)

def tpc_begin(storage, transaction):
"""
Enter the two-phase commit protocol.
:return: An implementation of :class:`ITPCStateBegan`.
"""

def tpc_finish(*args, **kwargs):
"""
Raises ``StorageTranasctionError``
"""

def tpc_vote(*args, **kwargs):
"""
As for `tpc_finish`.
"""

def store(*args, **kwargs):
"""
Raises ``ReadOnlyError`` or ``StorageTranasctionError``
"""

restore = deleteObject = undo = restoreBlob = store

class ITPCStateDatabaseAvailable(ITPCState):
"""
A state where the database connection is usually available.
"""

store_connection = Attribute("The IManagedStoreConnection in use.")


class ITPCStateBegan(ITPCStateDatabaseAvailable):
"""
The first phase where the database is available for storage.
Storing objects, restoring objects, storing blobs, deleting objects,
all happen in this phase.
"""

def tpc_vote(storage, transaction):
"""
Enter the voting state.
:return: An implementation of :class:`ITPCStateVoting`
"""

class ITPCStateBeganHP(ITPCStateBegan):
"""
The extra methods that are available for history-preserving
storages.
"""

class ITPCPhaseVoting(ITPCStateDatabaseAvailable):
"""
The phase where voting happens. This follows the beginning phase.
"""

def tpc_finish(storage, transaction, f=None):
"""
Finish the transaction.
:return: The next implementation of :class:`ITPCPhaseNotInTransaction`
"""
Loading

0 comments on commit 770c04a

Please sign in to comment.