Extending
AgentGraph can be extended with custom connectors for your own tools and integrations. This page includes the BaseConnector interface, hook signatures, and an example implementation.
#Why extend AgentGraph
AgentGraph is designed to be extended with new connectors.
- Build connectors for internal tools, private APIs, or niche SaaS products that are specific to your team.
- Keep your own integration logic outside the core package by shipping it as a separate connector package.
- Reuse the same fetch, poll, auth, and graph-upsert model that the built-in Slack, Discord, Google Docs, Drive, Sheets, Gmail, and RSS connectors use.
#Included connectors
| Source | Entities | Auth | Refresh model |
|---|---|---|---|
| Slack | Channel, Message | Browser-derived cookie credentials | Browser dwell plus 5 minute polling |
| Discord | Channel, Message | Bot token | Browser dwell plus 5 minute polling |
| Google Docs | Document | Google OAuth | Browser dwell plus Drive-backed refresh |
| Google Sheets | Spreadsheet | Google OAuth | Browser dwell plus Drive-backed refresh |
| Google Drive | Folder, Document | Google OAuth | Browser dwell for folders and files, plus Drive changes polling |
| Gmail | Thread | Google OAuth | Browser dwell plus background poll and ingest |
| RSS | Folder, Document | Feed URLs | Background poll and ingest; validated feed add and OPML import |
#Connector behavior
- Browser dwell: supported URLs trigger targeted fetches after the configured dwell threshold.
- Polling: connectors that support it store cursor state and fetch changes on a schedule.
- Ingest: some connectors expose a broader one-shot historical ingest beyond poll behavior.
- Download metadata: file-backed entities can expose
metadata.download_urlandmetadata.mime_typewhen an agent needs source bytes.
#Connector interface
BaseConnector is the author-facing contract for custom integrations. A connector subclass declares its identity and URL ownership, returns graph-shaped batches from fetch(), and can optionally participate in auth, background polling, and historical ingest.
#Authoring a connector
A connector is a Python package that subclasses BaseConnector, implements the required fetch path, and registers itself through the agentgraph.connectors entry point group. This is the extension point you use to teach AgentGraph about your own systems.
#Base contract
The required shape is small: identify which URLs the connector owns, implement fetch(), and optionally implement polling, ingest, auth, and user identity hooks. The signatures below match agentgraph.connectors.base.BaseConnector.
sourceis the stable connector identifier used by the CLI, MCP server, and registry.url_patternsdeclares which browser URLs the connector should claim for dwell-based fetches.fetch_policycontrols when a targeted fetch should be skipped because a resource is still fresh.can_handle(self, url) -> boolis required and should confirm whether a specific URL belongs to the connector.fetch(self, resource_type, resource_id, meta=None, account_id=None) -> EntityBatchis the required runtime fetch hook.normalise_fetch_id(self, resource_id, entity_type) -> tuple[str, ResourceType]lets a connector translate stored IDs into fetchable IDs when they differ.poll_interval,poll_delegates,poll_account_ids(),poll(), andingest()are the optional background refresh hooks.run_auth_flow(),list_accounts(),get_authenticated_user(),verify_auth(),current_user_id(), andcurrent_user_ids()are the auth and operator-facing hooks.
The contract is intentionally generic: core AgentGraph code calls these hooks without knowing platform-specific field names or APIs.
#Interface reference
| Member | Required | Purpose | | --- | --- | --- | | source: ClassVar[str] | Yes | Stable connector key used everywhere the platform is identified. | | fetch_policy: ClassVar[FetchPolicy] | Yes | Declares how long fetched resources stay fresh before an incremental refresh is needed. | | url_patterns: ClassVar[list[str]] | No | URL match patterns that declare which browser URLs this connector can observe. | | can_handle(self, url: str) -> bool | Yes | Fine-grained URL matcher used after url_patterns to decide whether the connector owns a specific URL. | | fetch(...) -> EntityBatch | Yes | Fetch one resource and return the entities, people, and edges needed to represent it in the graph. | | normalise_fetch_id(...) -> tuple[str, ResourceType] | No | Override when stored entity IDs differ from the IDs required by the upstream fetch path. | | poll_interval: ClassVar[timedelta \| None] | No | Enables scheduled background polling when set to a duration. | | poll(self, cursor, account_id=None) -> tuple[EntityBatch, dict[str, Any]] | No | Incremental background refresh hook; return both the batch and the next cursor. | | ingest(self, account_id=None) -> EntityBatch | No | One-shot historical backfill hook for loading data beyond normal polling. | | run_auth_flow(cls, account_id=None, add=False) -> None | No | Interactive auth entry point used by agentgraph auth and onboarding flows. | | list_accounts(cls) -> list[ConnectorAccount] | No | Enumerates authenticated accounts when a connector supports multiple identities. | | verify_auth(cls, account_id=None) -> tuple[str, str | None] | No | Reports whether stored credentials are ok, missing, or invalid. | | current_user_id(cls) -> str | None | No | Returns the canonical Person identifier for --mine filtering. |
FetchPolicy.decide(last_synced_at) returns one of three states:
FIRST_VISIT: the resource has never been synced, so do a full fetch.INCREMENTAL: the resource was synced before but is now stale, so fetch changes since the last sync.FRESH: the resource was synced recently enough that external fetch work can be skipped.
#Example implementation
This example shows one connector implementing the full contract, including auth, targeted fetch, polling, and historical ingest.
from __future__ import annotations
from datetime import timedelta
from typing import Any
import httpx
from agentgraph.connectors.base import (
BaseConnector,
EntityBatch,
EntityRecord,
FetchPolicy,
PersonRecord,
ResourceType,
)
from agentgraph.graph.upsert import upsert_batch
class ExampleConnector(BaseConnector):
source = "example"
fetch_policy = FetchPolicy(stale_after_seconds=15 * 60)
poll_interval: timedelta | None = timedelta(minutes=10) # type: ignore[assignment]
url_patterns = ["https://app.example.com/*"]
auth_label = "example"
auth_description = "Example platform resources"
onboard_prompt = "Set up Example?"
@classmethod
def run_auth_flow(cls, account_id: str | None = None, add: bool = False) -> None:
# Launch the interactive auth flow for `agentgraph auth example`
# and `agentgraph onboard`.
from agentgraph_connector_example.auth import run_oauth_flow
run_oauth_flow()
@classmethod
def get_authenticated_user(cls) -> str | None:
# Return a short operator-facing string for `agentgraph connectors`,
# or None if credentials are missing.
from agentgraph_connector_example.auth import load_credentials
try:
return load_credentials().email
except Exception:
return None
@classmethod
async def verify_auth(cls) -> tuple[str, str | None]:
# Optional but recommended: make a lightweight API call so auth
# failures show up as "invalid" instead of only "missing".
from agentgraph_connector_example.auth import load_credentials
try:
creds = load_credentials()
except Exception:
return ("missing", None)
async with httpx.AsyncClient(timeout=5) as client:
resp = await client.get(
"https://api.example.com/v1/me",
headers={"Authorization": f"Bearer {creds.access_token}"},
)
if resp.status_code == 200:
data: dict[str, Any] = resp.json()
return ("ok", str(data.get("email") or data.get("id")))
if resp.status_code == 401:
return ("invalid", "token rejected (401) — run: agentgraph auth example")
return ("invalid", f"HTTP {resp.status_code}")
@classmethod
def current_user_id(cls) -> str | None:
# Return the canonical identifier stored on the authenticated user's
# Person entity. This powers `agentgraph query --mine`.
from agentgraph_connector_example.auth import load_credentials
try:
return load_credentials().email
except Exception:
return None
def can_handle(self, url: str) -> bool:
return "app.example.com" in url
async def fetch(
self,
resource_type: ResourceType,
resource_id: str,
meta: dict[str, str] | None = None,
account_id: str | None = None,
) -> EntityBatch:
# `fetch()` is the required connector path. It should return the
# entities, people, and edges needed to represent one resource.
last_sync = await self.last_synced_at(resource_id)
decision = self.fetch_policy.decide(last_sync)
if decision == FetchPolicy.FRESH:
# Nothing changed recently; skip external API work.
return EntityBatch()
since = last_sync.isoformat() if last_sync is not None else None
batch = await _fetch_example_resource(resource_type, resource_id, since=since)
await upsert_batch(batch)
return batch
async def ingest(self, account_id: str | None = None) -> EntityBatch:
# `ingest()` is a one-shot historical backfill. Use it when the
# connector can fetch more than normal polling covers.
batch = await _fetch_full_history()
await upsert_batch(batch)
return batch
async def poll(
self,
cursor: dict[str, Any],
account_id: str | None = None,
) -> tuple[EntityBatch, dict[str, Any]]:
# `poll()` runs in the background from `agentgraph serve` or
# `agentgraph poll`. `cursor` is {} on first run; return the next cursor.
start_cursor = cursor.get("cursor")
batch, next_cursor = await _fetch_changes_since(start_cursor)
return batch, {"cursor": next_cursor}
async def _fetch_example_resource(
resource_type: ResourceType,
resource_id: str,
since: str | None,
) -> EntityBatch:
# Placeholder helper. Real connectors usually call the upstream API,
# build EntityRecord/PersonRecord/EdgeRecord values, and return them in a batch.
entity = EntityRecord(
entity_type="Document",
platform="example",
platform_entity_id=resource_id,
title=f"Example resource {resource_id}",
metadata={"resource_type": resource_type, "since": since},
)
return EntityBatch(entities=[entity], persons=[], edges=[])
async def _fetch_full_history() -> EntityBatch:
return EntityBatch()
async def _fetch_changes_since(cursor: str | None) -> tuple[EntityBatch, str]:
return EntityBatch(), cursor or "initial-cursor"
#Output model
EntityRecordfor messages, documents, channels, folders, spreadsheets, and threads.PersonRecordfor authors and participants.EdgeRecordfor authored, posted-in, replied-to, mentions, references, and similar relationships.
#Packaging
Register the connector in pyproject.toml.
[project.entry-points."agentgraph.connectors"]
myplatform = "agentgraph_connector_myplatform:MyConnector"