Engram Client API Reference¶
This document provides a comprehensive, OSS-grade reference for the engram.Engram asynchronous client.
[!NOTE] All methods (except context managers) are asynchronous and must be awaited. The code snippets assume you have instantiated an
engramclient.
1. Client Lifecycle¶
Engram()¶
Engram(
settings: EngramSettings | None = None,
*,
database_url: str | None = None,
openai_api_key: str | None = None,
memory_policy: str | MemoryPolicy | None = None,
)
connect(). Use memory_policy to configure how facts are typed and slotted (valid strings: "default", "legal", "coding_agent").
connect()¶
Establishes the database connection pool and initializes configured providers.
close()¶
Safely tears down the database connection pool and releases provider resources.
2. Memory Operations¶
add()¶
await engram.add(
content: str,
agent_id: str,
*,
main_content: str | None = None,
user_id: str | None = None,
session_id: str | None = None,
memory_type: str = "semantic",
metadata: dict | None = None,
) -> Memory
content is the exact fact to embed and index (mapped to .fact internally). main_content is used for storing the raw context the fact was extracted from, but it is not embedded for search.
add_batch()¶
Efficiently adds multiple memories in a single transaction with batch-vectorization. The memories list should contain dictionaries with content, agent_id, and other optional arguments corresponding to add().
add_conversation()¶
await engram.add_conversation(
user_message: str,
assistant_response: str,
agent_id: str,
*,
user_id: str | None = None,
session_id: str | None = None,
conversation_history: list[dict[str, str]] | None = None,
conversation_summary: str | None = None,
metadata: dict | None = None,
search_limit: int = 10,
update_summary: bool = True,
extract_assistant_response: bool = False,
) -> list[Memory]
extract_assistant_response=True only if you want the assistant's own statements converted into stored memories.
Use as the sole writer to a memory space.
add_conversation()searches the agent's existing memories to decide ADD/UPDATE/DELETE. If you also store raw turns in the same space withadd_batch(), the freshly-extracted fact is found already present verbatim in its own raw row, so the decision step judges it identical and NOOPs it — supersession never fires. Either letadd_conversation()own a memory space, or keep the two writers in separate namespaces. (Theexamples/chatbot.pyexample deliberately usesadd_batch()only for this reason.)
3. Memory Helpers & Lineage¶
get()¶
Fetches a single memory by ID and updates its internal access count.
update()¶
await engram.update(
memory_id: str,
*,
content: str | None = None,
importance: float | None = None,
metadata: dict | None = None
) -> Memory
revise()¶
await engram.revise(
memory_id: str,
*,
content: str | None = None,
importance: float | None = None,
metadata: dict | None = None,
reason: str | None = None
) -> Memory
get_current()¶
Fetches the active head of a memory's lineage. If passed an old (superseded) ID, it resolves forward to the current version.
get_lineage()¶
Retrieves the full version history (all updates and supersessions) of a specific memory.
explain_memory()¶
Returns an LLM-generated explanation of a memory's intent and context based on its graph relations.
reinforce()¶
Increases the importance score of a memory, typically used when a recalled memory was proven highly useful.
forget()¶
Soft-deletes a memory from the active search index. Returns True if successfully deleted.
purge()¶
Hard-deletes all memories belonging to an agent (and optionally a specific user). Returns the number of deleted records.
list_recent()¶
await engram.list_recent(agent_id: str, user_id: str | None = None, limit: int = 10) -> list[Memory]
get_history()¶
await engram.get_history(
agent_id: str,
*,
user_id: str | None = None,
limit: int = 50,
include_superseded: bool = True,
memory_types: list[str] | None = None,
since: datetime | None = None,
until: datetime | None = None
) -> list[MemoryHistoryEvent]
get_memories()¶
await engram.get_memories(
agent_id: str,
*,
user_id: str | None = None,
session_id: str | None = None,
metadata_filter: dict | None = None,
memory_types: list[str] | None = None,
limit: int = 200
) -> list[Memory]
4. Search and Recall¶
search()¶
await engram.search(
query: str,
agent_id: str,
*,
user_id: str | None = None,
limit: int = 10,
min_score: float = 0.0,
metadata_filter: dict | None = None,
memory_types: list[str] | None = None,
mode: str = "hybrid",
rerank: bool = False,
include_superseded: bool = False,
) -> list[SearchResult]
rerank=True activates local cross-encoder re-sorting (requires the sentence-transformers dependency).
deep_search()¶
await engram.deep_search(
query: str,
agent_id: str,
*,
user_id: str | None = None,
limit: int = 10,
min_score: float = 0.0,
metadata_filter: dict | None = None,
memory_types: list[str] | None = None,
mode: str = "hybrid",
n_queries: int = 4,
rerank: bool = False,
) -> list[SearchResult]
n_queries permutations, runs them concurrently, and fuses the results using Reciprocal Rank Fusion (RRF).
recall()¶
await engram.recall(
question: str,
agent_id: str,
*,
user_id: str | None = None,
question_date: datetime | None = None,
limit: int = 10,
compose_answer: bool = True,
) -> RecallAnswer
compose_answer=False to skip the final LLM composition.
recall_critical()¶
await engram.recall_critical(
agent_id: str,
*,
user_id: str | None = None,
limit: int = 50,
memory_types: list[str] | None = None
) -> list[Memory]
trace_recall()¶
await engram.trace_recall(
query: str,
agent_id: str,
*,
user_id: str | None = None,
limit: int = 20,
min_score: float = 0.0,
max_tokens: int = 2000,
expected_terms: list[str] | None = None,
use_deep_search: bool = True,
memory_types: list[str] | None = None,
token_counter: Callable | None = None
) -> RecallTrace
expected_terms and truncation flags.
get_context_block()¶
await engram.get_context_block(
query: str,
agent_id: str,
*,
user_id: str | None = None,
session_id: str | None = None,
limit: int = 10,
min_score: float = 0.0,
max_tokens: int | None = None,
header: str = "## Relevant memories",
token_counter: Callable | None = None,
memory_types: list[str] | None = None,
group_by_type: bool = False,
rerank: bool = False
) -> str
session_id is provided and has a rolling summary, the summary is prepended.
5. Graph API¶
relate()¶
await engram.relate(
source_id: str,
target_id: str,
relation_type: str = "related_to",
weight: float = 1.0,
metadata: dict | None = None
) -> None
related_to, supports, causes.
traverse()¶
await engram.traverse(
start_memory_id: str,
max_depth: int = 3,
direction: str = "outbound",
relation_types: list[str] | None = None,
min_weight: float = 0.0,
limit: int = 50
) -> list[TraversalResult]
traverse_many()¶
await engram.traverse_many(
start_memory_ids: list[str],
*,
max_depth: int = 2,
direction: str = "any",
relation_types: list[str] | None = None,
min_weight: float = 0.0,
limit_per_seed: int = 25,
total_limit: int = 100,
skip_missing: bool = True
) -> list[TraversalResult]
render_graph_context()¶
engram.render_graph_context(
results: list[TraversalResult],
*,
max_tokens: int | None = None,
token_counter: Callable | None = None,
include_paths: bool = False,
header: str = "## Related memory graph"
) -> str
6. Sessions¶
session()¶
async with engram.session(
agent_id: str,
user_id: str | None = None,
metadata: dict | None = None
) -> Session
7. Task Memory¶
start_task()¶
await engram.start_task(
goal: str,
agent_id: str,
*,
user_id: str | None = None,
session_id: str | None = None,
metadata: dict | None = None
) -> TaskRun
get_task() / list_tasks()¶
await engram.get_task(task_run_id: str, *, include_deleted: bool = False) -> TaskRun
await engram.list_tasks(*, agent_id: str | None = None, user_id: str | None = None, status: str | None = None, limit: int = 100, include_deleted: bool = False) -> list[TaskRun]
State Transitions¶
await engram.pause_task(task_run_id: str, *, outcome: str | None = None) -> TaskRun
await engram.complete_task(task_run_id: str, *, outcome: str | None = None) -> TaskRun
await engram.fail_task(task_run_id: str, *, outcome: str | None = None) -> TaskRun
await engram.cancel_task(task_run_id: str, *, outcome: str | None = None) -> TaskRun
await engram.soft_delete_task(task_run_id: str) -> TaskRun
record_event()¶
await engram.record_event(
*,
agent_id: str,
role: str,
event_type: str,
content: str = "",
task_run_id: str | None = None,
session_id: str | None = None,
user_id: str | None = None,
payload: dict | None = None,
metadata: dict | None = None
) -> AgentEvent
record_turn()¶
await engram.record_turn(
task_run_id: str,
user_message: str,
assistant_response: str,
*,
agent_id: str | None = None,
user_id: str | None = None,
session_id: str | None = None,
tool_calls: list[dict] | None = None,
artifacts: list[dict] | None = None,
metadata: dict | None = None,
enqueue_processing: bool = True
) -> list[AgentEvent]
search_events()¶
await engram.search_events(
query: str,
*,
agent_id: str | None = None,
task_run_id: str | None = None,
session_id: str | None = None,
user_id: str | None = None,
event_types: list[str] | None = None,
roles: list[str] | None = None,
since: datetime | None = None,
until: datetime | None = None,
limit: int = 50,
include_deleted: bool = False,
mode: str = "hybrid"
) -> list[AgentEvent]
create_checkpoint()¶
await engram.create_checkpoint(
task_run_id: str,
summary: str,
*,
completed_steps: list[str] | None = None,
pending_steps: list[str] | None = None,
decisions: list[str] | None = None,
blockers: list[str] | None = None,
artifacts: list[dict] | None = None,
source_event_ids: list[str] | None = None,
metadata: dict | None = None
) -> TaskCheckpoint
build_context()¶
await engram.build_context(
task_run_id: str,
*,
query: str = "",
max_tokens: int = 200000,
token_counter: Callable | None = None,
recent_event_limit: int = 40,
memory_limit: int = 25,
checkpoint_limit: int = 3,
include_graph: bool = True
) -> ContextBuildResult
8. Long Input Processing¶
record_long_input()¶
await engram.record_long_input(
task_run_id: str,
text: str,
*,
title: str | None = None,
agent_id: str | None = None,
user_id: str | None = None,
session_id: str | None = None,
metadata: dict | None = None,
max_chunk_tokens: int = 700,
extract_with_llm: bool = True,
max_facts_per_chunk: int = 6
) -> LongInputIngestionReport
build_long_input_context()¶
await engram.build_long_input_context(
task_run_id: str,
*,
query: str,
max_tokens: int = 4000,
source_chunk_limit: int = 6,
expected_terms: list[str] | None = None,
token_counter: Callable | None = None
) -> LongInputContextResult
9. Background Jobs¶
process_memory_jobs()¶
Manually processes a batch of deferred derivation jobs queued by operations like record_turn.
run_memory_worker()¶
await engram.run_memory_worker(
*,
batch_size: int = 10,
interval_seconds: float = 1.0,
stop_event: asyncio.Event | None = None,
max_iterations: int | None = None
) -> int
asyncio.create_task() daemon.