Skip to content

Workflows

albert.collections.workflows

AlbertPaginator

AlbertPaginator(
    *,
    path: str,
    mode: PaginationMode,
    session: AlbertSession,
    deserialize: Callable[
        [Iterable[dict]], Iterable[ItemType]
    ],
    params: dict[str, str] | None = None,
)

Bases: Iterator[ItemType]

Helper class for pagination through Albert endpoints.

Two pagination modes are possible: - Offset-based via by the offset query parameter - Key-based via by the startKey query parameter and 'lastKey' response field

A custom deserialize function is provided when additional logic is required to load the raw items returned by the search listing, e.g., making additional Albert API calls.

Source code in src/albert/core/pagination.py
def __init__(
    self,
    *,
    path: str,
    mode: PaginationMode,
    session: AlbertSession,
    deserialize: Callable[[Iterable[dict]], Iterable[ItemType]],
    params: dict[str, str] | None = None,
):
    self.path = path
    self.mode = mode
    self.session = session
    self.deserialize = deserialize

    params = params or {}
    self.params = {k: v for k, v in params.items() if v is not None}

    if "startKey" in self.params:
        self.params["startKey"] = quote_plus(self.params["startKey"])

    self._iterator = self._create_iterator()

deserialize instance-attribute

deserialize = deserialize

mode instance-attribute

mode = mode

params instance-attribute

params = {k: _Vfor (k, v) in items() if v is not None}

path instance-attribute

path = path

session instance-attribute

session = session

AlbertSession

AlbertSession(
    *,
    base_url: str,
    token: str | None = None,
    auth_manager: AlbertClientCredentials
    | AlbertSSOClient
    | None = None,
    retries: int | None = None,
)

Bases: Session

A session that has a base URL, which is prefixed to all request URLs.

Parameters:

Name Type Description Default
base_url str

The base URL to prefix to all relative request paths (e.g., "https://app.albertinvent.com").

required
token str | None

A static JWT token for authentication. Ignored if auth_manager is provided.

None
auth_manager AlbertClientCredentials | AlbertSSOClient

An authentication manager used to dynamically fetch and refresh tokens. If provided, it overrides token.

None
retries int

The number of automatic retries on failed requests (default is 3).

None

Methods:

Name Description
request
Source code in src/albert/core/session.py
def __init__(
    self,
    *,
    base_url: str,
    token: str | None = None,
    auth_manager: AlbertClientCredentials | AlbertSSOClient | None = None,
    retries: int | None = None,
):
    super().__init__()
    self.base_url = base_url
    self.headers.update(
        {
            "Content-Type": "application/json",
            "Accept": "application/json",
            "User-Agent": f"albert-SDK V.{albert.__version__}",
        }
    )

    if token is None and auth_manager is None:
        raise ValueError("Either `token` or `auth_manager` must be specified.")

    self._auth_manager = auth_manager
    self._provided_token = token

    # Set up retry logic
    retries = retries if retries is not None else 3
    retry = Retry(
        total=retries,
        read=retries,
        connect=retries,
        backoff_factor=0.3,
        status_forcelist=(500, 502, 503, 504, 403),
        raise_on_status=False,
    )
    adapter = HTTPAdapter(max_retries=retry)
    self.mount("http://", adapter)
    self.mount("https://", adapter)

base_url instance-attribute

base_url = base_url

request

request(
    method: str, path: str, *args, **kwargs
) -> Response
Source code in src/albert/core/session.py
def request(self, method: str, path: str, *args, **kwargs) -> requests.Response:
    self.headers["Authorization"] = f"Bearer {self._access_token}"
    full_url = urljoin(self.base_url, path) if not path.startswith("http") else path
    with handle_http_errors():
        response = super().request(method, full_url, *args, **kwargs)
        response.raise_for_status()
        return response

BaseCollection

BaseCollection(*, session: AlbertSession)

BaseCollection is the base class for all collection classes.

Parameters:

Name Type Description Default
session AlbertSession

The Albert API Session instance.

required
Source code in src/albert/collections/base.py
def __init__(self, *, session: AlbertSession):
    self.session = session

session instance-attribute

session = session

PaginationMode

Bases: str, Enum

KEY class-attribute instance-attribute

KEY = 'key'

OFFSET class-attribute instance-attribute

OFFSET = 'offset'

Workflow

Bases: BaseResource

A Pydantic Class representing a workflow in Albert.

Workflows are combinations of Data Templates and Parameter groups and their associated setpoints.

Attributes:

