Skip to content

models

Classes:

AccessControlEntry dataclass

AccessControlEntry(scope: Literal['statement', 'clear_graph', 'plugin', 'system'], policy: Literal['allow', 'deny', 'abstain'], role: str)

Methods:

  • as_dict
  • from_dict

    Create an AccessControlEntry subclass instance from raw GraphDB data.

Attributes:

  • policy (Literal['allow', 'deny', 'abstain']) –
  • role (str) –
  • scope (Literal['statement', 'clear_graph', 'plugin', 'system']) –

policy instance-attribute

policy: Literal['allow', 'deny', 'abstain']

role instance-attribute

role: str

scope instance-attribute

scope: Literal['statement', 'clear_graph', 'plugin', 'system']

as_dict

as_dict() -> dict[str, Any]
Source code in rdflib/contrib/graphdb/models.py
def as_dict(self) -> dict[str, t.Any]:
    raise NotImplementedError("Use a concrete AccessControlEntry subclass.")

from_dict classmethod

Create an AccessControlEntry subclass instance from raw GraphDB data.

Note: we perform parse validation essentially twice (here and in the subclass’s post_init) to ensure mypy is satisfied with the value’s type.

Parameters:

  • data

    (dict) –

    A dict containing the access control entry data, typically parsed from a JSON response.

Returns:

Raises:

  • TypeError

    If data is not a dict.

  • ValueError

    If ‘scope’ is not a supported value (‘system’, ‘statement’, ‘plugin’, ‘clear_graph’), or if any field fails validation (e.g., invalid policy, operation, role, plugin, subject, predicate, object, or graph values).

Source code in rdflib/contrib/graphdb/models.py
@classmethod
def from_dict(
    cls, data: dict
) -> (
    SystemAccessControlEntry
    | StatementAccessControlEntry
    | PluginAccessControlEntry
    | ClearGraphAccessControlEntry
):
    """Create an AccessControlEntry subclass instance from raw GraphDB data.

    Note: we perform parse validation essentially twice (here and in the
    subclass's __post_init__) to ensure mypy is satisfied with the value's type.

    Parameters:
        data: A dict containing the access control entry data, typically
            parsed from a JSON response.

    Returns:
        An AccessControlEntry subclass instance (SystemAccessControlEntry,
        StatementAccessControlEntry, PluginAccessControlEntry, or
        ClearGraphAccessControlEntry) depending on the 'scope' field.

    Raises:
        TypeError: If data is not a dict.
        ValueError: If 'scope' is not a supported value ('system', 'statement',
            'plugin', 'clear_graph'), or if any field fails validation
            (e.g., invalid policy, operation, role, plugin, subject,
            predicate, object, or graph values).
    """
    if not isinstance(data, dict):
        raise TypeError("ACL entry must be a mapping.")

    scope = data.get("scope")
    if scope == "system":
        policy = _parse_policy(data.get("policy"))
        role = _parse_role(data.get("role"))
        operation = _parse_operation(data.get("operation"))
        return SystemAccessControlEntry(
            scope="system",
            policy=policy,
            role=role,
            operation=operation,
        )
    if scope == "statement":
        policy = _parse_policy(data.get("policy"))
        role = _parse_role(data.get("role"))
        operation = _parse_operation(data.get("operation"))
        subject = _parse_subject(data.get("subject"))
        predicate = _parse_predicate(data.get("predicate"))
        obj = _parse_object(data.get("object"))
        graph = _parse_graph(data.get("context"))
        return StatementAccessControlEntry(
            scope="statement",
            policy=policy,
            role=role,
            operation=operation,
            subject=subject,
            predicate=predicate,
            object=obj,
            graph=graph,
        )
    if scope == "plugin":
        policy = _parse_policy(data.get("policy"))
        role = _parse_role(data.get("role"))
        operation = _parse_operation(data.get("operation"))
        plugin = _parse_plugin(data.get("plugin"))
        return PluginAccessControlEntry(
            scope="plugin",
            policy=policy,
            role=role,
            operation=operation,
            plugin=plugin,
        )
    if scope == "clear_graph":
        policy = _parse_policy(data.get("policy"))
        role = _parse_role(data.get("role"))
        graph = _parse_graph(data.get("context"))
        return ClearGraphAccessControlEntry(
            scope="clear_graph",
            policy=policy,
            role=role,
            graph=graph,
        )

    raise ValueError(f"Unsupported FGAC scope: {scope!r}")

AuthenticatedUser dataclass

AuthenticatedUser(username: str, authorities: list[str] = list(), appSettings: dict[str, Any] = dict(), external: bool = False, token: str = '')

Represents an authenticated user returned from POST /rest/login.

Attributes:

  • username (str) –

    The username of the authenticated user.

  • authorities (list[str]) –

    List of granted authorities/roles (e.g., [“ROLE_USER”, “ROLE_ADMIN”]).

  • appSettings (dict[str, Any]) –

    Application settings for the user.

  • external (bool) –

    Whether the user is external (e.g., from LDAP/OAuth).

  • token (str) –

    The full Authorization header value (e.g., “GDB “). Can be passed directly to GraphDBClient’s auth parameter.

Methods:

Attributes:

appSettings class-attribute instance-attribute

appSettings: dict[str, Any] = field(default_factory=dict)

authorities class-attribute instance-attribute

authorities: list[str] = field(default_factory=list)

external class-attribute instance-attribute

external: bool = False

token class-attribute instance-attribute

token: str = ''

username instance-attribute

username: str

__post_init__

__post_init__() -> None
Source code in rdflib/contrib/graphdb/models.py
def __post_init__(self) -> None:
    # Normalize None values from API responses to empty collections
    if self.authorities is None:
        object.__setattr__(self, "authorities", [])  # type: ignore[unreachable]
    if self.appSettings is None:
        object.__setattr__(self, "appSettings", {})  # type: ignore[unreachable]

    invalid: list[tuple[str, t.Any, type]] = []
    username = t.cast(t.Any, self.username)
    authorities = t.cast(t.Any, self.authorities)
    app_settings = t.cast(t.Any, self.appSettings)
    external = t.cast(t.Any, self.external)
    token = t.cast(t.Any, self.token)

    if not isinstance(username, str):
        invalid.append(("username", username, type(username)))
    if not isinstance(authorities, list):
        invalid.append(("authorities", authorities, type(authorities)))
    else:
        for index, value in enumerate(authorities):
            if not isinstance(value, str):
                invalid.append((f"authorities[{index}]", value, type(value)))
    if not isinstance(app_settings, dict):
        invalid.append(("appSettings", app_settings, type(app_settings)))
    else:
        for key in app_settings.keys():
            if not isinstance(key, str):
                invalid.append(("appSettings key", key, type(key)))
                break
    if type(external) is not bool:
        invalid.append(("external", external, type(external)))
    if not isinstance(token, str):
        invalid.append(("token", token, type(token)))

    if invalid:
        raise ValueError("Invalid AuthenticatedUser values: ", invalid)

from_response classmethod

from_response(data: dict[str, Any], token: str) -> AuthenticatedUser

Create an AuthenticatedUser from API response data and token.

Parameters:

  • data

    (dict[str, Any]) –

    The JSON response body from POST /rest/login.

  • token

    (str) –

    The GDB token extracted from the Authorization header.

Returns:

Raises:

  • ValueError

    If required fields are missing or invalid.

  • TypeError

    If data is not a dict.

Source code in rdflib/contrib/graphdb/models.py
@classmethod
def from_response(cls, data: dict[str, t.Any], token: str) -> AuthenticatedUser:
    """Create an AuthenticatedUser from API response data and token.

    Parameters:
        data: The JSON response body from POST /rest/login.
        token: The GDB token extracted from the Authorization header.

    Returns:
        An AuthenticatedUser instance.

    Raises:
        ValueError: If required fields are missing or invalid.
        TypeError: If data is not a dict.
    """
    if not isinstance(data, dict):
        raise TypeError("Response data must be a dict")
    if "username" not in data:
        raise ValueError("Response data must contain 'username'")

    return cls(
        username=data["username"],
        authorities=data.get("authorities", []),
        appSettings=data.get("appSettings", {}),
        external=data.get("external", False),
        token=token,
    )

BackupOperationBean dataclass

BackupOperationBean(id: str, username: str, operation: Literal['CREATE_BACKUP_IN_PROGRESS', 'RESTORE_BACKUP_IN_PROGRESS', 'CREATE_CLOUD_BACKUP_IN_PROGRESS', 'RESTORE_CLOUD_BACKUP_IN_PROGRESS'], affectedRepositories: list[str], msSinceCreated: int, snapshotOptions: SnapshotOptionsBean, nodePerformingClusterBackup: str | None = None)

Methods:

Attributes:

affectedRepositories instance-attribute

affectedRepositories: list[str]

id instance-attribute

id: str

msSinceCreated instance-attribute

msSinceCreated: int

nodePerformingClusterBackup class-attribute instance-attribute

nodePerformingClusterBackup: str | None = None

operation instance-attribute

operation: Literal['CREATE_BACKUP_IN_PROGRESS', 'RESTORE_BACKUP_IN_PROGRESS', 'CREATE_CLOUD_BACKUP_IN_PROGRESS', 'RESTORE_CLOUD_BACKUP_IN_PROGRESS']

