Module: process_queue

Expand source code
# Copyright (C) 2023-present The Project Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime as dt
import time
from dataclasses import dataclass
from cl.runtime import Context
from cl.runtime.primitive.datetime_util import DatetimeUtil
from cl.runtime.tasks.task import Task
from cl.runtime.tasks.task_queue import TaskQueue
from cl.runtime.tasks.task_status_enum import TaskStatusEnum


@dataclass(slots=True, kw_only=True)
class ProcessQueue(TaskQueue):
    """Execute tasks sequentially within the queue process."""

    def init(self) -> None:
        # Set default queue timeout with no tasks to 10 min
        if self.timeout_sec is None:
            self.timeout_sec = 10

    def run_start_queue(self) -> None:
        context = Context.current()
        queue_id = self.queue_id

        # Set timeout
        timeout_delta = dt.timedelta(seconds=self.timeout_sec) if self.timeout_sec is not None else None
        timeout_at = DatetimeUtil.now() + timeout_delta if timeout_delta is not None else None

        # Set the counter of while loop cycles with no tasks
        no_task_cycles = 0
        while True:
            # Get pending tasks
            # TODO: Use DB queries with filter by queue field
            all_tasks = context.load_all(Task)
            awaiting_tasks = [
                task for task in all_tasks
                if task.queue.queue_id == queue_id
                and task.status == TaskStatusEnum.AWAITING
            ]
            pending_tasks = [
                task for task in all_tasks
                if task.queue.queue_id == queue_id
                and task.status == TaskStatusEnum.PENDING
            ]

            # Awaiting tasks have priority over pending tasks
            queued_tasks = awaiting_tasks + pending_tasks

            if queued_tasks:
                # Run found tasks sequentially
                for task in queued_tasks:
                    task.run_task()
                # Reset timeout and no task cycles counter
                timeout_at = DatetimeUtil.now() + timeout_delta if timeout_delta is not None else None
                no_task_cycles = 0
            else:
                if timeout_at is not None and DatetimeUtil.now() > timeout_at:
                    break
                else:
                    no_task_cycles = no_task_cycles + 1

            # Pause for 1 sec more for each no_task_cycle up to 10 sec
            sleep_sec = min(round(pow(2, no_task_cycles)), 8)
            time.sleep(sleep_sec)

    def run_stop_queue(self) -> None:
        raise NotImplementedError()

Classes

class ProcessQueue (*, queue_id: str = None, timeout_sec: int = 10)

Execute tasks sequentially within the queue process.

Expand source code
@dataclass(slots=True, kw_only=True)
class ProcessQueue(TaskQueue):
    """Execute tasks sequentially within the queue process."""

    def init(self) -> None:
        # Set default queue timeout with no tasks to 10 min
        if self.timeout_sec is None:
            self.timeout_sec = 10

    def run_start_queue(self) -> None:
        context = Context.current()
        queue_id = self.queue_id

        # Set timeout
        timeout_delta = dt.timedelta(seconds=self.timeout_sec) if self.timeout_sec is not None else None
        timeout_at = DatetimeUtil.now() + timeout_delta if timeout_delta is not None else None

        # Set the counter of while loop cycles with no tasks
        no_task_cycles = 0
        while True:
            # Get pending tasks
            # TODO: Use DB queries with filter by queue field
            all_tasks = context.load_all(Task)
            awaiting_tasks = [
                task for task in all_tasks
                if task.queue.queue_id == queue_id
                and task.status == TaskStatusEnum.AWAITING
            ]
            pending_tasks = [
                task for task in all_tasks
                if task.queue.queue_id == queue_id
                and task.status == TaskStatusEnum.PENDING
            ]

            # Awaiting tasks have priority over pending tasks
            queued_tasks = awaiting_tasks + pending_tasks

            if queued_tasks:
                # Run found tasks sequentially
                for task in queued_tasks:
                    task.run_task()
                # Reset timeout and no task cycles counter
                timeout_at = DatetimeUtil.now() + timeout_delta if timeout_delta is not None else None
                no_task_cycles = 0
            else:
                if timeout_at is not None and DatetimeUtil.now() > timeout_at:
                    break
                else:
                    no_task_cycles = no_task_cycles + 1

            # Pause for 1 sec more for each no_task_cycle up to 10 sec
            sleep_sec = min(round(pow(2, no_task_cycles)), 8)
            time.sleep(sleep_sec)

    def run_stop_queue(self) -> None:
        raise NotImplementedError()

Ancestors

Static methods

def get_key_type() -> Type

Inherited from: TaskQueue.get_key_type

Return key type even when called from a record.

Fields

var queue_id -> str

Inherited from: TaskQueue.queue_id

Unique task queue identifier.

var timeout_sec -> int

Inherited from: TaskQueue.timeout_sec

Optional timeout in seconds, queue will stop after reaching this timeout.

Methods

def init(self) -> None
def run_start_queue(self) -> None

Inherited from: TaskQueue.run_start_queue

Run a query on tasks, run all returned tasks sequentially or in parallel, then repeat.

def run_stop_queue(self) -> None

Inherited from: TaskQueue.run_stop_queue

Exit after completing all currently executing tasks.