Name Type Description
name str

The name of the workflow.

parameter_group_setpoints list[ParameterGroupSetpoints]

The setpoints to apply to the parameter groups in the workflow.

id str | None

The AlbertID of the workflow. This is set when a workflow is retrived from the platform.

Methods:

Name Description
get_interval_id

Get the interval ID for a set of parameter values.

model_post_init

id class-attribute instance-attribute

id: str | None = Field(
    alias="albertId",
    default=None,
    validation_alias=AliasChoices(
        "albertId", "existingAlbertId"
    ),
    exclude=True,
)

interval_combinations class-attribute instance-attribute

interval_combinations: list[IntervalCombination] | None = (
    Field(default=None, alias="IntervalCombinations")
)

name instance-attribute

name: str

parameter_group_setpoints class-attribute instance-attribute

parameter_group_setpoints: list[ParameterGroupSetpoints] = (
    Field(alias="ParameterGroups")
)

get_interval_id

get_interval_id(parameter_values: dict[str, any]) -> str

Get the interval ID for a set of parameter values.

This method matches parameter values to intervals defined in the workflow and constructs a composite interval ID. For multiple parameters, the interval IDs are joined with 'X'.

Parameters:

Name Type Description Default
parameter_values dict[str, any]

Dictionary mapping parameter names to their values. Values can be numbers or strings that match the interval values defined in the workflow.

required

Returns:

Type Description
str

The composite interval ID string. For a single parameter this is just the interval ID. For multiple parameters, interval IDs are joined with 'X' (e.g. "ROW1XROW2").

Raises:

Type Description
ValueError

If any parameter value does not match a defined interval in the workflow.

Examples:

>>> workflow = Workflow(...)
>>> # Single parameter
>>> workflow.get_interval_id({"Temperature": 25})
'ROW1'
>>> # Multiple parameters
>>> workflow.get_interval_id({"Temperature": 25, "Time": 60})
'ROW1XROW2'
>>> # Non-matching value raises error
>>> workflow.get_interval_id({"Temperature": 999})
AlbertException: No matching interval found for parameter 'Temperature' with value '999'
Source code in src/albert/resources/workflows.py
def get_interval_id(self, parameter_values: dict[str, any]) -> str:
    """Get the interval ID for a set of parameter values.

    This method matches parameter values to intervals defined in the workflow and constructs
    a composite interval ID. For multiple parameters, the interval IDs are joined with 'X'.

    Parameters
    ----------
    parameter_values : dict[str, any]
        Dictionary mapping parameter names to their values. Values can be numbers or strings
        that match the interval values defined in the workflow.

    Returns
    -------
    str
        The composite interval ID string. For a single parameter this is just the interval ID.
        For multiple parameters, interval IDs are joined with 'X' (e.g. "ROW1XROW2").

    Raises
    ------
    ValueError
        If any parameter value does not match a defined interval in the workflow.

    Examples
    --------
    >>> workflow = Workflow(...)
    >>> # Single parameter
    >>> workflow.get_interval_id({"Temperature": 25})
    'ROW1'
    >>> # Multiple parameters
    >>> workflow.get_interval_id({"Temperature": 25, "Time": 60})
    'ROW1XROW2'
    >>> # Non-matching value raises error
    >>> workflow.get_interval_id({"Temperature": 999})
    AlbertException: No matching interval found for parameter 'Temperature' with value '999'
    """
    interval_id = ""
    for param_name, param_value in parameter_values.items():
        matching_interval = None
        for workflow_interval in self._interval_parameters:
            if workflow_interval.interval_param_name.lower() == param_name.lower() and (
                param_value == workflow_interval.interval_value
                or str(param_value) == workflow_interval.interval_value
            ):
                matching_interval = workflow_interval
                break

        if matching_interval is None:
            raise AlbertException(
                f"No matching interval found for parameter '{param_name}' with value '{param_value}'"
            )

        interval_id += (
            f"X{matching_interval.interval_id}"
            if interval_id != ""
            else matching_interval.interval_id
        )

    return interval_id

model_post_init

model_post_init(__context) -> None
Source code in src/albert/resources/workflows.py
def model_post_init(self, __context) -> None:
    self._populate_interval_parameters()

WorkflowCollection

WorkflowCollection(*, session: AlbertSession)

Bases: BaseCollection

WorkflowCollection is a collection class for managing Workflow entities in the Albert platform.

Parameters:

Name Type Description Default
session AlbertSession

The Albert session instance.

required

Methods:

Name Description
create

