wrappers.py 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. from contextvars import ContextVar
  2. from peewee import *
  3. from playhouse.db_url import connect
  4. from playhouse.pool import PooledPostgresqlDatabase
  5. from playhouse.shortcuts import ReconnectMixin
  6. db_state_default = {"closed": None, "conn": None, "ctx": None, "transactions": None}
  7. db_state = ContextVar("db_state", default=db_state_default.copy())
  8. class PeeweeConnectionState(object):
  9. def __init__(self, **kwargs):
  10. super().__setattr__("_state", db_state)
  11. super().__init__(**kwargs)
  12. def __setattr__(self, name, value):
  13. self._state.get()[name] = value
  14. def __getattr__(self, name):
  15. value = self._state.get()[name]
  16. return value
  17. class ReconnectingPostgresqlDatabase(ReconnectMixin, PostgresqlDatabase):
  18. pass
  19. class ReconnectingPooledPostgresqlDatabase(ReconnectMixin, PooledPostgresqlDatabase):
  20. pass
  21. class ReconnectingSqliteDatabase(ReconnectMixin, SqliteDatabase):
  22. pass
  23. def register_connection(db_url):
  24. # Connect using the playhouse.db_url module, which supports multiple
  25. # database types, then wrap the connection in a ReconnectMixin to handle dropped connections
  26. db = connect(db_url)
  27. if isinstance(db, PostgresqlDatabase):
  28. db = ReconnectingPostgresqlDatabase(db.database, **db.connect_params)
  29. elif isinstance(db, PooledPostgresqlDatabase):
  30. db = ReconnectingPooledPostgresqlDatabase(db.database, **db.connect_params)
  31. elif isinstance(db, SqliteDatabase):
  32. db = ReconnectingSqliteDatabase(db.database, **db.connect_params)
  33. else:
  34. raise ValueError('Unsupported database connection')
  35. return db