Skip to content

Tasks

albert.collections.tasks.TaskCollection

TaskCollection(*, session: AlbertSession)

Bases: BaseCollection

TaskCollection is a collection class for managing Task entities in the Albert platform.

Parameters:

Name Type Description Default
session AlbertSession

The Albert Session information

required

Methods:

Name Description
create

Create a new task. Tasks can be of different types, such as PropertyTask, and are created using the provided task object.

add_block

Add a block to a Property task.

update_block_workflow

Update the workflow of a specific block within a task.

remove_block

Remove a block from a Property task.

delete

Delete a task.

get_by_id

Retrieve a task by its ID.

search

Search for Task matching the provided criteria.

get_all

Retrieve fully hydrated Task entities with optional filters.

update

Update a task.

get_history

Attributes:

Name Type Description
base_path
Source code in src/albert/collections/tasks.py
def __init__(self, *, session: AlbertSession):
    """Initialize the TaskCollection.

    Parameters
    ----------
    session : AlbertSession
        The Albert Session information
    """
    super().__init__(session=session)
    self.base_path = f"/api/{TaskCollection._api_version}/tasks"

base_path

base_path = f'/api/{_api_version}/tasks'

create

create(
    *, task: PropertyTask | GeneralTask | BatchTask
) -> BaseTask

Create a new task. Tasks can be of different types, such as PropertyTask, and are created using the provided task object.

Parameters:

Name Type Description Default
task PropertyTask | GeneralTask | BatchTask

The task object to create.

required

Returns:

Type Description
BaseTask

The registered task object.

Source code in src/albert/collections/tasks.py
def create(self, *, task: PropertyTask | GeneralTask | BatchTask) -> BaseTask:
    """Create a new task. Tasks can be of different types, such as PropertyTask, and are created using the provided task object.

    Parameters
    ----------
    task : PropertyTask | GeneralTask | BatchTask
        The task object to create.

    Returns
    -------
    BaseTask
        The registered task object.
    """
    payload = [task.model_dump(mode="json", by_alias=True, exclude_none=True)]
    url = f"{self.base_path}/multi?category={task.category.value}"
    if task.parent_id is not None:
        url = f"{url}&parentId={task.parent_id}"
    response = self.session.post(url=url, json=payload)
    task_data = response.json()[0]
    return TaskAdapter.validate_python(task_data)

add_block

add_block(
    *,
    task_id: TaskId,
    data_template_id: DataTemplateId,
    workflow_id: WorkflowId,
) -> None

Add a block to a Property task.

Parameters:

Name Type Description Default
task_id TaskId

The ID of the task to add the block to.

required
data_template_id DataTemplateId

The ID of the data template to use for the block.

required
workflow_id WorkflowId

The ID of the workflow to assign to the block.

required

Returns:

Type Description
None

This method does not return any value.

Source code in src/albert/collections/tasks.py
@validate_call
def add_block(
    self, *, task_id: TaskId, data_template_id: DataTemplateId, workflow_id: WorkflowId
) -> None:
    """Add a block to a Property task.

    Parameters
    ----------
    task_id : TaskId
        The ID of the task to add the block to.
    data_template_id : DataTemplateId
        The ID of the data template to use for the block.
    workflow_id : WorkflowId
        The ID of the workflow to assign to the block.

    Returns
    -------
    None
        This method does not return any value.

    """
    url = f"{self.base_path}/{task_id}"
    payload = [
        {
            "id": task_id,
            "data": [
                {
                    "operation": "add",
                    "attribute": "Block",
                    "newValue": [{"datId": data_template_id, "Workflow": {"id": workflow_id}}],
                }
            ],
        }
    ]
    self.session.patch(url=url, json=payload)

update_block_workflow

update_block_workflow(
    *,
    task_id: TaskId,
    block_id: BlockId,
    workflow_id: WorkflowId,
) -> None

Update the workflow of a specific block within a task.

This method updates the workflow of a specified block within a task.

Parameters:

Name Type Description Default
task_id str

The ID of the task.

required
block_id str

The ID of the block within the task.

required
workflow_id str

The ID of the new workflow to be assigned to the block.

required

Returns:

Type Description
None

This method does not return any value.