snapshotOptions instance-attribute

snapshotOptions: SnapshotOptionsBean

username instance-attribute

username: str

__post_init__

__post_init__() -> None
Source code in rdflib/contrib/graphdb/models.py
def __post_init__(self) -> None:
    _allowed_operations = {
        "CREATE_BACKUP_IN_PROGRESS",
        "RESTORE_BACKUP_IN_PROGRESS",
        "CREATE_CLOUD_BACKUP_IN_PROGRESS",
        "RESTORE_CLOUD_BACKUP_IN_PROGRESS",
    }
    invalid: list[tuple[str, t.Any, type]] = []
    id_ = t.cast(t.Any, self.id)
    username = t.cast(t.Any, self.username)
    operation = t.cast(t.Any, self.operation)
    affected_repositories = t.cast(t.Any, self.affectedRepositories)
    ms_since_created = t.cast(t.Any, self.msSinceCreated)
    snapshot_options = t.cast(t.Any, self.snapshotOptions)
    node_performing_cluster_backup = t.cast(t.Any, self.nodePerformingClusterBackup)

    if not isinstance(id_, str):
        invalid.append(("id", id_, type(id_)))
    if not isinstance(username, str):
        invalid.append(("username", username, type(username)))
    if not isinstance(operation, str) or operation not in _allowed_operations:
        invalid.append(("operation", operation, type(operation)))
    if not isinstance(affected_repositories, list):
        invalid.append(
            (
                "affectedRepositories",
                affected_repositories,
                type(affected_repositories),
            )
        )
    else:
        for index, value in enumerate(affected_repositories):
            if not isinstance(value, str):
                invalid.append(
                    (f"affectedRepositories[{index}]", value, type(value))
                )
    if type(ms_since_created) is not int:
        invalid.append(("msSinceCreated", ms_since_created, type(ms_since_created)))
    if not isinstance(snapshot_options, SnapshotOptionsBean):
        invalid.append(
            ("snapshotOptions", snapshot_options, type(snapshot_options))
        )
    if node_performing_cluster_backup is not None and not isinstance(
        node_performing_cluster_backup, str
    ):
        invalid.append(
            (
                "nodePerformingClusterBackup",
                node_performing_cluster_backup,
                type(node_performing_cluster_backup),
            )
        )

    if invalid:
        raise ValueError("Invalid BackupOperationBean values: ", invalid)

from_dict classmethod

from_dict(data: dict) -> BackupOperationBean

Create a BackupOperationBean instance from a dict.

This is useful for converting JSON response data into the dataclass structure. The nested ‘snapshotOptions’ dict is automatically converted to a SnapshotOptionsBean instance.

Parameters:

  • data

    (dict) –

    A dict containing the backup operation data, typically parsed from a JSON response.

Returns:

Raises:

  • KeyError

    If required keys are missing from the input dict.

  • TypeError

    If nested ‘snapshotOptions’ cannot be unpacked into SnapshotOptionsBean.

  • ValueError

    If field validation fails in SnapshotOptionsBean or BackupOperationBean (e.g., invalid types or operation values).

Source code in rdflib/contrib/graphdb/models.py
@classmethod
def from_dict(cls, data: dict) -> BackupOperationBean:
    """Create a BackupOperationBean instance from a dict.

    This is useful for converting JSON response data into the dataclass structure.
    The nested 'snapshotOptions' dict is automatically converted to
    a SnapshotOptionsBean instance.

    Parameters:
        data: A dict containing the backup operation data, typically
            parsed from a JSON response.

    Returns:
        A BackupOperationBean instance with nested dataclass objects.

    Raises:
        KeyError: If required keys are missing from the input dict.
        TypeError: If nested 'snapshotOptions' cannot be unpacked into
            SnapshotOptionsBean.
        ValueError: If field validation fails in SnapshotOptionsBean or
            BackupOperationBean (e.g., invalid types or operation values).
    """
    snapshot_options = SnapshotOptionsBean(**data["snapshotOptions"])
    return cls(
        id=data["id"],
        username=data["username"],
        operation=data["operation"],
        affectedRepositories=data["affectedRepositories"],
        msSinceCreated=data["msSinceCreated"],
        snapshotOptions=snapshot_options,
        nodePerformingClusterBackup=data.get("nodePerformingClusterBackup"),
    )

ClearGraphAccessControlEntry dataclass

ClearGraphAccessControlEntry(scope: Literal['clear_graph'], policy: Literal['allow', 'deny', 'abstain'], role: str, graph: Literal['*', 'named', 'default'] | URIRef)

Bases: AccessControlEntry

Methods:

Attributes:

  • graph (Literal['*', 'named', 'default'] | URIRef) –
  • policy (Literal['allow', 'deny', 'abstain']) –
  • role (str) –
  • scope (Literal['clear_graph']) –

graph instance-attribute

graph: Literal['*', 'named', 'default'] | URIRef

policy instance-attribute

policy: Literal['allow', 'deny', 'abstain']

role instance-attribute

role: str

scope instance-attribute

scope: Literal['clear_graph']

__post_init__

__post_init__() -> None
Source code in rdflib/contrib/graphdb/models.py
def __post_init__(self) -> None:
    self.policy = _parse_policy(self.policy)
    self.role = _parse_role(self.role)
    self.graph = _parse_graph(self.graph)

as_dict

as_dict() -> dict[str, Any]
Source code in rdflib/contrib/graphdb/models.py
def as_dict(self) -> dict[str, t.Any]:
    return {
        "scope": self.scope,
        "policy": self.policy,
        "role": self.role,
        "context": _format_term(self.graph),
    }

ClusterRequest dataclass

ClusterRequest(electionMinTimeout: int, electionRangeTimeout: int, heartbeatInterval: int, messageSizeKB: int, verificationTimeout: int, transactionLogMaximumSizeGB: int, batchUpdateInterval: int, nodes: list[str])

Methods:

Attributes:

batchUpdateInterval instance-attribute

batchUpdateInterval: int

electionMinTimeout instance-attribute

electionMinTimeout: int

electionRangeTimeout instance-attribute

electionRangeTimeout: int

heartbeatInterval instance-attribute

heartbeatInterval: int

messageSizeKB instance-attribute

messageSizeKB: int

nodes instance-attribute

nodes: list[str]

transactionLogMaximumSizeGB instance-attribute

transactionLogMaximumSizeGB: int

verificationTimeout instance-attribute

verificationTimeout: int

__post_init__

__post_init__() -> None
Source code in rdflib/contrib/graphdb/models.py
def __post_init__(self) -> None:
    invalid: list[tuple[str, t.Any, type]] = []
    election_min_timeout = t.cast(t.Any, self.electionMinTimeout)
    election_range_timeout = t.cast(t.Any, self.electionRangeTimeout)
    heartbeat_interval = t.cast(t.Any, self.heartbeatInterval)
    message_size_kb = t.cast(t.Any, self.messageSizeKB)
    verification_timeout = t.cast(t.Any, self.verificationTimeout)
    transaction_log_maximum_size_gb = t.cast(
        t.Any, self.transactionLogMaximumSizeGB
    )
    batch_update_interval = t.cast(t.Any, self.batchUpdateInterval)
    nodes = t.cast(t.Any, self.nodes)

    if type(election_min_timeout) is not int:
        invalid.append(
            ("electionMinTimeout", election_min_timeout, type(election_min_timeout))
        )
    if type(election_range_timeout) is not int:
        invalid.append(
            (
                "electionRangeTimeout",
                election_range_timeout,
                type(election_range_timeout),
            )
        )
    if type(heartbeat_interval) is not int:
        invalid.append(
            ("heartbeatInterval", heartbeat_interval, type(heartbeat_interval))
        )
    if type(message_size_kb) is not int:
        invalid.append(("messageSizeKB", message_size_kb, type(message_size_kb)))
    if type(verification_timeout) is not int:
        invalid.append(
            (
                "verificationTimeout",
                verification_timeout,
                type(verification_timeout),
            )
        )
    if type(transaction_log_maximum_size_gb) is not int:
        invalid.append(
            (
                "transactionLogMaximumSizeGB",
                transaction_log_maximum_size_gb,
                type(transaction_log_maximum_size_gb),
            )
        )
    if type(batch_update_interval) is not int:
        invalid.append(
            (
                "batchUpdateInterval",
                batch_update_interval,
                type(batch_update_interval),
            )
        )

    if not isinstance(nodes, list):
        invalid.append(("nodes", nodes, type(nodes)))
    else:
        for index, value in enumerate(nodes):
            if not isinstance(value, str):
                invalid.append((f"nodes[{index}]", value, type(value)))

    if invalid:
        raise ValueError("Invalid ClusterRequest values: ", invalid)

from_dict classmethod

from_dict(data: dict) -> ClusterRequest

Create a ClusterRequest instance from a dict.

This is useful for converting JSON response data into the dataclass structure.

Parameters:

  • data

    (dict) –

    A dict containing the cluster request data, typically parsed from a JSON response.

Returns:

Raises:

  • KeyError

    If required keys are missing from the input dict.

  • TypeError

    If the dict cannot be unpacked into ClusterRequest.

  • ValueError

    If field validation fails in ClusterRequest (e.g., invalid types for integer fields or nodes).