Create or return matching workflows for the provided list of workflows.

get_all

Get all workflows. Unlikely to be used in production.

get_by_id

Retrieve a Workflow by its ID.

get_by_ids

Returns a list of Workflow objects by their IDs.

Source code in src/albert/collections/workflows.py
def __init__(self, *, session: AlbertSession):
    """
    Initializes the WorkflowCollection with the provided session.

    Parameters
    ----------
    session : AlbertSession
        The Albert session instance.
    """
    super().__init__(session=session)
    self.base_path = f"/api/{WorkflowCollection._api_version}/workflows"

base_path instance-attribute

base_path = f'/api/{_api_version}/workflows'

create

create(*, workflows: list[Workflow]) -> list[Workflow]

Create or return matching workflows for the provided list of workflows. This endpoint automatically tries to find an existing workflow with the same parameter setpoints, and will either return the existing workflow or create a new one.

Parameters:

Name Type Description Default
workflows list[Workflow]

A list of Workflow objects to find or create.

required

Returns:

Type Description
list[Workflow]

A list of created or found Workflow objects.

Source code in src/albert/collections/workflows.py
def create(self, *, workflows: list[Workflow]) -> list[Workflow]:
    """Create or return matching workflows for the provided list of workflows.
    This endpoint automatically tries to find an existing workflow with the same parameter setpoints, and will either return the existing workflow or create a new one.

    Parameters
    ----------
    workflows : list[Workflow]
        A list of Workflow objects to find or create.

    Returns
    -------
    list[Workflow]
        A list of created or found Workflow objects.
    """
    if isinstance(workflows, Workflow):
        # in case the user forgets this should be a list
        workflows = [workflows]
    response = self.session.post(
        url=f"{self.base_path}/bulk",
        json=[x.model_dump(mode="json", by_alias=True, exclude_none=True) for x in workflows],
    )
    return [Workflow(**x) for x in response.json()]

get_all

get_all(limit: int = 50) -> Iterator[Workflow]

Get all workflows. Unlikely to be used in production.

Parameters:

Name Type Description Default
limit int

The number of workflows to return, by default 50.

50

Yields:

Type Description
Iterator[Workflow]

An iterator of Workflow objects.

Source code in src/albert/collections/workflows.py
def get_all(self, limit: int = 50) -> Iterator[Workflow]:
    """Get all workflows. Unlikely to be used in production.

    Parameters
    ----------
    limit : int, optional
        The number of workflows to return, by default 50.

    Yields
    ------
    Iterator[Workflow]
        An iterator of Workflow objects.
    """

    def deserialize(items: list[dict]) -> list[Workflow]:
        return self.get_by_ids(ids=[x["albertId"] for x in items])

    params = {"limit": limit}
    return AlbertPaginator(
        mode=PaginationMode.KEY,
        path=self.base_path,
        params=params,
        session=self.session,
        deserialize=deserialize,
    )

get_by_id

get_by_id(*, id: str) -> Workflow

Retrieve a Workflow by its ID.

Parameters:

Name Type Description Default
id str

The ID of the Workflow to retrieve.

required

Returns:

Type Description
Workflow

The Workflow object.

Source code in src/albert/collections/workflows.py
def get_by_id(self, *, id: str) -> Workflow:
    """Retrieve a Workflow by its ID.

    Parameters
    ----------
    id : str
        The ID of the Workflow to retrieve.

    Returns
    -------
    Workflow
        The Workflow object.
    """
    response = self.session.get(f"{self.base_path}/{id}")
    return Workflow(**response.json())

get_by_ids

get_by_ids(*, ids: list[str]) -> list[Workflow]

Returns a list of Workflow objects by their IDs.

Parameters:

Name Type Description Default
ids list[str]

The list of Workflow IDs to retrieve.

required

Returns:

Type Description
list[Workflow]

The list of Workflow objects matching the provided IDs.

Source code in src/albert/collections/workflows.py
def get_by_ids(self, *, ids: list[str]) -> list[Workflow]:
    """Returns a list of Workflow objects by their IDs.

    Parameters
    ----------
    ids : list[str]
        The list of Workflow IDs to retrieve.

    Returns
    -------
    list[Workflow]
        The list of Workflow objects matching the provided IDs.
    """
    url = f"{self.base_path}/ids"
    batches = [ids[i : i + 100] for i in range(0, len(ids), 100)]
    return [
        Workflow(**item)
        for batch in batches
        for item in self.session.get(url, params={"id": batch}).json()["Items"]
    ]