Notes
  • The method asserts that the retrieved task is an instance of PropertyTask.
  • If the block's current workflow matches the new workflow ID, no update is performed.
  • The method handles the case where the block has a default workflow named "No Parameter Group".
Source code in src/albert/collections/tasks.py
@validate_call
def update_block_workflow(
    self, *, task_id: TaskId, block_id: BlockId, workflow_id: WorkflowId
) -> None:
    """
    Update the workflow of a specific block within a task.

    This method updates the workflow of a specified block within a task.
    Parameters
    ----------
    task_id : str
        The ID of the task.
    block_id : str
        The ID of the block within the task.
    workflow_id : str
        The ID of the new workflow to be assigned to the block.

    Returns
    -------
    None
        This method does not return any value.

    Notes
    -----
    - The method asserts that the retrieved task is an instance of `PropertyTask`.
    - If the block's current workflow matches the new workflow ID, no update is performed.
    - The method handles the case where the block has a default workflow named "No Parameter Group".
    """
    url = f"{self.base_path}/{task_id}"
    task = self.get_by_id(id=task_id)
    if not isinstance(task, PropertyTask):
        logger.error(f"Task {task_id} is not an instance of PropertyTask")
        raise TypeError(f"Task {task_id} is not an instance of PropertyTask")
    for b in task.blocks:
        if b.id != block_id:
            continue
        for w in b.workflow:
            if w.name == "No Parameter Group" and len(b.workflow) > 1:
                # hardcoded default workflow
                continue
            existing_workflow_id = w.id
    if existing_workflow_id == workflow_id:
        logger.info(f"Block {block_id} already has workflow {workflow_id}")
        return None
    patch = [
        {
            "data": [
                {
                    "operation": "update",
                    "attribute": "workflow",
                    "oldValue": existing_workflow_id,
                    "newValue": workflow_id,
                    "blockId": block_id,
                }
            ],
            "id": task_id,
        }
    ]
    self.session.patch(url=url, json=patch)

remove_block

remove_block(*, task_id: TaskId, block_id: BlockId) -> None

Remove a block from a Property task.

Parameters:

Name Type Description Default
task_id str

ID of the Task to remove the block from (e.g., TASFOR1234)

required
block_id str

ID of the Block to remove (e.g., BLK1)

required

Returns:

Type Description
None
Source code in src/albert/collections/tasks.py
@validate_call
def remove_block(self, *, task_id: TaskId, block_id: BlockId) -> None:
    """Remove a block from a Property task.

    Parameters
    ----------
    task_id : str
        ID of the Task to remove the block from (e.g., TASFOR1234)
    block_id : str
        ID of the Block to remove (e.g., BLK1)

    Returns
    -------
    None
    """
    url = f"{self.base_path}/{task_id}"
    payload = [
        {
            "id": task_id,
            "data": [
                {
                    "operation": "delete",
                    "attribute": "Block",
                    "oldValue": [block_id],
                }
            ],
        }
    ]
    self.session.patch(url=url, json=payload)

delete

delete(*, id: TaskId) -> None

Delete a task.

Parameters:

Name Type Description Default
id TaskId

The ID of the task to delete.

required
Source code in src/albert/collections/tasks.py
@validate_call
def delete(self, *, id: TaskId) -> None:
    """Delete a task.

    Parameters
    ----------
    id : TaskId
        The ID of the task to delete.
    """
    url = f"{self.base_path}/{id}"
    self.session.delete(url)

get_by_id

get_by_id(*, id: TaskId) -> BaseTask

Retrieve a task by its ID.

Parameters:

Name Type Description Default
id TaskId

The ID of the task to retrieve.

required

Returns:

Type Description
BaseTask

The task object with the provided ID.

Source code in src/albert/collections/tasks.py
@validate_call
def get_by_id(self, *, id: TaskId) -> BaseTask:
    """Retrieve a task by its ID.

    Parameters
    ----------
    id : TaskId
        The ID of the task to retrieve.

    Returns
    -------
    BaseTask
        The task object with the provided ID.
    """
    url = f"{self.base_path}/multi/{id}"
    response = self.session.get(url)
    return TaskAdapter.validate_python(response.json())

search