Source code in rdflib/contrib/graphdb/models.py
@classmethod
def from_dict(cls, data: dict) -> ClusterRequest:
    """Create a ClusterRequest instance from a dict.

    This is useful for converting JSON response data into the dataclass structure.

    Parameters:
        data: A dict containing the cluster request data, typically
            parsed from a JSON response.

    Returns:
        A ClusterRequest instance.

    Raises:
        KeyError: If required keys are missing from the input dict.
        TypeError: If the dict cannot be unpacked into ClusterRequest.
        ValueError: If field validation fails in ClusterRequest
            (e.g., invalid types for integer fields or nodes).
    """
    return cls(
        electionMinTimeout=data["electionMinTimeout"],
        electionRangeTimeout=data["electionRangeTimeout"],
        heartbeatInterval=data["heartbeatInterval"],
        messageSizeKB=data["messageSizeKB"],
        verificationTimeout=data["verificationTimeout"],
        transactionLogMaximumSizeGB=data["transactionLogMaximumSizeGB"],
        batchUpdateInterval=data["batchUpdateInterval"],
        nodes=data["nodes"],
    )

FreeAccessSettings dataclass

FreeAccessSettings(enabled: bool, authorities: list[str] = list(), appSettings: dict[str, Any] = dict())

Methods:

Attributes:

appSettings class-attribute instance-attribute

appSettings: dict[str, Any] = field(default_factory=dict)

authorities class-attribute instance-attribute

authorities: list[str] = field(default_factory=list)

enabled instance-attribute

enabled: bool

__post_init__

__post_init__() -> None
Source code in rdflib/contrib/graphdb/models.py
def __post_init__(self) -> None:
    invalid: list[tuple[str, t.Any, type]] = []
    enabled = t.cast(t.Any, self.enabled)
    authorities = t.cast(t.Any, self.authorities)
    app_settings = t.cast(t.Any, self.appSettings)
    if type(enabled) is not bool:
        invalid.append(("enabled", enabled, type(enabled)))
    if not isinstance(authorities, list):
        invalid.append(("authorities", authorities, type(authorities)))
    else:
        for index, value in enumerate(authorities):
            if not isinstance(value, str):
                invalid.append((f"authorities[{index}]", value, type(value)))
    if not isinstance(app_settings, dict):
        invalid.append(("appSettings", app_settings, type(app_settings)))
    else:
        for key in app_settings.keys():
            if not isinstance(key, str):
                invalid.append(("appSettings key", key, type(key)))
                break

    if invalid:
        raise ValueError("Invalid FreeAccessSettings values: ", invalid)

as_dict

as_dict()
Source code in rdflib/contrib/graphdb/models.py
def as_dict(self):
    return asdict(self)

GraphDBRepository dataclass

GraphDBRepository(id: Optional[str] = None, title: Optional[str] = None, uri: Optional[str] = None, externalUrl: Optional[str] = None, local: Optional[bool] = None, type: Optional[str] = None, sesameType: Optional[str] = None, location: Optional[str] = None, readable: Optional[bool] = None, writable: Optional[bool] = None, unsupported: Optional[bool] = None, state: Optional[RepositoryState] = None)

Methods:

  • from_dict

    Create a GraphDBRepository instance from a dict.

Attributes:

externalUrl class-attribute instance-attribute

externalUrl: Optional[str] = None

id class-attribute instance-attribute

id: Optional[str] = None

local class-attribute instance-attribute

local: Optional[bool] = None

location class-attribute instance-attribute

location: Optional[str] = None

readable class-attribute instance-attribute

readable: Optional[bool] = None

sesameType class-attribute instance-attribute

sesameType: Optional[str] = None

state class-attribute instance-attribute

state: Optional[RepositoryState] = None

title class-attribute instance-attribute

title: Optional[str] = None

type class-attribute instance-attribute

type: Optional[str] = None

unsupported class-attribute instance-attribute

unsupported: Optional[bool] = None

uri class-attribute instance-attribute

uri: Optional[str] = None

writable class-attribute instance-attribute

writable: Optional[bool] = None

from_dict classmethod

from_dict(data: dict) -> GraphDBRepository

Create a GraphDBRepository instance from a dict.

Parameters:

  • data

    (dict) –

    A dict containing the repository data, typically parsed from a JSON response.

Returns:

Raises:

  • TypeError

    If the dict contains keys that do not match GraphDBRepository fields.

  • ValueError

    If the ‘state’ value is not a valid RepositoryState.

Source code in rdflib/contrib/graphdb/models.py
@classmethod
def from_dict(cls, data: dict) -> GraphDBRepository:
    """Create a GraphDBRepository instance from a dict.

    Parameters:
        data: A dict containing the repository data, typically
            parsed from a JSON response.

    Returns:
        A GraphDBRepository instance.

    Raises:
        TypeError: If the dict contains keys that do not match
            GraphDBRepository fields.
        ValueError: If the 'state' value is not a valid RepositoryState.
    """
    data_copy = dict(data)
    if "state" in data_copy and data_copy["state"] is not None:
        data_copy["state"] = RepositoryState(data_copy["state"])
    return cls(**data_copy)

ImportSettings dataclass

ImportSettings(name: str, status: Literal['PENDING', 'IMPORTING', 'DONE', 'ERROR', 'NONE', 'INTERRUPTING'], size: str, lastModified: int, imported: int, addedStatements: int, removedStatements: int, numReplacedGraphs: int, message: str = '', context: Any | None = None, replaceGraphs: List = list(), baseURI: Any | None = None, forceSerial: bool = False, type: str = 'file', format: Any | None = None, data: Any | None = None, parserSettings: ParserSettings = ParserSettings())

Methods:

Attributes:

addedStatements instance-attribute

addedStatements: int

baseURI class-attribute instance-attribute

baseURI: Any | None = None

context class-attribute instance-attribute

context: Any | None = None

data class-attribute instance-attribute

data: Any | None = None

forceSerial class-attribute instance-attribute

forceSerial: bool = False

format class-attribute instance-attribute

format: Any | None = None

imported instance-attribute

imported: int

lastModified instance-attribute

lastModified: int

message class-attribute instance-attribute

message: str = ''

name instance-attribute

name: str

numReplacedGraphs instance-attribute

numReplacedGraphs: int

parserSettings class-attribute instance-attribute

parserSettings: ParserSettings = field(default_factory=ParserSettings)

removedStatements instance-attribute

removedStatements: int

replaceGraphs class-attribute instance-attribute

replaceGraphs: List = field(default_factory=list)

size instance-attribute

size: str

status instance-attribute

status: Literal['PENDING', 'IMPORTING', 'DONE', 'ERROR', 'NONE', 'INTERRUPTING']

type class-attribute instance-attribute

type: str = 'file'

__post_init__

__post_init__() -> None
Source code in rdflib/contrib/graphdb/models.py
def __post_init__(self) -> None:
    _allowed_status = {
        "PENDING",
        "IMPORTING",
        "DONE",
        "ERROR",
        "NONE",
        "INTERRUPTING",
    }
    invalid: list[tuple[str, t.Any, type]] = []

    name = t.cast(t.Any, self.name)
    status = t.cast(t.Any, self.status)
    message = t.cast(t.Any, self.message)
    replace_graphs = t.cast(t.Any, self.replaceGraphs)
    force_serial = t.cast(t.Any, self.forceSerial)
    type_ = t.cast(t.Any, self.type)
    parser_settings = t.cast(t.Any, self.parserSettings)
    size = t.cast(t.Any, self.size)
    last_modified = t.cast(t.Any, self.lastModified)
    imported = t.cast(t.Any, self.imported)
    added_statements = t.cast(t.Any, self.addedStatements)
    removed_statements = t.cast(t.Any, self.removedStatements)
    num_replaced_graphs = t.cast(t.Any, self.numReplacedGraphs)

    if not isinstance(name, str):
        invalid.append(("name", name, type(name)))
    if status not in _allowed_status:
        invalid.append(("status", status, type(status)))
    if not isinstance(message, str):
        invalid.append(("message", message, type(message)))
    if not isinstance(replace_graphs, list):
        invalid.append(("replaceGraphs", replace_graphs, type(replace_graphs)))
    if type(force_serial) is not bool:
        invalid.append(("forceSerial", force_serial, type(force_serial)))
    if not isinstance(type_, str):
        invalid.append(("type", type_, type(type_)))
    if not isinstance(parser_settings, ParserSettings):
        invalid.append(("parserSettings", parser_settings, type(parser_settings)))
    if not isinstance(size, str):
        invalid.append(("size", size, type(size)))
    if type(last_modified) is not int:
        invalid.append(("lastModified", last_modified, type(last_modified)))
    if type(imported) is not int:
        invalid.append(("imported", imported, type(imported)))
    if type(added_statements) is not int:
        invalid.append(
            ("addedStatements", added_statements, type(added_statements))
        )
    if type(removed_statements) is not int:
        invalid.append(
            ("removedStatements", removed_statements, type(removed_statements))
        )
    if type(num_replaced_graphs) is not int:
        invalid.append(
            ("numReplacedGraphs", num_replaced_graphs, type(num_replaced_graphs))
        )

    # Note: don't check context, baseURI, format, or data here since they are of type t.Any.

    if invalid:
        raise ValueError("Invalid ImportSettings values: ", invalid)

