Skip to content

Commit

Permalink
refactor(app): Update cache to handle bulk message downloading
Browse files Browse the repository at this point in the history
  • Loading branch information
castaway committed Dec 17, 2024
1 parent 2714c8c commit d624a89
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 48 deletions.
6 changes: 3 additions & 3 deletions e2e/cypress/integration/message-caching.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ describe('Message caching', () => {

});

it('should cache all messages on first time page load', () => {
cy.intercept('/rest/v1/email/12').as('message12requested');
it('should cache all messages on first time page load', () => {
cy.intercept('/rest/v1/email/download/*').as('message12requested');

cy.visit('/');
cy.wait('@message12requested', {'timeout':10000});
Expand All @@ -27,7 +27,7 @@ describe('Message caching', () => {
// Now don't fetch it again:
cy.visit('/#Inbox:12');
let called = false;
cy.intercept('/rest/v1/email/12', (_req) => {
cy.intercept('/rest/v1/email/download/*', (_req) => {
called = true;
});

Expand Down
28 changes: 26 additions & 2 deletions src/app/rmmapi/messagecache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ export class MessageCache {

try {
this.db = new Dexie(`messageCache-${userId}`);
this.db.version(2).stores({
messages: '', // use out-of-line keys
this.db.version(3).stores({
// use out-of-line keys, but index "id"
// yes, empty first arg is deliberate
messages: ',&id',
});
} catch (err) {
console.log(`Error initializing messagecache: ${err}`);
Expand All @@ -45,6 +47,28 @@ export class MessageCache {
);
}

// verify which ids we already have
async checkIds(ids: number[]): Promise<number[]> {
return this.db?.table('messages')
.where('id')
.anyOf(ids)
.primaryKeys()
.then(
(keys) => keys.map((key) => key as number)
);
}

async getMany(ids: number[]): Promise<MessageContents[]> {
return this.db?.table('messages')
.where('id')
.anyOf(ids)
.toArray()
.then(
result => result,
_error => null,
);
}

set(id: number, contents: MessageContents): void {
contents.version = this.message_version;
this.db?.table('messages').put(contents, id).catch(
Expand Down
96 changes: 53 additions & 43 deletions src/app/rmmapi/rbwebmail.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import { Injectable, NgZone } from '@angular/core';
import { Observable, from, Subject, AsyncSubject, firstValueFrom } from 'rxjs';
import { share } from 'rxjs/operators';
import { catchError, concatMap, share, map, mergeMap, tap } from 'rxjs/operators';
import { MessageInfo } from '../common/messageinfo';
import { MailAddressInfo } from '../common/mailaddressinfo';
import { FolderListEntry } from '../common/folderlistentry';
Expand Down Expand Up @@ -174,6 +174,10 @@ export class RunboxWebmailAPI {
private messageCache: AsyncSubject<MessageCache> = new AsyncSubject();
private messageContentsRequestCache = new LRUMessageCache<Promise<MessageContents>>();

// Track which messageids we're already fetching
// Else we attempt to refetch the same ones a fair bit on start-up
private downloadingMessages: number[] = [];

constructor(
private http: HttpClient,
private ngZone: NgZone,
Expand Down Expand Up @@ -266,51 +270,57 @@ export class RunboxWebmailAPI {
}

public downloadMessages(messageIds: number[]): Promise<MessageContents[]> {
const missingMessages = [];
for (const msgid of messageIds) {
if (!this.messageContentsCache[msgid]) {
this.messageContentsCache[msgid] = new AsyncSubject<MessageContents>();
missingMessages.push(msgid);
}
}

const messagePromises = messageIds.map(id => this.messageContentsCache[id].toPromise());

if (missingMessages.length > 0) {
this.http.get(`/rest/v1/email/download/${missingMessages.join(',')}`).pipe(
catchError((err: HttpErrorResponse) => throwError(err.message)),
concatMap((res: any) => {
if (res.status === 'success') {
return of(res.result);
} else {
return throwError(res.errors[0]);
}
}),
).subscribe(
(result: any) => {
for (const resultKey of Object.keys(result)) {
const msgid = parseInt(resultKey, 10);
const contents = result[msgid]?.json;
if (contents) {
this.messageContentsCache[msgid].next(contents);
this.messageContentsCache[msgid].complete();
const cached = this.messageCache.checkIds([...messageIds]);
return cached.then((inCache) => {
const missingMessages = messageIds.filter(
(msgId) => !inCache.includes(msgId)
&& !this.downloadingMessages.includes(msgId)
);
const messagePromises = missingMessages.map(id => this.messageCache.get(id));

if (missingMessages.length > 0) {
this.downloadingMessages = this.downloadingMessages.concat(missingMessages);
this.http.get(`/rest/v1/email/download/${missingMessages.join(',')}`).pipe(
catchError((err: HttpErrorResponse) => throwError(err.message)),
concatMap((res: any) => {
if (res.status === 'success') {
return of(res.result);
} else {
this.messageContentsCache[msgid].error(result[msgid]?.error);
delete this.messageContentsCache[msgid];
return throwError(res.errors[0]);
}
}),
).subscribe(
(result: any) => {
for (const resultKey of Object.keys(result)) {
const msgid = parseInt(resultKey, 10);
const contents = result[msgid]?.json;
if (contents) {
this.messageCache.set(msgid, contents);
} else {
this.deleteCachedMessageContents(msgid);
}
const msgIndex = this.downloadingMessages
.findIndex((id) => msgid === id);
if (msgIndex > -1) {
this.downloadingMessages.splice(msgIndex, 1);
}
}
},
(err: Error) => {
for (const msgid of missingMessages) {
this.deleteCachedMessageContents(msgid);
const msgIndex = this.downloadingMessages
.findIndex((id) => msgid === id);
if (msgIndex > -1) {
this.downloadingMessages.splice(msgIndex, 1);
}
}
}
},
(err: Error) => {
for (const msgid of missingMessages) {
this.messageContentsCache[msgid].error(err.toString());
delete this.messageContentsCache[msgid];
}
}
);
}

// return Promise.allSettled(messagePromises);
return Promise.all(messagePromises);
);
}
// return Promise.allSettled(messagePromises);
return Promise.all(messagePromises);
});
}

public updateLastOn(): Observable<any> {
Expand Down

0 comments on commit d624a89

Please sign in to comment.