API Reference#

Main module API#

async_job#

@regta.async_job(interval: Union[datetime.timedelta, regta_period.periods.AbstractPeriod], *args, **kwargs)#

Make AsyncJob from async function.

Parameters
  • interval – Interval between every call.

  • *args – Will be passed into the function.

  • **kwargs – Will be passed into the function.

thread_job#

@regta.thread_job(interval: Union[datetime.timedelta, regta_period.periods.AbstractPeriod], *args, **kwargs)#

Make ThreadJob from function.

Parameters
  • interval – Interval between every call.

  • *args – Will be passed into the function.

  • **kwargs – Will be passed into the function.

process_job#

@regta.process_job(interval: Union[datetime.timedelta, regta_period.periods.AbstractPeriod], *args, **kwargs)#

Make ProcessJob from function.

Parameters
  • interval – Interval between every call.

  • *args – Will be passed into the function.

  • **kwargs – Will be passed into the function.

AsyncJob#

class regta.AsyncJob(*args, logger: Optional[logging.Logger] = None, use_ansi: bool = True, **kwargs)#

Bases: regta.jobs.BaseJob

Async job class. Will be executed in an event loop.

interval: Union[datetime.timedelta, regta_period.periods.AbstractPeriod]#

Interval between every func() call.

logger: Optional[Union[logging.Logger, logging.LoggerAdapter]] = None#

Logger writes all results of func function and its exceptions. If the logger isn’t specified, regta will use std output.

args: Iterable = []#

Args will be passed into func.

kwargs: dict = {}#

Key-word args will be passed into func.

func: Callable[[...], Awaitable[Optional[str]]]#

The function on which the job will be based. Must be rewritten or passed. It’ll be called every interval.

async execute()#

Execute func and log result/error. It will be called by the job every interval.

async run()#

Method will be called by scheduler when regta starts. It’ll call func every interval.

async stop()#

Method will be called by the scheduler when regta gets stop signal.

ThreadJob#

class regta.ThreadJob(*args, logger: Optional[logging.Logger] = None, use_ansi: bool = True, **kwargs)#

Bases: regta.jobs.BaseSyncJob, threading.Thread

Sync job class based on a thread.

interval: Union[datetime.timedelta, regta_period.periods.AbstractPeriod]#

Interval between every func() call.

func: Callable[[...], Optional[str]]#

The function on which job will be based. Must be rewritten or passed. It’ll be called every interval.

logger: Optional[Union[logging.Logger, logging.LoggerAdapter]] = None#

Logger writes all results of func function and its exceptions. If the logger isn’t specified, regta will use std output.

args: Iterable = []#

Args will be passed into func.

kwargs: dict = {}#

Key-word args will be passed into func.

execute()#

Execute func and log result/error. It will be called by the job every interval.

run()#

Method will be called by scheduler when regta starts. It’ll call func every interval.

stop()#

Stops job’s thread. It will be called by the scheduler when regta gets a stop signal.

ProcessJob#

class regta.ProcessJob(*args, logger: Optional[logging.Logger] = None, use_ansi: bool = True, **kwargs)#

Bases: regta.jobs.BaseSyncJob, multiprocessing.context.Process

Sync job class based on a process.

interval: Union[datetime.timedelta, regta_period.periods.AbstractPeriod]#

Interval between every func() call.

func: Callable[[...], Optional[str]]#

The function on which job will be based. Must be rewritten or passed. It’ll be called every interval.

logger: Optional[Union[logging.Logger, logging.LoggerAdapter]] = None#

Logger writes all results of func function and its exceptions. If the logger isn’t specified, regta will use std output.

args: Iterable = []#

Args will be passed into func.

kwargs: dict = {}#

Key-word args will be passed into func.

execute()#

Execute func and log result/error. It will be called by the job every interval.

run()#

Method will be called by scheduler when regta starts. It’ll call func every interval.

stop()#

Stops and terminates job’s process. It will be called by the scheduler when regta gets a stop signal.

run#

regta.run(jobs: Iterable[Union[regta.jobs.AsyncJob, regta.jobs.ThreadJob, regta.jobs.ProcessJob]] = (), classes: Iterable[Type[Union[regta.jobs.AsyncJob, regta.jobs.ThreadJob, regta.jobs.ProcessJob]]] = (), logger: Optional[logging.Logger] = None, use_ansi: bool = True)#