as_dict

as_dict() -> dict[str, Any]
Source code in rdflib/contrib/graphdb/models.py
def as_dict(self) -> dict[str, t.Any]:
    return asdict(self)

InfrastructureMemoryUsage dataclass

InfrastructureMemoryUsage(max: int, committed: int, init: int, used: int)

Methods:

Attributes:

committed instance-attribute

committed: int

init instance-attribute

init: int

max instance-attribute

max: int

used instance-attribute

used: int

__post_init__

__post_init__() -> None
Source code in rdflib/contrib/graphdb/models.py
def __post_init__(self) -> None:
    invalid: list[tuple[str, t.Any, type]] = []
    max_val = t.cast(t.Any, self.max)
    committed = t.cast(t.Any, self.committed)
    init = t.cast(t.Any, self.init)
    used = t.cast(t.Any, self.used)

    if type(max_val) is not int:
        invalid.append(("max", max_val, type(max_val)))
    if type(committed) is not int:
        invalid.append(("committed", committed, type(committed)))
    if type(init) is not int:
        invalid.append(("init", init, type(init)))
    if type(used) is not int:
        invalid.append(("used", used, type(used)))

    if invalid:
        raise ValueError("Invalid InfrastructureMemoryUsage values: ", invalid)

InfrastructureStatistics dataclass

InfrastructureStatistics(heapMemoryUsage: InfrastructureMemoryUsage, nonHeapMemoryUsage: InfrastructureMemoryUsage, storageMemory: InfrastructureStorageMemory, threadCount: int, cpuLoad: float, classCount: int, gcCount: int, openFileDescriptors: int, maxFileDescriptors: int)

Methods:

Attributes:

classCount instance-attribute

classCount: int

cpuLoad instance-attribute

cpuLoad: float

gcCount instance-attribute

gcCount: int

heapMemoryUsage instance-attribute

heapMemoryUsage: InfrastructureMemoryUsage

maxFileDescriptors instance-attribute

maxFileDescriptors: int

nonHeapMemoryUsage instance-attribute

nonHeapMemoryUsage: InfrastructureMemoryUsage

openFileDescriptors instance-attribute

openFileDescriptors: int

storageMemory instance-attribute

threadCount instance-attribute

threadCount: int

__post_init__

__post_init__() -> None
Source code in rdflib/contrib/graphdb/models.py
def __post_init__(self) -> None:
    invalid: list[tuple[str, t.Any, type]] = []
    heap_memory_usage = t.cast(t.Any, self.heapMemoryUsage)
    non_heap_memory_usage = t.cast(t.Any, self.nonHeapMemoryUsage)
    storage_memory = t.cast(t.Any, self.storageMemory)
    thread_count = t.cast(t.Any, self.threadCount)
    cpu_load = t.cast(t.Any, self.cpuLoad)
    class_count = t.cast(t.Any, self.classCount)
    gc_count = t.cast(t.Any, self.gcCount)
    open_file_descriptors = t.cast(t.Any, self.openFileDescriptors)
    max_file_descriptors = t.cast(t.Any, self.maxFileDescriptors)

    if not isinstance(heap_memory_usage, InfrastructureMemoryUsage):
        invalid.append(
            ("heapMemoryUsage", heap_memory_usage, type(heap_memory_usage))
        )
    if not isinstance(non_heap_memory_usage, InfrastructureMemoryUsage):
        invalid.append(
            (
                "nonHeapMemoryUsage",
                non_heap_memory_usage,
                type(non_heap_memory_usage),
            )
        )
    if not isinstance(storage_memory, InfrastructureStorageMemory):
        invalid.append(("storageMemory", storage_memory, type(storage_memory)))
    if type(thread_count) is not int:
        invalid.append(("threadCount", thread_count, type(thread_count)))
    if type(cpu_load) is not float and type(cpu_load) is not int:
        invalid.append(("cpuLoad", cpu_load, type(cpu_load)))
    if type(class_count) is not int:
        invalid.append(("classCount", class_count, type(class_count)))
    if type(gc_count) is not int:
        invalid.append(("gcCount", gc_count, type(gc_count)))
    if type(open_file_descriptors) is not int:
        invalid.append(
            (
                "openFileDescriptors",
                open_file_descriptors,
                type(open_file_descriptors),
            )
        )
    if type(max_file_descriptors) is not int:
        invalid.append(
            ("maxFileDescriptors", max_file_descriptors, type(max_file_descriptors))
        )

    if invalid:
        raise ValueError("Invalid InfrastructureStatistics values: ", invalid)

from_dict classmethod

from_dict(data: dict) -> InfrastructureStatistics

Create an InfrastructureStatistics instance from a dict.

This is useful for converting JSON response data into the dataclass structure. The nested memory and storage dicts are automatically converted to their respective dataclass instances.

Parameters:

  • data

    (dict) –

    A dict containing the infrastructure statistics data, typically parsed from a JSON response.

Returns:

Raises:

  • KeyError

    If required keys are missing from the input dict.

  • TypeError

    If nested dicts cannot be unpacked into their respective dataclass instances.

  • ValueError

    If field validation fails in InfrastructureMemoryUsage, InfrastructureStorageMemory, or InfrastructureStatistics (e.g., invalid types).

Source code in rdflib/contrib/graphdb/models.py
@classmethod
def from_dict(cls, data: dict) -> InfrastructureStatistics:
    """Create an InfrastructureStatistics instance from a dict.

    This is useful for converting JSON response data into the dataclass structure.
    The nested memory and storage dicts are automatically converted to their
    respective dataclass instances.

    Parameters:
        data: A dict containing the infrastructure statistics data, typically
            parsed from a JSON response.

    Returns:
        An InfrastructureStatistics instance with nested dataclass objects.

    Raises:
        KeyError: If required keys are missing from the input dict.
        TypeError: If nested dicts cannot be unpacked into their respective
            dataclass instances.
        ValueError: If field validation fails in InfrastructureMemoryUsage,
            InfrastructureStorageMemory, or InfrastructureStatistics
            (e.g., invalid types).
    """
    heap_memory_usage = InfrastructureMemoryUsage(**data["heapMemoryUsage"])
    non_heap_memory_usage = InfrastructureMemoryUsage(**data["nonHeapMemoryUsage"])
    storage_memory = InfrastructureStorageMemory(**data["storageMemory"])
    return cls(
        heapMemoryUsage=heap_memory_usage,
        nonHeapMemoryUsage=non_heap_memory_usage,
        storageMemory=storage_memory,
        threadCount=data["threadCount"],
        cpuLoad=float(data["cpuLoad"]),
        classCount=data["classCount"],
        gcCount=data["gcCount"],
        openFileDescriptors=data["openFileDescriptors"],
        maxFileDescriptors=data["maxFileDescriptors"],
    )

InfrastructureStorageMemory dataclass

InfrastructureStorageMemory(dataDirUsed: int, workDirUsed: int, logsDirUsed: int, dataDirFree: int, workDirFree: int, logsDirFree: int)

Methods:

Attributes:

dataDirFree instance-attribute

dataDirFree: int

dataDirUsed instance-attribute

dataDirUsed: int

logsDirFree instance-attribute

logsDirFree: int

logsDirUsed instance-attribute

logsDirUsed: int

workDirFree instance-attribute

workDirFree: int

workDirUsed instance-attribute

workDirUsed: int

__post_init__

__post_init__() -> None
Source code in rdflib/contrib/graphdb/models.py
def __post_init__(self) -> None:
    invalid: list[tuple[str, t.Any, type]] = []
    data_dir_used = t.cast(t.Any, self.dataDirUsed)
    work_dir_used = t.cast(t.Any, self.workDirUsed)
    logs_dir_used = t.cast(t.Any, self.logsDirUsed)
    data_dir_free = t.cast(t.Any, self.dataDirFree)
    work_dir_free = t.cast(t.Any, self.workDirFree)
    logs_dir_free = t.cast(t.Any, self.logsDirFree)

    if type(data_dir_used) is not int:
        invalid.append(("dataDirUsed", data_dir_used, type(data_dir_used)))
    if type(work_dir_used) is not int:
        invalid.append(("workDirUsed", work_dir_used, type(work_dir_used)))
    if type(logs_dir_used) is not int:
        invalid.append(("logsDirUsed", logs_dir_used, type(logs_dir_used)))
    if type(data_dir_free) is not int:
        invalid.append(("dataDirFree", data_dir_free, type(data_dir_free)))
    if type(work_dir_free) is not int:
        invalid.append(("workDirFree", work_dir_free, type(work_dir_free)))
    if type(logs_dir_free) is not int:
        invalid.append(("logsDirFree", logs_dir_free, type(logs_dir_free)))

    if invalid:
        raise ValueError("Invalid InfrastructureStorageMemory values: ", invalid)

NodeStatus dataclass

NodeStatus(address: str, nodeState: str, term: int, syncStatus: dict[str, str], lastLogTerm: int, lastLogIndex: int, endpoint: str, recoveryStatus: RecoveryStatus | None, topologyStatus: TopologyStatus | None, clusterEnabled: bool | None)

Methods:

Attributes:

address instance-attribute

address: str

clusterEnabled instance-attribute

clusterEnabled: bool | None