search(
    *,
    text: str | None = None,
    tags: list[str] | None = None,
    task_id: list[TaskId] | None = None,
    linked_task: list[TaskId] | None = None,
    category: TaskCategory | None = None,
    albert_id: list[str] | None = None,
    data_template: list[DataTemplateId] | None = None,
    assigned_to: list[str] | None = None,
    location: list[str] | None = None,
    priority: list[str] | None = None,
    status: list[str] | None = None,
    parameter_group: list[ParameterGroupId] | None = None,
    created_by: list[str] | None = None,
    project_id: ProjectId | None = None,
    order_by: OrderBy = DESCENDING,
    sort_by: str | None = None,
    max_items: int | None = None,
    offset: int = 0,
) -> Iterator[TaskSearchItem]

Search for Task matching the provided criteria.

⚠️ This method returns partial (unhydrated) entities to optimize performance. To retrieve fully detailed entities, use :meth:get_all instead.

Parameters:

Name Type Description Default
text str

Text search across multiple task fields.

None
tags list[str]

Filter by tags associated with tasks.

None
task_id list[str]

Specific task IDs to search for.

None
linked_task list[str]

Task IDs linked to the ones being searched.

None
category TaskCategory

Task category filter (e.g., Experiment, Analysis).

None
albert_id list[str]

Albert-specific task identifiers.

None
data_template list[str]

Data template IDs associated with tasks.

None
assigned_to list[str]

User names assigned to the tasks.

None
location list[str]

Locations where tasks are carried out.

None
priority list[str]

Priority levels for filtering tasks.

None
status list[str]

Task status values (e.g., Open, Done).

None
parameter_group list[str]

Parameter Group IDs associated with tasks.

None
created_by list[str]

User names who created the tasks.

None
project_id str

ID of the parent project for filtering tasks.

None
order_by OrderBy

The order in which to return results (asc or desc), default DESCENDING.

DESCENDING
sort_by str

Attribute to sort tasks by (e.g., createdAt, name).

None
max_items int

Maximum number of items to return in total. If None, fetches all available items.

None
offset int

Number of results to skip for pagination, default 0.

0

Returns:

Type Description
Iterator[TaskSearchItem]

An iterator of matching, lightweight TaskSearchItem entities.

Source code in src/albert/collections/tasks.py
@validate_call
def search(
    self,
    *,
    text: str | None = None,
    tags: list[str] | None = None,
    task_id: list[TaskId] | None = None,
    linked_task: list[TaskId] | None = None,
    category: TaskCategory | None = None,
    albert_id: list[str] | None = None,
    data_template: list[DataTemplateId] | None = None,
    assigned_to: list[str] | None = None,
    location: list[str] | None = None,
    priority: list[str] | None = None,
    status: list[str] | None = None,
    parameter_group: list[ParameterGroupId] | None = None,
    created_by: list[str] | None = None,
    project_id: ProjectId | None = None,
    order_by: OrderBy = OrderBy.DESCENDING,
    sort_by: str | None = None,
    max_items: int | None = None,
    offset: int = 0,
) -> Iterator[TaskSearchItem]:
    """
    Search for Task matching the provided criteria.

    ⚠️ This method returns partial (unhydrated) entities to optimize performance.
    To retrieve fully detailed entities, use :meth:`get_all` instead.

    Parameters
    ----------
    text : str, optional
        Text search across multiple task fields.
    tags : list[str], optional
        Filter by tags associated with tasks.
    task_id : list[str], optional
        Specific task IDs to search for.
    linked_task : list[str], optional
        Task IDs linked to the ones being searched.
    category : TaskCategory, optional
        Task category filter (e.g., Experiment, Analysis).
    albert_id : list[str], optional
        Albert-specific task identifiers.
    data_template : list[str], optional
        Data template IDs associated with tasks.
    assigned_to : list[str], optional
        User names assigned to the tasks.
    location : list[str], optional
        Locations where tasks are carried out.
    priority : list[str], optional
        Priority levels for filtering tasks.
    status : list[str], optional
        Task status values (e.g., Open, Done).
    parameter_group : list[str], optional
        Parameter Group IDs associated with tasks.
    created_by : list[str], optional
        User names who created the tasks.
    project_id : str, optional
        ID of the parent project for filtering tasks.
    order_by : OrderBy, optional
        The order in which to return results (asc or desc), default DESCENDING.
    sort_by : str, optional
        Attribute to sort tasks by (e.g., createdAt, name).
    max_items : int, optional
        Maximum number of items to return in total. If None, fetches all available items.
    offset : int, optional
        Number of results to skip for pagination, default 0.

    Returns
    -------
    Iterator[TaskSearchItem]
        An iterator of matching, lightweight TaskSearchItem entities.
    """
    params = {
        "offset": offset,
        "order": order_by.value,
        "text": text,
        "sortBy": sort_by,
        "tags": tags,
        "taskId": task_id,
        "linkedTask": linked_task,
        "category": category,
        "albertId": albert_id,
        "dataTemplate": data_template,
        "assignedTo": assigned_to,
        "location": location,
        "priority": priority,
        "status": status,
        "parameterGroup": parameter_group,
        "createdBy": created_by,
        "projectId": project_id,
    }

    return AlbertPaginator(
        mode=PaginationMode.OFFSET,
        path=f"{self.base_path}/search",
        session=self.session,
        params=params,
        max_items=max_items,
        deserialize=lambda items: [
            TaskSearchItem(**item)._bind_collection(self) for item in items
        ],
    )

