Skip to content

Workflows

albert.collections.workflows

AlbertSession

AlbertSession(
    *,
    base_url: str,
    token: str | None = None,
    client_credentials: ClientCredentials | None = None,
    retries: int | None = None,
)

Bases: Session

A session that has a base URL, which is prefixed to all request URLs.

Parameters:

Name Type Description Default
base_url str

The base URL to prefix to all requests. (e.g., "https://sandbox.albertinvent.com")

required
retries int

The number of retries for failed requests. Defaults to 3.

None
client_credentials ClientCredentials | None

The client credentials for programmatic authentication. Optional if token is provided.

None
token str | None

The JWT token for authentication. Optional if client credentials are provided.

None

Methods:

Name Description
request
Source code in src/albert/session.py
def __init__(
    self,
    *,
    base_url: str,
    token: str | None = None,
    client_credentials: ClientCredentials | None = None,
    retries: int | None = None,
):
    super().__init__()
    self.base_url = base_url
    self.headers.update(
        {
            "Content-Type": "application/json",
            "Accept": "application/json",
            "User-Agent": f"albert-SDK V.{albert.__version__}",
        }
    )

    if token is None and client_credentials is None:
        raise ValueError("Either client credentials or token must be specified.")

    self._provided_token = token
    self._token_manager = (
        TokenManager(base_url, client_credentials) if client_credentials is not None else None
    )

    # Set up retry logic
    retries = retries if retries is not None else 3
    retry = Retry(
        total=retries,
        read=retries,
        connect=retries,
        backoff_factor=0.3,
        status_forcelist=(500, 502, 503, 504, 403),
        raise_on_status=False,
    )
    adapter = HTTPAdapter(max_retries=retry)
    self.mount("http://", adapter)
    self.mount("https://", adapter)

base_url instance-attribute

base_url = base_url

request

request(
    method: str, path: str, *args, **kwargs
) -> Response
Source code in src/albert/session.py
def request(self, method: str, path: str, *args, **kwargs) -> requests.Response:
    self.headers["Authorization"] = f"Bearer {self._access_token}"
    full_url = urljoin(self.base_url, path) if not path.startswith("http") else path
    with handle_http_errors():
        response = super().request(method, full_url, *args, **kwargs)
        response.raise_for_status()
        return response

BaseCollection

BaseCollection(*, session: AlbertSession)

BaseCollection is the base class for all collection classes.

Parameters:

Name Type Description Default
session AlbertSession

The Albert API Session instance.

required
Source code in src/albert/collections/base.py
def __init__(self, *, session: AlbertSession):
    self.session = session

session instance-attribute

session = session

Workflow

Bases: BaseResource

A Pydantic Class representing a workflow in Albert.

Workflows are combinations of Data Templates and Parameter groups and their associated setpoints.

Attributes:

Name Type Description
name str

The name of the workflow.

parameter_group_setpoints list[ParameterGroupSetpoints]

The setpoints to apply to the parameter groups in the workflow.

id str | None

The AlbertID of the workflow. This is set when a workflow is retrived from the platform.

Methods:

Name Description
get_interval_id

Get the interval ID for a set of parameter values.

model_post_init

id class-attribute instance-attribute

id: str | None = Field(
    alias="albertId",
    default=None,
    validation_alias=AliasChoices(
        "albertId", "existingAlbertId"
    ),
    exclude=True,
)

interval_combinations class-attribute instance-attribute

interval_combinations: list[IntervalCombination] | None = (
    Field(default=None, alias="IntervalCombinations")
)

name instance-attribute

name: str

parameter_group_setpoints class-attribute instance-attribute

parameter_group_setpoints: list[ParameterGroupSetpoints] = (
    Field(alias="ParameterGroups")
)

get_interval_id

get_interval_id(parameter_values: dict[str, any]) -> str

Get the interval ID for a set of parameter values.

This method matches parameter values to intervals defined in the workflow and constructs a composite interval ID. For multiple parameters, the interval IDs are joined with 'X'.

