EventIndexer: ensure we add initial checkpoints when the db is first opened (#31448)

* EventIndex: add some documentation

... because comments are not subject to rationing.

* EventIndex: rationalise addInitialCheckpoints logic

* EventIndex: improve logging

* use a single `logger` instance to do the prefixing
* use `JSON.stringify` on the checkpoints so that the rageshakes are useful
* distinguish between logger.warn and logger.debug
* emit some logs if the crawler loop fails

* Move check for empty database to `init`

The problem with checking if the database is empty in `onSync` is that, by the
time we get there, it won't be.

Instead let's remember if the db was empty in `init`, and then once a sync
completes, we can add the checkpoints.

* Some tests for EventIndexer
This commit is contained in:
Richard van der Hoff
2025-12-08 14:08:31 +00:00
committed by GitHub
parent 3b0bc0bb4a
commit 2fb0bf6152
3 changed files with 308 additions and 75 deletions

View File

@@ -15,10 +15,23 @@ import {
// The following interfaces take their names and member names from seshat and the spec
/* eslint-disable camelcase */
/** A record of a place to resume crawling events in a given room. */
export interface ICrawlerCheckpoint {
/** The room to be indexed */
roomId: string;
token: string | null;
/** The pagination index to resume crawling from. */
token: string;
/**
* If `fullCrawl` is false (or absent) and we find that we have already indexed the events we find, then we stop crawling.
*
* If `fullCrawl` is true, then we keep going until we reach the end of the room history.
*/
fullCrawl?: boolean;
/** Whether we should crawl in the forward or backward direction. */
direction: Direction;
}

View File

@@ -26,7 +26,7 @@ import {
type IMatrixProfile,
type IResultRoomEvents,
type SyncStateData,
type SyncState,
SyncState,
type TimelineIndex,
type TimelineWindow,
} from "matrix-js-sdk/src/matrix";
@@ -46,6 +46,7 @@ import {
type ISearchArgs,
} from "./BaseEventIndexManager";
import { asyncFilter } from "../utils/arrays.ts";
import { logErrorAndShowErrorDialog } from "../utils/ErrorUtils.tsx";
// The time in ms that the crawler will wait loop iterations if there
// have not been any checkpoints to consume in the last iteration.
@@ -58,20 +59,49 @@ interface ICrawler {
cancel(): void;
}
/*
/**
* Event indexing class that wraps the platform specific event indexing.
*/
export default class EventIndex extends EventEmitter {
private crawlerCheckpoints: ICrawlerCheckpoint[] = [];
private crawler: ICrawler | null = null;
/**
* A list of checkpoints which are awaiting processing by the crawler, once it has done with `currentCheckpoint`.
*/
private crawlerCheckpoints: ICrawlerCheckpoint[] = [];
/**
* The current checkpoint that the crawler is working on.
*/
private currentCheckpoint: ICrawlerCheckpoint | null = null;
/**
* True if we need to add the initial checkpoints for encrypted rooms, once we've completed a sync.
* This is set if the database is empty when the indexer is first initialized.
*/
private needsInitialCheckpoints = false;
private readonly logger;
public constructor() {
super();
this.logger = logger.getChild("EventIndex");
}
public async init(): Promise<void> {
const indexManager = PlatformPeg.get()?.getEventIndexingManager();
if (!indexManager) return;
// If the index is empty, set a flag so that we add the initial checkpoints once we sync.
// We do this check here rather than in `onSync` because, by the time `onSync` is called, there will
// have been a few events added to the index.
if (await indexManager.isEventIndexEmpty()) {
this.needsInitialCheckpoints = true;
}
this.crawlerCheckpoints = await indexManager.loadCheckpoints();
logger.log("EventIndex: Loaded checkpoints", this.crawlerCheckpoints);
this.logger.debug("Loaded checkpoints", JSON.stringify(this.crawlerCheckpoints));
this.registerListeners();
}
@@ -102,9 +132,11 @@ export default class EventIndex extends EventEmitter {
}
/**
* Get crawler checkpoints for the encrypted rooms and store them in the index.
* Add crawler checkpoints for all of the encrypted rooms the user is in.
*/
public async addInitialCheckpoints(): Promise<void> {
this.needsInitialCheckpoints = false;
const indexManager = PlatformPeg.get()?.getEventIndexingManager();
if (!indexManager) return;
const client = MatrixClientPeg.safeGet();
@@ -116,7 +148,7 @@ export default class EventIndex extends EventEmitter {
Boolean(await client.getCrypto()?.isEncryptionEnabledInRoom(room.roomId)),
);
logger.log("EventIndex: Adding initial crawler checkpoints");
this.logger.debug("addInitialCheckpoints: starting");
// Gather the prev_batch tokens and create checkpoints for
// our message crawler.
@@ -125,6 +157,12 @@ export default class EventIndex extends EventEmitter {
const timeline = room.getLiveTimeline();
const token = timeline.getPaginationToken(Direction.Backward);
if (!token) {
this.logger.debug(`addInitialCheckpoints: No back-pagination token for room ${room.roomId}"`);
return;
}
this.logger.debug(`addInitialCheckpoints: Adding initial checkpoints for room ${room.roomId}`);
const backCheckpoint: ICrawlerCheckpoint = {
roomId: room.roomId,
token: token,
@@ -139,57 +177,47 @@ export default class EventIndex extends EventEmitter {
};
try {
if (backCheckpoint.token) {
await indexManager.addCrawlerCheckpoint(backCheckpoint);
this.crawlerCheckpoints.push(backCheckpoint);
}
await indexManager.addCrawlerCheckpoint(backCheckpoint);
this.crawlerCheckpoints.push(backCheckpoint);
if (forwardCheckpoint.token) {
await indexManager.addCrawlerCheckpoint(forwardCheckpoint);
this.crawlerCheckpoints.push(forwardCheckpoint);
}
await indexManager.addCrawlerCheckpoint(forwardCheckpoint);
this.crawlerCheckpoints.push(forwardCheckpoint);
} catch (e) {
logger.log(
"EventIndex: Error adding initial checkpoints for room",
room.roomId,
backCheckpoint,
forwardCheckpoint,
this.logger.warn(
`addInitialCheckpoints: Error adding initial checkpoints for room ${room.roomId}`,
e,
);
}
}),
);
this.logger.debug("addInitialCheckpoints: done");
}
/*
/**
* The sync event listener.
*
* The listener has two cases:
* - First sync after start up, check if the index is empty, add
* initial checkpoints, if so. Start the crawler background task.
* - Every other sync, tell the event index to commit all the queued up
* live events
*/
private onSync = async (state: SyncState, prevState: SyncState | null, data?: SyncStateData): Promise<void> => {
const indexManager = PlatformPeg.get()?.getEventIndexingManager();
if (!indexManager) return;
private onSync = (state: SyncState, prevState: SyncState | null, data?: SyncStateData): void => {
if (state != SyncState.Syncing) return;
if (prevState === "PREPARED" && state === "SYNCING") {
// If our indexer is empty we're most likely running Element the
// first time with indexing support or running it with an
// initial sync. Add checkpoints to crawl our encrypted rooms.
const eventIndexWasEmpty = await indexManager.isEventIndexEmpty();
if (eventIndexWasEmpty) await this.addInitialCheckpoints();
const onSyncInner = async (): Promise<void> => {
const indexManager = PlatformPeg.get()?.getEventIndexingManager();
if (!indexManager) return;
// If the index was empty when we first started up, add the initial checkpoints, to back-populate the index.
if (this.needsInitialCheckpoints) {
await this.addInitialCheckpoints();
}
// Start the crawler if it's not already running.
this.startCrawler();
return;
}
if (prevState === "SYNCING" && state === "SYNCING") {
// A sync was done, presumably we queued up some live events,
// commit them now.
// Commit any queued up live events
await indexManager.commitLiveEvents();
}
};
onSyncInner().catch((e) => {
logErrorAndShowErrorDialog("Event indexer threw an unexpected error", e);
});
};
/*
@@ -232,7 +260,7 @@ export default class EventIndex extends EventEmitter {
if (!MatrixClientPeg.safeGet().isRoomEncrypted(state.roomId)) return;
if (ev.getType() === EventType.RoomEncryption && !(await this.isRoomIndexed(state.roomId))) {
logger.log("EventIndex: Adding a checkpoint for a newly encrypted room", state.roomId);
this.logger.debug("Adding a checkpoint for a newly encrypted room", state.roomId);
this.addRoomCheckpoint(state.roomId, true);
}
};
@@ -251,7 +279,7 @@ export default class EventIndex extends EventEmitter {
try {
await indexManager.deleteEvent(associatedId);
} catch (e) {
logger.log("EventIndex: Error deleting event from index", e);
this.logger.warn("Error deleting event from index", e);
}
};
@@ -265,7 +293,7 @@ export default class EventIndex extends EventEmitter {
if (!room) return;
if (!MatrixClientPeg.safeGet().isRoomEncrypted(room.roomId)) return;
logger.log("EventIndex: Adding a checkpoint because of a limited timeline", room.roomId);
this.logger.debug("Adding a checkpoint because of a limited timeline", room.roomId);
this.addRoomCheckpoint(room.roomId, false);
};
@@ -394,12 +422,12 @@ export default class EventIndex extends EventEmitter {
direction: Direction.Backward,
};
logger.log("EventIndex: Adding checkpoint", checkpoint);
this.logger.debug("Adding checkpoint", JSON.stringify(checkpoint));
try {
await indexManager.addCrawlerCheckpoint(checkpoint);
} catch (e) {
logger.log("EventIndex: Error adding new checkpoint for room", room.roomId, checkpoint, e);
this.logger.warn(`Error adding new checkpoint for room ${room.roomId}`, e);
}
this.crawlerCheckpoints.push(checkpoint);
@@ -460,6 +488,7 @@ export default class EventIndex extends EventEmitter {
continue;
}
this.logger.debug(`Processing checkpoint ${JSON.stringify(checkpoint)}`);
this.currentCheckpoint = checkpoint;
this.emitNewCheckpoint();
@@ -481,15 +510,15 @@ export default class EventIndex extends EventEmitter {
);
} catch (e) {
if (e instanceof HTTPError && e.httpStatus === 403) {
logger.log(
"EventIndex: Removing checkpoint as we don't have ",
this.logger.debug(
"Removing checkpoint as we don't have ",
"permissions to fetch messages from this room.",
checkpoint,
JSON.stringify(checkpoint),
);
try {
await indexManager.removeCrawlerCheckpoint(checkpoint);
} catch (e) {
logger.log("EventIndex: Error removing checkpoint", checkpoint, e);
this.logger.warn(`Error removing checkpoint ${JSON.stringify(checkpoint)}:`, e);
// We don't push the checkpoint here back, it will
// hopefully be removed after a restart. But let us
// ignore it for now as we don't want to hammer the
@@ -498,7 +527,7 @@ export default class EventIndex extends EventEmitter {
continue;
}
logger.log("EventIndex: Error crawling using checkpoint:", checkpoint, ",", e);
this.logger.warn(`Error crawling using checkpoint ${JSON.stringify(checkpoint)}:`, e);
this.crawlerCheckpoints.push(checkpoint);
continue;
}
@@ -509,13 +538,13 @@ export default class EventIndex extends EventEmitter {
}
if (res.chunk.length === 0) {
logger.log("EventIndex: Done with the checkpoint", checkpoint);
this.logger.debug("Done with the checkpoint", JSON.stringify(checkpoint));
// We got to the start/end of our timeline, lets just
// delete our checkpoint and go back to sleep.
try {
await indexManager.removeCrawlerCheckpoint(checkpoint);
} catch (e) {
logger.log("EventIndex: Error removing checkpoint", checkpoint, e);
this.logger.warn("Error removing checkpoint", JSON.stringify(checkpoint), e);
}
continue;
}
@@ -593,7 +622,7 @@ export default class EventIndex extends EventEmitter {
if (eventId) {
await indexManager.deleteEvent(eventId);
} else {
logger.warn("EventIndex: Redaction event doesn't contain a valid associated event id", ev);
this.logger.warn("Redaction event doesn't contain a valid associated event id", ev);
}
}
@@ -602,10 +631,9 @@ export default class EventIndex extends EventEmitter {
// We didn't get a valid new checkpoint from the server, nothing
// to do here anymore.
if (!newCheckpoint) {
logger.log(
"EventIndex: The server didn't return a valid ",
"new checkpoint, not continuing the crawl.",
checkpoint,
this.logger.debug(
"The server didn't return a valid new checkpoint, not continuing the crawl.",
JSON.stringify(checkpoint),
);
continue;
}
@@ -615,31 +643,29 @@ export default class EventIndex extends EventEmitter {
// Let us delete the checkpoint in that case, otherwise push
// the new checkpoint to be used by the crawler.
if (eventsAlreadyAdded === true && newCheckpoint.fullCrawl !== true) {
logger.log(
"EventIndex: Checkpoint had already all events",
this.logger.debug(
"Checkpoint had already all events",
"added, stopping the crawl",
checkpoint,
JSON.stringify(checkpoint),
);
await indexManager.removeCrawlerCheckpoint(newCheckpoint);
} else {
if (eventsAlreadyAdded === true) {
logger.log(
"EventIndex: Checkpoint had already all events",
this.logger.debug(
"Checkpoint had already all events",
"added, but continuing due to a full crawl",
checkpoint,
JSON.stringify(checkpoint),
);
}
this.crawlerCheckpoints.push(newCheckpoint);
}
} catch (e) {
logger.log("EventIndex: Error during a crawl", e);
this.logger.warn("Error during a crawl", e);
// An error occurred, put the checkpoint back so we
// can retry.
this.crawlerCheckpoints.push(checkpoint);
}
}
this.crawler = null;
}
/**
@@ -647,7 +673,14 @@ export default class EventIndex extends EventEmitter {
*/
public startCrawler(): void {
if (this.crawler !== null) return;
this.crawlerFunc();
this.logger.debug("Starting crawler");
this.crawlerFunc()
.finally(() => {
this.crawler = null;
})
.catch((e) => {
this.logger.error("Error in crawler function", e);
});
}
/**
@@ -655,6 +688,7 @@ export default class EventIndex extends EventEmitter {
*/
public stopCrawler(): void {
if (this.crawler === null) return;
this.logger.debug("Stopping crawler");
this.crawler.cancel();
}
@@ -732,7 +766,7 @@ export default class EventIndex extends EventEmitter {
try {
events = await indexManager.loadFileEvents(loadArgs);
} catch (e) {
logger.log("EventIndex: Error getting file events", e);
this.logger.debug("Error getting file events", e);
return [];
}
@@ -842,11 +876,8 @@ export default class EventIndex extends EventEmitter {
ret = true;
}
logger.log(
"EventIndex: Populating file panel with",
matrixEvents.length,
"events and setting the pagination token to",
paginationToken,
this.logger.debug(
`Populating file panel with ${matrixEvents.length} events and setting the pagination token to ${paginationToken}`,
);
timeline.setPaginationToken(paginationToken, EventTimeline.BACKWARDS);
@@ -961,7 +992,10 @@ export default class EventIndex extends EventEmitter {
}
public crawlingRooms(): {
/** The rooms that we are currently crawling. */
crawlingRooms: Set<string>;
/** All the encrypted rooms known by the MatrixClient. */
totalRooms: Set<string>;
} {
const totalRooms = new Set<string>();

View File

@@ -0,0 +1,186 @@
/*
Copyright 2025 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import { type Mocked } from "jest-mock";
import {
Direction,
type MatrixClient,
type IEvent,
MatrixEvent,
type Room,
ClientEvent,
SyncState,
} from "matrix-js-sdk/src/matrix";
import EventIndex from "../../../src/indexing/EventIndex.ts";
import { emitPromise, getMockClientWithEventEmitter, mockClientMethodsRooms, mockPlatformPeg } from "../../test-utils";
import type BaseEventIndexManager from "../../../src/indexing/BaseEventIndexManager.ts";
import { type ICrawlerCheckpoint } from "../../../src/indexing/BaseEventIndexManager.ts";
import SettingsStore from "../../../src/settings/SettingsStore.ts";
afterEach(() => {
jest.restoreAllMocks();
});
describe("EventIndex", () => {
it("crawls through the loaded checkpoints", async () => {
const mockIndexingManager = {
loadCheckpoints: jest.fn(),
removeCrawlerCheckpoint: jest.fn(),
isEventIndexEmpty: jest.fn().mockResolvedValue(false),
} as any as Mocked<BaseEventIndexManager>;
mockPlatformPeg({ getEventIndexingManager: () => mockIndexingManager });
const room1 = { roomId: "!room1:id" } as any as Room;
const room2 = { roomId: "!room2:id" } as any as Room;
const mockClient = getMockClientWithEventEmitter({
getEventMapper: () => (obj: Partial<IEvent>) => new MatrixEvent(obj),
createMessagesRequest: jest.fn(),
...mockClientMethodsRooms([room1, room2]),
});
jest.spyOn(SettingsStore, "getValueAt").mockImplementation((_level, settingName): any => {
if (settingName === "crawlerSleepTime") return 0;
return undefined;
});
mockIndexingManager.loadCheckpoints.mockResolvedValue([
{ roomId: "!room1:id", token: "token1", direction: Direction.Backward } as ICrawlerCheckpoint,
{ roomId: "!room2:id", token: "token2", direction: Direction.Forward } as ICrawlerCheckpoint,
]);
const indexer = new EventIndex();
await indexer.init();
let changedCheckpointPromise = emitPromise(indexer, "changedCheckpoint") as Promise<Room>;
indexer.startCrawler();
// Mock out the /messags request, and wait for the crawler to hit the first room
const mock1 = mockCreateMessagesRequest(mockClient);
let changedCheckpoint = await changedCheckpointPromise;
expect(changedCheckpoint.roomId).toEqual("!room1:id");
await mock1.called;
expect(mockClient.createMessagesRequest).toHaveBeenCalledWith("!room1:id", "token1", 100, "b");
// Continue, and wait for the crawler to hit the second room
changedCheckpointPromise = emitPromise(indexer, "changedCheckpoint") as Promise<Room>;
mock1.resolve({ chunk: [] });
changedCheckpoint = await changedCheckpointPromise;
expect(changedCheckpoint.roomId).toEqual("!room2:id");
// Mock out the /messages request again, and wait for it to be called
const mock2 = mockCreateMessagesRequest(mockClient);
await mock2.called;
expect(mockClient.createMessagesRequest).toHaveBeenCalledWith("!room2:id", "token2", 100, "f");
});
it("adds checkpoints for the encrypted rooms after the first sync", async () => {
const mockIndexingManager = {
loadCheckpoints: jest.fn().mockResolvedValue([]),
isEventIndexEmpty: jest.fn().mockResolvedValue(true),
addCrawlerCheckpoint: jest.fn(),
removeCrawlerCheckpoint: jest.fn(),
commitLiveEvents: jest.fn(),
} as any as Mocked<BaseEventIndexManager>;
mockPlatformPeg({ getEventIndexingManager: () => mockIndexingManager });
const room1 = {
roomId: "!room1:id",
getLiveTimeline: () => ({
getPaginationToken: () => "token1",
}),
} as any as Room;
const room2 = {
roomId: "!room2:id",
getLiveTimeline: () => ({
getPaginationToken: () => "token2",
}),
} as any as Room;
const mockCrypto = {
isEncryptionEnabledInRoom: jest.fn().mockResolvedValue(true),
};
const mockClient = getMockClientWithEventEmitter({
getEventMapper: () => (obj: Partial<IEvent>) => new MatrixEvent(obj),
createMessagesRequest: jest.fn(),
getCrypto: () => mockCrypto as any,
...mockClientMethodsRooms([room1, room2]),
});
const commitLiveEventsCalled = Promise.withResolvers<void>();
mockIndexingManager.commitLiveEvents.mockImplementation(async () => {
commitLiveEventsCalled.resolve();
});
const indexer = new EventIndex();
await indexer.init();
// During the first sync, some events are added to the index, meaning that `isEventIndexEmpty` will now be false.
mockIndexingManager.isEventIndexEmpty.mockResolvedValue(false);
// The first sync completes:
mockClient.emit(ClientEvent.Sync, SyncState.Syncing, null, {});
// Wait for `commitLiveEvents` to be called, by which time the checkpoints should have been added.
await commitLiveEventsCalled.promise;
expect(mockIndexingManager.addCrawlerCheckpoint).toHaveBeenCalledTimes(4);
expect(mockIndexingManager.addCrawlerCheckpoint).toHaveBeenCalledWith({
roomId: "!room1:id",
token: "token1",
direction: Direction.Backward,
fullCrawl: true,
});
expect(mockIndexingManager.addCrawlerCheckpoint).toHaveBeenCalledWith({
roomId: "!room1:id",
token: "token1",
direction: Direction.Forward,
});
expect(mockIndexingManager.addCrawlerCheckpoint).toHaveBeenCalledWith({
roomId: "!room2:id",
token: "token2",
direction: Direction.Backward,
fullCrawl: true,
});
expect(mockIndexingManager.addCrawlerCheckpoint).toHaveBeenCalledWith({
roomId: "!room2:id",
token: "token2",
direction: Direction.Forward,
});
});
});
/**
* Mock out the `createMessagesRequest` method on the client, with an implementation that will block until a resolver is called.
*
* @returns An object with the following properties:
* * `called`: A promise that resolves when `createMessagesRequest` is called.
* * `resolve`: A function that can be called to allow `createMessagesRequest` to complete.
*/
function mockCreateMessagesRequest(mockClient: Mocked<MatrixClient>): {
called: Promise<void>;
resolve: (result: any) => void;
} {
const messagesCalledPromise = Promise.withResolvers<void>();
const messagesResultPromise = Promise.withResolvers();
mockClient.createMessagesRequest.mockImplementationOnce(() => {
messagesCalledPromise.resolve();
return messagesResultPromise.promise as any;
});
return {
called: messagesCalledPromise.promise,
resolve: messagesResultPromise.resolve,
};
}