endpoint instance-attribute

endpoint: str

lastLogIndex instance-attribute

lastLogIndex: int

lastLogTerm instance-attribute

lastLogTerm: int

nodeState instance-attribute

nodeState: str

recoveryStatus instance-attribute

recoveryStatus: RecoveryStatus | None

syncStatus instance-attribute

syncStatus: dict[str, str]

term instance-attribute

term: int

topologyStatus instance-attribute

topologyStatus: TopologyStatus | None

__post_init__

__post_init__() -> None
Source code in rdflib/contrib/graphdb/models.py
def __post_init__(self) -> None:
    invalid: list[tuple[str, t.Any, type]] = []
    address = t.cast(t.Any, self.address)
    node_state = t.cast(t.Any, self.nodeState)
    term = t.cast(t.Any, self.term)
    sync_status = t.cast(t.Any, self.syncStatus)
    last_log_term = t.cast(t.Any, self.lastLogTerm)
    last_log_index = t.cast(t.Any, self.lastLogIndex)
    endpoint = t.cast(t.Any, self.endpoint)
    recovery_status = t.cast(t.Any, self.recoveryStatus)
    topology_status = t.cast(t.Any, self.topologyStatus)
    cluster_enabled = t.cast(t.Any, self.clusterEnabled)

    if not isinstance(address, str):
        invalid.append(("address", address, type(address)))
    if not isinstance(node_state, str):
        invalid.append(("nodeState", node_state, type(node_state)))
    if type(term) is not int:
        invalid.append(("term", term, type(term)))
    if not isinstance(sync_status, dict):
        invalid.append(("syncStatus", sync_status, type(sync_status)))
    else:
        for key, value in sync_status.items():
            if not isinstance(key, str):
                invalid.append((f"syncStatus key '{key}'", key, type(key)))
            if not isinstance(value, str):
                invalid.append((f"syncStatus['{key}']", value, type(value)))
    if type(last_log_term) is not int:
        invalid.append(("lastLogTerm", last_log_term, type(last_log_term)))
    if type(last_log_index) is not int:
        invalid.append(("lastLogIndex", last_log_index, type(last_log_index)))
    if not isinstance(endpoint, str):
        invalid.append(("endpoint", endpoint, type(endpoint)))
    if recovery_status is not None and not isinstance(
        recovery_status, RecoveryStatus
    ):
        invalid.append(("recoveryStatus", recovery_status, type(recovery_status)))
    if topology_status is not None and not isinstance(
        topology_status, TopologyStatus
    ):
        invalid.append(("topologyStatus", topology_status, type(topology_status)))
    if cluster_enabled is not None and type(cluster_enabled) is not bool:
        invalid.append(("clusterEnabled", cluster_enabled, type(cluster_enabled)))

    if invalid:
        raise ValueError("Invalid NodeStatus values: ", invalid)

from_dict classmethod

from_dict(data: dict) -> NodeStatus

Create a NodeStatus instance from a dict.

This is useful for converting JSON response data into the dataclass structure. The nested ‘recoveryStatus’ and ‘topologyStatus’ dicts are automatically converted to their respective dataclass instances.

Parameters:

  • data

    (dict) –

    A dict containing the node status data, typically parsed from a JSON response.

Returns:

  • NodeStatus

    A NodeStatus instance with nested dataclass objects.

Raises:

  • KeyError

    If required keys are missing from the input dict.

  • TypeError

    If nested dicts cannot be unpacked into their respective dataclass instances.

  • ValueError

    If field validation fails in TopologyStatus, RecoveryOperation, RecoveryStatus, or NodeStatus (e.g., invalid types).

Source code in rdflib/contrib/graphdb/models.py
@classmethod
def from_dict(cls, data: dict) -> NodeStatus:
    """Create a NodeStatus instance from a dict.

    This is useful for converting JSON response data into the dataclass structure.
    The nested 'recoveryStatus' and 'topologyStatus' dicts are automatically
    converted to their respective dataclass instances.

    Parameters:
        data: A dict containing the node status data, typically
            parsed from a JSON response.

    Returns:
        A NodeStatus instance with nested dataclass objects.

    Raises:
        KeyError: If required keys are missing from the input dict.
        TypeError: If nested dicts cannot be unpacked into their respective
            dataclass instances.
        ValueError: If field validation fails in TopologyStatus, RecoveryOperation,
            RecoveryStatus, or NodeStatus (e.g., invalid types).
    """
    # Handle recoveryStatus - can be empty dict or actual data
    recovery_status = None
    if "recoveryStatus" in data and data["recoveryStatus"]:
        recovery_status = RecoveryStatus.from_dict(data["recoveryStatus"])
    elif "recoveryStatus" in data:
        # Empty dict case
        recovery_status = RecoveryStatus.from_dict({})

    # Handle topologyStatus - optional field
    topology_status = None
    if "topologyStatus" in data and data["topologyStatus"]:
        topology_status = TopologyStatus(**data["topologyStatus"])

    return cls(
        address=data["address"],
        nodeState=data["nodeState"],
        term=data["term"],
        syncStatus=data["syncStatus"],
        lastLogTerm=data["lastLogTerm"],
        lastLogIndex=data["lastLogIndex"],
        endpoint=data["endpoint"],
        recoveryStatus=recovery_status,
        topologyStatus=topology_status,
        clusterEnabled=data.get("clusterEnabled"),
    )

OWLimParameter dataclass

OWLimParameter(name: str, label: str, value: str)

Attributes:

label instance-attribute

label: str

name instance-attribute

name: str

value instance-attribute

value: str

ParserSettings dataclass

ParserSettings(preserveBNodeIds: bool = False, failOnUnknownDataTypes: bool = False, verifyDataTypeValues: bool = False, normalizeDataTypeValues: bool = False, failOnUnknownLanguageTags: bool = False, verifyLanguageTags: bool = True, normalizeLanguageTags: bool = False, stopOnError: bool = True, contextLink: Any | None = None)

Methods:

Attributes:

contextLink: Any | None = None

failOnUnknownDataTypes class-attribute instance-attribute

failOnUnknownDataTypes: bool = False

failOnUnknownLanguageTags class-attribute instance-attribute

failOnUnknownLanguageTags: bool = False

normalizeDataTypeValues class-attribute instance-attribute

normalizeDataTypeValues: bool = False

normalizeLanguageTags class-attribute instance-attribute

normalizeLanguageTags: bool = False

preserveBNodeIds class-attribute instance-attribute

preserveBNodeIds: bool = False

stopOnError class-attribute instance-attribute

stopOnError: bool = True

verifyDataTypeValues class-attribute instance-attribute

verifyDataTypeValues: bool = False

verifyLanguageTags class-attribute instance-attribute

verifyLanguageTags: bool = True

__post_init__

__post_init__() -> None
Source code in rdflib/contrib/graphdb/models.py
def __post_init__(self) -> None:
    invalid: list[tuple[str, t.Any, type]] = []
    preserve_bnode_ids = t.cast(t.Any, self.preserveBNodeIds)
    fail_on_unknown_data_types = t.cast(t.Any, self.failOnUnknownDataTypes)
    verify_data_type_values = t.cast(t.Any, self.verifyDataTypeValues)
    normalize_data_type_values = t.cast(t.Any, self.normalizeDataTypeValues)
    fail_on_unknown_language_tags = t.cast(t.Any, self.failOnUnknownLanguageTags)
    verify_language_tags = t.cast(t.Any, self.verifyLanguageTags)
    normalize_language_tags = t.cast(t.Any, self.normalizeLanguageTags)
    stop_on_error = t.cast(t.Any, self.stopOnError)

    if type(preserve_bnode_ids) is not bool:
        invalid.append(
            ("preserveBNodeIds", preserve_bnode_ids, type(preserve_bnode_ids))
        )
    if type(fail_on_unknown_data_types) is not bool:
        invalid.append(
            (
                "failOnUnknownDataTypes",
                fail_on_unknown_data_types,
                type(fail_on_unknown_data_types),
            )
        )
    if type(verify_data_type_values) is not bool:
        invalid.append(
            (
                "verifyDataTypeValues",
                verify_data_type_values,
                type(verify_data_type_values),
            )
        )
    if type(normalize_data_type_values) is not bool:
        invalid.append(
            (
                "normalizeDataTypeValues",
                normalize_data_type_values,
                type(normalize_data_type_values),
            )
        )
    if type(fail_on_unknown_language_tags) is not bool:
        invalid.append(
            (
                "failOnUnknownLanguageTags",
                fail_on_unknown_language_tags,
                type(fail_on_unknown_language_tags),
            )
        )
    if type(verify_language_tags) is not bool:
        invalid.append(
            ("verifyLanguageTags", verify_language_tags, type(verify_language_tags))
        )
    if type(normalize_language_tags) is not bool:
        invalid.append(
            (
                "normalizeLanguageTags",
                normalize_language_tags,
                type(normalize_language_tags),
            )
        )
    if type(stop_on_error) is not bool:
        invalid.append(("stopOnError", stop_on_error, type(stop_on_error)))

    # Note: don't check contextLink here since it's of type t.Any.

    if invalid:
        raise ValueError("Invalid ParserSettings values: ", invalid)

as_dict

