diff --git a/src/core/identity-db.ts b/src/core/identity-db.ts new file mode 100644 index 0000000..560ee54 --- /dev/null +++ b/src/core/identity-db.ts @@ -0,0 +1,298 @@ +import { + type ConnectedTopic, + type Fact, + type FactTopic, + type ListTopicsOptions, + type Topic, + type TopicLookupOptions, + type TopicWithFacts, + type UpsertTopicInput, + type AddFactInput, +} from '../types/api'; +import type { DatabaseConnection, IdentityDBConnectionConfig } from '../adapters/dialect'; +import type { IdentityDatabaseSchema } from '../types/database'; +import type { FactRecord, TopicRecord } from '../types/domain'; +import { createDatabase } from '../adapters/dialect'; +import { IdentityDBError } from './errors'; +import { initializeSchema } from './migrations'; +import { + canonicalizeTopicName, + createId, + mapFactRow, + mapTopicRow, + normalizeTopicName, + nowIsoString, + serializeMetadata, +} from './utils'; +import { + findFactRowsConnectingTopicIds, + findFactRowsForTopicId, + findTopicLinksForFactIds, +} from '../queries/facts'; +import { + findConnectedTopicRows, + findTopicRowByNormalizedName, + listTopicRows, + type DatabaseExecutor, +} from '../queries/topics'; + +export class IdentityDB { + private constructor(private readonly connection: DatabaseConnection) {} + + static async connect(config: IdentityDBConnectionConfig): Promise { + const connection = await createDatabase(config); + return new IdentityDB(connection); + } + + async initialize(): Promise { + await initializeSchema(this.connection.db); + } + + async close(): Promise { + await this.connection.destroy(); + } + + async upsertTopic(input: UpsertTopicInput): Promise { + return this.upsertTopicInExecutor(this.connection.db, input); + } + + async addFact(input: AddFactInput): Promise { + if (input.statement.trim().length === 0) { + throw new IdentityDBError('Fact statement cannot be empty.'); + } + + if (input.topics.length === 0) { + throw new IdentityDBError('A fact must be linked to at least one topic.'); + } + + return this.connection.db.transaction().execute(async (trx) => { + const createdAt = nowIsoString(); + const factId = createId(); + + await trx + .insertInto('facts') + .values({ + id: factId, + statement: input.statement.trim(), + summary: input.summary ?? null, + source: input.source ?? null, + confidence: input.confidence ?? null, + metadata: serializeMetadata(input.metadata), + created_at: createdAt, + updated_at: createdAt, + }) + .execute(); + + const topics: FactTopic[] = []; + + for (const [index, topicInput] of input.topics.entries()) { + const topic = await this.upsertTopicInExecutor(trx, topicInput); + + await trx + .insertInto('fact_topics') + .values({ + fact_id: factId, + topic_id: topic.id, + role: topicInput.role ?? null, + position: index, + created_at: createdAt, + }) + .execute(); + + topics.push({ + ...topic, + role: topicInput.role ?? null, + position: index, + }); + } + + return { + id: factId, + statement: input.statement.trim(), + summary: input.summary ?? null, + source: input.source ?? null, + confidence: input.confidence ?? null, + metadata: input.metadata ?? null, + createdAt, + updatedAt: createdAt, + topics, + }; + }); + } + + async getTopicFacts(name: string): Promise { + const topicRow = await this.getRequiredTopicRow(name); + + if (!topicRow) { + return []; + } + + const factRows = await findFactRowsForTopicId(this.connection.db, topicRow.id); + return this.hydrateFacts(factRows); + } + + async getTopicFactsLinkedTo(name: string, linkedTopicName: string): Promise { + return this.findFactsConnectingTopics([name, linkedTopicName]); + } + + async findFactsConnectingTopics(names: string[]): Promise { + if (names.length === 0) { + return []; + } + + const topicRows = await Promise.all(names.map((name) => this.getRequiredTopicRow(name))); + + if (topicRows.some((topicRow) => topicRow === undefined)) { + return []; + } + + const topicIds = topicRows.map((topicRow) => topicRow!.id); + const factRows = await findFactRowsConnectingTopicIds(this.connection.db, topicIds); + + return this.hydrateFacts(factRows); + } + + async getTopicByName( + name: string, + options: { includeFacts: true }, + ): Promise; + async getTopicByName(name: string, options?: TopicLookupOptions): Promise; + async getTopicByName( + name: string, + options?: TopicLookupOptions, + ): Promise { + const topicRow = await this.getRequiredTopicRow(name); + + if (!topicRow) { + return null; + } + + const topic = mapTopicRow(topicRow); + + if (options?.includeFacts) { + return { + ...topic, + facts: await this.getTopicFacts(name), + }; + } + + return topic; + } + + async listTopics(options: { includeFacts: true; limit?: number }): Promise; + async listTopics(options?: ListTopicsOptions): Promise; + async listTopics( + options?: ListTopicsOptions, + ): Promise { + const rows = await listTopicRows(this.connection.db, options?.limit); + + if (!options?.includeFacts) { + return rows.map(mapTopicRow); + } + + const topicsWithFacts: TopicWithFacts[] = []; + + for (const row of rows) { + topicsWithFacts.push({ + ...mapTopicRow(row), + facts: await this.getTopicFacts(row.name), + }); + } + + return topicsWithFacts; + } + + async findConnectedTopics(name: string): Promise { + const topicRow = await this.getRequiredTopicRow(name); + + if (!topicRow) { + return []; + } + + const rows = await findConnectedTopicRows(this.connection.db, topicRow.id); + + return rows.map((row) => ({ + ...mapTopicRow(row), + sharedFactCount: row.shared_fact_count, + })); + } + + private async upsertTopicInExecutor( + executor: DatabaseExecutor, + input: UpsertTopicInput, + ): Promise { + const normalizedName = normalizeTopicName(input.name); + + if (normalizedName.length === 0) { + throw new IdentityDBError('Topic name cannot be empty.'); + } + + const existing = await findTopicRowByNormalizedName(executor, normalizedName); + const now = nowIsoString(); + + if (existing) { + await executor + .updateTable('topics') + .set({ + name: canonicalizeTopicName(input.name), + category: input.category ?? existing.category, + granularity: input.granularity ?? existing.granularity, + description: input.description !== undefined ? input.description : existing.description, + metadata: + input.metadata !== undefined + ? serializeMetadata(input.metadata) + : existing.metadata, + updated_at: now, + }) + .where('id', '=', existing.id) + .execute(); + + const updated = await executor + .selectFrom('topics') + .selectAll() + .where('id', '=', existing.id) + .executeTakeFirstOrThrow(); + + return mapTopicRow(updated); + } + + const createdRow: TopicRecord = { + id: createId(), + name: canonicalizeTopicName(input.name), + normalized_name: normalizedName, + category: input.category ?? 'custom', + granularity: input.granularity ?? 'mixed', + description: input.description ?? null, + metadata: serializeMetadata(input.metadata), + created_at: now, + updated_at: now, + }; + + await executor.insertInto('topics').values(createdRow).execute(); + + return mapTopicRow(createdRow); + } + + private async getRequiredTopicRow(name: string): Promise { + return findTopicRowByNormalizedName(this.connection.db, normalizeTopicName(name)); + } + + private async hydrateFacts(factRows: FactRecord[]): Promise { + const factIds = factRows.map((fact) => fact.id); + const topicLinks = await findTopicLinksForFactIds(this.connection.db, factIds); + + const topicsByFactId = new Map(); + + for (const topicLink of topicLinks) { + const topics = topicsByFactId.get(topicLink.fact_id) ?? []; + topics.push({ + ...mapTopicRow(topicLink), + role: topicLink.role, + position: topicLink.position, + }); + topicsByFactId.set(topicLink.fact_id, topics); + } + + return factRows.map((factRow) => mapFactRow(factRow, topicsByFactId.get(factRow.id) ?? [])); + } +} diff --git a/src/core/utils.ts b/src/core/utils.ts new file mode 100644 index 0000000..022f139 --- /dev/null +++ b/src/core/utils.ts @@ -0,0 +1,64 @@ +import { randomUUID } from 'node:crypto'; + +import type { Fact, FactTopic, Topic } from '../types/api'; +import type { FactRecord, TopicRecord } from '../types/domain'; + +export function normalizeTopicName(name: string): string { + return name.trim().replace(/\s+/g, ' ').toLowerCase(); +} + +export function canonicalizeTopicName(name: string): string { + return name.trim().replace(/\s+/g, ' '); +} + +export function nowIsoString(): string { + return new Date().toISOString(); +} + +export function createId(): string { + return randomUUID(); +} + +export function serializeMetadata(metadata: unknown): string | null { + if (metadata === undefined || metadata === null) { + return null; + } + + return JSON.stringify(metadata); +} + +export function deserializeMetadata(metadata: string | null): unknown | null { + if (metadata === null) { + return null; + } + + return JSON.parse(metadata); +} + +export function mapTopicRow(record: TopicRecord): Topic { + return { + id: record.id, + name: record.name, + normalizedName: record.normalized_name, + category: record.category, + granularity: record.granularity, + description: record.description, + metadata: deserializeMetadata(record.metadata) as Topic['metadata'], + createdAt: record.created_at, + updatedAt: record.updated_at, + }; +} + +export function mapFactRow(record: FactRecord, topics: FactTopic[]): Fact { + return { + id: record.id, + statement: record.statement, + summary: record.summary, + source: record.source, + confidence: record.confidence, + metadata: deserializeMetadata(record.metadata) as Fact['metadata'], + createdAt: record.created_at, + updatedAt: record.updated_at, + topics, + }; +} diff --git a/src/index.ts b/src/index.ts index cb0ff5c..86f9b1b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1 +1,6 @@ -export {}; +export * from './adapters'; +export * from './core/identity-db'; +export * from './core/migrations'; +export * from './types/api'; +export * from './types/database'; +export * from './types/domain'; diff --git a/src/queries/facts.ts b/src/queries/facts.ts new file mode 100644 index 0000000..d516b27 --- /dev/null +++ b/src/queries/facts.ts @@ -0,0 +1,66 @@ +import type { Kysely, Transaction } from 'kysely'; + +import type { IdentityDatabaseSchema } from '../types/database'; +import type { FactRecord, TopicRecord } from '../types/domain'; + +export type DatabaseExecutor = Kysely | Transaction; + +export interface FactTopicJoinRow extends TopicRecord { + fact_id: string; + role: string | null; + position: number; +} + +export async function findFactRowsForTopicId( + executor: DatabaseExecutor, + topicId: string, +): Promise { + return executor + .selectFrom('facts') + .innerJoin('fact_topics', 'fact_topics.fact_id', 'facts.id') + .selectAll('facts') + .where('fact_topics.topic_id', '=', topicId) + .orderBy('facts.created_at', 'asc') + .execute(); +} + +export async function findFactRowsConnectingTopicIds( + executor: DatabaseExecutor, + topicIds: string[], +): Promise { + if (topicIds.length === 0) { + return []; + } + + return executor + .selectFrom('facts') + .innerJoin('fact_topics', 'fact_topics.fact_id', 'facts.id') + .selectAll('facts') + .where('fact_topics.topic_id', 'in', topicIds) + .groupBy('facts.id') + .having((eb) => eb.fn.count('fact_topics.topic_id'), '=', topicIds.length) + .orderBy('facts.created_at', 'asc') + .execute(); +} + +export async function findTopicLinksForFactIds( + executor: DatabaseExecutor, + factIds: string[], +): Promise { + if (factIds.length === 0) { + return []; + } + + return executor + .selectFrom('fact_topics') + .innerJoin('topics', 'topics.id', 'fact_topics.topic_id') + .selectAll('topics') + .select([ + 'fact_topics.fact_id as fact_id', + 'fact_topics.role as role', + 'fact_topics.position as position', + ]) + .where('fact_topics.fact_id', 'in', factIds) + .orderBy('fact_topics.position', 'asc') + .execute() as Promise; +} diff --git a/src/queries/topics.ts b/src/queries/topics.ts new file mode 100644 index 0000000..fb71093 --- /dev/null +++ b/src/queries/topics.ts @@ -0,0 +1,55 @@ +import type { Kysely, Transaction } from 'kysely'; + +import type { IdentityDatabaseSchema } from '../types/database'; +import type { TopicRecord } from '../types/domain'; + +export type DatabaseExecutor = Kysely | Transaction; + +export interface ConnectedTopicRow extends TopicRecord { + shared_fact_count: number; +} + +export async function findTopicRowByNormalizedName( + executor: DatabaseExecutor, + normalizedName: string, +): Promise { + return executor + .selectFrom('topics') + .selectAll() + .where('normalized_name', '=', normalizedName) + .executeTakeFirst(); +} + +export async function listTopicRows( + executor: DatabaseExecutor, + limit?: number, +): Promise { + let query = executor + .selectFrom('topics') + .selectAll() + .orderBy('normalized_name', 'asc'); + + if (limit !== undefined) { + query = query.limit(limit); + } + + return query.execute(); +} + +export async function findConnectedTopicRows( + executor: DatabaseExecutor, + topicId: string, +): Promise { + return executor + .selectFrom('fact_topics as source_link') + .innerJoin('fact_topics as related_link', 'related_link.fact_id', 'source_link.fact_id') + .innerJoin('topics', 'topics.id', 'related_link.topic_id') + .selectAll('topics') + .select((eb) => eb.fn.count('related_link.fact_id').as('shared_fact_count')) + .where('source_link.topic_id', '=', topicId) + .whereRef('related_link.topic_id', '!=', 'source_link.topic_id') + .groupBy('topics.id') + .orderBy('shared_fact_count', 'desc') + .orderBy('topics.normalized_name', 'asc') + .execute() as Promise; +} diff --git a/src/types/api.ts b/src/types/api.ts index 06ea540..10c1346 100644 --- a/src/types/api.ts +++ b/src/types/api.ts @@ -20,3 +20,49 @@ export interface AddFactInput { metadata?: JsonValue | null; topics: TopicLinkInput[]; } + +export interface Topic { + id: string; + name: string; + normalizedName: string; + category: TopicCategory; + granularity: TopicGranularity; + description: string | null; + metadata: JsonValue | null; + createdAt: string; + updatedAt: string; +} + +export interface FactTopic extends Topic { + role: string | null; + position: number; +} + +export interface Fact { + id: string; + statement: string; + summary: string | null; + source: string | null; + confidence: number | null; + metadata: JsonValue | null; + createdAt: string; + updatedAt: string; + topics: FactTopic[]; +} + +export interface TopicWithFacts extends Topic { + facts: Fact[]; +} + +export interface ConnectedTopic extends Topic { + sharedFactCount: number; +} + +export interface TopicLookupOptions { + includeFacts?: boolean; +} + +export interface ListTopicsOptions { + includeFacts?: boolean; + limit?: number; +}