D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
proc
/
3
/
cwd
/
opt
/
imh-python
/
lib
/
python3.9
/
site-packages
/
celery
/
backends
/
database
/
Filename :
models.py
back
Copy
"""Database models used by the SQLAlchemy result store backend.""" from datetime import datetime, timezone import sqlalchemy as sa from sqlalchemy.types import PickleType from celery import states from .session import ResultModelBase __all__ = ('Task', 'TaskExtended', 'TaskSet') class Task(ResultModelBase): """Task result/status.""" __tablename__ = 'celery_taskmeta' __table_args__ = {'sqlite_autoincrement': True} id = sa.Column(sa.Integer, sa.Sequence('task_id_sequence'), primary_key=True, autoincrement=True) task_id = sa.Column(sa.String(155), unique=True) status = sa.Column(sa.String(50), default=states.PENDING) result = sa.Column(PickleType, nullable=True) date_done = sa.Column(sa.DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc), nullable=True) traceback = sa.Column(sa.Text, nullable=True) def __init__(self, task_id): self.task_id = task_id def to_dict(self): return { 'task_id': self.task_id, 'status': self.status, 'result': self.result, 'traceback': self.traceback, 'date_done': self.date_done, } def __repr__(self): return '<Task {0.task_id} state: {0.status}>'.format(self) @classmethod def configure(cls, schema=None, name=None): cls.__table__.schema = schema cls.id.default.schema = schema cls.__table__.name = name or cls.__tablename__ class TaskExtended(Task): """For the extend result.""" __tablename__ = 'celery_taskmeta' __table_args__ = {'sqlite_autoincrement': True, 'extend_existing': True} name = sa.Column(sa.String(155), nullable=True) args = sa.Column(sa.LargeBinary, nullable=True) kwargs = sa.Column(sa.LargeBinary, nullable=True) worker = sa.Column(sa.String(155), nullable=True) retries = sa.Column(sa.Integer, nullable=True) queue = sa.Column(sa.String(155), nullable=True) def to_dict(self): task_dict = super().to_dict() task_dict.update({ 'name': self.name, 'args': self.args, 'kwargs': self.kwargs, 'worker': self.worker, 'retries': self.retries, 'queue': self.queue, }) return task_dict class TaskSet(ResultModelBase): """TaskSet result.""" __tablename__ = 'celery_tasksetmeta' __table_args__ = {'sqlite_autoincrement': True} id = sa.Column(sa.Integer, sa.Sequence('taskset_id_sequence'), autoincrement=True, primary_key=True) taskset_id = sa.Column(sa.String(155), unique=True) result = sa.Column(PickleType, nullable=True) date_done = sa.Column(sa.DateTime, default=datetime.now(timezone.utc), nullable=True) def __init__(self, taskset_id, result): self.taskset_id = taskset_id self.result = result def to_dict(self): return { 'taskset_id': self.taskset_id, 'result': self.result, 'date_done': self.date_done, } def __repr__(self): return f'<TaskSet: {self.taskset_id}>' @classmethod def configure(cls, schema=None, name=None): cls.__table__.schema = schema cls.id.default.schema = schema cls.__table__.name = name or cls.__tablename__