Module: callable_task
Expand source code
# Copyright (C) 2023-present The Project Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
import re
from abc import ABC
from dataclasses import dataclass
from typing import Any
from typing import Callable
from typing import Dict
from typing import cast
from inflection import underscore
from typing_extensions import Self
from cl.runtime.context.context import Context
from cl.runtime.records.dataclasses_extensions import missing
from cl.runtime.records.protocols import KeyProtocol
from cl.runtime.schema.schema import Schema
from cl.runtime.serialization.dict_serializer import DictSerializer
from cl.runtime.serialization.string_serializer import StringSerializer
from cl.runtime.tasks.task import Task
from cl.runtime.tasks.task_key import TaskKey
key_serializer = StringSerializer()
param_dict_serializer = DictSerializer() # TODO: Support complex params
@dataclass(slots=True, kw_only=True)
class CallableTask(Task, ABC):
"""Base class for tasks that invoke callables (class methods, functions, etc.)."""
@classmethod
def normalize_method_name(cls, method_name: str) -> str:
"""If method name has uppercase letters, assume it is PascalCase and convert to snake_case."""
if any(c.isupper() for c in method_name):
# Use inflection library
result = underscore(method_name)
# In addition, add underscore before numbers
result = re.sub(r"([0-9]+)", r"_1", result)
else:
# Already in snake_case, return unchanged argument
result = method_name
return result
Classes
class CallableTask (*, task_id: str = None, label: str | None = None, queue: TaskQueueKey = None, status: TaskStatusEnum = None, progress_pct: float = None, elapsed_sec: float | None = None, remaining_sec: float | None = None, error_message: str | None = None)
-
Base class for tasks that invoke callables (class methods, functions, etc.).
Expand source code
@dataclass(slots=True, kw_only=True) class CallableTask(Task, ABC): """Base class for tasks that invoke callables (class methods, functions, etc.).""" @classmethod def normalize_method_name(cls, method_name: str) -> str: """If method name has uppercase letters, assume it is PascalCase and convert to snake_case.""" if any(c.isupper() for c in method_name): # Use inflection library result = underscore(method_name) # In addition, add underscore before numbers result = re.sub(r"([0-9]+)", r"_1", result) else: # Already in snake_case, return unchanged argument result = method_name return result
Ancestors
- Task
- TaskKey
- KeyMixin
- RecordMixin
- abc.ABC
- typing.Generic
Subclasses
Static methods
def get_key_type() -> Type
-
Inherited from:
Task
.get_key_type
Return key type even when called from a record.
def normalize_method_name(method_name: str) -> str
-
If method name has uppercase letters, assume it is PascalCase and convert to snake_case.
def wait_for_completion(task_key: TaskKey, timeout_sec: int = 10) -> None
-
Inherited from:
Task
.wait_for_completion
Wait for completion of the specified task run before exiting from this method (not async/await).
Fields
var elapsed_sec -> float | None
-
Inherited from:
Task
.elapsed_sec
Elapsed time in seconds if available.
var error_message -> str | None
-
Inherited from:
Task
.error_message
Error message for Failed status if available.
var label -> str | None
-
Label for information purposes only (should not be used in processing).
var progress_pct -> float
-
Inherited from:
Task
.progress_pct
Task progress in percent from 0 to 100.
var queue -> TaskQueueKey
-
The queue that will run the task once it is saved.
var remaining_sec -> float | None
-
Inherited from:
Task
.remaining_sec
Remaining time in seconds if available.
var status -> TaskStatusEnum
-
Begins from Pending, continues to Running or Paused, and ends with Completed, Failed, or Cancelled.
var task_id -> str
-
Unique task identifier.
Methods
def get_key(self) -> TaskKey
-
Return a new key object whose fields populated from self, do not return self.
def init_all(self) -> None
-
Invoke ‘init’ for each class in the order from base to derived, then validate against schema.
def run_task(self) -> None
-
Invoke execute with task status updates and exception handling.