Skip to content

Commit

Permalink
Fixed:
Browse files Browse the repository at this point in the history
- Journal Sync will not hang up during big replication, especially the initial one.
- All changes which have been replicated while rebuilding will not be postponed (Previous behaviour).
Improved:
- Now Journal Sync works efficiently in download and parse, or pack and upload.
- Less server storage and faster packing/unpacking usage by the new chunk format.
  • Loading branch information
vrtmrz committed Apr 30, 2024
1 parent 7b5f7d0 commit daa3fee
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 24 deletions.
6 changes: 3 additions & 3 deletions src/CmdSetupLiveSync.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { type EntryDoc, type ObsidianLiveSyncSettings, DEFAULT_SETTINGS, LOG_LEVEL_NOTICE, REMOTE_COUCHDB } from "./lib/src/types";
import { type EntryDoc, type ObsidianLiveSyncSettings, DEFAULT_SETTINGS, LOG_LEVEL_NOTICE, REMOTE_COUCHDB, REMOTE_MINIO } from "./lib/src/types";
import { configURIBase } from "./types";
import { Logger } from "./lib/src/logger";
import { PouchDB } from "./lib/src/pouchdb-browser.js";
Expand Down Expand Up @@ -312,15 +312,15 @@ Of course, we are able to disable these features.`
}
async suspendReflectingDatabase() {
if (this.plugin.settings.doNotSuspendOnFetching) return;
// if (this.plugin.settings.remoteType == REMOTE_MINIO) return;
if (this.plugin.settings.remoteType == REMOTE_MINIO) return;
Logger(`Suspending reflection: Database and storage changes will not be reflected in each other until completely finished the fetching.`, LOG_LEVEL_NOTICE);
this.plugin.settings.suspendParseReplicationResult = true;
this.plugin.settings.suspendFileWatching = true;
await this.plugin.saveSettings();
}
async resumeReflectingDatabase() {
if (this.plugin.settings.doNotSuspendOnFetching) return;
// if (this.plugin.settings.remoteType == REMOTE_MINIO) return;
if (this.plugin.settings.remoteType == REMOTE_MINIO) return;
Logger(`Database and storage reflection has been resumed!`, LOG_LEVEL_NOTICE);
this.plugin.settings.suspendParseReplicationResult = false;
this.plugin.settings.suspendFileWatching = false;
Expand Down
26 changes: 13 additions & 13 deletions src/KeyValueDB.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { deleteDB, type IDBPDatabase, openDB } from "idb";
export interface KeyValueDatabase {
get<T>(key: string): Promise<T>;
set<T>(key: string, value: T): Promise<IDBValidKey>;
del(key: string): Promise<void>;
get<T>(key: IDBValidKey): Promise<T>;
set<T>(key: IDBValidKey, value: T): Promise<IDBValidKey>;
del(key: IDBValidKey): Promise<void>;
clear(): Promise<void>;
keys(query?: IDBValidKey | IDBKeyRange, count?: number): Promise<IDBValidKey[]>;
close(): void;
Expand All @@ -23,20 +23,20 @@ export const OpenKeyValueDatabase = async (dbKey: string): Promise<KeyValueDatab
const db = await dbPromise;
databaseCache[dbKey] = db;
return {
get<T>(key: string): Promise<T> {
return db.get(storeKey, key);
async get<T>(key: IDBValidKey): Promise<T> {
return await db.get(storeKey, key);
},
set<T>(key: string, value: T) {
return db.put(storeKey, value, key);
async set<T>(key: IDBValidKey, value: T) {
return await db.put(storeKey, value, key);
},
del(key: string) {
return db.delete(storeKey, key);
async del(key: IDBValidKey) {
return await db.delete(storeKey, key);
},
clear() {
return db.clear(storeKey);
async clear() {
return await db.clear(storeKey);
},
keys(query?: IDBValidKey | IDBKeyRange, count?: number) {
return db.getAllKeys(storeKey, query, count);
async keys(query?: IDBValidKey | IDBKeyRange, count?: number) {
return await db.getAllKeys(storeKey, query, count);
},
close() {
delete databaseCache[dbKey];
Expand Down
22 changes: 18 additions & 4 deletions src/ObsidianLiveSyncSettingTab.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2331,7 +2331,7 @@ ${stringifyYaml(pluginConfig)}`;
.setWarning()
.setDisabled(false)
.onClick(async () => {
await this.plugin.getMinioJournalSyncClient().updateCheckPointInfo((info) => ({ ...info, receivedFiles: [], knownIDs: [] }));
await this.plugin.getMinioJournalSyncClient().updateCheckPointInfo((info) => ({ ...info, receivedFiles: new Set(), knownIDs: new Set() }));
Logger(`Journal received history has been cleared.`, LOG_LEVEL_NOTICE);
})
)
Expand All @@ -2344,7 +2344,7 @@ ${stringifyYaml(pluginConfig)}`;
.setWarning()
.setDisabled(false)
.onClick(async () => {
await this.plugin.getMinioJournalSyncClient().updateCheckPointInfo((info) => ({ ...info, lastLocalSeq: 0, sentIDs: [], sentFiles: [] }));
await this.plugin.getMinioJournalSyncClient().updateCheckPointInfo((info) => ({ ...info, lastLocalSeq: 0, sentIDs: new Set(), sentFiles: new Set() }));
Logger(`Journal sent history has been cleared.`, LOG_LEVEL_NOTICE);
})
)
Expand All @@ -2357,10 +2357,24 @@ ${stringifyYaml(pluginConfig)}`;
.setWarning()
.setDisabled(false)
.onClick(async () => {
await this.plugin.getMinioJournalSyncClient().updateCheckPointInfo((info) => ({ ...info, receivedFiles: [], knownIDs: [], lastLocalSeq: 0, sentIDs: [], sentFiles: [] }));
await this.plugin.getMinioJournalSyncClient().resetCheckpointInfo();
Logger(`Journal exchange history has been cleared.`, LOG_LEVEL_NOTICE);
})
)
new Setting(containerMaintenanceEl)
.setName("Purge all journal counter")
.setDesc("Purge all sending and downloading cache.")
.addButton((button) =>
button
.setButtonText("Reset all")
.setWarning()
.setDisabled(false)
.onClick(async () => {
await this.plugin.getMinioJournalSyncClient().resetAllCaches();
Logger(`Journal sending and downloading cache has been cleared.`, LOG_LEVEL_NOTICE);
})
)

