관리-도구
편집 파일: db.py
#!/opt/cloudlinux/venv/bin/python3 -bb # coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2020 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENCE.TXT # import os import contextlib import sqlite3 from datetime import datetime, timedelta from sqlalchemy import ( Column, Boolean, DateTime, Integer, String, create_engine, event ) from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.engine.reflection import Inspector from sqlalchemy.orm import Session from wmt.common.const import WMT_DB, RETENTION_TIME_DAYS Base = declarative_base() class ScrapeResult(Base): """ created_date is saved in local TZ format """ __tablename__ = 'scrape_result' id = Column(Integer, primary_key=True) website = Column(String) create_date = Column(DateTime, default=lambda: datetime.now(), index=True) is_finished = Column(Boolean) response_code = Column(Integer, nullable=True) response_time_ms = Column(Integer, nullable=True) class DomainAlerts(Base): """ alert_time is saved in local TZ format """ __tablename__ = 'domain_alerts' id = Column(Integer, primary_key=True) website = Column(String, index=True) alert_time = Column(DateTime, default=lambda: datetime.now(), index=True) is_resolved = Column(Boolean, default=False) def cleanup_old_data(engine): n_days_ago = datetime.today() - timedelta(days=RETENTION_TIME_DAYS) with session_scope(engine) as session: session.query(ScrapeResult)\ .filter(ScrapeResult.create_date < n_days_ago)\ .delete() session.query(DomainAlerts)\ .filter(DomainAlerts.alert_time < n_days_ago)\ .delete() def create_db_if_not_exist(engine): if not is_db_present(engine): Base.metadata.create_all(engine) def is_db_present(engine): if not os.path.isfile(WMT_DB): return False database_inspection = Inspector.from_engine(engine) tables = [table for table in database_inspection.get_table_names()] return len(tables) > 0 def setup_wal_mode(dbapi_con, con_record): dbapi_con.execute('PRAGMA journal_mode = WAL') def setup_database(readonly=False): connection_string = f'file:{WMT_DB}' if readonly: connection_string = f'{connection_string}?mode=ro' creator = lambda: sqlite3.connect(connection_string, uri=True) engine = create_engine( 'sqlite:////', creator=creator, echo=False, ) event.listen(engine, 'connect', setup_wal_mode) create_db_if_not_exist(engine) return engine @contextlib.contextmanager def session_scope(engine) -> Session: """Provide a transactional scope around a series of operations.""" session = Session(bind=engine) try: yield session session.commit() except: session.rollback() raise finally: session.close()