as_dict() -> dict[str, Any]
Source code in rdflib/contrib/graphdb/models.py
def as_dict(self) -> dict[str, t.Any]:
    return asdict(self)

PluginAccessControlEntry dataclass

PluginAccessControlEntry(scope: Literal['plugin'], policy: Literal['allow', 'deny', 'abstain'], role: str, operation: Literal['read', 'write', '*'], plugin: str)

Bases: AccessControlEntry

Methods:

Attributes:

  • operation (Literal['read', 'write', '*']) –
  • plugin (str) –
  • policy (Literal['allow', 'deny', 'abstain']) –
  • role (str) –
  • scope (Literal['plugin']) –

operation instance-attribute

operation: Literal['read', 'write', '*']

plugin instance-attribute

plugin: str

policy instance-attribute

policy: Literal['allow', 'deny', 'abstain']

role instance-attribute

role: str

scope instance-attribute

scope: Literal['plugin']

__post_init__

__post_init__() -> None
Source code in rdflib/contrib/graphdb/models.py
def __post_init__(self) -> None:
    self.policy = _parse_policy(self.policy)
    self.role = _parse_role(self.role)
    self.operation = _parse_operation(self.operation)
    self.plugin = _parse_plugin(self.plugin)

as_dict

as_dict() -> dict[str, Any]
Source code in rdflib/contrib/graphdb/models.py
def as_dict(self) -> dict[str, t.Any]:
    return {
        "scope": self.scope,
        "policy": self.policy,
        "role": self.role,
        "operation": self.operation,
        "plugin": self.plugin,
    }

RecoveryOperation dataclass

RecoveryOperation(name: str, message: str)

Methods:

Attributes:

message instance-attribute

message: str

name instance-attribute

name: str

__post_init__

__post_init__() -> None
Source code in rdflib/contrib/graphdb/models.py
def __post_init__(self) -> None:
    invalid: list[tuple[str, t.Any, type]] = []
    name = t.cast(t.Any, self.name)
    message = t.cast(t.Any, self.message)

    if not isinstance(name, str):
        invalid.append(("name", name, type(name)))
    if not isinstance(message, str):
        invalid.append(("message", message, type(message)))

    if invalid:
        raise ValueError("Invalid RecoveryOperation values: ", invalid)

RecoveryStatus dataclass

RecoveryStatus(state: RecoveryOperation | None = None, message: str | None = None, affectedNodes: list[str] = list())

Methods:

Attributes:

affectedNodes class-attribute instance-attribute

affectedNodes: list[str] = field(default_factory=list)

message class-attribute instance-attribute

message: str | None = None

state class-attribute instance-attribute

state: RecoveryOperation | None = None

__post_init__

__post_init__() -> None
Source code in rdflib/contrib/graphdb/models.py
def __post_init__(self) -> None:
    invalid: list[tuple[str, t.Any, type]] = []
    state = t.cast(t.Any, self.state)
    message = t.cast(t.Any, self.message)
    affected_nodes = t.cast(t.Any, self.affectedNodes)

    if state is not None and not isinstance(state, RecoveryOperation):
        invalid.append(("state", state, type(state)))
    if message is not None and not isinstance(message, str):
        invalid.append(("message", message, type(message)))
    if not isinstance(affected_nodes, list):
        invalid.append(("affectedNodes", affected_nodes, type(affected_nodes)))
    else:
        for index, value in enumerate(affected_nodes):
            if not isinstance(value, str):
                invalid.append((f"affectedNodes[{index}]", value, type(value)))

    if invalid:
        raise ValueError("Invalid RecoveryStatus values: ", invalid)

from_dict classmethod

from_dict(data: dict) -> RecoveryStatus

Create a RecoveryStatus instance from a dict.

This is useful for converting JSON response data into the dataclass structure. Handles empty dict {} by returning a RecoveryStatus with all None/default values. The nested ‘state’ dict (if present) is automatically converted to a RecoveryOperation instance.

Parameters:

  • data

    (dict) –

    A dict containing the recovery status data, typically parsed from a JSON response. Can be an empty dict.

Returns:

  • RecoveryStatus

    A RecoveryStatus instance with nested dataclass objects.

Raises:

  • KeyError

    If required keys are missing from the input dict.

  • TypeError

    If nested ‘state’ cannot be unpacked into RecoveryOperation.

  • ValueError

    If field validation fails in RecoveryOperation or RecoveryStatus (e.g., invalid types).

Source code in rdflib/contrib/graphdb/models.py
@classmethod
def from_dict(cls, data: dict) -> RecoveryStatus:
    """Create a RecoveryStatus instance from a dict.

    This is useful for converting JSON response data into the dataclass structure.
    Handles empty dict {} by returning a RecoveryStatus with all None/default values.
    The nested 'state' dict (if present) is automatically converted to
    a RecoveryOperation instance.

    Parameters:
        data: A dict containing the recovery status data, typically
            parsed from a JSON response. Can be an empty dict.

    Returns:
        A RecoveryStatus instance with nested dataclass objects.

    Raises:
        KeyError: If required keys are missing from the input dict.
        TypeError: If nested 'state' cannot be unpacked into RecoveryOperation.
        ValueError: If field validation fails in RecoveryOperation or
            RecoveryStatus (e.g., invalid types).
    """
    # Handle empty dict case
    if not data:
        return cls()

    state = None
    if "state" in data and data["state"] is not None:
        state = RecoveryOperation(**data["state"])

    return cls(
        state=state,
        message=data.get("message"),
        affectedNodes=data.get("affectedNodes", []),
    )

RepositoryConfigBean dataclass

RepositoryConfigBean(id: str, title: str, type: str, sesameType: str, location: str, params: dict[str, OWLimParameter] = dict())

Attributes:

id instance-attribute

id: str

location instance-attribute

location: str

params class-attribute instance-attribute

params: dict[str, OWLimParameter] = field(default_factory=dict)

sesameType instance-attribute

sesameType: str

title instance-attribute

title: str

type instance-attribute

type: str

RepositoryConfigBeanCreate dataclass

RepositoryConfigBeanCreate(id: str, title: str, type: str, sesameType: str, location: str, params: dict[str, OWLimParameter] = dict(), missingDefaults: dict[str, OWLimParameter] = dict())

Methods:

  • as_dict

    Serialize the dataclass to a Python dict.

Attributes:

id instance-attribute

id: str

location instance-attribute

location: str

missingDefaults class-attribute instance-attribute

missingDefaults: dict[str, OWLimParameter] = field(default_factory=dict)

params class-attribute instance-attribute

params: dict[str, OWLimParameter] = field(default_factory=dict)

sesameType instance-attribute

sesameType: str

title instance-attribute

title: str

type instance-attribute

type: str

as_dict

as_dict() -> dict

Serialize the dataclass to a Python dict.

Returns:

  • dict ( dict ) –

    A dictionary representation of the dataclass suitable for use with httpx POST requests (e.g., via the json parameter).

Examples:

>>> config = RepositoryConfigBeanCreate(
...     id="test-repo",
...     title="Test Repository",
...     type="graphdb:FreeSailRepository",
...     sesameType="graphdb:FreeSailRepository",
...     location="",
... )
>>> config_dict = config.as_dict()
>>> isinstance(config_dict, dict)
True
Source code in rdflib/contrib/graphdb/models.py
def as_dict(self) -> dict:
    """Serialize the dataclass to a Python dict.

    Returns:
        dict: A dictionary representation of the dataclass suitable for use
            with httpx POST requests (e.g., via the `json` parameter).

    Examples:
        >>> config = RepositoryConfigBeanCreate(
        ...     id="test-repo",
        ...     title="Test Repository",
        ...     type="graphdb:FreeSailRepository",
        ...     sesameType="graphdb:FreeSailRepository",
        ...     location="",
        ... )
        >>> config_dict = config.as_dict()
        >>> isinstance(config_dict, dict)
        True
    """
    return asdict(self)

RepositorySizeInfo dataclass

RepositorySizeInfo(inferred: int, total: int, explicit: int)

Methods:

Attributes:

explicit instance-attribute

explicit: int

inferred instance-attribute

inferred: int

total instance-attribute

total: int

__post_init__

__post_init__()
Source code in rdflib/contrib/graphdb/models.py
def __post_init__(self):
    invalid = []
    if type(self.inferred) is not int:
        invalid.append("inferred")
    if type(self.total) is not int:
        invalid.append("total")
    if type(self.explicit) is not int:
        invalid.append("explicit")

    if invalid:
        raise ValueError(
            "Invalid RepositorySizeInfo values: ",
            [(x, self.__dict__[x], type(self.__dict__[x])) for x in invalid],
        )

RepositoryState

Bases: str, Enum

Enumeration for repository state values.

Attributes:

INACTIVE class-attribute instance-attribute

INACTIVE = 'INACTIVE'

RESTARTING class-attribute instance-attribute

RESTARTING = 'RESTARTING'

RUNNING class-attribute instance-attribute

RUNNING = 'RUNNING'

STARTING class-attribute instance-attribute

STARTING = 'STARTING'

STOPPING class-attribute instance-attribute

STOPPING = 'STOPPING'

RepositoryStatistics dataclass

RepositoryStatistics(queries: RepositoryStatisticsQueries, entityPool: RepositoryStatisticsEntityPool, activeTransactions: int, openConnections: int)

