Module: callable_task

Expand source code
# Copyright (C) 2023-present The Project Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import inspect
import re
from abc import ABC
from dataclasses import dataclass
from typing import Any
from typing import Callable
from typing import Dict
from typing import cast
from inflection import underscore
from typing_extensions import Self
from cl.runtime.context.context import Context
from cl.runtime.records.dataclasses_extensions import missing
from cl.runtime.records.protocols import KeyProtocol
from cl.runtime.schema.schema import Schema
from cl.runtime.serialization.dict_serializer import DictSerializer
from cl.runtime.serialization.string_serializer import StringSerializer
from cl.runtime.tasks.task import Task
from cl.runtime.tasks.task_key import TaskKey

key_serializer = StringSerializer()
param_dict_serializer = DictSerializer()  # TODO: Support complex params


@dataclass(slots=True, kw_only=True)
class CallableTask(Task, ABC):
    """Base class for tasks that invoke callables (class methods, functions, etc.)."""

    @classmethod
    def normalize_method_name(cls, method_name: str) -> str:
        """If method name has uppercase letters, assume it is PascalCase and convert to snake_case."""

        if any(c.isupper() for c in method_name):
            # Use inflection library
            result = underscore(method_name)
            # In addition, add underscore before numbers
            result = re.sub(r"([0-9]+)", r"_1", result)
        else:
            # Already in snake_case, return unchanged argument
            result = method_name
        return result

Classes

class CallableTask (*, task_id: str = None, label: str | None = None, queue: TaskQueueKey = None, status: TaskStatusEnum = None, progress_pct: float = None, elapsed_sec: float | None = None, remaining_sec: float | None = None, error_message: str | None = None)

Base class for tasks that invoke callables (class methods, functions, etc.).

Expand source code
@dataclass(slots=True, kw_only=True)
class CallableTask(Task, ABC):
    """Base class for tasks that invoke callables (class methods, functions, etc.)."""

    @classmethod
    def normalize_method_name(cls, method_name: str) -> str:
        """If method name has uppercase letters, assume it is PascalCase and convert to snake_case."""

        if any(c.isupper() for c in method_name):
            # Use inflection library
            result = underscore(method_name)
            # In addition, add underscore before numbers
            result = re.sub(r"([0-9]+)", r"_1", result)
        else:
            # Already in snake_case, return unchanged argument
            result = method_name
        return result

Ancestors

Subclasses

Static methods

def get_key_type() -> Type

Inherited from: Task.get_key_type

Return key type even when called from a record.

def normalize_method_name(method_name: str) -> str

If method name has uppercase letters, assume it is PascalCase and convert to snake_case.

def wait_for_completion(task_key: TaskKey, timeout_sec: int = 10) -> None

Inherited from: Task.wait_for_completion

Wait for completion of the specified task run before exiting from this method (not async/await).

Fields

var elapsed_sec -> float | None

Inherited from: Task.elapsed_sec

Elapsed time in seconds if available.

var error_message -> str | None

Inherited from: Task.error_message

Error message for Failed status if available.

var label -> str | None

Inherited from: Task.label

Label for information purposes only (should not be used in processing).

var progress_pct -> float

Inherited from: Task.progress_pct

Task progress in percent from 0 to 100.

var queue -> TaskQueueKey

Inherited from: Task.queue

The queue that will run the task once it is saved.

var remaining_sec -> float | None

Inherited from: Task.remaining_sec

Remaining time in seconds if available.

var status -> TaskStatusEnum

Inherited from: Task.status

Begins from Pending, continues to Running or Paused, and ends with Completed, Failed, or Cancelled.

var task_id -> str

Inherited from: Task.task_id

Unique task identifier.

Methods

def get_key(self) -> TaskKey

Inherited from: Task.get_key

Return a new key object whose fields populated from self, do not return self.

def init_all(self) -> None

Inherited from: Task.init_all

Invoke ‘init’ for each class in the order from base to derived, then validate against schema.

def run_task(self) -> None

Inherited from: Task.run_task

Invoke execute with task status updates and exception handling.