Make Jobs#
Regta provides various job types to solve various tasks:
To build fast jobs with IO operations e.g. internet requests, database query or work with files use Async Job.
If for some reason you can’t use asynchronous programming, but you are still faced with a task of building a lightweight job, use Thread-Based Job.
To build heavy jobs with a lot of computing e.g. ML or data analytics use Process-Based Job.
All job output will be logged. If logger factory is not specified, standard output will be used.
See also
What is the logger factory and how to add logging into your project here.
Async Job#
Use regta.async_job
to build jobs with this type. See an example of
how to send regular notifications to a Slack channel below:
from aiohttp import ClientSession
from datetime import datetime, timedelta
import regta
class Slack:
url: str # secret URL from env
def __init__(self, channel: str):
self.channel = channel
async def send_message(self, message: str):
async with ClientSession(headers={"Content-type": "application/json"}) as session:
await session.post(self.url, json={"text": message, "channel": self.channel})
@regta.async_job(timedelta(days=7))
async def weekly_report():
data = "Some report data..."
await Slack("my-channel").send_message(data)
return f"Weekly report was sent at {datetime.utcnow()}."
Thread-Based Job#
Use regta.thread_job
to build jobs with this type. See an example of
how to mark users as inactive for the synchronous version of sqlalchemy:
from datetime import datetime, timedelta
import regta
from db import Session
from models import User
@regta.thread_job(timedelta(hours=1))
def mark_users_as_inactive():
with Session() as session:
marked_users_ids = session.execute(
User
.update()
.where(User.active=True, User.last_online < datetime.utcnow() - timedelta(days=365))
.values(active=False)
.returning(User.id)
)
return f"{len(marked_users_ids)} users was marked as inactive."
Process-Based Job#
Use regta.process_job
to build jobs with this type. See an example of
how to predict the weather by temperature (this is not a good example of a ML
task, but is ok to demonstrate how to use a process-based job) using prepared
ML model and send the result on a Telegram channel:
from datetime import datetime, timedelta
from typing import List
import regta
import telebot # module for Telegram
from ml_models import my_model # Stub model for this example
def get_last_temperature_for_a_week() -> List[float]:
"""This is just a stub function for demonstration."""
class TelegramChannel:
CHANNEL_ID: str
TOKEN: str
def __init__(self):
self.bot = telebot.TeleBot(self.TOKEN)
def post(text: str):
self.bot.send_message(self.CHANNEL_ID, text)
@regta.process_job(timedelta(hours=24))
def send_temperature_prediction():
prediction = my_model.predict(get_last_temperature_for_a_week())
result = float(prediction[0])
msg = f"Tomorrow at this time will be {result}С°"
TelegramChanel().post(msg)
return msg