Module: local_cache
Expand source code
# Copyright (C) 2023-present The Project Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from dataclasses import dataclass
from dataclasses import field
from typing import ClassVar
from typing import Dict
from typing import Iterable
from typing import Type
from typing import cast
from typing_extensions import Self
from cl.runtime.db.dataset_util import DatasetUtil
from cl.runtime.db.protocols import TKey
from cl.runtime.db.protocols import TRecord
from cl.runtime.log.exceptions.user_error import UserError
from cl.runtime.records.protocols import KeyProtocol
from cl.runtime.records.protocols import RecordProtocol
from cl.runtime.records.protocols import TQuery
from cl.runtime.serialization.string_serializer import StringSerializer
key_serializer = StringSerializer()
"""Serializer for keys used in cache lookup."""
_local_cache_instance: LocalCache | None = None
"""Singleton instance is created on first access."""
@dataclass(slots=True, kw_only=True)
class LocalCache:
"""In-memory cache for objects without serialization."""
__cache: Dict[KeyProtocol, RecordProtocol] = field(default_factory=lambda: {})
"""Record instance is stored in cache without serialization."""
def load_one(
self,
record_type: Type[TRecord],
record_or_key: TRecord | KeyProtocol | None,
*,
dataset: str | None = None,
identity: str | None = None,
is_key_optional: bool = False,
is_record_optional: bool = False,
) -> TRecord | None:
# Check for an empty key
if record_or_key is None:
if is_key_optional:
return None
else:
raise UserError(f"Key is None when trying to load record type {record_type.__name__} from DB.")
if record_or_key is None or getattr(record_or_key, "get_key", None) is not None:
# Key instance is Record or None, return without lookup
return cast(RecordProtocol, record_or_key)
elif getattr(record_or_key, "get_key_type"):
# Key, look up the record in cache
key_type = record_or_key.get_key_type()
serialized_key = key_serializer.serialize_key(record_or_key)
# Try to retrieve dataset dictionary, insert if it does not yet exist
dataset_cache = self.__cache.setdefault(dataset, {})
# Try to retrieve table dictionary
if (table_cache := dataset_cache.setdefault(key_type, None)) is not None:
# Look up the record, defaults to None
result = table_cache.get(serialized_key, None)
else:
# Return None if not found
return None
# Check if the record was not found
if not is_record_optional and result is None:
raise UserError(f"{record_type.__name__} record is not found for key {record_or_key}")
return result
else:
raise RuntimeError(f"Type {record_or_key.__class__.__name__} is not a record or key.")
def load_many(
self,
record_type: Type[TRecord],
records_or_keys: Iterable[TRecord | KeyProtocol | tuple | str | None] | None,
*,
dataset: str | None = None,
identity: str | None = None,
) -> Iterable[TRecord | None] | None:
# TODO: Implement directly for better performance
result = [
self.load_one(
record_type,
x,
dataset=dataset,
identity=identity,
is_key_optional=True, # TODO: Keep the existing defaults for load_many
is_record_optional=True, # TODO: Keep the existing defaults for load_many
)
for x in records_or_keys
]
return result
def load_all(
self,
record_type: Type[TRecord],
*,
dataset: str | None = None,
identity: str | None = None,
) -> Iterable[TRecord | None] | None:
raise NotImplementedError()
def load_filter(
self,
record_type: Type[TRecord],
filter_obj: TRecord,
*,
dataset: str | None = None,
identity: str | None = None,
) -> Iterable[TRecord]:
raise NotImplementedError()
def save_one(
self,
record: RecordProtocol | None,
*,
dataset: str | None = None,
identity: str | None = None,
) -> None:
# If record is None, do nothing
if record is None:
return
# Try to retrieve dataset dictionary, insert if it does not yet exist
dataset_cache = self.__cache.setdefault(dataset, {})
# Try to retrieve table dictionary using 'key_type' as key, insert if it does not yet exist
key_type = record.get_key_type()
table_cache = dataset_cache.setdefault(key_type, {})
# Serialize both key and record
serialized_key = key_serializer.serialize_key(record)
# Add record to cache, overwriting an existing record if present
table_cache[serialized_key] = record
def save_many(
self,
records: Iterable[RecordProtocol],
*,
dataset: str | None = None,
identity: str | None = None,
) -> None:
# TODO: Review performance compared to a custom implementation for save_many
[self.save_one(x) for x in records]
def delete_one(
self,
key_type: Type[TKey],
key: TKey | KeyProtocol | tuple | str | None,
*,
dataset: str | None = None,
identity: str | None = None,
) -> None:
raise NotImplementedError()
def delete_many(
self,
keys: Iterable[KeyProtocol] | None,
*,
dataset: str | None = None,
identity: str | None = None,
) -> None:
# Validate the dataset and if necessary convert to delimited string
raise NotImplementedError()
@classmethod
def instance(cls) -> Self:
"""Return singleton instance."""
# Check if cached value exists, load if not found
global _local_cache_instance
if _local_cache_instance is None:
# Create if does not yet exist
_local_cache_instance = LocalCache()
return _local_cache_instance
Global variables
var key_serializer
-
Serializer for keys used in cache lookup.
Classes
class LocalCache
-
In-memory cache for objects without serialization.
Expand source code
@dataclass(slots=True, kw_only=True) class LocalCache: """In-memory cache for objects without serialization.""" __cache: Dict[KeyProtocol, RecordProtocol] = field(default_factory=lambda: {}) """Record instance is stored in cache without serialization.""" def load_one( self, record_type: Type[TRecord], record_or_key: TRecord | KeyProtocol | None, *, dataset: str | None = None, identity: str | None = None, is_key_optional: bool = False, is_record_optional: bool = False, ) -> TRecord | None: # Check for an empty key if record_or_key is None: if is_key_optional: return None else: raise UserError(f"Key is None when trying to load record type {record_type.__name__} from DB.") if record_or_key is None or getattr(record_or_key, "get_key", None) is not None: # Key instance is Record or None, return without lookup return cast(RecordProtocol, record_or_key) elif getattr(record_or_key, "get_key_type"): # Key, look up the record in cache key_type = record_or_key.get_key_type() serialized_key = key_serializer.serialize_key(record_or_key) # Try to retrieve dataset dictionary, insert if it does not yet exist dataset_cache = self.__cache.setdefault(dataset, {}) # Try to retrieve table dictionary if (table_cache := dataset_cache.setdefault(key_type, None)) is not None: # Look up the record, defaults to None result = table_cache.get(serialized_key, None) else: # Return None if not found return None # Check if the record was not found if not is_record_optional and result is None: raise UserError(f"{record_type.__name__} record is not found for key {record_or_key}") return result else: raise RuntimeError(f"Type {record_or_key.__class__.__name__} is not a record or key.") def load_many( self, record_type: Type[TRecord], records_or_keys: Iterable[TRecord | KeyProtocol | tuple | str | None] | None, *, dataset: str | None = None, identity: str | None = None, ) -> Iterable[TRecord | None] | None: # TODO: Implement directly for better performance result = [ self.load_one( record_type, x, dataset=dataset, identity=identity, is_key_optional=True, # TODO: Keep the existing defaults for load_many is_record_optional=True, # TODO: Keep the existing defaults for load_many ) for x in records_or_keys ] return result def load_all( self, record_type: Type[TRecord], *, dataset: str | None = None, identity: str | None = None, ) -> Iterable[TRecord | None] | None: raise NotImplementedError() def load_filter( self, record_type: Type[TRecord], filter_obj: TRecord, *, dataset: str | None = None, identity: str | None = None, ) -> Iterable[TRecord]: raise NotImplementedError() def save_one( self, record: RecordProtocol | None, *, dataset: str | None = None, identity: str | None = None, ) -> None: # If record is None, do nothing if record is None: return # Try to retrieve dataset dictionary, insert if it does not yet exist dataset_cache = self.__cache.setdefault(dataset, {}) # Try to retrieve table dictionary using 'key_type' as key, insert if it does not yet exist key_type = record.get_key_type() table_cache = dataset_cache.setdefault(key_type, {}) # Serialize both key and record serialized_key = key_serializer.serialize_key(record) # Add record to cache, overwriting an existing record if present table_cache[serialized_key] = record def save_many( self, records: Iterable[RecordProtocol], *, dataset: str | None = None, identity: str | None = None, ) -> None: # TODO: Review performance compared to a custom implementation for save_many [self.save_one(x) for x in records] def delete_one( self, key_type: Type[TKey], key: TKey | KeyProtocol | tuple | str | None, *, dataset: str | None = None, identity: str | None = None, ) -> None: raise NotImplementedError() def delete_many( self, keys: Iterable[KeyProtocol] | None, *, dataset: str | None = None, identity: str | None = None, ) -> None: # Validate the dataset and if necessary convert to delimited string raise NotImplementedError() @classmethod def instance(cls) -> Self: """Return singleton instance.""" # Check if cached value exists, load if not found global _local_cache_instance if _local_cache_instance is None: # Create if does not yet exist _local_cache_instance = LocalCache() return _local_cache_instance
Static methods
def instance() -> Self
-
Return singleton instance.
Methods
def delete_many(self, keys: Iterable[KeyProtocol] | None, *, dataset: str | None = None, identity: str | None = None) -> None
def delete_one(self, key_type: Type[TKey], key: TKey | KeyProtocol | tuple | str | None, *, dataset: str | None = None, identity: str | None = None) -> None
def load_all(self, record_type: Type[TRecord], *, dataset: str | None = None, identity: str | None = None) -> Optional[Iterable[Optional[~TRecord]]]
def load_filter(self, record_type: Type[TRecord], filter_obj: TRecord, *, dataset: str | None = None, identity: str | None = None) -> Iterable[~TRecord]
def load_many(self, record_type: Type[TRecord], records_or_keys: Iterable[TRecord | KeyProtocol | tuple | str | None] | None, *, dataset: str | None = None, identity: str | None = None) -> Optional[Iterable[Optional[~TRecord]]]
def load_one(self, record_type: Type[TRecord], record_or_key: TRecord | KeyProtocol | None, *, dataset: str | None = None, identity: str | None = None, is_key_optional: bool = False, is_record_optional: bool = False) -> Optional[~TRecord]
def save_many(self, records: Iterable[RecordProtocol], *, dataset: str | None = None, identity: str | None = None) -> None
def save_one(self, record: RecordProtocol | None, *, dataset: str | None = None, identity: str | None = None) -> None