Methods:

Attributes:

activeTransactions instance-attribute

activeTransactions: int

entityPool instance-attribute

openConnections instance-attribute

openConnections: int

queries instance-attribute

__post_init__

__post_init__() -> None
Source code in rdflib/contrib/graphdb/models.py
def __post_init__(self) -> None:
    invalid: list[tuple[str, t.Any, type]] = []
    queries = t.cast(t.Any, self.queries)
    entity_pool = t.cast(t.Any, self.entityPool)
    active_transactions = t.cast(t.Any, self.activeTransactions)
    open_connections = t.cast(t.Any, self.openConnections)

    if not isinstance(queries, RepositoryStatisticsQueries):
        invalid.append(("queries", queries, type(queries)))
    if not isinstance(entity_pool, RepositoryStatisticsEntityPool):
        invalid.append(("entityPool", entity_pool, type(entity_pool)))
    if type(active_transactions) is not int:
        invalid.append(
            ("activeTransactions", active_transactions, type(active_transactions))
        )
    if type(open_connections) is not int:
        invalid.append(
            ("openConnections", open_connections, type(open_connections))
        )

    if invalid:
        raise ValueError("Invalid RepositoryStatistics values: ", invalid)

from_dict classmethod

from_dict(data: dict) -> RepositoryStatistics

Create a RepositoryStatistics instance from a dict.

This is useful for converting JSON response data into the dataclass structure. The nested ‘queries’ and ‘entityPool’ dicts are automatically converted to their respective dataclass instances.

Parameters:

  • data

    (dict) –

    A dict containing the repository statistics data, typically parsed from a JSON response.

Returns:

Raises:

  • KeyError

    If required keys are missing from the input dict.

  • TypeError

    If nested dicts cannot be unpacked into their respective dataclass instances.

  • ValueError

    If field validation fails in RepositoryStatisticsQueries, RepositoryStatisticsEntityPool, or RepositoryStatistics (e.g., invalid types).

Source code in rdflib/contrib/graphdb/models.py
@classmethod
def from_dict(cls, data: dict) -> RepositoryStatistics:
    """Create a RepositoryStatistics instance from a dict.

    This is useful for converting JSON response data into the dataclass structure.
    The nested 'queries' and 'entityPool' dicts are automatically converted to
    their respective dataclass instances.

    Parameters:
        data: A dict containing the repository statistics data, typically
            parsed from a JSON response.

    Returns:
        A RepositoryStatistics instance with nested dataclass objects.

    Raises:
        KeyError: If required keys are missing from the input dict.
        TypeError: If nested dicts cannot be unpacked into their respective
            dataclass instances.
        ValueError: If field validation fails in RepositoryStatisticsQueries,
            RepositoryStatisticsEntityPool, or RepositoryStatistics
            (e.g., invalid types).
    """
    queries = RepositoryStatisticsQueries(**data["queries"])
    entity_pool = RepositoryStatisticsEntityPool(**data["entityPool"])
    return cls(
        queries=queries,
        entityPool=entity_pool,
        activeTransactions=data["activeTransactions"],
        openConnections=data["openConnections"],
    )

RepositoryStatisticsEntityPool dataclass

RepositoryStatisticsEntityPool(epoolReads: int, epoolWrites: int, epoolSize: int)

Methods:

Attributes:

epoolReads instance-attribute

epoolReads: int

epoolSize instance-attribute

epoolSize: int

epoolWrites instance-attribute

epoolWrites: int

__post_init__

__post_init__() -> None
Source code in rdflib/contrib/graphdb/models.py
def __post_init__(self) -> None:
    invalid: list[tuple[str, t.Any, type]] = []
    epool_reads = t.cast(t.Any, self.epoolReads)
    epool_writes = t.cast(t.Any, self.epoolWrites)
    epool_size = t.cast(t.Any, self.epoolSize)

    if type(epool_reads) is not int:
        invalid.append(("epoolReads", epool_reads, type(epool_reads)))
    if type(epool_writes) is not int:
        invalid.append(("epoolWrites", epool_writes, type(epool_writes)))
    if type(epool_size) is not int:
        invalid.append(("epoolSize", epool_size, type(epool_size)))

    if invalid:
        raise ValueError("Invalid RepositoryStatisticsEntityPool values: ", invalid)

RepositoryStatisticsQueries dataclass

RepositoryStatisticsQueries(slow: int, suboptimal: int)

Methods:

Attributes:

slow instance-attribute

slow: int

suboptimal instance-attribute

suboptimal: int

__post_init__

__post_init__() -> None
Source code in rdflib/contrib/graphdb/models.py
def __post_init__(self) -> None:
    invalid: list[tuple[str, t.Any, type]] = []
    slow = t.cast(t.Any, self.slow)
    suboptimal = t.cast(t.Any, self.suboptimal)

    if type(slow) is not int:
        invalid.append(("slow", slow, type(slow)))
    if type(suboptimal) is not int:
        invalid.append(("suboptimal", suboptimal, type(suboptimal)))

    if invalid:
        raise ValueError("Invalid RepositoryStatisticsQueries values: ", invalid)

ServerImportBody dataclass

ServerImportBody(fileNames: list[str], importSettings: ImportSettings | None = None)

Methods:

Attributes:

fileNames instance-attribute

fileNames: list[str]

importSettings class-attribute instance-attribute

importSettings: ImportSettings | None = None

__post_init__

__post_init__() -> None
Source code in rdflib/contrib/graphdb/models.py
def __post_init__(self) -> None:
    invalid: list[tuple[str, t.Any, type]] = []
    import_settings = t.cast(t.Any, self.importSettings)
    file_names = t.cast(t.Any, self.fileNames)

    if import_settings is not None and not isinstance(
        import_settings, ImportSettings
    ):
        invalid.append(("importSettings", import_settings, type(import_settings)))
    if not isinstance(file_names, list):
        invalid.append(("fileNames", file_names, type(file_names)))
    else:
        for index, value in enumerate(file_names):
            if not isinstance(value, str):
                invalid.append((f"fileNames[{index}]", value, type(value)))

    if invalid:
        raise ValueError("Invalid ServerImportBody values: ", invalid)

as_dict

as_dict() -> dict[str, Any]
Source code in rdflib/contrib/graphdb/models.py
def as_dict(self) -> dict[str, t.Any]:
    result = asdict(self)
    if self.importSettings is None:
        result.pop("importSettings", None)
    return result

SnapshotOptionsBean dataclass

SnapshotOptionsBean(withRepositoryData: bool, withSystemData: bool, cleanDataDir: bool, repositories: list[str] | None = None)

Methods:

Attributes:

cleanDataDir instance-attribute

cleanDataDir: bool

repositories class-attribute instance-attribute

repositories: list[str] | None = None

withRepositoryData instance-attribute

withRepositoryData: bool

withSystemData instance-attribute

withSystemData: bool

__post_init__

__post_init__() -> None
Source code in rdflib/contrib/graphdb/models.py
def __post_init__(self) -> None:
    invalid: list[tuple[str, t.Any, type]] = []
    with_repository_data = t.cast(t.Any, self.withRepositoryData)
    with_system_data = t.cast(t.Any, self.withSystemData)
    clean_data_dir = t.cast(t.Any, self.cleanDataDir)
    repositories = t.cast(t.Any, self.repositories)

    if type(with_repository_data) is not bool:
        invalid.append(
            ("withRepositoryData", with_repository_data, type(with_repository_data))
        )
    if type(with_system_data) is not bool:
        invalid.append(("withSystemData", with_system_data, type(with_system_data)))
    if type(clean_data_dir) is not bool:
        invalid.append(("cleanDataDir", clean_data_dir, type(clean_data_dir)))

    if repositories is not None:
        if not isinstance(repositories, list):
            invalid.append(("repositories", repositories, type(repositories)))
        else:
            for index, value in enumerate(repositories):
                if not isinstance(value, str):
                    invalid.append((f"repositories[{index}]", value, type(value)))

    if invalid:
        raise ValueError("Invalid SnapshotOptionsBean values: ", invalid)

StatementAccessControlEntry dataclass

StatementAccessControlEntry(scope: Literal['statement'], policy: Literal['allow', 'deny', 'abstain'], role: str, operation: Literal['read', 'write', '*'], subject: Literal['*'] | URIRef, predicate: Literal['*'] | URIRef, object: Literal['*'] | URIRef | Literal, graph: Literal['*', 'named', 'default'] | URIRef)

Bases: AccessControlEntry

Methods:

Attributes:

graph instance-attribute

graph: Literal['*', 'named', 'default'] | URIRef

object instance-attribute

object: Literal['*'] | URIRef | Literal

operation instance-attribute

operation: Literal['read', 'write', '*']

policy instance-attribute

policy: Literal['allow', 'deny', 'abstain']

predicate instance-attribute

predicate: Literal['*'] | URIRef

role instance-attribute

role: str

scope instance-attribute

scope: Literal['statement']

subject instance-attribute

subject: Literal['*'] | URIRef

__post_init__

__post_init__() -> None
Source code in rdflib/contrib/graphdb/models.py
def __post_init__(self) -> None:
    self.policy = _parse_policy(self.policy)
    self.role = _parse_role(self.role)
    self.operation = _parse_operation(self.operation)
    self.subject = _parse_subject(self.subject)
    self.predicate = _parse_predicate(self.predicate)
    self.object = _parse_object(self.object)
    self.graph = _parse_graph(self.graph)