Initializes regta.Scheduler and starts passed jobs.

Parameters
  • jobs – List of job instances.

  • classes – List of job classes. This func will make instances from them.

  • logger – If a logger isn`t passed, regta will use std output.

  • use_ansi – Enable / Disable ANSI colors.

Raises

ValueError – If jobs or classes weren’t passed.

Scheduler#

class regta.Scheduler#

Bases: regta.schedulers.SyncBlocking, regta.schedulers.AbstractScheduler

add_job(job: Union[regta.jobs.AsyncJob, regta.jobs.ThreadJob, regta.jobs.ProcessJob])#

Add job to scheduler jobs list which will be started.

Parameters

job – The job will be added.

Raises

IncorrectJobType – If incorrect job type will be passed.

run(block: bool = True)#

Run scheduler’s jobs.

Parameters

block – If True, blocks the current thread. If False, the thread will not be blocked and your script will continue.

stop()#

Stops scheduler’s jobs.

Will be called by regta.schedulers.SyncBlocking when regta gets stop signal (SystemExit, KeyboardInterrupt, etc.) or if the scheduler’s thread is not blocked, you can call it yourself.

AbstractPeriod#

class regta.AbstractPeriod#

The minimum interface every period object has.

abstract get_interval(dt: datetime.datetime) datetime.timedelta#

Get time to the next moment as timedelta since passed moment.

Parameters

dt (datetime) – Current moment (\(t\))

Returns

Interval to the next moment (\(f(t)\))

Return type

timedelta

abstract get_next(dt: datetime.datetime) datetime.datetime#

Get the next moment since passed moment.

Parameters

dt (datetime) – Current moment (\(t\))

Returns

The next moment (\(t + f(t)\))

Return type

datetime

abstract property is_timezone_in_use: bool#

If timezone is specified, return True, else False.

Period#

class regta.Period(days: int = 0, hours: int = 0, minutes: int = 0, seconds: int = 0, milliseconds: int = 0, time: Optional[str] = None, timezone: Optional[Union[datetime.tzinfo, str, int, float]] = None, weekdays: Optional[Iterable[regta_period.enums.Weekdays]] = None)#

The core logic of this module.

Parameters
  • days (int) – Amount of days for the regular offset.

  • hours (int) – Amount of hours for the regular offset.

  • minutes (int) – Amount of minutes for the regular offset.

  • seconds (int) – Amount of seconds for the regular offset.

  • milliseconds (int) – Amount of milliseconds for the regular offset.

  • time (str) – Exact time of moments (time offset). Format: “HH:MM” or “HH:MM:SS”.

  • timezone (Union[tzinfo, str, int, float]) – Time zone for exact time. If str, then it will be converted into tzinfo via zoneinfo.ZoneInfo. If int or float, then it will be used directly as an offset for the time offset.

  • weekdays (Iterable[Weekdays]) – Time windows of weekdays.

every(n: int) regta_period.periods.Period#

Specify a factor for regular offset properties.

Parameters

n (int) – The factor.

property seconds: regta_period.periods.Period#

Seconds regular offset property. Must be used only after every multiplier.

property minutes: regta_period.periods.Period#

Minutes regular offset property. Must be used only after every multiplier.

property hours: regta_period.periods.Period#

Hours regular offset property. Must be used only after every multiplier.

property days: regta_period.periods.Period#

Days regular offset property. Must be used only after every multiplier.

property hourly: regta_period.periods.Period#

Regular offset = every hour. The same as .every(1).hours. Can’t be combined with another regular offset.

property daily: regta_period.periods.Period#

Regular offset = every day. The same as .every(1).days. Can’t be combined with another regular offset.

property on: regta_period.periods.Period#

This property does nothing. It’s designed only to write better human-readable code, e.g. .on.monday.

property monday: regta_period.periods.Period#

Add Monday to the time windows list.

property tuesday: regta_period.periods.Period#

Add Tuesday to the time windows list.

property wednesday: regta_period.periods.Period#

Add Wednesday to the time windows list.

property thursday: regta_period.periods.Period#

Add Thursday to the time windows list.

property friday: regta_period.periods.Period#

Add Friday to the time windows list.

property saturday: regta_period.periods.Period#

Add Saturday to the time windows list.

property sunday: regta_period.periods.Period#

Add Sunday to the time windows list.

property weekdays: regta_period.periods.Period#

Add weekdays (Monday-Friday) to the time windows list.

property weekends: regta_period.periods.Period#

Add weekends (Saturday-Sunday) to the time windows list.

at(time: str) regta_period.periods.Period#

Specify the moment exact time (time offset, \(\Delta t_{time}\)).

Parameters

time (str) – Exact time. Format: “HH:MM” or “HH:MM:SS”.

by(timezone: Union[datetime.tzinfo, str, int, float]) regta_period.periods.Period#

Specify the time zone for the exact time.

Parameters

timezone (Union[tzinfo, str, int, float]) – Time zone. If str, then it will be converted into tzinfo via zoneinfo.ZoneInfo. If int or float, then it will be used directly as an offset for the time offset.

get_next(dt: datetime.datetime) datetime.datetime#

Get the next moment since passed moment.

Parameters

dt (datetime) – Current moment (\(t\))

Returns

The next moment (\(t + f(t)\))

Return type

datetime

get_interval(dt: datetime.datetime) datetime.timedelta#

Get time to the next moment as timedelta since passed moment.

Parameters

dt (datetime) – Current moment (\(t\))

Returns

Interval to the next moment (\(f(t)\))

Return type

timedelta

property is_timezone_in_use: bool#

If timezone is specified, return True, else False.

property AND: regta_period.periods.Period#

This property does nothing. It’s designed only to write better human-readable code, e.g. .on.monday.AND.tuesday. It’s uppercase because and is a reserved word in python.

property OR: regta_period.periods.PeriodAggregation#

Create PeriodAggregation from this period and a new empty period.

It’s designed to write better human-readable code, e.g. .on.weekdays.at("18:00").OR.on.weekends.at("21:00"). It’s uppercase because or is a reserved word in python.

PeriodAggregation#

class regta.PeriodAggregation(*periods: regta_period.periods.Period)#

Aggregation class for Period.

It contains the logic of how to get the nearest time moment and add data into the last period object.

Parameters

*periods (Tuple[Period]) – Periods to aggregate.

periods#

Aggregated periods.

Type

Tuple[Period]

get_next(dt: datetime.datetime) datetime.datetime#

Get the next moment since passed moment.

Parameters

dt (datetime) – Current moment (\(t\))

Returns

The next moment (\(t + f(t)\))

Return type

datetime

get_interval(dt: datetime.datetime) datetime.timedelta#

Get time to the next moment as timedelta since passed moment.

Parameters

dt (datetime) – Current moment (\(t\))

Returns

Interval to the next moment (\(f(t)\))

Return type

timedelta

property is_timezone_in_use: bool#

If timezone is specified, return True, else False.

property OR: regta_period.periods.PeriodAggregation#

Create a new PeriodAggregation from these periods of aggregation and a new empty period.

It’s designed to write better human-readable code, e.g. .on.weekdays.at("18:00").OR.on.weekends.at("21:00"). It’s uppercase because or is a reserved word in python.

Weekdays#

class regta.Weekdays(value)#

An enumeration of weekdays for the time windows logic.

classmethod get(dt: datetime.datetime) regta_period.enums.Weekdays#

Create a Weekdays object by a datetime object.

regta.jobs#

Contains bases of different job types.

You can use the following entities to build your jobs:

jobs.AbstractJob#

class regta.jobs.AbstractJob#

Bases: abc.ABC

Interface which every job base implement.

interval: Union[datetime.timedelta, regta_period.periods.AbstractPeriod]#

Interval between every func() call.

func: Callable[[...], Optional[str]]#

The function on which the job will be based. Must be rewritten or passed. It’ll be called every interval.

abstract execute()#

Execute func and log result/error. It will be called by the job every interval.

abstract run()#

Method will be called by scheduler when regta starts. It’ll call func every interval.

abstract stop()#

Method will be called by the scheduler when regta gets stop signal.

jobs.BaseJob#

class regta.jobs.BaseJob(*args, logger: Optional[logging.Logger] = None, use_ansi: bool = True, **kwargs)#

Bases: regta.jobs.AbstractJob

Base job class which implements common logic.

interval: Union[datetime.timedelta, regta_period.periods.AbstractPeriod]#

Interval between every func() call.

func: Callable[[...], Optional[str]]#

The function on which job will be based. Must be rewritten or passed. It’ll be called every interval.

args: Iterable = []#

Args will be passed into func.

kwargs: dict = {}#

Key-word args will be passed into func.

logger: Optional[Union[logging.Logger, logging.LoggerAdapter]] = None#

Logger writes all results of func function and its exceptions. If the logger isn’t specified, regta will use std output.

abstract execute()#

Execute func and log result/error. It will be called by the job every interval.

abstract run()#

Method will be called by scheduler when regta starts. It’ll call func every interval.

abstract stop()#

Method will be called by the scheduler when regta gets stop signal.

jobs.BaseSyncJob#

class regta.jobs.BaseSyncJob(*args, logger: Optional[logging.Logger] = None, use_ansi: bool = True, **kwargs)#

Bases: regta.jobs.BaseJob

Class contains common for ThreadJob and ProcessJob blocking.

interval: Union[datetime.timedelta, regta_period.periods.AbstractPeriod]#

Interval between every func() call.

func: Callable[[...], Optional[str]]#

The function on which job will be based. Must be rewritten or passed. It’ll be called every interval.

logger: Optional[Union[logging.Logger, logging.LoggerAdapter]] = None#

Logger writes all results of func function and its exceptions. If the logger isn’t specified, regta will use std output.

args: Iterable = []#

Args will be passed into func.

kwargs: dict = {}#

Key-word args will be passed into func.

execute()#

Execute func and log result/error. It will be called by the job every interval.

run()#

Method will be called by scheduler when regta starts. It’ll call func every interval.

abstract stop()#

Method will be called by the scheduler when regta gets stop signal.

regta.schedulers#

Contains family of different schedulers.

Use following schedulers to build your system:

schedulers.AbstractScheduler#

class regta.schedulers.AbstractScheduler#

Bases: abc.ABC

Interface which every scheduler implement.

abstract add_job(job: regta.jobs.AbstractJob)#

Add job to scheduler jobs list which will be started.

Parameters

job – The job will be added.

Raises

IncorrectJobType – If incorrect job type will be passed.

abstract run(block: bool = True)#

Run scheduler’s jobs.

Parameters

block – If True, blocks the current thread. If False, the thread will not be blocked and your script will continue.

abstract stop()#

Stops scheduler’s jobs.

Will be called by regta.schedulers.SyncBlocking when regta gets stop signal (SystemExit, KeyboardInterrupt, etc.) or if the scheduler’s thread is not blocked, you can call it yourself.

schedulers.SyncBlocking#

class regta.schedulers.SyncBlocking#

Bases: regta.schedulers.AbstractScheduler, abc.ABC

Contains sync blocking code to add block bool var to AbstractScheduler.run() to block thread.

schedulers.SyncScheduler#

class regta.schedulers.SyncScheduler#

Bases: regta.schedulers.SyncBlocking, regta.schedulers.AbstractScheduler

add_job(job: Union[regta.jobs.ThreadJob, regta.jobs.ProcessJob])#

Add job to scheduler jobs list which will be started.

Parameters

job – The job will be added.

Raises

IncorrectJobType – If incorrect job type will be passed.

run(block: bool = True)#

Run scheduler’s jobs.

Parameters

block – If True, blocks the current thread. If False, the thread will not be blocked and your script will continue.

stop()#

Stops scheduler’s jobs.

Will be called by regta.schedulers.SyncBlocking when regta gets stop signal (SystemExit, KeyboardInterrupt, etc.) or if the scheduler’s thread is not blocked, you can call it yourself.

schedulers.AsyncScheduler#

class regta.schedulers.AsyncScheduler#

Bases: regta.schedulers.AbstractScheduler, threading.Thread

add_job(job: regta.jobs.AsyncJob)#

Add job to scheduler jobs list which will be started.

Parameters

job – The job will be added.

Raises

IncorrectJobType – If incorrect job type will be passed.

run(block: bool = True)#

Run scheduler’s jobs.

Parameters

block – If True, blocks the current thread. If False, the thread will not be blocked and your script will continue.

stop()#

Stop scheduler’s jobs.

regta.exceptions#

exception regta.exceptions.RegtaException#

Bases: Exception

exception regta.exceptions.StopService#

Bases: regta.exceptions.RegtaException

exception regta.exceptions.IncorrectJobType(job, scheduler)#

Bases: regta.exceptions.RegtaException