get_all

get_all(
    *,
    text: str | None = None,
    tags: list[str] | None = None,
    task_id: list[TaskId] | None = None,
    linked_task: list[TaskId] | None = None,
    category: TaskCategory | None = None,
    albert_id: list[str] | None = None,
    data_template: list[DataTemplateId] | None = None,
    assigned_to: list[str] | None = None,
    location: list[str] | None = None,
    priority: list[str] | None = None,
    status: list[str] | None = None,
    parameter_group: list[ParameterGroupId] | None = None,
    created_by: list[str] | None = None,
    project_id: ProjectId | None = None,
    order_by: OrderBy = DESCENDING,
    sort_by: str | None = None,
    max_items: int | None = None,
    offset: int = 0,
) -> Iterator[BaseTask]

Retrieve fully hydrated Task entities with optional filters.

This method returns complete entity data using get_by_id. Use :meth:search for faster retrieval when you only need lightweight, partial (unhydrated) entities.

Parameters:

Name Type Description Default
text str

Text search across multiple task fields.

None
tags list[str]

Filter by tags associated with tasks.

None
task_id list[str]

Specific task IDs to search for.

None
linked_task list[str]

Task IDs linked to the ones being searched.

None
category TaskCategory

Task category filter (e.g., Experiment, Analysis).

None
albert_id list[str]

Albert-specific task identifiers.

None
data_template list[str]

Data template IDs associated with tasks.

None
assigned_to list[str]

User names assigned to the tasks.

None
location list[str]

Locations where tasks are carried out.

None
priority list[str]

Priority levels for filtering tasks.

None
status list[str]

Task status values (e.g., Open, Done).

None
parameter_group list[str]

Parameter Group IDs associated with tasks.

None
created_by list[str]

User names who created the tasks.

None
project_id str

ID of the parent project for filtering tasks.

None
order_by OrderBy

The order in which to return results (asc or desc), default DESCENDING.

DESCENDING
sort_by str

Attribute to sort tasks by (e.g., createdAt, name).

None
max_items int

Maximum number of items to return in total. If None, fetches all available items.

None
offset int

Number of results to skip for pagination, default 0.

0

Yields:

Type Description
Iterator[BaseTask]

A stream of fully hydrated Task entities (PropertyTask, BatchTask, or GeneralTask).