Parameters:

Name Type Description Default
parameter_values dict[str, any]

Dictionary mapping parameter names to their values. Values can be numbers or strings that match the interval values defined in the workflow.

required

Returns:

Type Description
str

The composite interval ID string. For a single parameter this is just the interval ID. For multiple parameters, interval IDs are joined with 'X' (e.g. "ROW1XROW2").

Raises:

Type Description
ValueError

If any parameter value does not match a defined interval in the workflow.

Examples:

>>> workflow = Workflow(...)
>>> # Single parameter
>>> workflow.get_interval_id({"Temperature": 25})
'ROW1'
>>> # Multiple parameters
>>> workflow.get_interval_id({"Temperature": 25, "Time": 60})
'ROW1XROW2'
>>> # Non-matching value raises error
>>> workflow.get_interval_id({"Temperature": 999})
AlbertException: No matching interval found for parameter 'Temperature' with value '999'
Source code in src/albert/resources/workflows.py
def get_interval_id(self, parameter_values: dict[str, any]) -> str:
    """Get the interval ID for a set of parameter values.

    This method matches parameter values to intervals defined in the workflow and constructs
    a composite interval ID. For multiple parameters, the interval IDs are joined with 'X'.

    Parameters
    ----------
    parameter_values : dict[str, any]
        Dictionary mapping parameter names to their values. Values can be numbers or strings
        that match the interval values defined in the workflow.

    Returns
    -------
    str
        The composite interval ID string. For a single parameter this is just the interval ID.
        For multiple parameters, interval IDs are joined with 'X' (e.g. "ROW1XROW2").

    Raises
    ------
    ValueError
        If any parameter value does not match a defined interval in the workflow.

    Examples
    --------
    >>> workflow = Workflow(...)
    >>> # Single parameter
    >>> workflow.get_interval_id({"Temperature": 25})
    'ROW1'
    >>> # Multiple parameters
    >>> workflow.get_interval_id({"Temperature": 25, "Time": 60})
    'ROW1XROW2'
    >>> # Non-matching value raises error
    >>> workflow.get_interval_id({"Temperature": 999})
    AlbertException: No matching interval found for parameter 'Temperature' with value '999'
    """
    interval_id = ""
    for param_name, param_value in parameter_values.items():
        matching_interval = None
        for workflow_interval in self._interval_parameters:
            if workflow_interval.interval_param_name.lower() == param_name.lower() and (
                param_value == workflow_interval.interval_value
                or str(param_value) == workflow_interval.interval_value
            ):
                matching_interval = workflow_interval
                break

        if matching_interval is None:
            raise AlbertException(
                f"No matching interval found for parameter '{param_name}' with value '{param_value}'"
            )

        interval_id += (
            f"X{matching_interval.interval_id}"
            if interval_id != ""
            else matching_interval.interval_id
        )

    return interval_id

model_post_init

model_post_init(__context) -> None
Source code in src/albert/resources/workflows.py
def model_post_init(self, __context) -> None:
    self._populate_interval_parameters()

WorkflowCollection

WorkflowCollection(*, session: AlbertSession)

Bases: BaseCollection

WorkflowCollection is a collection class for managing Workflow entities in the Albert platform.

Parameters:

Name Type Description Default
session AlbertSession

The Albert session instance.

required

Methods:

Name Description
create

Create or return matching workflows for the provided list of workflows.

get_by_id

Retrieve a Workflow by its ID.

get_by_ids

Returns a list of Workflow objects by their IDs.

list

List all workflows. Unlikly to be used in production.

Source code in src/albert/collections/workflows.py
def __init__(self, *, session: AlbertSession):
    """
    Initializes the WorkflowCollection with the provided session.

    Parameters
    ----------
    session : AlbertSession
        The Albert session instance.
    """
    super().__init__(session=session)
    self.base_path = f"/api/{WorkflowCollection._api_version}/workflows"