as_dict

as_dict() -> dict[str, Any]
Source code in rdflib/contrib/graphdb/models.py
def as_dict(self) -> dict[str, t.Any]:
    return {
        "scope": self.scope,
        "policy": self.policy,
        "role": self.role,
        "operation": self.operation,
        "subject": _format_term(self.subject),
        "predicate": _format_term(self.predicate),
        "object": _format_term(self.object),
        "context": _format_term(self.graph),
    }

StructuresStatistics dataclass

StructuresStatistics(cacheHit: int, cacheMiss: int)

Methods:

Attributes:

cacheHit instance-attribute

cacheHit: int

cacheMiss instance-attribute

cacheMiss: int

__post_init__

__post_init__() -> None
Source code in rdflib/contrib/graphdb/models.py
def __post_init__(self) -> None:
    invalid: list[tuple[str, t.Any, type]] = []
    cache_hit = t.cast(t.Any, self.cacheHit)
    cache_miss = t.cast(t.Any, self.cacheMiss)

    if type(cache_hit) is not int:
        invalid.append(("cacheHit", cache_hit, type(cache_hit)))
    if type(cache_miss) is not int:
        invalid.append(("cacheMiss", cache_miss, type(cache_miss)))

    if invalid:
        raise ValueError("Invalid StructuresStatistics values: ", invalid)

SystemAccessControlEntry dataclass

SystemAccessControlEntry(scope: Literal['system'], policy: Literal['allow', 'deny', 'abstain'], role: str, operation: Literal['read', 'write', '*'])

Bases: AccessControlEntry

Methods:

Attributes:

  • operation (Literal['read', 'write', '*']) –
  • policy (Literal['allow', 'deny', 'abstain']) –
  • role (str) –
  • scope (Literal['system']) –

operation instance-attribute

operation: Literal['read', 'write', '*']

policy instance-attribute

policy: Literal['allow', 'deny', 'abstain']

role instance-attribute

role: str

scope instance-attribute

scope: Literal['system']

__post_init__

__post_init__() -> None
Source code in rdflib/contrib/graphdb/models.py
def __post_init__(self) -> None:
    self.policy = _parse_policy(self.policy)
    self.role = _parse_role(self.role)
    self.operation = _parse_operation(self.operation)

as_dict

as_dict() -> dict[str, Any]
Source code in rdflib/contrib/graphdb/models.py
def as_dict(self) -> dict[str, t.Any]:
    return {
        "scope": self.scope,
        "policy": self.policy,
        "role": self.role,
        "operation": self.operation,
    }

TopologyStatus dataclass

TopologyStatus(state: str, primaryTags: dict[str, Any])

Methods:

Attributes:

primaryTags instance-attribute

primaryTags: dict[str, Any]

state instance-attribute

state: str

__post_init__

__post_init__() -> None
Source code in rdflib/contrib/graphdb/models.py
def __post_init__(self) -> None:
    invalid: list[tuple[str, t.Any, type]] = []
    state = t.cast(t.Any, self.state)
    primary_tags = t.cast(t.Any, self.primaryTags)

    if not isinstance(state, str):
        invalid.append(("state", state, type(state)))
    if not isinstance(primary_tags, dict):
        invalid.append(("primaryTags", primary_tags, type(primary_tags)))
    else:
        for key, value in primary_tags.items():
            if not isinstance(key, str):
                invalid.append((f"primaryTags key '{key}'", key, type(key)))

    if invalid:
        raise ValueError("Invalid TopologyStatus values: ", invalid)

User dataclass

User(username: str, password: str, dateCreated: int, grantedAuthorities: list[str] = list(), appSettings: dict[str, Any] = dict(), gptThreads: list[Any] = list())

Methods:

Attributes:

appSettings class-attribute instance-attribute

appSettings: dict[str, Any] = field(default_factory=dict)

dateCreated instance-attribute

dateCreated: int

gptThreads class-attribute instance-attribute

gptThreads: list[Any] = field(default_factory=list)

grantedAuthorities class-attribute instance-attribute

grantedAuthorities: list[str] = field(default_factory=list)

password instance-attribute

password: str

username instance-attribute

username: str

__post_init__

__post_init__() -> None
Source code in rdflib/contrib/graphdb/models.py
def __post_init__(self) -> None:
    # Normalize None values from API responses to empty collections
    if self.grantedAuthorities is None:
        object.__setattr__(self, "grantedAuthorities", [])  # type: ignore[unreachable]
    if self.appSettings is None:
        object.__setattr__(self, "appSettings", {})  # type: ignore[unreachable]
    if self.gptThreads is None:
        object.__setattr__(self, "gptThreads", [])  # type: ignore[unreachable]

    invalid: list[tuple[str, t.Any, type]] = []
    username = t.cast(t.Any, self.username)
    password = t.cast(t.Any, self.password)
    date_created = t.cast(t.Any, self.dateCreated)
    granted_authorities = t.cast(t.Any, self.grantedAuthorities)
    app_settings = t.cast(t.Any, self.appSettings)
    gpt_threads = t.cast(t.Any, self.gptThreads)

    if not isinstance(username, str):
        invalid.append(("username", username, type(username)))
    if not isinstance(password, str):
        invalid.append(("password", password, type(password)))
    if not isinstance(date_created, int):
        invalid.append(("dateCreated", date_created, type(date_created)))

    if not isinstance(granted_authorities, list):
        invalid.append(
            ("grantedAuthorities", granted_authorities, type(granted_authorities))
        )
    else:
        for index, value in enumerate(granted_authorities):
            if not isinstance(value, str):
                invalid.append((f"grantedAuthorities[{index}]", value, type(value)))

    if not isinstance(app_settings, dict):
        invalid.append(("appSettings", app_settings, type(app_settings)))
    else:
        for key in app_settings.keys():
            if not isinstance(key, str):
                invalid.append(("appSettings key", key, type(key)))
                break

    if not isinstance(gpt_threads, list):
        invalid.append(("gptThreads", gpt_threads, type(gpt_threads)))

    if invalid:
        raise ValueError("Invalid User values: ", invalid)

as_dict

as_dict()
Source code in rdflib/contrib/graphdb/models.py
def as_dict(self):
    return asdict(self)

UserCreate dataclass

UserCreate(username: str, password: str, grantedAuthorities: list[str] = list(), appSettings: dict[str, Any] = dict(), gptThreads: list[Any] = list())

Dataclass for creating a new user in GraphDB.

Unlike User, this class does not include dateCreated since GraphDB automatically assigns this value when the user is created.

Methods:

Attributes:

appSettings class-attribute instance-attribute

appSettings: dict[str, Any] = field(default_factory=dict)

gptThreads class-attribute instance-attribute

gptThreads: list[Any] = field(default_factory=list)

grantedAuthorities class-attribute instance-attribute

grantedAuthorities: list[str] = field(default_factory=list)

password instance-attribute

password: str

username instance-attribute

username: str

__post_init__

__post_init__() -> None
Source code in rdflib/contrib/graphdb/models.py
def __post_init__(self) -> None:
    invalid: list[tuple[str, t.Any, type]] = []
    username = t.cast(t.Any, self.username)
    password = t.cast(t.Any, self.password)
    granted_authorities = t.cast(t.Any, self.grantedAuthorities)
    app_settings = t.cast(t.Any, self.appSettings)
    gpt_threads = t.cast(t.Any, self.gptThreads)

    if not isinstance(username, str):
        invalid.append(("username", username, type(username)))
    if not isinstance(password, str):
        invalid.append(("password", password, type(password)))

    if not isinstance(granted_authorities, list):
        invalid.append(
            ("grantedAuthorities", granted_authorities, type(granted_authorities))
        )
    else:
        for index, value in enumerate(granted_authorities):
            if not isinstance(value, str):
                invalid.append((f"grantedAuthorities[{index}]", value, type(value)))

    if not isinstance(app_settings, dict):
        invalid.append(("appSettings", app_settings, type(app_settings)))
    else:
        for key in app_settings.keys():
            if not isinstance(key, str):
                invalid.append(("appSettings key", key, type(key)))
                break

    if not isinstance(gpt_threads, list):
        invalid.append(("gptThreads", gpt_threads, type(gpt_threads)))

    if invalid:
        raise ValueError("Invalid UserCreate values: ", invalid)

as_dict

as_dict() -> dict[str, Any]
Source code in rdflib/contrib/graphdb/models.py
def as_dict(self) -> dict[str, t.Any]:
    return asdict(self)

UserUpdate dataclass

UserUpdate(password: str = '', appSettings: dict[str, Any] = dict(), gptThreads: list[Any] = list())

Methods:

Attributes:

appSettings class-attribute instance-attribute

appSettings: dict[str, Any] = field(default_factory=dict)

gptThreads class-attribute instance-attribute

gptThreads: list[Any] = field(default_factory=list)

password class-attribute instance-attribute

password: str = field(default='')

as_dict

as_dict() -> dict[str, Any]
Source code in rdflib/contrib/graphdb/models.py
def as_dict(self) -> dict[str, t.Any]:
    return asdict(self)