From 9a43b27772e0380b7a66bdc7b16c1679d2e9bf17 Mon Sep 17 00:00:00 2001 From: LeXofLeviafan Date: Sat, 24 Aug 2024 01:04:01 +0200 Subject: [PATCH] [jarun#753] implement thread safety --- buku | 843 +++++++++++++++++++++++++---------------------- setup.py | 1 + tests/pytest.ini | 2 + tox.ini | 2 + 4 files changed, 450 insertions(+), 398 deletions(-) diff --git a/buku b/buku index 7dc1bd88..6ca928d5 100755 --- a/buku +++ b/buku @@ -478,6 +478,7 @@ class BukuDb: self.chatty = chatty self.colorize = colorize self.conn, self.cur = BukuDb.initdb(dbfile, self.chatty) + self.lock = threading.RLock() # repeatable lock, only blocks *concurrent* access self._to_export = None # type: Optional[Dict[str, str | BookmarkVar]] self._to_delete = None # type: Optional[int | Sequence[int] | Set[int] | range] @@ -584,32 +585,42 @@ class BukuDb: return (conn, cur) - def _fetch(self, query: str, *args) -> List[BookmarkVar]: - self.cur.execute(query, args) - return [BookmarkVar(*x) for x in self.cur.fetchall()] + def _fetch(self, query: str, *args, lock: bool = True) -> List[BookmarkVar]: + if not lock: + self.cur.execute(query, args) + return [BookmarkVar(*x) for x in self.cur.fetchall()] + with self.lock: + return self._fetch(query, *args, lock=False) - def _fetch_first(self, query: str, *args) -> Optional[BookmarkVar]: - rows = self._fetch(query + ' LIMIT 1', *args) + def _fetch_first(self, query: str, *args, lock: bool = True) -> Optional[BookmarkVar]: + rows = self._fetch(query + ' LIMIT 1', *args, lock=lock) return rows[0] if rows else None - def get_rec_all(self): + def get_rec_all(self, *, lock: bool = True): """Get all the bookmarks in the database. + Parameters + ---------- + lock : bool + Whether to restrict concurrent access (True by default). + Returns ------- list A list of tuples representing bookmark records. """ - return self._fetch('SELECT * FROM bookmarks') + return self._fetch('SELECT * FROM bookmarks', lock=lock) - def get_rec_by_id(self, index: int) -> Optional[BookmarkVar]: + def get_rec_by_id(self, index: int, *, lock: bool = True) -> Optional[BookmarkVar]: """Get a bookmark from database by its ID. Parameters ---------- index : int DB index of bookmark record. + lock : bool + Whether to restrict concurrent access (True by default). Returns ------- @@ -617,15 +628,17 @@ class BukuDb: Bookmark data, or None if index is not found. """ - return self._fetch_first('SELECT * FROM bookmarks WHERE id = ?', index) + return self._fetch_first('SELECT * FROM bookmarks WHERE id = ?', index, lock=lock) - def get_rec_all_by_ids(self, indices: Sequence[int] | Set[int] | range): # Ints + def get_rec_all_by_ids(self, indices: Sequence[int] | Set[int] | range, *, lock: bool = True): # Ints """Get all the bookmarks in the database. Parameters ---------- indices : int[] | int{} | range DB indices of bookmark records. + lock : bool + Whether to restrict concurrent access (True by default). Returns ------- @@ -634,15 +647,17 @@ class BukuDb: """ placeholder = ', '.join(['?'] * len(indices)) - return indices and self._fetch(f'SELECT * FROM bookmarks WHERE id IN ({placeholder})', *list(indices)) + return indices and self._fetch(f'SELECT * FROM bookmarks WHERE id IN ({placeholder})', *list(indices), lock=lock) - def get_rec_id(self, url): + def get_rec_id(self, url: str, *, lock: bool = True): """Check if URL already exists in DB. Parameters ---------- url : str A URL to search for in the DB. + lock : bool + Whether to restrict concurrent access (True by default). Returns ------- @@ -650,16 +665,18 @@ class BukuDb: DB index, or None if URL not found in DB. """ - row = self._fetch_first('SELECT * FROM bookmarks WHERE url = ?', url) + row = self._fetch_first('SELECT * FROM bookmarks WHERE url = ?', url, lock=lock) return row and row.id - def get_rec_ids(self, urls: Sequence[str] | Set[str]): # Values[str] + def get_rec_ids(self, urls: Sequence[str] | Set[str], *, lock: bool = True): # Values[str] """Check if URL already exists in DB. Parameters ---------- urls : str[] | str{} URLs to search for in the DB. + lock : bool + Whether to restrict concurrent access (True by default). Returns ------- @@ -669,21 +686,32 @@ class BukuDb: if not urls: return [] - placeholder = ', '.join(['?'] * len(urls)) - self.cur.execute(f'SELECT id FROM bookmarks WHERE url IN ({placeholder})', list(urls)) - return [x[0] for x in self.cur.fetchall()] - - def get_max_id(self) -> int: + if not lock: + placeholder = ', '.join(['?'] * len(urls)) + self.cur.execute(f'SELECT id FROM bookmarks WHERE url IN ({placeholder})', list(urls)) + return [x[0] for x in self.cur.fetchall()] + with self.lock: + return self.get_rec_ids(urls, lock=False) + + def get_max_id(self, *, lock: bool = True) -> int: """Fetch the ID of the last record. + Parameters + ---------- + lock : bool + Whether to restrict concurrent access (True by default). + Returns ------- int ID of the record if any record exists, else None. """ - self.cur.execute('SELECT MAX(id) FROM bookmarks') - return self.cur.fetchall()[0][0] + if not lock: + self.cur.execute('SELECT MAX(id) FROM bookmarks') + return self.cur.fetchall()[0][0] + with self.lock: + return self.get_max_id(lock=False) def add_rec( self, @@ -779,12 +807,13 @@ class BukuDb: flagset |= FLAG_IMMUTABLE qry = 'INSERT INTO bookmarks(URL, metadata, tags, desc, flags) VALUES (?, ?, ?, ?, ?)' - self.cur.execute(qry, (url, title, tags_in, desc, flagset)) - if not delay_commit: - self.conn.commit() - if self.chatty: - self.print_rec(self.cur.lastrowid) - return self.cur.lastrowid + with self.lock: + self.cur.execute(qry, (url, title, tags_in, desc, flagset)) + if not delay_commit: + self.conn.commit() + if self.chatty: + self.print_rec(self.cur.lastrowid) + return self.cur.lastrowid except Exception as e: LOGERR('add_rec(): %s', e) return None @@ -812,30 +841,31 @@ class BukuDb: return True indices = (None if not index else [index] if isinstance(index, int) else index) - if not indices: - resp = read_in('Append the tags to ALL bookmarks? (y/n): ') - if resp != 'y': - return False + with self.lock: + if not indices: + resp = read_in('Append the tags to ALL bookmarks? (y/n): ') + if resp != 'y': + return False - self.cur.execute('SELECT id, tags FROM bookmarks ORDER BY id ASC') - else: - placeholder = ', '.join(['?'] * len(indices)) - self.cur.execute(f'SELECT id, tags FROM bookmarks WHERE id IN ({placeholder}) ORDER BY id ASC', tuple(indices)) + self.cur.execute('SELECT id, tags FROM bookmarks ORDER BY id ASC') + else: + placeholder = ', '.join(['?'] * len(indices)) + self.cur.execute(f'SELECT id, tags FROM bookmarks WHERE id IN ({placeholder}) ORDER BY id ASC', tuple(indices)) - resultset = self.cur.fetchall() - if resultset: - query = 'UPDATE bookmarks SET tags = ? WHERE id = ?' - for row in resultset: - tags = row[1] + tags_in[1:] - tags = parse_tags([tags]) - self.cur.execute(query, (tags, row[0],)) - if self.chatty and not delay_commit: - self.print_rec(row[0]) - else: - return False + resultset = self.cur.fetchall() + if resultset: + query = 'UPDATE bookmarks SET tags = ? WHERE id = ?' + for row in resultset: + tags = row[1] + tags_in[1:] + tags = parse_tags([tags]) + self.cur.execute(query, (tags, row[0],)) + if self.chatty and not delay_commit: + self.print_rec(row[0]) + else: + return False - if not delay_commit: - self.conn.commit() + if not delay_commit: + self.conn.commit() return True @@ -877,25 +907,29 @@ class BukuDb: query += ' AND id IN ({})'.format(', '.join(['?'] * len(indices))) count = 0 - for tag in tags_to_delete: - tag = delim_wrap(tag) - args = (tag, DELIM, '%'+like_escape(tag, '`')+'%') + tuple(indices or []) - self.cur.execute(query, args) - count += self.cur.rowcount + with self.lock: + for tag in tags_to_delete: + tag = delim_wrap(tag) + args = (tag, DELIM, '%'+like_escape(tag, '`')+'%') + tuple(indices or []) + self.cur.execute(query, args) + count += self.cur.rowcount - if count > 0 and not delay_commit: - self.conn.commit() - if self.chatty: - print('%d record(s) updated' % count) + if count > 0 and not delay_commit: + self.conn.commit() + if self.chatty: + print('%d record(s) updated' % count) return True # Process a single index # Use SELECT and UPDATE to handle multiple tags at once - query = 'SELECT id, tags FROM bookmarks WHERE id = ? LIMIT 1' - self.cur.execute(query, list(indices)) - resultset = self.cur.fetchall() - if resultset: + with self.lock: + query = 'SELECT id, tags FROM bookmarks WHERE id = ? LIMIT 1' + self.cur.execute(query, list(indices)) + resultset = self.cur.fetchall() + if not resultset: + return False + query = 'UPDATE bookmarks SET tags = ? WHERE id = ?' for row in resultset: tags = row[1] @@ -909,8 +943,6 @@ class BukuDb: if not delay_commit: self.conn.commit() - else: - return False return True @@ -1103,25 +1135,26 @@ class BukuDb: LOGDBG('update_rec query: "%s", args: %s', query, arguments) - try: - self.cur.execute(query, arguments) - self.conn.commit() - if self.cur.rowcount > 0 and self.chatty: - self.print_rec(index) - elif self.cur.rowcount == 0: - if single: - LOGERR('No matching index %d', index) - else: - LOGERR('No matches found') + with self.lock: + try: + self.cur.execute(query, arguments) + self.conn.commit() + if self.cur.rowcount > 0 and self.chatty: + self.print_rec(index) + elif self.cur.rowcount == 0: + if single: + LOGERR('No matching index %d', index) + else: + LOGERR('No matches found') + return False + except sqlite3.IntegrityError: + LOGERR('URL already exists') return False - except sqlite3.IntegrityError: - LOGERR('URL already exists') - return False - except sqlite3.OperationalError as e: - LOGERR(e) - return False - finally: - self.commit_delete() + except sqlite3.OperationalError as e: + LOGERR(e) + return False + finally: + self.commit_delete() return True @@ -1195,18 +1228,19 @@ class BukuDb: LOGERR('custom_url is only supported for a singular index') return False - if not indices: - self.cur.execute('SELECT id, url, tags, flags FROM bookmarks ORDER BY id ASC') - else: - placeholder = ', '.join(['?'] * len(indices)) - self.cur.execute(f'SELECT id, url, tags, flags FROM bookmarks WHERE id IN ({placeholder}) ORDER BY id ASC', - tuple(indices)) - - resultset = self.cur.fetchall() - recs = len(resultset) - if not recs: - LOGERR('No matching index or title immutable or empty DB') - return False + with self.lock: + if not indices: + self.cur.execute('SELECT id, url, tags, flags FROM bookmarks ORDER BY id ASC') + else: + placeholder = ', '.join(['?'] * len(indices)) + self.cur.execute(f'SELECT id, url, tags, flags FROM bookmarks WHERE id IN ({placeholder}) ORDER BY id ASC', + tuple(indices)) + + resultset = self.cur.fetchall() + recs = len(resultset) + if not recs: + LOGERR('No matching index or title immutable or empty DB') + return False # Set up strings to be printed if self.colorize: @@ -1231,146 +1265,139 @@ class BukuDb: if not MYHEADERS: gen_headers() - cond = threading.Condition() - cond.acquire() - - def refresh(count, cond): + def refresh(thread_idx, cond): """Inner function to fetch titles and update records. Parameters ---------- - count : int - Dummy input to adhere to convention. + thread_idx : int + Thread index/ID. cond : threading condition object. """ - count = 0 + _count = 0 while True: query = 'UPDATE bookmarks SET' arguments = [] - cond.acquire() - if resultset: - id, url, tags, flags = resultset.pop() - else: - cond.release() - break - cond.release() + with cond: + if resultset: + id, url, tags, flags = resultset.pop() + else: + break result = fetch_data(custom_url or url, http_head=(flags & FLAG_IMMUTABLE) > 0) - count += 1 + _count += 1 - cond.acquire() + with cond: + if result.bad: + print(bad_url_str % id) + if custom_tags: + self.cur.execute('UPDATE bookmarks SET tags = ? WHERE id = ?', (custom_tags, id)) + continue - if result.bad: - print(bad_url_str % id) - if custom_tags: - self.cur.execute('UPDATE bookmarks SET tags = ? WHERE id = ?', (custom_tags, id)) - cond.release() - continue + if result.fetch_status in (del_error or []): + if result.fetch_status in export_on: + self._to_export[url] = self.get_rec_by_id(id, lock=False) + LOGERR('HTTP error %s', result.fetch_status) + self._to_delete += [id] + if result.mime and self.chatty: + print(mime_str % id) + if custom_tags: + self.cur.execute('UPDATE bookmarks SET tags = ? WHERE id = ?', (custom_tags, id)) + continue - if result.fetch_status in (del_error or []): - if result.fetch_status in export_on: - self._to_export[url] = self.get_rec_by_id(id) - LOGERR('HTTP error %s', result.fetch_status) - self._to_delete += [id] - if result.mime and self.chatty: - print(mime_str % id) - if custom_tags: - self.cur.execute('UPDATE bookmarks SET tags = ? WHERE id = ?', (custom_tags, id)) - cond.release() - continue + if result.mime: + if self.chatty: + print(mime_str % id) + if custom_tags: + self.cur.execute('UPDATE bookmarks SET tags = ? WHERE id = ?', (custom_tags, id)) + continue - if result.mime: - if self.chatty: - print(mime_str % id) - if custom_tags: - self.cur.execute('UPDATE bookmarks SET tags = ? WHERE id = ?', (custom_tags, id)) - cond.release() - continue + if not result.title: + LOGERR(blank_url_str, id) + elif update_title: + query += ' metadata = ?,' + arguments += (result.title,) - if not result.title: - LOGERR(blank_url_str, id) - elif update_title: - query += ' metadata = ?,' - arguments += (result.title,) + if update_title and result.desc: + query += ' desc = ?,' + arguments += (result.desc,) - if update_title and result.desc: - query += ' desc = ?,' - arguments += (result.desc,) + _url = url + if url_redirect and result.url != url: + query += ' url = ?,' + arguments += (result.url,) + _url = result.url - _url = url - if url_redirect and result.url != url: - query += ' url = ?,' - arguments += (result.url,) - _url = result.url + if result.fetch_status in export_on: + self._to_export[_url] = url - if result.fetch_status in export_on: - self._to_export[_url] = url + _tags = result.tags(keywords=False, redirect=tag_redirect, error=tag_error) + if _tags: + query += ' tags = ?,' + arguments += (taglist_str((custom_tags or tags) + DELIM + _tags),) + elif custom_tags: + query += ' tags = ?,' + arguments += (taglist_str(custom_tags),) - _tags = result.tags(keywords=False, redirect=tag_redirect, error=tag_error) - if _tags: - query += ' tags = ?,' - arguments += (taglist_str((custom_tags or tags) + DELIM + _tags),) - elif custom_tags: - query += ' tags = ?,' - arguments += (taglist_str(custom_tags),) - - if not arguments: # nothing to update - cond.release() - continue + if not arguments: # nothing to update + continue - query = query[:-1] + ' WHERE id = ?' - arguments += (id,) - LOGDBG('refreshdb query: "%s", args: %s', query, arguments) + query = query[:-1] + ' WHERE id = ?' + arguments += (id,) + LOGDBG('refreshdb query: "%s", args: %s', query, arguments) - self.cur.execute(query, arguments) + self.cur.execute(query, arguments) - # Save after fetching 32 titles per thread - if count % 32 == 0: - self.conn.commit() + # Save after fetching 32 titles per thread + if _count % 32 == 0: + self.conn.commit() - if self.chatty: - print(success_str % (result.title, id)) - cond.release() + if self.chatty: + print(success_str % (result.title, id)) if INTERRUPTED: break - LOGDBG('Thread %d: processed %d', threading.get_ident(), count) + LOGDBG('Thread %d: processed %d', threading.get_ident(), _count) with cond: done['value'] += 1 - processed['value'] += count + processed['value'] += _count cond.notify() - threads = min(threads, recs) + with self.lock: # preventing external concurrent access + cond = threading.Condition() + with cond: # preventing concurrent access between workers + threads = min(threads, recs) - for i in range(threads): - thread = threading.Thread(target=refresh, args=(i, cond)) - thread.start() + for i in range(threads): + thread = threading.Thread(target=refresh, args=(i, cond)) + thread.start() - while done['value'] < threads: - cond.wait() - LOGDBG('%d threads completed', done['value']) + while done['value'] < threads: + cond.wait() + LOGDBG('%d threads completed', done['value']) - # Guard: records found == total records processed - if recs != processed['value']: - LOGERR('Records: %d, processed: %d !!!', recs, processed['value']) + # Guard: records found == total records processed + if recs != processed['value']: + LOGERR('Records: %d, processed: %d !!!', recs, processed['value']) + + if delay_delete: + self.conn.commit() + else: + self.commit_delete() - cond.release() - if delay_delete: - self.conn.commit() - else: - self.commit_delete() return True def commit_delete(self, apply: bool = True): """Commit delayed delete commands.""" if apply and self._to_delete is not None: - for id in sorted(set(self._to_delete), reverse=True): - self.delete_rec(id, delay_commit=True, chatty=False) - self.conn.commit() + with self.lock: + for id in sorted(set(self._to_delete), reverse=True): + self.delete_rec(id, delay_commit=True, chatty=False) + self.conn.commit() self._to_delete = None def edit_update_rec(self, index, immutable=None): @@ -1443,8 +1470,9 @@ class BukuDb: else: end = int(val[1]) qtemp = 'SELECT id FROM bookmarks ORDER BY id DESC limit {0}'.format(end) - self.cur.execute(qtemp, []) - part_ids = list(chain.from_iterable(self.cur.fetchall())) + with self.lock: + self.cur.execute(qtemp, []) + part_ids = list(chain.from_iterable(self.cur.fetchall())) q0 += ','.join(list(map(str, part_ids))) else: q0 += idx + ',' @@ -1722,10 +1750,11 @@ class BukuDb: if max_id > index: results = self._fetch(query1, max_id) for row in results: - self.cur.execute(query2, (row.id,)) - self.cur.execute(query3, (index, row.url, row.title, row.tags_raw, row.desc, row.flags)) - if not delay_commit: - self.conn.commit() + with self.lock: + self.cur.execute(query2, (row.id,)) + self.cur.execute(query3, (index, row.url, row.title, row.tags_raw, row.desc, row.flags)) + if not delay_commit: + self.conn.commit() if self.chatty: print('Index %d moved to %d' % (row.id, index)) @@ -1834,9 +1863,10 @@ class BukuDb: try: if chatty: - self.cur.execute('SELECT COUNT(*) from bookmarks where id ' - 'BETWEEN ? AND ?', (low, high)) - count = self.cur.fetchone() + with self.lock: + self.cur.execute('SELECT COUNT(*) from bookmarks where id ' + 'BETWEEN ? AND ?', (low, high)) + count = self.cur.fetchone() if count[0] < 1: print('Index %d-%d: 0 deleted' % (low, high)) return False @@ -1847,19 +1877,21 @@ class BukuDb: return False query = 'DELETE from bookmarks where id BETWEEN ? AND ?' - self.cur.execute(query, (low, high)) - print('Index %d-%d: %d deleted' % (low, high, self.cur.rowcount)) - if not self.cur.rowcount: - return False + with self.lock: + self.cur.execute(query, (low, high)) + print('Index %d-%d: %d deleted' % (low, high, self.cur.rowcount)) + if not self.cur.rowcount: + return False # Compact DB by ascending order of index to ensure # the existing higher indices move only once # Delayed commit is forced - for index in range(low, high + 1): - self.compactdb(index, delay_commit=True) + with self.lock: + for index in range(low, high + 1): + self.compactdb(index, delay_commit=True) - if not delay_commit: - self.conn.commit() + if not delay_commit: + self.conn.commit() except IndexError: LOGERR('No matching index') return False @@ -1868,9 +1900,10 @@ class BukuDb: else: # Remove a single entry try: if chatty: - self.cur.execute('SELECT COUNT(*) FROM bookmarks WHERE ' - 'id = ? LIMIT 1', (index,)) - count = self.cur.fetchone() + with self.lock: + self.cur.execute('SELECT COUNT(*) FROM bookmarks WHERE ' + 'id = ? LIMIT 1', (index,)) + count = self.cur.fetchone() if count[0] < 1: LOGERR('No matching index %d', index) return False @@ -1880,16 +1913,17 @@ class BukuDb: if resp != 'y': return False - query = 'DELETE FROM bookmarks WHERE id = ?' - self.cur.execute(query, (index,)) - if self.cur.rowcount == 1: - print('Index %d deleted' % index) - self.compactdb(index, delay_commit=True) - if not delay_commit: - self.conn.commit() - else: - LOGERR('No matching index %d', index) - return False + with self.lock: + query = 'DELETE FROM bookmarks WHERE id = ?' + self.cur.execute(query, (index,)) + if self.cur.rowcount == 1: + print('Index %d deleted' % index) + self.compactdb(index, delay_commit=True) + if not delay_commit: + self.conn.commit() + else: + LOGERR('No matching index %d', index) + return False except IndexError: LOGERR('No matching index %d', index) return False @@ -1924,16 +1958,13 @@ class BukuDb: return False # delete records in reverse order - pos = len(results) - 1 - while pos >= 0: - idx = results[pos][0] - self.delete_rec(idx, delay_commit=True) - - # Commit at every 200th removal - if pos % 200 == 0: - self.conn.commit() + with self.lock: + for pos, row in reversed(list(enumerate(results))): + self.delete_rec(row[0], delay_commit=True) - pos -= 1 + # Commit at every 200th removal, counting from the end + if pos % 200 == 0: + self.conn.commit() return True @@ -1953,9 +1984,10 @@ class BukuDb: """ try: - self.cur.execute('DELETE FROM bookmarks') - if not delay_commit: - self.conn.commit() + with self.lock: + self.cur.execute('DELETE FROM bookmarks') + if not delay_commit: + self.conn.commit() return True except Exception as e: LOGERR('delete_rec_all(): %s', e) @@ -1976,8 +2008,9 @@ class BukuDb: return False if self.delete_rec_all(): - self.cur.execute('VACUUM') - self.conn.commit() + with self.lock: + self.cur.execute('VACUUM') + self.conn.commit() print('All bookmarks deleted') return True @@ -2063,12 +2096,13 @@ class BukuDb: try: # If range starts from 0 print all records - if low == 0: - query = 'SELECT * from bookmarks' - resultset = self.cur.execute(query) - else: - query = 'SELECT * from bookmarks where id BETWEEN ? AND ?' - resultset = self.cur.execute(query, (low, high)) + with self.lock: + if low == 0: + query = 'SELECT * from bookmarks' + resultset = self.cur.execute(query) + else: + query = 'SELECT * from bookmarks where id BETWEEN ? AND ?' + resultset = self.cur.execute(query, (low, high)) except IndexError: LOGERR('Index out of range') return False @@ -2094,8 +2128,9 @@ class BukuDb: return True else: # Show all entries - self.cur.execute('SELECT * FROM bookmarks ORDER BY id') - resultset = self.cur.fetchall() + with self.lock: + self.cur.execute('SELECT * FROM bookmarks ORDER BY id') + resultset = self.cur.fetchall() if not resultset: LOGERR('0 records') @@ -2124,14 +2159,15 @@ class BukuDb: unique_tags = [] dic = {} qry = 'SELECT DISTINCT tags, COUNT(tags) FROM bookmarks GROUP BY tags' - for row in self.cur.execute(qry): - tagset = row[0].strip(DELIM).split(DELIM) - for tag in tagset: - if tag not in tags: - dic[tag] = row[1] - tags += (tag,) - else: - dic[tag] += row[1] + with self.lock: + for row in self.cur.execute(qry): + tagset = row[0].strip(DELIM).split(DELIM) + for tag in tagset: + if tag not in tags: + dic[tag] = row[1] + tags += (tag,) + else: + dic[tag] += row[1] if not tags: return tags, dic @@ -2167,8 +2203,9 @@ class BukuDb: if tag == '': continue - self.cur.execute(qry, ('%' + delim_wrap(tag) + '%',)) - results = self.cur.fetchall() + with self.lock: + self.cur.execute(qry, ('%' + delim_wrap(tag) + '%',)) + results = self.cur.fetchall() for row in results: # update tagset with unique tags in row tagset |= set(row[0].strip(DELIM).split(DELIM)) @@ -2234,18 +2271,19 @@ class BukuDb: raise RuntimeError("Tag deletion failed.") # Update bookmarks with original tag - query = 'SELECT id, tags FROM bookmarks WHERE tags LIKE ?' - self.cur.execute(query, ('%' + orig + '%',)) - results = self.cur.fetchall() - if results: - query = 'UPDATE bookmarks SET tags = ? WHERE id = ?' - for row in results: - tags = row[1].replace(orig, newtags) - tags = parse_tags([tags]) - self.cur.execute(query, (tags, row[0],)) - print('Index %d updated' % row[0]) + with self.lock: + query = 'SELECT id, tags FROM bookmarks WHERE tags LIKE ?' + self.cur.execute(query, ('%' + orig + '%',)) + results = self.cur.fetchall() + if results: + query = 'UPDATE bookmarks SET tags = ? WHERE id = ?' + for row in results: + tags = row[1].replace(orig, newtags) + tags = parse_tags([tags]) + self.cur.execute(query, (tags, row[0],)) + print('Index %d updated' % row[0]) - self.conn.commit() + self.conn.commit() def get_tagstr_from_taglist(self, id_list, taglist): """Get a string of delimiter-separated (and enclosed) string @@ -2327,50 +2365,51 @@ class BukuDb: if flag != 2: index += 1 - update_count = 0 - query = 'UPDATE bookmarks SET tags = ? WHERE id = ?' - try: - db_id_list = cmdstr[index + 1:].split() - for id in db_id_list: - if is_int(id) and int(id) > 0: - if flag == 1: - if self.append_tag_at_index(id, tags, True): - update_count += 1 - elif flag == 2: - tags = parse_tags([tags]) - self.cur.execute(query, (tags, id,)) - update_count += self.cur.rowcount - else: - self.delete_tag_at_index(id, tags, True) - update_count += 1 - elif '-' in id: - vals = [int(x) for x in id.split('-')] - if vals[0] > vals[-1]: - vals[0], vals[-1] = vals[-1], vals[0] - - for _id in range(vals[0], vals[-1] + 1): + with self.lock: + update_count = 0 + query = 'UPDATE bookmarks SET tags = ? WHERE id = ?' + try: + db_id_list = cmdstr[index + 1:].split() + for id in db_id_list: + if is_int(id) and int(id) > 0: if flag == 1: - if self.append_tag_at_index(_id, tags, True): + if self.append_tag_at_index(id, tags, True): update_count += 1 elif flag == 2: tags = parse_tags([tags]) - self.cur.execute(query, (tags, _id,)) + self.cur.execute(query, (tags, id,)) update_count += self.cur.rowcount else: - if self.delete_tag_at_index(_id, tags, True): - update_count += 1 - else: - return -1 - except ValueError: - return -1 - except sqlite3.IntegrityError: - return -1 + self.delete_tag_at_index(id, tags, True) + update_count += 1 + elif '-' in id: + vals = [int(x) for x in id.split('-')] + if vals[0] > vals[-1]: + vals[0], vals[-1] = vals[-1], vals[0] + + for _id in range(vals[0], vals[-1] + 1): + if flag == 1: + if self.append_tag_at_index(_id, tags, True): + update_count += 1 + elif flag == 2: + tags = parse_tags([tags]) + self.cur.execute(query, (tags, _id,)) + update_count += self.cur.rowcount + else: + if self.delete_tag_at_index(_id, tags, True): + update_count += 1 + else: + return -1 + except ValueError: + return -1 + except sqlite3.IntegrityError: + return -1 - try: - self.conn.commit() - except Exception as e: - LOGERR(e) - return -1 + try: + self.conn.commit() + except Exception as e: + LOGERR(e) + return -1 return update_count @@ -2409,8 +2448,9 @@ class BukuDb: raise IndexError qry = 'SELECT URL from bookmarks where id BETWEEN ? AND ?' - for row in self.cur.execute(qry, (low, high)): - browse(row[0]) + with self.lock: + for row in self.cur.execute(qry, (low, high)): + browse(row[0]) return True except IndexError: LOGERR('Index out of range') @@ -2421,9 +2461,10 @@ class BukuDb: return False if index == 0: - qry = 'SELECT id from bookmarks ORDER BY RANDOM() LIMIT 1' - self.cur.execute(qry) - result = self.cur.fetchone() + with self.lock: + qry = 'SELECT id from bookmarks ORDER BY RANDOM() LIMIT 1' + self.cur.execute(qry) + result = self.cur.fetchone() # Return if no entries in DB if result is None: @@ -2435,9 +2476,10 @@ class BukuDb: qry = 'SELECT URL FROM bookmarks WHERE id = ? LIMIT 1' try: - for row in self.cur.execute(qry, (index,)): - browse(row[0]) - return True + with self.lock: + for row in self.cur.execute(qry, (index,)): + browse(row[0]) + return True LOGERR('No matching index %d', index) except IndexError: LOGERR('No matching index %d', index) @@ -2771,78 +2813,79 @@ class BukuDb: resp = 'y' add_parent_folder_as_tag = resp == 'y' - resp = 'y' + with self.lock: + resp = 'y' - try: - if os.path.isfile(os.path.expanduser(gc_bm_db_path)): - if self.chatty: - resp = input('Import bookmarks from google chrome? (y/n): ') - if resp == 'y': - bookmarks_database = os.path.expanduser(gc_bm_db_path) - if not os.path.exists(bookmarks_database): - raise FileNotFoundError - self.load_chrome_database(bookmarks_database, newtag, add_parent_folder_as_tag) - except Exception as e: - LOGERR(e) - print('Could not import bookmarks from google-chrome') + try: + if os.path.isfile(os.path.expanduser(gc_bm_db_path)): + if self.chatty: + resp = input('Import bookmarks from google chrome? (y/n): ') + if resp == 'y': + bookmarks_database = os.path.expanduser(gc_bm_db_path) + if not os.path.exists(bookmarks_database): + raise FileNotFoundError + self.load_chrome_database(bookmarks_database, newtag, add_parent_folder_as_tag) + except Exception as e: + LOGERR(e) + print('Could not import bookmarks from google-chrome') - try: - if os.path.isfile(os.path.expanduser(cb_bm_db_path)): - if self.chatty: - resp = input('Import bookmarks from chromium? (y/n): ') - if resp == 'y': - bookmarks_database = os.path.expanduser(cb_bm_db_path) - if not os.path.exists(bookmarks_database): - raise FileNotFoundError - self.load_chrome_database(bookmarks_database, newtag, add_parent_folder_as_tag) - except Exception as e: - LOGERR(e) - print('Could not import bookmarks from chromium') + try: + if os.path.isfile(os.path.expanduser(cb_bm_db_path)): + if self.chatty: + resp = input('Import bookmarks from chromium? (y/n): ') + if resp == 'y': + bookmarks_database = os.path.expanduser(cb_bm_db_path) + if not os.path.exists(bookmarks_database): + raise FileNotFoundError + self.load_chrome_database(bookmarks_database, newtag, add_parent_folder_as_tag) + except Exception as e: + LOGERR(e) + print('Could not import bookmarks from chromium') - try: - if os.path.isfile(os.path.expanduser(vi_bm_db_path)): - if self.chatty: - resp = input('Import bookmarks from Vivaldi? (y/n): ') - if resp == 'y': - bookmarks_database = os.path.expanduser(vi_bm_db_path) - if not os.path.exists(bookmarks_database): - raise FileNotFoundError - self.load_chrome_database(bookmarks_database, newtag, add_parent_folder_as_tag) - except Exception as e: - LOGERR(e) - print('Could not import bookmarks from Vivaldi') + try: + if os.path.isfile(os.path.expanduser(vi_bm_db_path)): + if self.chatty: + resp = input('Import bookmarks from Vivaldi? (y/n): ') + if resp == 'y': + bookmarks_database = os.path.expanduser(vi_bm_db_path) + if not os.path.exists(bookmarks_database): + raise FileNotFoundError + self.load_chrome_database(bookmarks_database, newtag, add_parent_folder_as_tag) + except Exception as e: + LOGERR(e) + print('Could not import bookmarks from Vivaldi') - try: - ff_bm_db_paths = {k: s for k, s in ff_bm_db_paths.items() if os.path.isfile(os.path.expanduser(s))} - for idx, (name, ff_bm_db_path) in enumerate(ff_bm_db_paths.items(), start=1): - if self.chatty: - profile = ('' if len(ff_bm_db_paths) < 2 else - f' profile {name} [{idx}/{len(ff_bm_db_paths)}]') - resp = input(f'Import bookmarks from Firefox{profile}? (y/n): ') - if resp == 'y': - bookmarks_database = os.path.expanduser(ff_bm_db_path) - if not os.path.exists(bookmarks_database): - raise FileNotFoundError - self.load_firefox_database(bookmarks_database, newtag, add_parent_folder_as_tag) - break - except Exception as e: - LOGERR(e) - print('Could not import bookmarks from Firefox.') + try: + ff_bm_db_paths = {k: s for k, s in ff_bm_db_paths.items() if os.path.isfile(os.path.expanduser(s))} + for idx, (name, ff_bm_db_path) in enumerate(ff_bm_db_paths.items(), start=1): + if self.chatty: + profile = ('' if len(ff_bm_db_paths) < 2 else + f' profile {name} [{idx}/{len(ff_bm_db_paths)}]') + resp = input(f'Import bookmarks from Firefox{profile}? (y/n): ') + if resp == 'y': + bookmarks_database = os.path.expanduser(ff_bm_db_path) + if not os.path.exists(bookmarks_database): + raise FileNotFoundError + self.load_firefox_database(bookmarks_database, newtag, add_parent_folder_as_tag) + break + except Exception as e: + LOGERR(e) + print('Could not import bookmarks from Firefox.') - try: - if os.path.isfile(os.path.expanduser(me_bm_db_path)): - if self.chatty: - resp = input('Import bookmarks from microsoft edge? (y/n): ') - if resp == 'y': - bookmarks_database = os.path.expanduser(me_bm_db_path) - if not os.path.exists(bookmarks_database): - raise FileNotFoundError - self.load_edge_database(bookmarks_database, newtag, add_parent_folder_as_tag) - except Exception as e: - LOGERR(e) - print('Could not import bookmarks from microsoft-edge') + try: + if os.path.isfile(os.path.expanduser(me_bm_db_path)): + if self.chatty: + resp = input('Import bookmarks from microsoft edge? (y/n): ') + if resp == 'y': + bookmarks_database = os.path.expanduser(me_bm_db_path) + if not os.path.exists(bookmarks_database): + raise FileNotFoundError + self.load_edge_database(bookmarks_database, newtag, add_parent_folder_as_tag) + except Exception as e: + LOGERR(e) + print('Could not import bookmarks from microsoft-edge') - self.conn.commit() + self.conn.commit() if newtag: print('\nAuto-generated tag: %s' % newtag) @@ -2962,13 +3005,14 @@ n: don't add parent folder as tag items = import_html(soup, add_parent_folder_as_tag, newtag, use_nested_folder_structure) infp.close() - for item in items: - add_rec_res = self.add_rec(*item) - if not add_rec_res and append_tags_resp == 'y': - rec_id = self.get_rec_id(item[0]) - self.append_tag_at_index(rec_id, item[2]) + with self.lock: + for item in items: + add_rec_res = self.add_rec(*item) + if not add_rec_res and append_tags_resp == 'y': + rec_id = self.get_rec_id(item[0]) + self.append_tag_at_index(rec_id, item[2]) - self.conn.commit() + self.conn.commit() if newtag: print('\nAuto-generated tag: %s' % newtag) @@ -3001,10 +3045,11 @@ n: don't add parent folder as tag resultset = indb_cur.fetchall() if resultset: - for row in bookmark_vars(resultset): - self.add_rec(row.url, row.title, row.tags_raw, row.desc, row.flags, True, False) + with self.lock: + for row in bookmark_vars(resultset): + self.add_rec(row.url, row.title, row.tags_raw, row.desc, row.flags, True, False) - self.conn.commit() + self.conn.commit() try: indb_cur.close() @@ -3043,8 +3088,9 @@ n: don't add parent folder as tag return None if index: - self.cur.execute('SELECT url FROM bookmarks WHERE id = ? LIMIT 1', (index,)) - results = self.cur.fetchall() + with self.lock: + self.cur.execute('SELECT url FROM bookmarks WHERE id = ? LIMIT 1', (index,)) + results = self.cur.fetchall() if not results: return None @@ -3152,23 +3198,24 @@ n: don't add parent folder as tag """ to_commit = False - self.cur.execute('SELECT id, tags FROM bookmarks ORDER BY id ASC') - resultset = self.cur.fetchall() - query = 'UPDATE bookmarks SET tags = ? WHERE id = ?' - for row in resultset: - oldtags = row[1] - if oldtags == DELIM: - continue + with self.lock: + self.cur.execute('SELECT id, tags FROM bookmarks ORDER BY id ASC') + resultset = self.cur.fetchall() + query = 'UPDATE bookmarks SET tags = ? WHERE id = ?' + for row in resultset: + oldtags = row[1] + if oldtags == DELIM: + continue - tags = parse_tags([oldtags]) - if tags == oldtags: - continue + tags = parse_tags([oldtags]) + if tags == oldtags: + continue - self.cur.execute(query, (tags, row[0],)) - to_commit = True + self.cur.execute(query, (tags, row[0],)) + to_commit = True - if to_commit: - self.conn.commit() + if to_commit: + self.conn.commit() def close(self): """Close a DB connection.""" diff --git a/setup.py b/setup.py index 4b4b4630..7d875aa3 100644 --- a/setup.py +++ b/setup.py @@ -26,6 +26,7 @@ 'pylint>=1.7.2', 'pytest-cov', 'pytest-recording>=0.12.1', + 'pytest-timeout', 'pytest>=6.2.1', 'PyYAML>=4.2b1', 'setuptools>=41.0.1', diff --git a/tests/pytest.ini b/tests/pytest.ini index 30780dbd..e3e514d2 100644 --- a/tests/pytest.ini +++ b/tests/pytest.ini @@ -1,4 +1,6 @@ [pytest] +timeout = 10 +timeout_method = thread markers = non_tox: not run on tox slow: slow tests diff --git a/tox.ini b/tox.ini index cb8a89c5..42698f32 100644 --- a/tox.ini +++ b/tox.ini @@ -32,6 +32,8 @@ ignore = E203, [pytest] +timeout = 10 +timeout_method = thread markers = non_tox: not run on tox slow: slow tests