base_path instance-attribute

base_path = f'/api/{_api_version}/workflows'

create

create(*, workflows: list[Workflow]) -> list[Workflow]

Create or return matching workflows for the provided list of workflows. This endpoint automatically tries to find an existing workflow with the same parameter setpoints, and will either return the existing workflow or create a new one.

Parameters:

Name Type Description Default
workflows list[Workflow]

A list of Workflow objects to find or create.

required

Returns:

Type Description
list[Workflow]

A list of created or found Workflow objects.

Source code in src/albert/collections/workflows.py
def create(self, *, workflows: list[Workflow]) -> list[Workflow]:
    """Create or return matching workflows for the provided list of workflows.
    This endpoint automatically tries to find an existing workflow with the same parameter setpoints, and will either return the existing workflow or create a new one.

    Parameters
    ----------
    workflows : list[Workflow]
        A list of Workflow objects to find or create.

    Returns
    -------
    list[Workflow]
        A list of created or found Workflow objects.
    """
    if isinstance(workflows, Workflow):
        # in case the user forgets this should be a list
        workflows = [workflows]
    response = self.session.post(
        url=f"{self.base_path}/bulk",
        json=[x.model_dump(mode="json", by_alias=True, exclude_none=True) for x in workflows],
    )
    return [Workflow(**x) for x in response.json()]

get_by_id

get_by_id(*, id: str) -> Workflow

Retrieve a Workflow by its ID.

Parameters:

Name Type Description Default
id str

The ID of the Workflow to retrieve.

required

Returns:

Type Description
Workflow

The Workflow object.

Source code in src/albert/collections/workflows.py
def get_by_id(self, *, id: str) -> Workflow:
    """Retrieve a Workflow by its ID.

    Parameters
    ----------
    id : str
        The ID of the Workflow to retrieve.

    Returns
    -------
    Workflow
        The Workflow object.
    """
    response = self.session.get(f"{self.base_path}/{id}")
    return Workflow(**response.json())

get_by_ids

get_by_ids(*, ids: list[str]) -> list[Workflow]

Returns a list of Workflow objects by their IDs.

Parameters:

Name Type Description Default
ids list[str]

The list of Workflow IDs to retrieve.

required

Returns:

Type Description
list[Workflow]

The list of Workflow objects matching the provided IDs.

Source code in src/albert/collections/workflows.py
def get_by_ids(self, *, ids: list[str]) -> list[Workflow]:
    """Returns a list of Workflow objects by their IDs.

    Parameters
    ----------
    ids : list[str]
        The list of Workflow IDs to retrieve.

    Returns
    -------
    list[Workflow]
        The list of Workflow objects matching the provided IDs.
    """
    url = f"{self.base_path}/ids"
    batches = [ids[i : i + 100] for i in range(0, len(ids), 100)]
    return [
        Workflow(**item)
        for batch in batches
        for item in self.session.get(url, params={"id": batch}).json()["Items"]
    ]

list

list(limit: int = 50) -> Iterator[Workflow]

List all workflows. Unlikly to be used in production.

Parameters:

Name Type Description Default
limit int

The number of workflows to return, by default 50.

50

Yields:

Type Description
Iterator[Workflow]

An iterator of Workflow objects.

Source code in src/albert/collections/workflows.py
def list(self, limit: int = 50) -> Iterator[Workflow]:
    """List all workflows. Unlikly to be used in production.

    Parameters
    ----------
    limit : int, optional
        The number of workflows to return, by default 50.

    Yields
    ------
    Iterator[Workflow]
        An iterator of Workflow objects.
    """

    def deserialize(items: list[dict]) -> list[Workflow]:
        return self.get_by_ids(ids=[x["albertId"] for x in items])

    params = {"limit": limit}
    return AlbertPaginator(
        mode=PaginationMode.KEY,
        path=self.base_path,
        params=params,
        session=self.session,
        deserialize=deserialize,
    )