Files
IdentityDB/src/core/identity-db.ts

513 lines
15 KiB
TypeScript

import {
type ConnectedTopic,
type Fact,
type FactTopic,
type ListTopicsOptions,
type Topic,
type TopicLookupOptions,
type TopicWithFacts,
type UpsertTopicInput,
type AddFactInput,
type LinkTopicsInput,
} from '../types/api';
import type { IngestStatementOptions } from '../ingestion/types';
import type { DatabaseConnection, IdentityDBConnectionConfig } from '../adapters/dialect';
import type { IdentityDatabaseSchema } from '../types/database';
import type { FactRecord, TopicRecord } from '../types/domain';
import { createDatabase } from '../adapters/dialect';
import { IdentityDBError } from './errors';
import { initializeSchema } from './migrations';
import {
canonicalizeTopicName,
createId,
mapFactRow,
mapTopicRow,
normalizeTopicName,
nowIsoString,
serializeMetadata,
} from './utils';
import { extractFact } from '../ingestion/extractor';
import {
findFactRowsConnectingTopicIds,
findFactRowsForTopicId,
findTopicLinksForFactIds,
} from '../queries/facts';
import {
findConnectedTopicRows,
findTopicRowByNameOrAlias,
findTopicRowByNormalizedAlias,
findTopicRowByNormalizedName,
listTopicAliasRowsForTopicId,
listTopicRows,
findChildTopicRows,
findParentTopicRows,
type DatabaseExecutor,
} from '../queries/topics';
export class IdentityDB {
private constructor(private readonly connection: DatabaseConnection) {}
static async connect(config: IdentityDBConnectionConfig): Promise<IdentityDB> {
const connection = await createDatabase(config);
return new IdentityDB(connection);
}
async initialize(): Promise<void> {
await initializeSchema(this.connection.db);
}
async close(): Promise<void> {
await this.connection.destroy();
}
async upsertTopic(input: UpsertTopicInput): Promise<Topic> {
return this.upsertTopicInExecutor(this.connection.db, input);
}
async addFact(input: AddFactInput): Promise<Fact> {
if (input.statement.trim().length === 0) {
throw new IdentityDBError('Fact statement cannot be empty.');
}
if (input.topics.length === 0) {
throw new IdentityDBError('A fact must be linked to at least one topic.');
}
return this.connection.db.transaction().execute(async (trx) => {
const createdAt = nowIsoString();
const factId = createId();
await trx
.insertInto('facts')
.values({
id: factId,
statement: input.statement.trim(),
summary: input.summary ?? null,
source: input.source ?? null,
confidence: input.confidence ?? null,
metadata: serializeMetadata(input.metadata),
created_at: createdAt,
updated_at: createdAt,
})
.execute();
const topics: FactTopic[] = [];
for (let index = 0; index < input.topics.length; index += 1) {
const topicInput = input.topics[index]!;
const topic = await this.upsertTopicInExecutor(trx, topicInput);
await trx
.insertInto('fact_topics')
.values({
fact_id: factId,
topic_id: topic.id,
role: topicInput.role ?? null,
position: index,
created_at: createdAt,
})
.execute();
topics.push({
...topic,
role: topicInput.role ?? null,
position: index,
});
}
return {
id: factId,
statement: input.statement.trim(),
summary: input.summary ?? null,
source: input.source ?? null,
confidence: input.confidence ?? null,
metadata: input.metadata ?? null,
createdAt,
updatedAt: createdAt,
topics,
};
});
}
async ingestStatement(
statement: string,
options: IngestStatementOptions,
): Promise<Fact> {
const extracted = await extractFact(statement, options.extractor);
const factInput: AddFactInput = {
statement: extracted.statement ?? statement,
topics: extracted.topics,
};
if (extracted.summary !== undefined) {
factInput.summary = extracted.summary;
}
if (extracted.source !== undefined) {
factInput.source = extracted.source;
}
if (extracted.confidence !== undefined) {
factInput.confidence = extracted.confidence;
}
if (extracted.metadata !== undefined) {
factInput.metadata = extracted.metadata;
}
return this.addFact(factInput);
}
async linkTopics(input: LinkTopicsInput): Promise<void> {
const parentNormalizedName = normalizeTopicName(input.parentName);
const childNormalizedName = normalizeTopicName(input.childName);
if (parentNormalizedName.length === 0 || childNormalizedName.length === 0) {
throw new IdentityDBError('Topic hierarchy links require both a parent and child topic name.');
}
if (parentNormalizedName === childNormalizedName) {
throw new IdentityDBError('A topic cannot be linked as its own parent.');
}
await this.connection.db.transaction().execute(async (trx) => {
const parentTopic = await this.upsertTopicInExecutor(trx, {
name: input.parentName,
granularity: 'abstract',
});
const childTopic = await this.upsertTopicInExecutor(trx, {
name: input.childName,
});
const existing = await trx
.selectFrom('topic_relations')
.select(['parent_topic_id'])
.where('parent_topic_id', '=', parentTopic.id)
.where('child_topic_id', '=', childTopic.id)
.where('relation', '=', 'parent_of')
.executeTakeFirst();
if (!existing) {
await trx
.insertInto('topic_relations')
.values({
parent_topic_id: parentTopic.id,
child_topic_id: childTopic.id,
relation: 'parent_of',
created_at: nowIsoString(),
})
.execute();
}
});
}
async addTopicAlias(canonicalName: string, alias: string): Promise<void> {
const normalizedAlias = normalizeTopicName(alias);
if (normalizedAlias.length === 0) {
throw new IdentityDBError('Topic alias cannot be empty.');
}
await this.connection.db.transaction().execute(async (trx) => {
const canonicalTopic = await this.upsertTopicInExecutor(trx, { name: canonicalName });
if (normalizedAlias === canonicalTopic.normalizedName) {
return;
}
const exactTopicMatch = await findTopicRowByNormalizedName(trx, normalizedAlias);
if (exactTopicMatch && exactTopicMatch.id !== canonicalTopic.id) {
throw new IdentityDBError('Cannot assign an alias that already belongs to another canonical topic.');
}
const aliasMatch = await findTopicRowByNormalizedAlias(trx, normalizedAlias);
if (aliasMatch) {
if (aliasMatch.id !== canonicalTopic.id) {
throw new IdentityDBError('Cannot assign an alias that already resolves to another topic.');
}
return;
}
const createdAt = nowIsoString();
await trx
.insertInto('topic_aliases')
.values({
id: createId(),
topic_id: canonicalTopic.id,
alias: canonicalizeTopicName(alias),
normalized_alias: normalizedAlias,
is_primary: 0,
created_at: createdAt,
updated_at: createdAt,
})
.execute();
});
}
async resolveTopic(name: string): Promise<Topic | null> {
const topicRow = await this.getRequiredTopicRow(name);
return topicRow ? mapTopicRow(topicRow) : null;
}
async getTopicAliases(name: string): Promise<string[]> {
const topicRow = await this.getRequiredTopicRow(name);
if (!topicRow) {
return [];
}
const aliasRows = await listTopicAliasRowsForTopicId(this.connection.db, topicRow.id);
return aliasRows.map((aliasRow) => aliasRow.alias);
}
async getTopicChildren(name: string): Promise<Topic[]> {
const topicRow = await this.getRequiredTopicRow(name);
if (!topicRow) {
return [];
}
const childRows = await findChildTopicRows(this.connection.db, topicRow.id);
return childRows.map(mapTopicRow);
}
async getTopicParents(name: string): Promise<Topic[]> {
const topicRow = await this.getRequiredTopicRow(name);
if (!topicRow) {
return [];
}
const parentRows = await findParentTopicRows(this.connection.db, topicRow.id);
return parentRows.map(mapTopicRow);
}
async getTopicLineage(name: string): Promise<Topic[]> {
const topicRow = await this.getRequiredTopicRow(name);
if (!topicRow) {
return [];
}
const lineage: Topic[] = [];
const visitedTopicIds = new Set<string>([topicRow.id]);
let currentLevelIds = [topicRow.id];
while (currentLevelIds.length > 0) {
const nextLevelIds: string[] = [];
for (const currentId of currentLevelIds) {
const parentRows = await findParentTopicRows(this.connection.db, currentId);
for (const parentRow of parentRows) {
if (visitedTopicIds.has(parentRow.id)) {
continue;
}
visitedTopicIds.add(parentRow.id);
nextLevelIds.push(parentRow.id);
lineage.push(mapTopicRow(parentRow));
}
}
currentLevelIds = nextLevelIds;
}
return lineage;
}
async getTopicFacts(name: string): Promise<Fact[]> {
const topicRow = await this.getRequiredTopicRow(name);
if (!topicRow) {
return [];
}
const factRows = await findFactRowsForTopicId(this.connection.db, topicRow.id);
return this.hydrateFacts(factRows);
}
async getTopicFactsLinkedTo(name: string, linkedTopicName: string): Promise<Fact[]> {
return this.findFactsConnectingTopics([name, linkedTopicName]);
}
async findFactsConnectingTopics(names: string[]): Promise<Fact[]> {
if (names.length === 0) {
return [];
}
const topicRows = await Promise.all(names.map((name) => this.getRequiredTopicRow(name)));
if (topicRows.some((topicRow) => topicRow === undefined)) {
return [];
}
const topicIds = topicRows.map((topicRow) => topicRow!.id);
const factRows = await findFactRowsConnectingTopicIds(this.connection.db, topicIds);
return this.hydrateFacts(factRows);
}
async getTopicByName(
name: string,
options: { includeFacts: true },
): Promise<TopicWithFacts | null>;
async getTopicByName(name: string, options?: TopicLookupOptions): Promise<Topic | null>;
async getTopicByName(
name: string,
options?: TopicLookupOptions,
): Promise<Topic | TopicWithFacts | null> {
const topicRow = await this.getRequiredTopicRow(name);
if (!topicRow) {
return null;
}
const topic = mapTopicRow(topicRow);
if (options?.includeFacts) {
return {
...topic,
facts: await this.getTopicFacts(name),
};
}
return topic;
}
async listTopics(options: { includeFacts: true; limit?: number }): Promise<TopicWithFacts[]>;
async listTopics(options?: ListTopicsOptions): Promise<Topic[]>;
async listTopics(
options?: ListTopicsOptions,
): Promise<Topic[] | TopicWithFacts[]> {
const rows = await listTopicRows(this.connection.db, options?.limit);
if (!options?.includeFacts) {
return rows.map(mapTopicRow);
}
const topicsWithFacts: TopicWithFacts[] = [];
for (const row of rows) {
topicsWithFacts.push({
...mapTopicRow(row),
facts: await this.getTopicFacts(row.name),
});
}
return topicsWithFacts;
}
async findConnectedTopics(name: string): Promise<ConnectedTopic[]> {
const topicRow = await this.getRequiredTopicRow(name);
if (!topicRow) {
return [];
}
const rows = await findConnectedTopicRows(this.connection.db, topicRow.id);
return rows.map((row) => ({
...mapTopicRow(row),
sharedFactCount: row.shared_fact_count,
}));
}
private async upsertTopicInExecutor(
executor: DatabaseExecutor,
input: UpsertTopicInput,
): Promise<Topic> {
const normalizedName = normalizeTopicName(input.name);
if (normalizedName.length === 0) {
throw new IdentityDBError('Topic name cannot be empty.');
}
const existing = await findTopicRowByNormalizedName(executor, normalizedName);
const now = nowIsoString();
if (existing) {
return this.updateTopicRowInExecutor(executor, existing, input, now, true);
}
const aliasedTopic = await findTopicRowByNormalizedAlias(executor, normalizedName);
if (aliasedTopic) {
return this.updateTopicRowInExecutor(executor, aliasedTopic, input, now, false);
}
const createdRow: TopicRecord = {
id: createId(),
name: canonicalizeTopicName(input.name),
normalized_name: normalizedName,
category: input.category ?? 'custom',
granularity: input.granularity ?? 'mixed',
description: input.description ?? null,
metadata: serializeMetadata(input.metadata),
created_at: now,
updated_at: now,
};
await executor.insertInto('topics').values(createdRow).execute();
return mapTopicRow(createdRow);
}
private async updateTopicRowInExecutor(
executor: DatabaseExecutor,
existing: TopicRecord,
input: UpsertTopicInput,
now: string,
shouldRename: boolean,
): Promise<Topic> {
await executor
.updateTable('topics')
.set({
name: shouldRename ? canonicalizeTopicName(input.name) : existing.name,
category: input.category ?? existing.category,
granularity: input.granularity ?? existing.granularity,
description: input.description !== undefined ? input.description : existing.description,
metadata: input.metadata !== undefined ? serializeMetadata(input.metadata) : existing.metadata,
updated_at: now,
})
.where('id', '=', existing.id)
.execute();
const updated = await executor
.selectFrom('topics')
.selectAll()
.where('id', '=', existing.id)
.executeTakeFirstOrThrow();
return mapTopicRow(updated);
}
private async getRequiredTopicRow(name: string): Promise<TopicRecord | undefined> {
const normalizedName = normalizeTopicName(name);
if (normalizedName.length === 0) {
return undefined;
}
return findTopicRowByNameOrAlias(this.connection.db, normalizedName);
}
private async hydrateFacts(factRows: FactRecord[]): Promise<Fact[]> {
const factIds = factRows.map((fact) => fact.id);
const topicLinks = await findTopicLinksForFactIds(this.connection.db, factIds);
const topicsByFactId = new Map<string, FactTopic[]>();
for (const topicLink of topicLinks) {
const topics = topicsByFactId.get(topicLink.fact_id) ?? [];
topics.push({
...mapTopicRow(topicLink),
role: topicLink.role,
position: topicLink.position,
});
topicsByFactId.set(topicLink.fact_id, topics);
}
return factRows.map((factRow) => mapFactRow(factRow, topicsByFactId.get(factRow.id) ?? []));
}
}