Skip to content

Commit

Permalink
Windows pywin32 locking
Browse files Browse the repository at this point in the history
Locking ranges is not currently supported on Windows.

pywintypes.OVERLAPPED in the Windows lock implementation may
not be a class member (spack#22244)

Windows: Lock entire file even if lock-by-range is requested (spack#24183)

Windows lock timeout (spack#25189)

Fixup improper cross-platform lock catching

Add locking test to Windows CI (spack#25233)

Co-authored-by: [email protected] <[email protected]>
  • Loading branch information
johnwparent and loulawrence committed Jan 25, 2022
1 parent 7134cab commit 718665d
Showing 1 changed file with 120 additions and 28 deletions.
148 changes: 120 additions & 28 deletions lib/spack/llnl/util/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,26 @@
# SPDX-License-Identifier: (Apache-2.0 OR MIT)

import errno
import fcntl
import os
import socket
import time
from datetime import datetime
from sys import platform as _platform
from typing import Dict, Tuple # novm

import llnl.util.tty as tty

import spack.util.string

is_windows = _platform == 'win32'
if not is_windows:
import fcntl
else:
import win32con
import win32file

import pywintypes # isort:skip

__all__ = [
'Lock',
'LockDowngradeError',
Expand All @@ -29,8 +38,6 @@
'CantCreateLockError'
]

#: Mapping of supported locks to description
lock_type = {fcntl.LOCK_SH: 'read', fcntl.LOCK_EX: 'write'}

#: A useful replacement for functions that should return True when not provided
#: for example.
Expand Down Expand Up @@ -166,6 +173,26 @@ def _attempts_str(wait_time, nattempts):
return ' after {0:0.2f}s and {1}'.format(wait_time, attempts)


def lock_checking(func):
from functools import wraps

@wraps(func)
def win_lock(self, *args, **kwargs):
if is_windows and self._reads > 0:
self._partial_unlock()
try:
suc = func(self, *args, **kwargs)
except Exception as e:
if self._current_lock:
timeout = kwargs.get('timeout', None)
self._lock(self._current_lock, timeout=timeout)
raise e
else:
suc = func(self, *args, **kwargs)
return suc
return win_lock


class Lock(object):
"""This is an implementation of a filesystem lock using Python's lockf.
Expand Down Expand Up @@ -206,6 +233,7 @@ def __init__(self, path, start=0, length=0, default_timeout=None,
"""
self.path = path
self._file = None
self._file_mode = ""
self._reads = 0
self._writes = 0

Expand All @@ -228,6 +256,32 @@ def __init__(self, path, start=0, length=0, default_timeout=None,
self.pid = self.old_pid = None
self.host = self.old_host = None

if is_windows:
self.LOCK_EX = win32con.LOCKFILE_EXCLUSIVE_LOCK # exclusive lock
self.LOCK_SH = 0 # shared lock, default
self.LOCK_NB = win32con.LOCKFILE_FAIL_IMMEDIATELY # non-blocking
self.LOCK_CATCH = pywintypes.error
else:
self.LOCK_EX = fcntl.LOCK_EX
self.LOCK_SH = fcntl.LOCK_SH
self.LOCK_NB = fcntl.LOCK_NB
self.LOCK_UN = fcntl.LOCK_UN
self.LOCK_CATCH = IOError

# Mapping of supported locks to description
self.lock_type = {self.LOCK_SH: 'read', self.LOCK_EX: 'write'}
self._current_lock = None

def __lock_fail_condition(self, e):
if is_windows:
# 33 "The process cannot access the file because another
# process has locked a portion of the file."
# 32 "The process cannot access the file because it is being
# used by another process"
return e.args[0] not in (32, 33)
else:
return e.errno not in (errno.EAGAIN, errno.EACCES)

@staticmethod
def _poll_interval_generator(_wait_times=None):
"""This implements a backoff scheme for polling a contended resource
Expand Down Expand Up @@ -256,7 +310,8 @@ def __repr__(self):
"""Formal representation of the lock."""
rep = '{0}('.format(self.__class__.__name__)
for attr, value in self.__dict__.items():
rep += '{0}={1}, '.format(attr, value.__repr__())
if attr != "LOCK_CATCH":
rep += '{0}={1}, '.format(attr, value.__repr__())
return '{0})'.format(rep.strip(', '))

def __str__(self):
Expand All @@ -266,6 +321,7 @@ def __str__(self):
activity = '#reads={0}, #writes={1}'.format(self._reads, self._writes)
return '({0}, {1}, {2})'.format(location, timeout, activity)

@lock_checking
def _lock(self, op, timeout=None):
"""This takes a lock using POSIX locks (``fcntl.lockf``).
Expand All @@ -276,23 +332,23 @@ def _lock(self, op, timeout=None):
successfully acquired, the total wait time and the number of attempts
is returned.
"""
assert op in lock_type
assert op in self.lock_type

self._log_acquiring('{0} LOCK'.format(lock_type[op].upper()))
self._log_acquiring('{0} LOCK'.format(self.lock_type[op].upper()))
timeout = timeout or self.default_timeout

# Create file and parent directories if they don't exist.
if self._file is None:
self._ensure_parent_directory()
self._file = file_tracker.get_fh(self.path)

if op == fcntl.LOCK_EX and self._file.mode == 'r':
if op == self.LOCK_EX and self._file.mode == 'r':
# Attempt to upgrade to write lock w/a read-only file.
# If the file were writable, we'd have opened it 'r+'
raise LockROFileError(self.path)

self._log_debug("{0} locking [{1}:{2}]: timeout {3} sec"
.format(lock_type[op], self._start, self._length,
.format(self.lock_type[op], self._start, self._length,
timeout))

poll_intervals = iter(Lock._poll_interval_generator())
Expand All @@ -313,58 +369,69 @@ def _lock(self, op, timeout=None):
return total_wait_time, num_attempts

raise LockTimeoutError("Timed out waiting for a {0} lock."
.format(lock_type[op]))
.format(self.lock_type[op]))

def _poll_lock(self, op):
"""Attempt to acquire the lock in a non-blocking manner. Return whether
the locking attempt succeeds
"""
assert op in lock_type
assert op in self.lock_type

try:
# Try to get the lock (will raise if not available.)
fcntl.lockf(self._file, op | fcntl.LOCK_NB,
self._length, self._start, os.SEEK_SET)
if is_windows:
hfile = win32file._get_osfhandle(self._file.fileno())
win32file.LockFileEx(hfile,
op | self.LOCK_NB, # flags
0,
0xffff0000,
pywintypes.OVERLAPPED())
else:
fcntl.lockf(self._file, op | self.LOCK_NB,
self._length, self._start, os.SEEK_SET)

# help for debugging distributed locking
if self.debug:
# All locks read the owner PID and host
self._read_log_debug_data()
self._log_debug('{0} locked {1} [{2}:{3}] (owner={4})'
.format(lock_type[op], self.path,
.format(self.lock_type[op], self.path,
self._start, self._length, self.pid))

# Exclusive locks write their PID/host
if op == fcntl.LOCK_EX:
if op == self.LOCK_EX:
self._write_log_debug_data()

return True

except IOError as e:
# EAGAIN and EACCES == locked by another process (so try again)
if e.errno not in (errno.EAGAIN, errno.EACCES):
except self.LOCK_CATCH as e:
# check if lock failure or lock is already held
if self.__lock_fail_condition(e):
raise

return False

def _ensure_parent_directory(self):
parent = os.path.dirname(self.path)

# relative paths to lockfiles in the current directory have no parent
if not parent:
return '.'

try:
os.makedirs(parent)
except OSError as e:
# makedirs can fail when diretory already exists.
# makedirs can fail when directory already exists.
if not (e.errno == errno.EEXIST and os.path.isdir(parent) or
e.errno == errno.EISDIR):
raise
return parent

def _read_log_debug_data(self):
"""Read PID and host data out of the file if it is there."""
if is_windows:
# Not implemented for windows
return

self.old_pid = self.pid
self.old_host = self.host

Expand All @@ -377,6 +444,10 @@ def _read_log_debug_data(self):

def _write_log_debug_data(self):
"""Write PID and host data to the file, recording old values."""
if is_windows:
# Not implemented for windows
return

self.old_pid = self.pid
self.old_host = self.host

Expand All @@ -390,18 +461,37 @@ def _write_log_debug_data(self):
self._file.flush()
os.fsync(self._file.fileno())

def _unlock(self):
def _partial_unlock(self):
"""Releases a lock using POSIX locks (``fcntl.lockf``)
Releases the lock regardless of mode. Note that read locks may
be masquerading as write locks, but this removes either.
"""
fcntl.lockf(self._file, fcntl.LOCK_UN,
self._length, self._start, os.SEEK_SET)

if is_windows:
hfile = win32file._get_osfhandle(self._file.fileno())
win32file.UnlockFileEx(hfile,
0,
0xffff0000,
pywintypes.OVERLAPPED())
else:
fcntl.lockf(self._file, self.LOCK_UN,
self._length, self._start, os.SEEK_SET)

def _unlock(self):
"""Releases a lock using POSIX locks (``fcntl.lockf``)
Releases the lock regardless of mode. Note that read locks may
be masquerading as write locks, but this removes either.
Reset all lock attributes to initial states
"""
self._partial_unlock()
file_tracker.release_fh(self.path)

self._file = None
self._file_mode = ""
self._reads = 0
self._writes = 0

Expand All @@ -420,8 +510,9 @@ def acquire_read(self, timeout=None):

if self._reads == 0 and self._writes == 0:
# can raise LockError.
wait_time, nattempts = self._lock(fcntl.LOCK_SH, timeout=timeout)
wait_time, nattempts = self._lock(self.LOCK_SH, timeout=timeout)
self._reads += 1
self._current_lock = self.LOCK_SH
# Log if acquired, which includes counts when verbose
self._log_acquired('READ LOCK', wait_time, nattempts)
return True
Expand All @@ -445,8 +536,9 @@ def acquire_write(self, timeout=None):

if self._writes == 0:
# can raise LockError.
wait_time, nattempts = self._lock(fcntl.LOCK_EX, timeout=timeout)
wait_time, nattempts = self._lock(self.LOCK_EX, timeout=timeout)
self._writes += 1
self._current_lock = self.LOCK_EX
# Log if acquired, which includes counts when verbose
self._log_acquired('WRITE LOCK', wait_time, nattempts)

Expand Down Expand Up @@ -486,10 +578,10 @@ def downgrade_write_to_read(self, timeout=None):
"""
timeout = timeout or self.default_timeout

if self._writes == 1 and self._reads == 0:
if self._writes == 1:
self._log_downgrading()
# can raise LockError.
wait_time, nattempts = self._lock(fcntl.LOCK_SH, timeout=timeout)
wait_time, nattempts = self._lock(self.LOCK_SH, timeout=timeout)
self._reads = 1
self._writes = 0
self._log_downgraded(wait_time, nattempts)
Expand All @@ -505,10 +597,10 @@ def upgrade_read_to_write(self, timeout=None):
"""
timeout = timeout or self.default_timeout

if self._reads == 1 and self._writes == 0:
if self._reads >= 1 and self._writes == 0:
self._log_upgrading()
# can raise LockError.
wait_time, nattempts = self._lock(fcntl.LOCK_EX, timeout=timeout)
wait_time, nattempts = self._lock(self.LOCK_EX, timeout=timeout)
self._reads = 0
self._writes = 1
self._log_upgraded(wait_time, nattempts)
Expand Down

0 comments on commit 718665d

Please sign in to comment.