new Setting(containerMaintenanceEl)
.setName("Make empty the bucket")
.setDesc("Delete all data on the remote.")
Expand All @@ -2370,7 +2384,7 @@ ${stringifyYaml(pluginConfig)}`;
.setWarning()
.setDisabled(false)
.onClick(async () => {
await this.plugin.getMinioJournalSyncClient().updateCheckPointInfo((info) => ({ ...info, receivedFiles: [], knownIDs: [], lastLocalSeq: 0, sentIDs: [], sentFiles: [] }));
await this.plugin.getMinioJournalSyncClient().updateCheckPointInfo((info) => ({ ...info, receivedFiles: new Set(), knownIDs: new Set(), lastLocalSeq: 0, sentIDs: new Set(), sentFiles: new Set() }));
await this.plugin.resetRemoteBucket();
Logger(`the bucket has been cleared.`, LOG_LEVEL_NOTICE);
})
Expand Down
2 changes: 1 addition & 1 deletion src/lib
6 changes: 3 additions & 3 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { type Diff, DIFF_DELETE, DIFF_EQUAL, DIFF_INSERT, diff_match_patch, stri
import { Notice, Plugin, TFile, addIcon, TFolder, normalizePath, TAbstractFile, Editor, MarkdownView, type RequestUrlParam, type RequestUrlResponse, requestUrl, type MarkdownFileInfo } from "./deps";
import { type EntryDoc, type LoadedEntry, type ObsidianLiveSyncSettings, type diff_check_result, type diff_result_leaf, type EntryBody, LOG_LEVEL, VER, DEFAULT_SETTINGS, type diff_result, FLAGMD_REDFLAG, SYNCINFO_ID, SALT_OF_PASSPHRASE, type ConfigPassphraseStore, type CouchDBConnection, FLAGMD_REDFLAG2, FLAGMD_REDFLAG3, PREFIXMD_LOGFILE, type DatabaseConnectingStatus, type EntryHasPath, type DocumentID, type FilePathWithPrefix, type FilePath, type AnyEntry, LOG_LEVEL_DEBUG, LOG_LEVEL_INFO, LOG_LEVEL_NOTICE, LOG_LEVEL_URGENT, LOG_LEVEL_VERBOSE, type SavingEntry, MISSING_OR_ERROR, NOT_CONFLICTED, AUTO_MERGED, CANCELLED, LEAVE_TO_SUBSEQUENT, FLAGMD_REDFLAG2_HR, FLAGMD_REDFLAG3_HR, REMOTE_MINIO, REMOTE_COUCHDB, type BucketSyncSetting, } from "./lib/src/types";
import { type InternalFileInfo, type CacheData, type FileEventItem, FileWatchEventQueueMax } from "./types";
import { arrayToChunkedArray, createBlob, delay, determineTypeFromBlob, fireAndForget, getDocData, isAnyNote, isDocContentSame, isObjectDifferent, readContent, sendValue, throttle } from "./lib/src/utils";
import { arrayToChunkedArray, createBlob, delay, determineTypeFromBlob, fireAndForget, getDocData, isAnyNote, isDocContentSame, isObjectDifferent, readContent, sendValue, throttle, type SimpleStore } from "./lib/src/utils";
import { Logger, setGlobalLogFunction } from "./lib/src/logger";
import { PouchDB } from "./lib/src/pouchdb-browser.js";
import { ConflictResolveModal } from "./ConflictResolveModal";
Expand Down Expand Up @@ -37,7 +37,7 @@ import { initializeStores } from "./stores.js";
import { JournalSyncMinio } from "./lib/src/JournalSyncMinio.js";
import { LiveSyncJournalReplicator, type LiveSyncJournalReplicatorEnv } from "./lib/src/LiveSyncJournalReplicator.js";
import { LiveSyncCouchDBReplicator, type LiveSyncCouchDBReplicatorEnv } from "./lib/src/LiveSyncReplicator.js";
import type { CheckPointInfo, SimpleStore } from "./lib/src/JournalSyncTypes.js";
import type { CheckPointInfo } from "./lib/src/JournalSyncTypes.js";
import { ObsHttpHandler } from "./ObsHttpHandler.js";

setNoticeClass(Notice);
Expand Down Expand Up @@ -477,7 +477,7 @@ export default class ObsidianLiveSyncPlugin extends Plugin
},
keys: async (from: string | undefined, to: string | undefined, count?: number | undefined): Promise<string[]> => {
const ret = this.kvDB.keys(IDBKeyRange.bound(`os-${from || ""}`, `os-${to || ""}`), count);
return (await ret).map(e => e.toString());
return (await ret).map(e => e.toString()).filter(e => e.startsWith("os-")).map(e => e.substring(3));
}
}
getMinioJournalSyncClient() {
Expand Down

0 comments on commit daa3fee

Please sign in to comment.