SchedulerConfig¶
Up to the version 3.2.3, Esmerald was supporting only Asyncz for its internal scheduler. From the version 3.2.3 onwards, that is still the case but Esmerald also makes it modular, like everything in it.
What does this even mean?
Well, this means that if you don't want to use Asyncz for your own personal or applicational reasons, then you can simply build your own configuration and plug the scheduler into Esmerald.
This is now possible due to the fact that Esmerald now implements the SchedulerConfig.
How to import it¶
You can import the configuration from the following:
from esmerald.contrib.schedulers import SchedulerConfig
The SchedulerConfig class¶
When implementing a scheduler configurations you must implement two functions.
This is what makes the SchedulerConfig modular because there are plenty of schedulers out there and each one of them with a lot of different options and configurations but the one thing they all have in common is the fact that all of them must start and shutdown at some point. The only thing Esmerald "cares" is that by encapsulating that functionality into two simple functions.
The start function¶
The start function, as the name suggests, its the function that Esmerald calls internally to start the scheduler for you.
This is important because when the enable_scheduler
flag is set, it will look for the scheduler config and call the
start
on startup.
The shutdown function¶
The shutdown function, as the name suggests, its the function that Esmerald calls internally to shutdown the scheduler for you.
This is important because when the enable_scheduler
flag is set, it will look for the scheduler config and call the
shutdown
on shutdown (usually when the application stops).
How to use it¶
Esmerald already implements this interface with the custom AsynczConfig
. This functionality is very handy since Asyncz
has a lot of configurations that can be passed and used within an Esmerald application.
Let us see how the implementation looks like.
import warnings
from uuid import uuid4
from datetime import datetime
from datetime import timezone as dtimezone
from typing import Any, Callable, Dict, Union, cast, Type
from asyncz.schedulers import AsyncIOScheduler
from asyncz.schedulers.types import SchedulerType
from asyncz.triggers.types import TriggerType
from asyncz.tasks.base import Task as AsynczTask
from asyncz.typing import undefined, UndefinedType
from esmerald.conf import settings
from esmerald.contrib.schedulers.base import SchedulerConfig
from esmerald.exceptions import ImproperlyConfigured
from esmerald.utils.module_loading import import_string
class AsynczConfig(SchedulerConfig):
"""
Implements an integration with Asyncz, allowing to
customise the scheduler with the provided configurations.
"""
def __init__(
self,
scheduler_class: Type[SchedulerType] = AsyncIOScheduler,
tasks: Union[Dict[str, str]] = None,
timezone: Union[dtimezone, str, None] = None,
configurations: Union[Dict[str, Dict[str, str]], None] = None,
**kwargs: Dict[str, Any],
):
"""
Initializes the AsynczConfig object.
Args:
scheduler_class: The class of the scheduler to be used.
tasks: A dictionary of tasks to be registered in the scheduler.
timezone: The timezone to be used by the scheduler.
configurations: Extra configurations to be passed to the scheduler.
**kwargs: Additional keyword arguments.
"""
super().__init__(**kwargs)
self.scheduler_class = scheduler_class
self.tasks = tasks
self.timezone = timezone
self.configurations = configurations
self.options = kwargs
for task, module in self.tasks.items():
if not isinstance(task, str) or not isinstance(module, str):
raise ImproperlyConfigured("The dict of tasks must be Dict[str, str].")
if not self.tasks:
warnings.warn(
"Esmerald is starting the scheduler, yet there are no tasks declared.",
UserWarning,
stacklevel=2,
)
# Load the scheduler object
self.handler = self.get_scheduler(
scheduler=self.scheduler_class,
timezone=self.timezone,
configurations=self.configurations,
**self.options,
)
self.register_tasks(tasks=self.tasks)
def register_tasks(self, tasks: Dict[str, str]) -> None:
"""
Registers the tasks in the Scheduler.
Args:
tasks: A dictionary of tasks to be registered in the scheduler.
"""
for task, _module in tasks.items():
imported_task = f"{_module}.{task}"
scheduled_task: "Task" = import_string(imported_task)
if not scheduled_task.is_enabled:
continue
try:
scheduled_task.add_task(self.handler)
except Exception as e:
raise ImproperlyConfigured(str(e)) from e
def get_scheduler(
self,
scheduler: Type[SchedulerType],
timezone: Union[dtimezone, str, None] = None,
configurations: Union[Dict[str, Any], None] = None,
**options: Dict[str, Any],
) -> SchedulerType:
"""
Initiates the scheduler from the given time.
If no value is provided, it will default to AsyncIOScheduler.
The value of `scheduler_class` can be overwritten by any esmerald custom settings.
Args:
scheduler: The class of the scheduler to be used.
timezone: The timezone instance.
configurations: A dictionary with extra configurations to be passed to the scheduler.
**options: Additional options.
Returns:
SchedulerType: An instance of a Scheduler.
"""
if not timezone:
timezone = settings.timezone
if not configurations:
return scheduler(timezone=timezone, **options)
return scheduler(global_config=configurations, timezone=timezone, **options)
async def start(self, **kwargs: Dict[str, Any]) -> None:
"""
Starts the scheduler.
Args:
**kwargs: Additional keyword arguments.
"""
self.handler.start(**kwargs)
async def shutdown(self, **kwargs: Dict[str, Any]) -> None:
"""
Shuts down the scheduler.
Args:
**kwargs: Additional keyword arguments.
"""
self.handler.shutdown(**kwargs)
class Task:
"""
Base for the scheduler decorator that will auto discover the
tasks in the application and add them to the internal scheduler.
"""
def __init__(
self,
*,
name: Union[str, None] = None,
trigger: Union[TriggerType, None] = None,
id: Union[str, None] = None,
mistrigger_grace_time: Union[int, UndefinedType, None] = undefined,
coalesce: Union[bool, UndefinedType] = undefined,
max_instances: Union[int, UndefinedType, None] = undefined,
next_run_time: Union[datetime, str, UndefinedType, None] = undefined,
store: str = "default",
executor: str = "default",
replace_existing: bool = False,
args: Union[Any, None] = None,
kwargs: Union[Dict[str, Any], None] = None,
is_enabled: bool = True,
) -> None:
"""
Initializes a new instance of the `Task` class for the Scheduler.
Args:
name (str, optional): Textual description of the task.
trigger (TriggerType, optional): An instance of a trigger class.
id (str, optional): Explicit identifier for the task.
mistrigger_grace_time (int, optional): Seconds after the designated runtime that the task is still allowed to be run
(or None to allow the task to run no matter how late it is).
coalesce (bool, optional): Run once instead of many times if the scheduler determines that the task should be run more than once in succession.
max_instances (int, optional): Maximum number of concurrently running instances allowed for this task.
next_run_time (datetime, optional): When to first run the task, regardless of the trigger (pass None to add the task as paused).
store (str, optional): Alias of the task store to store the task in.
executor (str, optional): Alias of the executor to run the task with.
replace_existing (bool, optional): True to replace an existing task with the same id
(but retain the number of runs from the existing one).
args (Any, optional): List of positional arguments to call func with.
kwargs (Dict[str, Any], optional): Dict of keyword arguments to call func with.
is_enabled (bool, optional): True if the task is to be added to the scheduler.
"""
self.name = name
self.trigger = trigger
self.id = id
self.mistrigger_grace_time = mistrigger_grace_time
self.coalesce = coalesce
self.max_instances = max_instances
self.next_run_time = next_run_time
self.store = store
self.executor = executor
self.replace_existing = replace_existing
self.args = args
self.kwargs = kwargs
self.is_enabled = is_enabled
self.fn = None
def add_task(self, scheduler: SchedulerType) -> None:
try:
scheduler.add_task(
self.fn,
trigger=self.trigger,
args=self.args,
kwargs=self.kwargs,
id=self.id,
name=self.name,
mistrigger_grace_time=self.mistrigger_grace_time,
coalesce=self.coalesce,
max_instances=self.max_instances,
next_run_time=self.next_run_time,
store=self.store,
executor=self.executor,
replace_existing=self.replace_existing,
)
except Exception as e:
raise ImproperlyConfigured(str(e)) from e
We won't be dueling on the technicalities of this configuration because its unique to Asyncz provided by Esmerald but
it is not mandatory to use it as you can build your own and pass it to Esmerald scheduler_config
parameter.
SchedulerConfig and application¶
To use the SchedulerConfig
in an application, like the one shown above with asyncz, you can simply do this:
Note
We use the existing AsynczConfig as example but feel free to use your own if you require something else.
from asyncz.executors import AsyncIOExecutor, ThreadPoolExecutor
from asyncz.stores.mongo import MongoDBStore
from esmerald import Esmerald
from esmerald.contrib.schedulers.asyncz.config import AsynczConfig
def get_scheduler_config() -> AsynczConfig:
# Define the stores
# Override the default MemoryStore to become RedisStore where the db is 0
stores = {"default": MongoDBStore()}
# Define the executors
# Override the default ot be the AsyncIOExecutor
executors = {
"default": AsyncIOExecutor(),
"threadpool": ThreadPoolExecutor(max_workers=20),
}
# Set the defaults
task_defaults = {"coalesce": False, "max_instances": 4}
return AsynczConfig(
tasks=...,
timezone="UTC",
stores=stores,
executors=executors,
task_defaults=task_defaults,
)
app = Esmerald(routes=[...], scheduler_config=get_scheduler_config())
If you want to know more about how to use the AsynczConfig, check out the section.
Application lifecycle¶
Esmerald scheduler is tight to the application lifecycle and that means the on_startup/on_shutdown
and lifespan
.
You can read more about this in the appropriate section of the documentation.
By default, the scheduler is linked to on_startup/on_shutdown
events and those are automatically managed for you
but if you require the lifespan instead, then you must do the appropriate adjustments.
The following example serves as a suggestion but feel free to use your own design. Let us check how we could manage
this using the lifespan
instead.
from contextlib import asynccontextmanager
from functools import lru_cache
from asyncz.executors import AsyncIOExecutor, ThreadPoolExecutor
from asyncz.stores.mongo import MongoDBStore
from esmerald import Esmerald
from esmerald.contrib.schedulers.asyncz.config import AsynczConfig
@asynccontextmanager
async def lifespan(app: Esmerald):
# What happens on startup
await get_scheduler_config().start()
yield
# What happens on shutdown
await get_scheduler_config().shutdown()
@lru_cache
def get_scheduler_config() -> AsynczConfig:
# Define the stores
# Override the default MemoryStore to become RedisStore where the db is 0
stores = {"default": MongoDBStore()}
# Define the executors
# Override the default ot be the AsyncIOExecutor
executors = {
"default": AsyncIOExecutor(),
"threadpool": ThreadPoolExecutor(max_workers=20),
}
# Set the defaults
task_defaults = {"coalesce": False, "max_instances": 4}
return AsynczConfig(
tasks=...,
timezone="UTC",
stores=stores,
executors=executors,
task_defaults=task_defaults,
)
app = Esmerald(
routes=[...],
lifespan=lifespan,
scheduler_config=get_scheduler_config(),
)
Pretty easy, right? Esmerald then understands what needs to be done as normal.
The SchedulerConfig and the settings¶
Like everything in Esmerald, the SchedulerConfig can be also made available via settings.
from asyncz.executors import AsyncIOExecutor, ThreadPoolExecutor
from asyncz.stores.mongo import MongoDBStore
from esmerald import EsmeraldAPISettings
from esmerald.contrib.schedulers import SchedulerConfig
from esmerald.contrib.schedulers.asyncz.config import AsynczConfig
class CustomSettings(EsmeraldAPISettings):
@property
def scheduler_config(self) -> SchedulerConfig:
stores = {"default": MongoDBStore()}
# Define the executors
# Override the default ot be the AsyncIOExecutor
executors = {
"default": AsyncIOExecutor(),
"threadpool": ThreadPoolExecutor(max_workers=20),
}
# Set the defaults
task_defaults = {"coalesce": False, "max_instances": 4}
return AsynczConfig(
tasks=...,
timezone="UTC",
stores=stores,
executors=executors,
task_defaults=task_defaults,
)
Important Notes¶
- You can create your own custom scheduler config.
- You must implement the
start/shutdown
functions in any scheduler configuration. - You can use or
on_startup/shutdown
orlifespan
events. The first is automatically managed for you.