feat: add isolated memory spaces
This commit is contained in:
@@ -1,39 +1,27 @@
|
||||
import {
|
||||
type AddFactInput,
|
||||
type ConnectedTopic,
|
||||
type Fact,
|
||||
type FactTopic,
|
||||
type FindSimilarFactsInput,
|
||||
type IndexFactEmbeddingsInput,
|
||||
type LinkTopicsInput,
|
||||
type ListTopicsOptions,
|
||||
type ScoredFact,
|
||||
type SearchFactsInput,
|
||||
type Space,
|
||||
type SpaceScopedInput,
|
||||
type Topic,
|
||||
type TopicLookupOptions,
|
||||
type TopicWithFacts,
|
||||
type UpsertSpaceInput,
|
||||
type UpsertTopicInput,
|
||||
type AddFactInput,
|
||||
type LinkTopicsInput,
|
||||
} from '../types/api';
|
||||
import type { IngestStatementOptions } from '../ingestion/types';
|
||||
import type { DatabaseConnection, IdentityDBConnectionConfig } from '../adapters/dialect';
|
||||
import type { IdentityDatabaseSchema } from '../types/database';
|
||||
import type { FactRecord, TopicRecord } from '../types/domain';
|
||||
import type { FactRecord, SpaceRecord, TopicRecord } from '../types/domain';
|
||||
import { createDatabase } from '../adapters/dialect';
|
||||
import { IdentityDBError } from './errors';
|
||||
import { initializeSchema } from './migrations';
|
||||
import {
|
||||
canonicalizeTopicName,
|
||||
cosineSimilarity,
|
||||
createContentHash,
|
||||
createId,
|
||||
deserializeEmbedding,
|
||||
mapFactRow,
|
||||
mapTopicRow,
|
||||
normalizeTopicName,
|
||||
nowIsoString,
|
||||
serializeEmbedding,
|
||||
serializeMetadata,
|
||||
} from './utils';
|
||||
import { extractFact } from '../ingestion/extractor';
|
||||
import {
|
||||
findFactRowsConnectingTopicIds,
|
||||
@@ -41,16 +29,37 @@ import {
|
||||
findTopicLinksForFactIds,
|
||||
} from '../queries/facts';
|
||||
import {
|
||||
type DatabaseExecutor,
|
||||
findChildTopicRows,
|
||||
findConnectedTopicRows,
|
||||
findParentTopicRows,
|
||||
findSpaceRowByNormalizedName,
|
||||
findTopicRowByNameOrAlias,
|
||||
findTopicRowByNormalizedAlias,
|
||||
findTopicRowByNormalizedName,
|
||||
listTopicAliasRowsForTopicId,
|
||||
listTopicRows,
|
||||
findChildTopicRows,
|
||||
findParentTopicRows,
|
||||
type DatabaseExecutor,
|
||||
} from '../queries/topics';
|
||||
import { IdentityDBError } from './errors';
|
||||
import { initializeSchema } from './migrations';
|
||||
import {
|
||||
canonicalizeSpaceName,
|
||||
canonicalizeTopicName,
|
||||
cosineSimilarity,
|
||||
createContentHash,
|
||||
createId,
|
||||
deserializeEmbedding,
|
||||
mapFactRow,
|
||||
mapSpaceRow,
|
||||
mapTopicRow,
|
||||
normalizeSpaceName,
|
||||
normalizeTopicName,
|
||||
nowIsoString,
|
||||
serializeEmbedding,
|
||||
serializeMetadata,
|
||||
} from './utils';
|
||||
|
||||
const DEFAULT_SPACE_NAME = 'default';
|
||||
|
||||
export class IdentityDB {
|
||||
private constructor(private readonly connection: DatabaseConnection) {}
|
||||
@@ -68,6 +77,72 @@ export class IdentityDB {
|
||||
await this.connection.destroy();
|
||||
}
|
||||
|
||||
async upsertSpace(input: UpsertSpaceInput): Promise<Space> {
|
||||
return this.connection.db.transaction().execute(async (trx) => {
|
||||
const normalizedName = normalizeSpaceName(input.name);
|
||||
if (normalizedName.length === 0) {
|
||||
throw new IdentityDBError('Space name cannot be empty.');
|
||||
}
|
||||
|
||||
const now = nowIsoString();
|
||||
const existing = await findSpaceRowByNormalizedName(trx, normalizedName);
|
||||
|
||||
if (existing) {
|
||||
await trx
|
||||
.updateTable('spaces')
|
||||
.set({
|
||||
name: canonicalizeSpaceName(input.name),
|
||||
description: input.description !== undefined ? input.description : existing.description,
|
||||
metadata: input.metadata !== undefined ? serializeMetadata(input.metadata) : existing.metadata,
|
||||
updated_at: now,
|
||||
})
|
||||
.where('id', '=', existing.id)
|
||||
.execute();
|
||||
|
||||
const updated = await trx
|
||||
.selectFrom('spaces')
|
||||
.selectAll()
|
||||
.where('id', '=', existing.id)
|
||||
.executeTakeFirstOrThrow();
|
||||
|
||||
return mapSpaceRow(updated);
|
||||
}
|
||||
|
||||
const createdRow: SpaceRecord = {
|
||||
id: createId(),
|
||||
name: canonicalizeSpaceName(input.name),
|
||||
normalized_name: normalizedName,
|
||||
description: input.description ?? null,
|
||||
metadata: serializeMetadata(input.metadata),
|
||||
created_at: now,
|
||||
updated_at: now,
|
||||
};
|
||||
|
||||
await trx.insertInto('spaces').values(createdRow).execute();
|
||||
return mapSpaceRow(createdRow);
|
||||
});
|
||||
}
|
||||
|
||||
async getSpaceByName(name: string): Promise<Space | null> {
|
||||
const normalizedName = normalizeSpaceName(name);
|
||||
if (normalizedName.length === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const row = await findSpaceRowByNormalizedName(this.connection.db, normalizedName);
|
||||
return row ? mapSpaceRow(row) : null;
|
||||
}
|
||||
|
||||
async listSpaces(): Promise<Space[]> {
|
||||
const rows = await this.connection.db
|
||||
.selectFrom('spaces')
|
||||
.selectAll()
|
||||
.orderBy('normalized_name', 'asc')
|
||||
.execute();
|
||||
|
||||
return rows.map(mapSpaceRow);
|
||||
}
|
||||
|
||||
async upsertTopic(input: UpsertTopicInput): Promise<Topic> {
|
||||
return this.upsertTopicInExecutor(this.connection.db, input);
|
||||
}
|
||||
@@ -82,6 +157,7 @@ export class IdentityDB {
|
||||
}
|
||||
|
||||
return this.connection.db.transaction().execute(async (trx) => {
|
||||
const space = await this.getOrCreateSpaceInExecutor(trx, input.spaceName);
|
||||
const createdAt = nowIsoString();
|
||||
const factId = createId();
|
||||
|
||||
@@ -89,6 +165,7 @@ export class IdentityDB {
|
||||
.insertInto('facts')
|
||||
.values({
|
||||
id: factId,
|
||||
space_id: space.id,
|
||||
statement: input.statement.trim(),
|
||||
summary: input.summary ?? null,
|
||||
source: input.source ?? null,
|
||||
@@ -103,7 +180,11 @@ export class IdentityDB {
|
||||
|
||||
for (let index = 0; index < input.topics.length; index += 1) {
|
||||
const topicInput = input.topics[index]!;
|
||||
const topic = await this.upsertTopicInExecutor(trx, topicInput);
|
||||
this.assertScopedTopicInput(space, topicInput.spaceName);
|
||||
const topic = await this.upsertTopicInExecutor(trx, {
|
||||
...topicInput,
|
||||
spaceName: space.name,
|
||||
});
|
||||
|
||||
await trx
|
||||
.insertInto('fact_topics')
|
||||
@@ -125,6 +206,7 @@ export class IdentityDB {
|
||||
|
||||
return {
|
||||
id: factId,
|
||||
spaceId: space.id,
|
||||
statement: input.statement.trim(),
|
||||
summary: input.summary ?? null,
|
||||
source: input.source ?? null,
|
||||
@@ -137,14 +219,12 @@ export class IdentityDB {
|
||||
});
|
||||
}
|
||||
|
||||
async ingestStatement(
|
||||
statement: string,
|
||||
options: IngestStatementOptions,
|
||||
): Promise<Fact> {
|
||||
async ingestStatement(statement: string, options: IngestStatementOptions): Promise<Fact> {
|
||||
const extracted = await extractFact(statement, options.extractor);
|
||||
const factInput: AddFactInput = {
|
||||
statement: extracted.statement ?? statement,
|
||||
topics: extracted.topics,
|
||||
spaceName: options.spaceName,
|
||||
};
|
||||
|
||||
if (extracted.summary !== undefined) {
|
||||
@@ -170,6 +250,7 @@ export class IdentityDB {
|
||||
topicNames: factInput.topics.map((topic) => topic.name),
|
||||
limit: 1,
|
||||
minimumScore: options.duplicateThreshold ?? 0.97,
|
||||
spaceName: options.spaceName,
|
||||
});
|
||||
|
||||
if (similarFacts[0]) {
|
||||
@@ -180,15 +261,27 @@ export class IdentityDB {
|
||||
const fact = await this.addFact(factInput);
|
||||
|
||||
if (options.embeddingProvider) {
|
||||
await this.indexFactEmbedding(fact.id, { provider: options.embeddingProvider });
|
||||
await this.indexFactEmbedding(fact.id, {
|
||||
provider: options.embeddingProvider,
|
||||
spaceName: options.spaceName,
|
||||
});
|
||||
}
|
||||
|
||||
return fact;
|
||||
}
|
||||
|
||||
async indexFactEmbeddings(input: IndexFactEmbeddingsInput): Promise<void> {
|
||||
const factRows = await this.connection.db.selectFrom('facts').selectAll().orderBy('created_at', 'asc').execute();
|
||||
const space = await this.getSpaceForRead(input.spaceName);
|
||||
if (input.spaceName && !space) {
|
||||
return;
|
||||
}
|
||||
|
||||
let factQuery = this.connection.db.selectFrom('facts').selectAll().orderBy('created_at', 'asc');
|
||||
if (space) {
|
||||
factQuery = factQuery.where('space_id', '=', space.id);
|
||||
}
|
||||
|
||||
const factRows = await factQuery.execute();
|
||||
if (factRows.length === 0) {
|
||||
return;
|
||||
}
|
||||
@@ -222,6 +315,13 @@ export class IdentityDB {
|
||||
throw new IdentityDBError(`Fact not found: ${factId}`);
|
||||
}
|
||||
|
||||
if (input.spaceName) {
|
||||
const space = await this.getSpaceForRead(input.spaceName);
|
||||
if (!space || space.id !== factRow.space_id) {
|
||||
throw new IdentityDBError(`Fact ${factId} does not belong to space ${canonicalizeSpaceName(input.spaceName)}.`);
|
||||
}
|
||||
}
|
||||
|
||||
const embedding = await input.provider.embed(factRow.statement);
|
||||
this.assertEmbeddingShape(embedding, input.provider.dimensions);
|
||||
|
||||
@@ -236,6 +336,11 @@ export class IdentityDB {
|
||||
return [];
|
||||
}
|
||||
|
||||
const space = await this.getSpaceForRead(input.spaceName);
|
||||
if (input.spaceName && !space) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const queryEmbedding = await input.provider.embed(queryText);
|
||||
this.assertEmbeddingShape(queryEmbedding, input.provider.dimensions);
|
||||
|
||||
@@ -245,6 +350,7 @@ export class IdentityDB {
|
||||
topicNames: input.topicNames,
|
||||
limit: input.limit,
|
||||
minimumScore: input.minimumScore,
|
||||
spaceId: space?.id,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -254,6 +360,11 @@ export class IdentityDB {
|
||||
return [];
|
||||
}
|
||||
|
||||
const space = await this.getSpaceForRead(input.spaceName);
|
||||
if (input.spaceName && !space) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const queryEmbedding = await input.provider.embed(statement);
|
||||
this.assertEmbeddingShape(queryEmbedding, input.provider.dimensions);
|
||||
|
||||
@@ -263,6 +374,7 @@ export class IdentityDB {
|
||||
topicNames: input.topicNames,
|
||||
limit: input.limit,
|
||||
minimumScore: input.minimumScore,
|
||||
spaceId: space?.id,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -279,12 +391,15 @@ export class IdentityDB {
|
||||
}
|
||||
|
||||
await this.connection.db.transaction().execute(async (trx) => {
|
||||
const space = await this.getOrCreateSpaceInExecutor(trx, input.spaceName);
|
||||
const parentTopic = await this.upsertTopicInExecutor(trx, {
|
||||
name: input.parentName,
|
||||
granularity: 'abstract',
|
||||
spaceName: space.name,
|
||||
});
|
||||
const childTopic = await this.upsertTopicInExecutor(trx, {
|
||||
name: input.childName,
|
||||
spaceName: space.name,
|
||||
});
|
||||
|
||||
const existing = await trx
|
||||
@@ -309,7 +424,7 @@ export class IdentityDB {
|
||||
});
|
||||
}
|
||||
|
||||
async addTopicAlias(canonicalName: string, alias: string): Promise<void> {
|
||||
async addTopicAlias(canonicalName: string, alias: string, options?: SpaceScopedInput): Promise<void> {
|
||||
const normalizedAlias = normalizeTopicName(alias);
|
||||
|
||||
if (normalizedAlias.length === 0) {
|
||||
@@ -317,18 +432,22 @@ export class IdentityDB {
|
||||
}
|
||||
|
||||
await this.connection.db.transaction().execute(async (trx) => {
|
||||
const canonicalTopic = await this.upsertTopicInExecutor(trx, { name: canonicalName });
|
||||
const space = await this.getOrCreateSpaceInExecutor(trx, options?.spaceName);
|
||||
const canonicalTopic = await this.upsertTopicInExecutor(trx, {
|
||||
name: canonicalName,
|
||||
spaceName: space.name,
|
||||
});
|
||||
|
||||
if (normalizedAlias === canonicalTopic.normalizedName) {
|
||||
return;
|
||||
}
|
||||
|
||||
const exactTopicMatch = await findTopicRowByNormalizedName(trx, normalizedAlias);
|
||||
const exactTopicMatch = await findTopicRowByNormalizedName(trx, space.id, normalizedAlias);
|
||||
if (exactTopicMatch && exactTopicMatch.id !== canonicalTopic.id) {
|
||||
throw new IdentityDBError('Cannot assign an alias that already belongs to another canonical topic.');
|
||||
}
|
||||
|
||||
const aliasMatch = await findTopicRowByNormalizedAlias(trx, normalizedAlias);
|
||||
const aliasMatch = await findTopicRowByNormalizedAlias(trx, space.id, normalizedAlias);
|
||||
if (aliasMatch) {
|
||||
if (aliasMatch.id !== canonicalTopic.id) {
|
||||
throw new IdentityDBError('Cannot assign an alias that already resolves to another topic.');
|
||||
@@ -341,6 +460,7 @@ export class IdentityDB {
|
||||
.insertInto('topic_aliases')
|
||||
.values({
|
||||
id: createId(),
|
||||
space_id: space.id,
|
||||
topic_id: canonicalTopic.id,
|
||||
alias: canonicalizeTopicName(alias),
|
||||
normalized_alias: normalizedAlias,
|
||||
@@ -352,47 +472,43 @@ export class IdentityDB {
|
||||
});
|
||||
}
|
||||
|
||||
async resolveTopic(name: string): Promise<Topic | null> {
|
||||
const topicRow = await this.getRequiredTopicRow(name);
|
||||
async resolveTopic(name: string, options?: SpaceScopedInput): Promise<Topic | null> {
|
||||
const topicRow = await this.getRequiredTopicRow(name, options?.spaceName);
|
||||
return topicRow ? mapTopicRow(topicRow) : null;
|
||||
}
|
||||
|
||||
async getTopicAliases(name: string): Promise<string[]> {
|
||||
const topicRow = await this.getRequiredTopicRow(name);
|
||||
|
||||
async getTopicAliases(name: string, options?: SpaceScopedInput): Promise<string[]> {
|
||||
const topicRow = await this.getRequiredTopicRow(name, options?.spaceName);
|
||||
if (!topicRow) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const aliasRows = await listTopicAliasRowsForTopicId(this.connection.db, topicRow.id);
|
||||
const aliasRows = await listTopicAliasRowsForTopicId(this.connection.db, topicRow.space_id, topicRow.id);
|
||||
return aliasRows.map((aliasRow) => aliasRow.alias);
|
||||
}
|
||||
|
||||
async getTopicChildren(name: string): Promise<Topic[]> {
|
||||
const topicRow = await this.getRequiredTopicRow(name);
|
||||
|
||||
async getTopicChildren(name: string, options?: SpaceScopedInput): Promise<Topic[]> {
|
||||
const topicRow = await this.getRequiredTopicRow(name, options?.spaceName);
|
||||
if (!topicRow) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const childRows = await findChildTopicRows(this.connection.db, topicRow.id);
|
||||
const childRows = await findChildTopicRows(this.connection.db, topicRow.space_id, topicRow.id);
|
||||
return childRows.map(mapTopicRow);
|
||||
}
|
||||
|
||||
async getTopicParents(name: string): Promise<Topic[]> {
|
||||
const topicRow = await this.getRequiredTopicRow(name);
|
||||
|
||||
async getTopicParents(name: string, options?: SpaceScopedInput): Promise<Topic[]> {
|
||||
const topicRow = await this.getRequiredTopicRow(name, options?.spaceName);
|
||||
if (!topicRow) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const parentRows = await findParentTopicRows(this.connection.db, topicRow.id);
|
||||
const parentRows = await findParentTopicRows(this.connection.db, topicRow.space_id, topicRow.id);
|
||||
return parentRows.map(mapTopicRow);
|
||||
}
|
||||
|
||||
async getTopicLineage(name: string): Promise<Topic[]> {
|
||||
const topicRow = await this.getRequiredTopicRow(name);
|
||||
|
||||
async getTopicLineage(name: string, options?: SpaceScopedInput): Promise<Topic[]> {
|
||||
const topicRow = await this.getRequiredTopicRow(name, options?.spaceName);
|
||||
if (!topicRow) {
|
||||
return [];
|
||||
}
|
||||
@@ -405,8 +521,7 @@ export class IdentityDB {
|
||||
const nextLevelIds: string[] = [];
|
||||
|
||||
for (const currentId of currentLevelIds) {
|
||||
const parentRows = await findParentTopicRows(this.connection.db, currentId);
|
||||
|
||||
const parentRows = await findParentTopicRows(this.connection.db, topicRow.space_id, currentId);
|
||||
for (const parentRow of parentRows) {
|
||||
if (visitedTopicIds.has(parentRow.id)) {
|
||||
continue;
|
||||
@@ -424,97 +539,97 @@ export class IdentityDB {
|
||||
return lineage;
|
||||
}
|
||||
|
||||
async getTopicFacts(name: string): Promise<Fact[]> {
|
||||
const topicRow = await this.getRequiredTopicRow(name);
|
||||
|
||||
async getTopicFacts(name: string, options?: SpaceScopedInput): Promise<Fact[]> {
|
||||
const topicRow = await this.getRequiredTopicRow(name, options?.spaceName);
|
||||
if (!topicRow) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const factRows = await findFactRowsForTopicId(this.connection.db, topicRow.id);
|
||||
return this.hydrateFacts(factRows);
|
||||
const factRows = await findFactRowsForTopicId(this.connection.db, topicRow.space_id, topicRow.id);
|
||||
return this.hydrateFacts(factRows, topicRow.space_id);
|
||||
}
|
||||
|
||||
async getTopicFactsLinkedTo(name: string, linkedTopicName: string): Promise<Fact[]> {
|
||||
return this.findFactsConnectingTopics([name, linkedTopicName]);
|
||||
async getTopicFactsLinkedTo(name: string, linkedTopicName: string, options?: SpaceScopedInput): Promise<Fact[]> {
|
||||
return this.findFactsConnectingTopics([name, linkedTopicName], options);
|
||||
}
|
||||
|
||||
async findFactsConnectingTopics(names: string[]): Promise<Fact[]> {
|
||||
async findFactsConnectingTopics(names: string[], options?: SpaceScopedInput): Promise<Fact[]> {
|
||||
if (names.length === 0) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const topicRows = await Promise.all(names.map((name) => this.getRequiredTopicRow(name)));
|
||||
const space = await this.getSpaceForRead(options?.spaceName);
|
||||
if (options?.spaceName && !space) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const topicRows = await Promise.all(names.map((name) => this.getRequiredTopicRow(name, options?.spaceName)));
|
||||
if (topicRows.some((topicRow) => topicRow === undefined)) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const topicIds = topicRows.map((topicRow) => topicRow!.id);
|
||||
const factRows = await findFactRowsConnectingTopicIds(this.connection.db, topicIds);
|
||||
const spaceId = topicRows[0]!.space_id ?? space?.id;
|
||||
const factRows = await findFactRowsConnectingTopicIds(this.connection.db, spaceId, topicIds);
|
||||
|
||||
return this.hydrateFacts(factRows);
|
||||
return this.hydrateFacts(factRows, spaceId);
|
||||
}
|
||||
|
||||
async getTopicByName(
|
||||
name: string,
|
||||
options: { includeFacts: true },
|
||||
): Promise<TopicWithFacts | null>;
|
||||
async getTopicByName(name: string, options: { includeFacts: true; spaceName?: string }): Promise<TopicWithFacts | null>;
|
||||
async getTopicByName(name: string, options?: TopicLookupOptions): Promise<Topic | null>;
|
||||
async getTopicByName(
|
||||
name: string,
|
||||
options?: TopicLookupOptions,
|
||||
): Promise<Topic | TopicWithFacts | null> {
|
||||
const topicRow = await this.getRequiredTopicRow(name);
|
||||
|
||||
async getTopicByName(name: string, options?: TopicLookupOptions): Promise<Topic | TopicWithFacts | null> {
|
||||
const topicRow = await this.getRequiredTopicRow(name, options?.spaceName);
|
||||
if (!topicRow) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const topic = mapTopicRow(topicRow);
|
||||
|
||||
if (options?.includeFacts) {
|
||||
return {
|
||||
...topic,
|
||||
facts: await this.getTopicFacts(name),
|
||||
facts: await this.getTopicFacts(name, { spaceName: options.spaceName }),
|
||||
};
|
||||
}
|
||||
|
||||
return topic;
|
||||
}
|
||||
|
||||
async listTopics(options: { includeFacts: true; limit?: number }): Promise<TopicWithFacts[]>;
|
||||
async listTopics(options: { includeFacts: true; limit?: number; spaceName?: string }): Promise<TopicWithFacts[]>;
|
||||
async listTopics(options?: ListTopicsOptions): Promise<Topic[]>;
|
||||
async listTopics(
|
||||
options?: ListTopicsOptions,
|
||||
): Promise<Topic[] | TopicWithFacts[]> {
|
||||
const rows = await listTopicRows(this.connection.db, options?.limit);
|
||||
async listTopics(options?: ListTopicsOptions): Promise<Topic[] | TopicWithFacts[]> {
|
||||
const space = await this.getSpaceForRead(options?.spaceName);
|
||||
if (options?.spaceName && !space) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const spaceId = space?.id ?? await this.getDefaultSpaceIdForRead();
|
||||
if (!spaceId) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const rows = await listTopicRows(this.connection.db, spaceId, options?.limit);
|
||||
if (!options?.includeFacts) {
|
||||
return rows.map(mapTopicRow);
|
||||
}
|
||||
|
||||
const topicsWithFacts: TopicWithFacts[] = [];
|
||||
|
||||
for (const row of rows) {
|
||||
topicsWithFacts.push({
|
||||
...mapTopicRow(row),
|
||||
facts: await this.getTopicFacts(row.name),
|
||||
facts: await this.getTopicFacts(row.name, { spaceName: options?.spaceName }),
|
||||
});
|
||||
}
|
||||
|
||||
return topicsWithFacts;
|
||||
}
|
||||
|
||||
async findConnectedTopics(name: string): Promise<ConnectedTopic[]> {
|
||||
const topicRow = await this.getRequiredTopicRow(name);
|
||||
|
||||
async findConnectedTopics(name: string, options?: SpaceScopedInput): Promise<ConnectedTopic[]> {
|
||||
const topicRow = await this.getRequiredTopicRow(name, options?.spaceName);
|
||||
if (!topicRow) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const rows = await findConnectedTopicRows(this.connection.db, topicRow.id);
|
||||
|
||||
const rows = await findConnectedTopicRows(this.connection.db, topicRow.space_id, topicRow.id);
|
||||
return rows.map((row) => ({
|
||||
...mapTopicRow(row),
|
||||
sharedFactCount: row.shared_fact_count,
|
||||
@@ -527,18 +642,25 @@ export class IdentityDB {
|
||||
topicNames?: string[] | undefined;
|
||||
limit?: number | undefined;
|
||||
minimumScore?: number | undefined;
|
||||
spaceId?: string | undefined;
|
||||
}): Promise<ScoredFact[]> {
|
||||
const topicIds = await this.resolveTopicIds(input.topicNames);
|
||||
const effectiveSpaceId = input.spaceId ?? await this.getDefaultSpaceIdForRead();
|
||||
if (!effectiveSpaceId) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const topicIds = await this.resolveTopicIds(input.topicNames, effectiveSpaceId);
|
||||
if (topicIds === null) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const factRows = topicIds.length > 0
|
||||
? await findFactRowsConnectingTopicIds(this.connection.db, topicIds)
|
||||
? await findFactRowsConnectingTopicIds(this.connection.db, effectiveSpaceId, topicIds)
|
||||
: await this.connection.db
|
||||
.selectFrom('facts')
|
||||
.innerJoin('fact_embeddings', 'fact_embeddings.fact_id', 'facts.id')
|
||||
.selectAll('facts')
|
||||
.where('facts.space_id', '=', effectiveSpaceId)
|
||||
.where('fact_embeddings.model', '=', input.providerModel)
|
||||
.orderBy('facts.created_at', 'asc')
|
||||
.execute();
|
||||
@@ -547,14 +669,14 @@ export class IdentityDB {
|
||||
return [];
|
||||
}
|
||||
|
||||
const embeddingRowsQuery = this.connection.db
|
||||
const embeddingRows = await this.connection.db
|
||||
.selectFrom('fact_embeddings')
|
||||
.selectAll()
|
||||
.where('model', '=', input.providerModel);
|
||||
|
||||
const embeddingRows = factRows.length > 0
|
||||
? await embeddingRowsQuery.where('fact_id', 'in', factRows.map((factRow) => factRow.id)).execute()
|
||||
: [];
|
||||
.innerJoin('facts', 'facts.id', 'fact_embeddings.fact_id')
|
||||
.selectAll('fact_embeddings')
|
||||
.where('facts.space_id', '=', effectiveSpaceId)
|
||||
.where('fact_embeddings.model', '=', input.providerModel)
|
||||
.where('fact_embeddings.fact_id', 'in', factRows.map((factRow) => factRow.id))
|
||||
.execute();
|
||||
|
||||
const embeddingsByFactId = new Map(
|
||||
embeddingRows.map((embeddingRow) => [embeddingRow.fact_id, deserializeEmbedding(embeddingRow.embedding)]),
|
||||
@@ -578,7 +700,7 @@ export class IdentityDB {
|
||||
return [];
|
||||
}
|
||||
|
||||
const hydratedFacts = await this.hydrateFacts(scoredRows.map((entry) => entry.factRow));
|
||||
const hydratedFacts = await this.hydrateFacts(scoredRows.map((entry) => entry.factRow), effectiveSpaceId);
|
||||
const factsById = new Map(hydratedFacts.map((fact) => [fact.id, fact]));
|
||||
|
||||
return scoredRows.map((entry) => ({
|
||||
@@ -587,12 +709,12 @@ export class IdentityDB {
|
||||
}));
|
||||
}
|
||||
|
||||
private async resolveTopicIds(topicNames?: string[]): Promise<string[] | null> {
|
||||
private async resolveTopicIds(topicNames: string[] | undefined, spaceId: string): Promise<string[] | null> {
|
||||
if (!topicNames || topicNames.length === 0) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const topicRows = await Promise.all(topicNames.map((topicName) => this.getRequiredTopicRow(topicName)));
|
||||
const topicRows = await Promise.all(topicNames.map((topicName) => this.getRequiredTopicRowInSpaceId(topicName, spaceId)));
|
||||
if (topicRows.some((topicRow) => !topicRow)) {
|
||||
return null;
|
||||
}
|
||||
@@ -637,30 +759,28 @@ export class IdentityDB {
|
||||
}
|
||||
}
|
||||
|
||||
private async upsertTopicInExecutor(
|
||||
executor: DatabaseExecutor,
|
||||
input: UpsertTopicInput,
|
||||
): Promise<Topic> {
|
||||
private async upsertTopicInExecutor(executor: DatabaseExecutor, input: UpsertTopicInput): Promise<Topic> {
|
||||
const normalizedName = normalizeTopicName(input.name);
|
||||
|
||||
if (normalizedName.length === 0) {
|
||||
throw new IdentityDBError('Topic name cannot be empty.');
|
||||
}
|
||||
|
||||
const existing = await findTopicRowByNormalizedName(executor, normalizedName);
|
||||
const space = await this.getOrCreateSpaceInExecutor(executor, input.spaceName);
|
||||
const existing = await findTopicRowByNormalizedName(executor, space.id, normalizedName);
|
||||
const now = nowIsoString();
|
||||
|
||||
if (existing) {
|
||||
return this.updateTopicRowInExecutor(executor, existing, input, now, true);
|
||||
}
|
||||
|
||||
const aliasedTopic = await findTopicRowByNormalizedAlias(executor, normalizedName);
|
||||
const aliasedTopic = await findTopicRowByNormalizedAlias(executor, space.id, normalizedName);
|
||||
if (aliasedTopic) {
|
||||
return this.updateTopicRowInExecutor(executor, aliasedTopic, input, now, false);
|
||||
}
|
||||
|
||||
const createdRow: TopicRecord = {
|
||||
id: createId(),
|
||||
space_id: space.id,
|
||||
name: canonicalizeTopicName(input.name),
|
||||
normalized_name: normalizedName,
|
||||
category: input.category ?? 'custom',
|
||||
@@ -672,7 +792,6 @@ export class IdentityDB {
|
||||
};
|
||||
|
||||
await executor.insertInto('topics').values(createdRow).execute();
|
||||
|
||||
return mapTopicRow(createdRow);
|
||||
}
|
||||
|
||||
@@ -705,22 +824,39 @@ export class IdentityDB {
|
||||
return mapTopicRow(updated);
|
||||
}
|
||||
|
||||
private async getRequiredTopicRow(name: string): Promise<TopicRecord | undefined> {
|
||||
const normalizedName = normalizeTopicName(name);
|
||||
private async getRequiredTopicRow(name: string, spaceName?: string): Promise<TopicRecord | undefined> {
|
||||
const space = await this.getSpaceForRead(spaceName);
|
||||
if (spaceName && !space) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const spaceId = space?.id ?? await this.getDefaultSpaceIdForRead();
|
||||
if (!spaceId) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
return this.getRequiredTopicRowInSpaceId(name, spaceId);
|
||||
}
|
||||
|
||||
private async getRequiredTopicRowInSpaceId(name: string, spaceId: string): Promise<TopicRecord | undefined> {
|
||||
const normalizedName = normalizeTopicName(name);
|
||||
if (normalizedName.length === 0) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
return findTopicRowByNameOrAlias(this.connection.db, normalizedName);
|
||||
return findTopicRowByNameOrAlias(this.connection.db, spaceId, normalizedName);
|
||||
}
|
||||
|
||||
private async hydrateFacts(factRows: FactRecord[]): Promise<Fact[]> {
|
||||
private async hydrateFacts(factRows: FactRecord[], spaceId?: string): Promise<Fact[]> {
|
||||
if (factRows.length === 0) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const effectiveSpaceId = spaceId ?? factRows[0]!.space_id;
|
||||
const factIds = factRows.map((fact) => fact.id);
|
||||
const topicLinks = await findTopicLinksForFactIds(this.connection.db, factIds);
|
||||
const topicLinks = await findTopicLinksForFactIds(this.connection.db, effectiveSpaceId, factIds);
|
||||
|
||||
const topicsByFactId = new Map<string, FactTopic[]>();
|
||||
|
||||
for (const topicLink of topicLinks) {
|
||||
const topics = topicsByFactId.get(topicLink.fact_id) ?? [];
|
||||
topics.push({
|
||||
@@ -733,4 +869,57 @@ export class IdentityDB {
|
||||
|
||||
return factRows.map((factRow) => mapFactRow(factRow, topicsByFactId.get(factRow.id) ?? []));
|
||||
}
|
||||
|
||||
private async getOrCreateSpaceInExecutor(executor: DatabaseExecutor, requestedSpaceName?: string): Promise<SpaceRecord> {
|
||||
const normalizedName = normalizeSpaceName(requestedSpaceName ?? DEFAULT_SPACE_NAME);
|
||||
const canonicalName = canonicalizeSpaceName(requestedSpaceName ?? DEFAULT_SPACE_NAME);
|
||||
const existing = await findSpaceRowByNormalizedName(executor, normalizedName);
|
||||
if (existing) {
|
||||
return existing;
|
||||
}
|
||||
|
||||
const now = nowIsoString();
|
||||
const createdRow: SpaceRecord = {
|
||||
id: createId(),
|
||||
name: canonicalName,
|
||||
normalized_name: normalizedName,
|
||||
description: null,
|
||||
metadata: null,
|
||||
created_at: now,
|
||||
updated_at: now,
|
||||
};
|
||||
|
||||
await executor.insertInto('spaces').values(createdRow).execute();
|
||||
return createdRow;
|
||||
}
|
||||
|
||||
private async getSpaceForRead(spaceName?: string): Promise<SpaceRecord | undefined> {
|
||||
if (!spaceName) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const normalizedName = normalizeSpaceName(spaceName);
|
||||
if (normalizedName.length === 0) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
return findSpaceRowByNormalizedName(this.connection.db, normalizedName);
|
||||
}
|
||||
|
||||
private async getDefaultSpaceIdForRead(): Promise<string | undefined> {
|
||||
const defaultSpace = await findSpaceRowByNormalizedName(this.connection.db, normalizeSpaceName(DEFAULT_SPACE_NAME));
|
||||
return defaultSpace?.id;
|
||||
}
|
||||
|
||||
private assertScopedTopicInput(space: SpaceRecord, topicSpaceName?: string): void {
|
||||
if (!topicSpaceName) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (normalizeSpaceName(topicSpaceName) !== space.normalized_name) {
|
||||
throw new IdentityDBError(
|
||||
`Fact topics cannot point to a different space than the fact itself (${space.name}).`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user