Source code in src/albert/collections/tasks.py
@validate_call
def get_all(
    self,
    *,
    text: str | None = None,
    tags: list[str] | None = None,
    task_id: list[TaskId] | None = None,
    linked_task: list[TaskId] | None = None,
    category: TaskCategory | None = None,
    albert_id: list[str] | None = None,
    data_template: list[DataTemplateId] | None = None,
    assigned_to: list[str] | None = None,
    location: list[str] | None = None,
    priority: list[str] | None = None,
    status: list[str] | None = None,
    parameter_group: list[ParameterGroupId] | None = None,
    created_by: list[str] | None = None,
    project_id: ProjectId | None = None,
    order_by: OrderBy = OrderBy.DESCENDING,
    sort_by: str | None = None,
    max_items: int | None = None,
    offset: int = 0,
) -> Iterator[BaseTask]:
    """
    Retrieve fully hydrated Task entities with optional filters.

    This method returns complete entity data using `get_by_id`.
    Use :meth:`search` for faster retrieval when you only need lightweight, partial (unhydrated) entities.

    Parameters
    ----------
    text : str, optional
        Text search across multiple task fields.
    tags : list[str], optional
        Filter by tags associated with tasks.
    task_id : list[str], optional
        Specific task IDs to search for.
    linked_task : list[str], optional
        Task IDs linked to the ones being searched.
    category : TaskCategory, optional
        Task category filter (e.g., Experiment, Analysis).
    albert_id : list[str], optional
        Albert-specific task identifiers.
    data_template : list[str], optional
        Data template IDs associated with tasks.
    assigned_to : list[str], optional
        User names assigned to the tasks.
    location : list[str], optional
        Locations where tasks are carried out.
    priority : list[str], optional
        Priority levels for filtering tasks.
    status : list[str], optional
        Task status values (e.g., Open, Done).
    parameter_group : list[str], optional
        Parameter Group IDs associated with tasks.
    created_by : list[str], optional
        User names who created the tasks.
    project_id : str, optional
        ID of the parent project for filtering tasks.
    order_by : OrderBy, optional
        The order in which to return results (asc or desc), default DESCENDING.
    sort_by : str, optional
        Attribute to sort tasks by (e.g., createdAt, name).
    max_items : int, optional
        Maximum number of items to return in total. If None, fetches all available items.
    offset : int, optional
        Number of results to skip for pagination, default 0.

    Yields
    ------
    Iterator[BaseTask]
        A stream of fully hydrated Task entities (PropertyTask, BatchTask, or GeneralTask).
    """
    for task in self.search(
        text=text,
        tags=tags,
        task_id=task_id,
        linked_task=linked_task,
        category=category,
        albert_id=albert_id,
        data_template=data_template,
        assigned_to=assigned_to,
        location=location,
        priority=priority,
        status=status,
        parameter_group=parameter_group,
        created_by=created_by,
        project_id=project_id,
        order_by=order_by,
        sort_by=sort_by,
        max_items=max_items,
        offset=offset,
    ):
        task_id = getattr(task, "id", None)
        if not task_id:
            continue

        try:
            yield self.get_by_id(id=task_id)
        except (AlbertHTTPError, RetryError) as e:
            logger.warning(f"Error fetching task '{task_id}': {e}")

update

update(*, task: BaseTask) -> BaseTask

Update a task.

Parameters:

Name Type Description Default
task BaseTask

The updated Task object.

required

Returns:

Type Description
BaseTask

The updated Task object as it exists in the Albert platform.

Source code in src/albert/collections/tasks.py
def update(self, *, task: BaseTask) -> BaseTask:
    """Update a task.

    Parameters
    ----------
    task : BaseTask
        The updated Task object.

    Returns
    -------
    BaseTask
        The updated Task object as it exists in the Albert platform.
    """
    existing = self.get_by_id(id=task.id)
    patch_payload = self._generate_adv_patch_payload(updated=task, existing=existing)

    if len(patch_payload.data) == 0:
        logger.info(f"Task {task.id} is already up to date")
        return task
    path = f"{self.base_path}/{task.id}"

    for datum in patch_payload.data:
        patch_payload = TaskPatchPayload(data=[datum], id=task.id)
        self.session.patch(
            url=path,
            json=[patch_payload.model_dump(mode="json", by_alias=True, exclude_none=True)],
        )

    return self.get_by_id(id=task.id)

get_history

get_history(
    *,
    id: TaskId,
    order: OrderBy = DESCENDING,
    limit: int = 1000,
    entity: HistoryEntity | None = None,
    blockId: str | None = None,
    startKey: str | None = None,
) -> TaskHistory
Source code in src/albert/collections/tasks.py
def get_history(
    self,
    *,
    id: TaskId,
    order: OrderBy = OrderBy.DESCENDING,
    limit: int = 1000,
    entity: HistoryEntity | None = None,
    blockId: str | None = None,
    startKey: str | None = None,
) -> TaskHistory:
    params = {
        "limit": limit,
        "orderBy": OrderBy(order).value if order else None,
        "entity": entity,
        "blockId": blockId,
        "startKey": startKey,
    }
    url = f"{self.base_path}/{id}/history"
    response = self.session.get(url, params=params)
    return TaskHistory(**response.json())