Skip to content

Workflows

albert.collections.workflows.WorkflowCollection

WorkflowCollection(*, session: AlbertSession)

Bases: BaseCollection

WorkflowCollection is a collection class for managing Workflow entities in the Albert platform.

Parameters:

Name Type Description Default
session AlbertSession

The Albert session instance.

required

Methods:

Name Description
create

Create or return matching workflows for the provided list of workflows.

get_by_id

Retrieve a Workflow by its ID.

get_by_ids

Returns a list of Workflow entities by their IDs.

get_all

Get all workflows. Unlikely to be used in production.

Attributes:

Name Type Description
base_path
Source code in src/albert/collections/workflows.py
def __init__(self, *, session: AlbertSession):
    """
    Initializes the WorkflowCollection with the provided session.

    Parameters
    ----------
    session : AlbertSession
        The Albert session instance.
    """
    super().__init__(session=session)
    self.base_path = f"/api/{WorkflowCollection._api_version}/workflows"

base_path

base_path = f'/api/{_api_version}/workflows'

create

create(*, workflows: list[Workflow]) -> list[Workflow]

Create or return matching workflows for the provided list of workflows. This endpoint automatically tries to find an existing workflow with the same parameter setpoints, and will either return the existing workflow or create a new one.

Parameters:

Name Type Description Default
workflows list[Workflow]

A list of Workflow entities to find or create.

required

Returns:

Type Description
list[Workflow]

A list of created or found Workflow entities.

Source code in src/albert/collections/workflows.py
def create(self, *, workflows: list[Workflow]) -> list[Workflow]:
    """Create or return matching workflows for the provided list of workflows.
    This endpoint automatically tries to find an existing workflow with the same parameter setpoints, and will either return the existing workflow or create a new one.

    Parameters
    ----------
    workflows : list[Workflow]
        A list of Workflow entities to find or create.

    Returns
    -------
    list[Workflow]
        A list of created or found Workflow entities.
    """
    if isinstance(workflows, Workflow):
        # in case the user forgets this should be a list
        workflows = [workflows]

    # Hydrate any parameter groups provided only by ID with their parameters
    for wf in workflows:
        self._hydrate_parameter_groups(workflow=wf)

    response = self.session.post(
        url=f"{self.base_path}/bulk",
        json=[
            x.model_dump(
                mode="json",
                by_alias=True,
                exclude_none=True,
                exclude={"created", "updated"},
            )
            for x in workflows
        ],
    )
    return [Workflow(**x) for x in response.json()]

get_by_id

get_by_id(*, id: WorkflowId) -> Workflow

Retrieve a Workflow by its ID.

Parameters:

Name Type Description Default
id str

The ID of the Workflow to retrieve.

required

Returns:

Type Description
Workflow

The Workflow object.

Source code in src/albert/collections/workflows.py
@validate_call
def get_by_id(self, *, id: WorkflowId) -> Workflow:
    """Retrieve a Workflow by its ID.

    Parameters
    ----------
    id : str
        The ID of the Workflow to retrieve.

    Returns
    -------
    Workflow
        The Workflow object.
    """
    response = self.session.get(f"{self.base_path}/{id}")
    return Workflow(**response.json())

get_by_ids

get_by_ids(*, ids: list[WorkflowId]) -> list[Workflow]

Returns a list of Workflow entities by their IDs.

Parameters:

Name Type Description Default
ids list[str]

The list of Workflow IDs to retrieve.

required

Returns:

Type Description
list[Workflow]

The list of Workflow entities matching the provided IDs.

Source code in src/albert/collections/workflows.py
@validate_call
def get_by_ids(self, *, ids: list[WorkflowId]) -> list[Workflow]:
    """Returns a list of Workflow entities by their IDs.

    Parameters
    ----------
    ids : list[str]
        The list of Workflow IDs to retrieve.

    Returns
    -------
    list[Workflow]
        The list of Workflow entities matching the provided IDs.
    """
    url = f"{self.base_path}/ids"
    batches = [ids[i : i + 100] for i in range(0, len(ids), 100)]
    return [
        Workflow(**item)
        for batch in batches
        for item in self.session.get(url, params={"id": batch}).json()["Items"]
    ]

get_all

get_all(max_items: int | None = None) -> Iterator[Workflow]

Get all workflows. Unlikely to be used in production.

Parameters:

Name Type Description Default
max_items int

Maximum number of items to return in total. If None, fetches all available items.

None

Yields:

Type Description
Iterator[Workflow]

An iterator of Workflow entities.

Source code in src/albert/collections/workflows.py
def get_all(
    self,
    max_items: int | None = None,
) -> Iterator[Workflow]:
    """
    Get all workflows. Unlikely to be used in production.

    Parameters
    ----------
    max_items : int, optional
        Maximum number of items to return in total. If None, fetches all available items.

    Yields
    ------
    Iterator[Workflow]
        An iterator of Workflow entities.
    """

    def deserialize(items: list[dict]) -> list[Workflow]:
        return self.get_by_ids(ids=[x["albertId"] for x in items])

    return AlbertPaginator(
        mode=PaginationMode.KEY,
        path=self.base_path,
        params={},
        session=self.session,
        deserialize=deserialize,
        max_items=max_items,
    )