Module: task_key

Expand source code
# Copyright (C) 2023-present The Project Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from dataclasses import dataclass
from typing import Type

from cl.runtime.primitive.timestamp import Timestamp
from cl.runtime.records.dataclasses_extensions import missing
from cl.runtime.records.key_mixin import KeyMixin
from cl.runtime.records.protocols import is_key


@dataclass(slots=True, kw_only=True)
class TaskKey(KeyMixin):
    """
    The task 'run_task' method is invoked by the queue to which the task is submitted.

    Notes:
        - The task may run sequentially or in parallel with other tasks
        - The task may run in a different process, thread or machine than the submitting code
          and must be able to acquire the resources required by its 'run_task' method in all of these cases
        - The queue updates 'status' field of the task as it progresses from its initial Pending state through
          the Running and optionally Paused state and ending in one of Completed, Failed, or Cancelled states
    """

    task_id: str = missing()
    """Unique task identifier."""

    def init(self) -> None:
        # Check only if inside a key, will be set automatically if inside a record
        if is_key(self):
            Timestamp.validate(
                self.task_id,
                value_name="task_id",
                data_type="TaskKey"
            )

    @classmethod
    def get_key_type(cls) -> Type:
        return TaskKey

Classes

class TaskKey (*, task_id: str = None)

The task ‘run_task’ method is invoked by the queue to which the task is submitted.

Notes

  • The task may run sequentially or in parallel with other tasks
  • The task may run in a different process, thread or machine than the submitting code and must be able to acquire the resources required by its ‘run_task’ method in all of these cases
  • The queue updates ‘status’ field of the task as it progresses from its initial Pending state through the Running and optionally Paused state and ending in one of Completed, Failed, or Cancelled states
Expand source code
@dataclass(slots=True, kw_only=True)
class TaskKey(KeyMixin):
    """
    The task 'run_task' method is invoked by the queue to which the task is submitted.

    Notes:
        - The task may run sequentially or in parallel with other tasks
        - The task may run in a different process, thread or machine than the submitting code
          and must be able to acquire the resources required by its 'run_task' method in all of these cases
        - The queue updates 'status' field of the task as it progresses from its initial Pending state through
          the Running and optionally Paused state and ending in one of Completed, Failed, or Cancelled states
    """

    task_id: str = missing()
    """Unique task identifier."""

    def init(self) -> None:
        # Check only if inside a key, will be set automatically if inside a record
        if is_key(self):
            Timestamp.validate(
                self.task_id,
                value_name="task_id",
                data_type="TaskKey"
            )

    @classmethod
    def get_key_type(cls) -> Type:
        return TaskKey

Ancestors

Subclasses

Static methods

def get_key_type() -> Type

Inherited from: KeyMixin.get_key_type

Return key type even when called from a record.

Fields

var task_id -> str

Unique task identifier.

Methods

def init(self) -> None