Module scenographer.database
Module to connect with database
Expand source code
#!/usr/bin/env python3
"""
Module to connect with database
"""
import subprocess
import uuid
from functools import lru_cache
from pathlib import Path
from pipes import quote
from types import SimpleNamespace
from typing import Any, Iterable, List, Mapping, NamedTuple, TextIO
import postgres_copy
import sqlalchemy
from loguru import logger
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.engine import Engine, ResultProxy
from sqlalchemy.schema import Table
from sqlalchemy.sql.expression import Select
from sqlalchemy.types import CHAR, TypeDecorator
from scenographer.utils import PrintAs
class Database(NamedTuple):
"Wrapper around database operations"
database_url: str
def execute(self, *args, **kwargs) -> ResultProxy:
"""
Executes query and returns the usual ResultProxy from SQLAlchemy.
"""
with self.engine.begin() as connection:
return connection.execute(*args, **kwargs)
def execute_return_list(self, *args, **kwargs) -> List[Any]:
"""
Executes query and returns a list of the resulting values
It raises an AssertionError if more than one column is returned
"""
resultproxy = list(self.execute(*args, **kwargs))
if resultproxy:
columns = [column_name for column_name, value in resultproxy[0].items()]
if len(columns) != 1:
print(columns)
assert len(columns) == 1
return [rowproxy.values()[0] for rowproxy in resultproxy]
def execute_return_dict(self, *args, **kwargs) -> List[Mapping[str, Any]]:
"""
Executes query and returns each row as a dictionary;
It raises an AssertionError if any column name is repeated.
"""
resultproxy = list(self.execute(*args, **kwargs))
if resultproxy:
columns = [column_name for column_name, value in resultproxy[0].items()]
assert len(columns) == len(set(columns))
return [
{column: value for column, value in rowproxy.items()}
for rowproxy in resultproxy
]
def copy_to_csv(self, file: TextIO, select: Select) -> None:
"Executes query and write the rows into a file object in CSV format."
postgres_copy.copy_to(select, file, self.engine, format="csv", header=True)
@property
@lru_cache()
def engine(self) -> Engine:
"Create, return and cache the associated sqlalchemy engine"
return sqlalchemy.create_engine(self.database_url)
@property
@lru_cache()
def tables(self) -> SimpleNamespace:
"Reflect the database to return and cache a namespace with all of its tables"
logger.info("Reflecting source database")
metadata = sqlalchemy.MetaData()
with PrintAs(logger.warning):
metadata.reflect(self.engine, views=False)
return SimpleNamespace(**metadata.tables)
def load_schema(self, source_database: "Database") -> None:
"""
pg_dump \
--format=custom --no-owner --schema-only \
--verbose {source_database} \
| pg_restore \
--format=custom --no-owner --schema-only \
--no-acl \
--verbose -d {target_database}
"""
pg_copy_schema = self.load_schema.__doc__.format(
source_database=quote(source_database.database_url),
target_database=quote(self.database_url),
)
process = subprocess.Popen(
pg_copy_schema,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
with process.stdout:
for line in iter(process.stdout.readline, b""):
logger.trace("{}", line)
exit_code = process.wait()
logger.debug("Command pg_dump | pg_restore exited with code {}", exit_code)
# assert exit_code == 0
def load_samples(self, directory: Path, samples: Iterable[Table]) -> None:
"Copy the generated sample CSVs into the database"
for table in samples:
with open(directory / Path(table.name).with_suffix(".csv")) as file:
postgres_copy.copy_from(
file, table, self.engine, format="csv", header=True
)
def test_conn(self) -> "Database":
"Copy the generated sample CSVs into the database"
return self.engine and self # Always returns self
# Retrieved from
# https://docs.sqlalchemy.org/en/13/core/custom_types.html#backend-agnostic-guid-type
class UUIDField(TypeDecorator):
"""Platform-independent GUID type.
Uses PostgreSQL's UUID type, otherwise uses
CHAR(32), storing as stringified hex values.
"""
impl = CHAR
def load_dialect_impl(self, dialect):
if dialect.name == "postgresql":
return dialect.type_descriptor(UUID())
else:
return dialect.type_descriptor(CHAR(32))
def process_bind_param(self, value, dialect):
if value is None:
return value
elif dialect.name == "postgresql":
return str(value)
else:
if not isinstance(value, uuid.UUID):
return "%.32x" % uuid.UUID(value).int
else:
# hexstring
return "%.32x" % value.int
def process_result_value(self, value, dialect):
if value is None:
return value
else:
if not isinstance(value, uuid.UUID):
value = uuid.UUID(value)
return value
Classes
class Database (database_url: str)
-
Wrapper around database operations
Expand source code
class Database(NamedTuple): "Wrapper around database operations" database_url: str def execute(self, *args, **kwargs) -> ResultProxy: """ Executes query and returns the usual ResultProxy from SQLAlchemy. """ with self.engine.begin() as connection: return connection.execute(*args, **kwargs) def execute_return_list(self, *args, **kwargs) -> List[Any]: """ Executes query and returns a list of the resulting values It raises an AssertionError if more than one column is returned """ resultproxy = list(self.execute(*args, **kwargs)) if resultproxy: columns = [column_name for column_name, value in resultproxy[0].items()] if len(columns) != 1: print(columns) assert len(columns) == 1 return [rowproxy.values()[0] for rowproxy in resultproxy] def execute_return_dict(self, *args, **kwargs) -> List[Mapping[str, Any]]: """ Executes query and returns each row as a dictionary; It raises an AssertionError if any column name is repeated. """ resultproxy = list(self.execute(*args, **kwargs)) if resultproxy: columns = [column_name for column_name, value in resultproxy[0].items()] assert len(columns) == len(set(columns)) return [ {column: value for column, value in rowproxy.items()} for rowproxy in resultproxy ] def copy_to_csv(self, file: TextIO, select: Select) -> None: "Executes query and write the rows into a file object in CSV format." postgres_copy.copy_to(select, file, self.engine, format="csv", header=True) @property @lru_cache() def engine(self) -> Engine: "Create, return and cache the associated sqlalchemy engine" return sqlalchemy.create_engine(self.database_url) @property @lru_cache() def tables(self) -> SimpleNamespace: "Reflect the database to return and cache a namespace with all of its tables" logger.info("Reflecting source database") metadata = sqlalchemy.MetaData() with PrintAs(logger.warning): metadata.reflect(self.engine, views=False) return SimpleNamespace(**metadata.tables) def load_schema(self, source_database: "Database") -> None: """ pg_dump \ --format=custom --no-owner --schema-only \ --verbose {source_database} \ | pg_restore \ --format=custom --no-owner --schema-only \ --no-acl \ --verbose -d {target_database} """ pg_copy_schema = self.load_schema.__doc__.format( source_database=quote(source_database.database_url), target_database=quote(self.database_url), ) process = subprocess.Popen( pg_copy_schema, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) with process.stdout: for line in iter(process.stdout.readline, b""): logger.trace("{}", line) exit_code = process.wait() logger.debug("Command pg_dump | pg_restore exited with code {}", exit_code) # assert exit_code == 0 def load_samples(self, directory: Path, samples: Iterable[Table]) -> None: "Copy the generated sample CSVs into the database" for table in samples: with open(directory / Path(table.name).with_suffix(".csv")) as file: postgres_copy.copy_from( file, table, self.engine, format="csv", header=True ) def test_conn(self) -> "Database": "Copy the generated sample CSVs into the database" return self.engine and self # Always returns self
Ancestors
- builtins.tuple
Instance variables
var database_url : str
-
Alias for field number 0
var engine : sqlalchemy.engine.base.Engine
-
Create, return and cache the associated sqlalchemy engine
Expand source code
@property @lru_cache() def engine(self) -> Engine: "Create, return and cache the associated sqlalchemy engine" return sqlalchemy.create_engine(self.database_url)
var tables : types.SimpleNamespace
-
Reflect the database to return and cache a namespace with all of its tables
Expand source code
@property @lru_cache() def tables(self) -> SimpleNamespace: "Reflect the database to return and cache a namespace with all of its tables" logger.info("Reflecting source database") metadata = sqlalchemy.MetaData() with PrintAs(logger.warning): metadata.reflect(self.engine, views=False) return SimpleNamespace(**metadata.tables)
Methods
def copy_to_csv(self, file:
, select: sqlalchemy.sql.selectable.Select) ‑> NoneType -
Executes query and write the rows into a file object in CSV format.
Expand source code
def copy_to_csv(self, file: TextIO, select: Select) -> None: "Executes query and write the rows into a file object in CSV format." postgres_copy.copy_to(select, file, self.engine, format="csv", header=True)
def execute(self, *args, **kwargs) ‑> sqlalchemy.engine.result.ResultProxy
-
Executes query and returns the usual ResultProxy from SQLAlchemy.
Expand source code
def execute(self, *args, **kwargs) -> ResultProxy: """ Executes query and returns the usual ResultProxy from SQLAlchemy. """ with self.engine.begin() as connection: return connection.execute(*args, **kwargs)
def execute_return_dict(self, *args, **kwargs) ‑> List[Mapping[str, Any]]
-
Executes query and returns each row as a dictionary; It raises an AssertionError if any column name is repeated.
Expand source code
def execute_return_dict(self, *args, **kwargs) -> List[Mapping[str, Any]]: """ Executes query and returns each row as a dictionary; It raises an AssertionError if any column name is repeated. """ resultproxy = list(self.execute(*args, **kwargs)) if resultproxy: columns = [column_name for column_name, value in resultproxy[0].items()] assert len(columns) == len(set(columns)) return [ {column: value for column, value in rowproxy.items()} for rowproxy in resultproxy ]
def execute_return_list(self, *args, **kwargs) ‑> List[Any]
-
Executes query and returns a list of the resulting values It raises an AssertionError if more than one column is returned
Expand source code
def execute_return_list(self, *args, **kwargs) -> List[Any]: """ Executes query and returns a list of the resulting values It raises an AssertionError if more than one column is returned """ resultproxy = list(self.execute(*args, **kwargs)) if resultproxy: columns = [column_name for column_name, value in resultproxy[0].items()] if len(columns) != 1: print(columns) assert len(columns) == 1 return [rowproxy.values()[0] for rowproxy in resultproxy]
def load_samples(self, directory: pathlib.Path, samples: Iterable[sqlalchemy.sql.schema.Table]) ‑> NoneType
-
Copy the generated sample CSVs into the database
Expand source code
def load_samples(self, directory: Path, samples: Iterable[Table]) -> None: "Copy the generated sample CSVs into the database" for table in samples: with open(directory / Path(table.name).with_suffix(".csv")) as file: postgres_copy.copy_from( file, table, self.engine, format="csv", header=True )
def load_schema(self, source_database: Database) ‑> NoneType
-
pg_dump –format=custom –no-owner –schema-only –verbose {source_database} | pg_restore –format=custom –no-owner –schema-only –no-acl –verbose -d {target_database}
Expand source code
def load_schema(self, source_database: "Database") -> None: """ pg_dump \ --format=custom --no-owner --schema-only \ --verbose {source_database} \ | pg_restore \ --format=custom --no-owner --schema-only \ --no-acl \ --verbose -d {target_database} """ pg_copy_schema = self.load_schema.__doc__.format( source_database=quote(source_database.database_url), target_database=quote(self.database_url), ) process = subprocess.Popen( pg_copy_schema, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) with process.stdout: for line in iter(process.stdout.readline, b""): logger.trace("{}", line) exit_code = process.wait() logger.debug("Command pg_dump | pg_restore exited with code {}", exit_code)
def test_conn(self) ‑> Database
-
Copy the generated sample CSVs into the database
Expand source code
def test_conn(self) -> "Database": "Copy the generated sample CSVs into the database" return self.engine and self # Always returns self
class UUIDField (*args, **kwargs)
-
Platform-independent GUID type.
Uses PostgreSQL's UUID type, otherwise uses CHAR(32), storing as stringified hex values.
Construct a :class:
.TypeDecorator
.Arguments sent here are passed to the constructor of the class assigned to the
impl
class level attribute, assuming theimpl
is a callable, and the resulting object is assigned to theself.impl
instance attribute (thus overriding the class attribute of the same name).If the class level
impl
is not a callable (the unusual case), it will be assigned to the same instance attribute 'as-is', ignoring those arguments passed to the constructor.Subclasses can override this to customize the generation of
self.impl
entirely.Expand source code
class UUIDField(TypeDecorator): """Platform-independent GUID type. Uses PostgreSQL's UUID type, otherwise uses CHAR(32), storing as stringified hex values. """ impl = CHAR def load_dialect_impl(self, dialect): if dialect.name == "postgresql": return dialect.type_descriptor(UUID()) else: return dialect.type_descriptor(CHAR(32)) def process_bind_param(self, value, dialect): if value is None: return value elif dialect.name == "postgresql": return str(value) else: if not isinstance(value, uuid.UUID): return "%.32x" % uuid.UUID(value).int else: # hexstring return "%.32x" % value.int def process_result_value(self, value, dialect): if value is None: return value else: if not isinstance(value, uuid.UUID): value = uuid.UUID(value) return value
Ancestors
- sqlalchemy.sql.type_api.TypeDecorator
- sqlalchemy.sql.base.SchemaEventTarget
- sqlalchemy.sql.type_api.TypeEngine
- sqlalchemy.sql.visitors.Visitable
Class variables
var impl
-
The SQL CHAR type.
Methods
def load_dialect_impl(self, dialect)
-
Return a :class:
.TypeEngine
object corresponding to a dialect.This is an end-user override hook that can be used to provide differing types depending on the given dialect. It is used by the :class:
.TypeDecorator
implementation of :meth:type_engine
to help determine what type should ultimately be returned for a given :class:.TypeDecorator
.By default returns
self.impl
.Expand source code
def load_dialect_impl(self, dialect): if dialect.name == "postgresql": return dialect.type_descriptor(UUID()) else: return dialect.type_descriptor(CHAR(32))
def process_bind_param(self, value, dialect)
-
Receive a bound parameter value to be converted.
Subclasses override this method to return the value that should be passed along to the underlying :class:
.TypeEngine
object, and from there to the DBAPIexecute()
method.The operation could be anything desired to perform custom behavior, such as transforming or serializing data. This could also be used as a hook for validating logic.
This operation should be designed with the reverse operation in mind, which would be the process_result_value method of this class.
:param value: Data to operate upon, of any type expected by this method in the subclass. Can be
None
. :param dialect: the :class:.Dialect
in use.Expand source code
def process_bind_param(self, value, dialect): if value is None: return value elif dialect.name == "postgresql": return str(value) else: if not isinstance(value, uuid.UUID): return "%.32x" % uuid.UUID(value).int else: # hexstring return "%.32x" % value.int
def process_result_value(self, value, dialect)
-
Receive a result-row column value to be converted.
Subclasses should implement this method to operate on data fetched from the database.
Subclasses override this method to return the value that should be passed back to the application, given a value that is already processed by the underlying :class:
.TypeEngine
object, originally from the DBAPI cursor methodfetchone()
or similar.The operation could be anything desired to perform custom behavior, such as transforming or serializing data. This could also be used as a hook for validating logic.
:param value: Data to operate upon, of any type expected by this method in the subclass. Can be
None
. :param dialect: the :class:.Dialect
in use.This operation should be designed to be reversible by the "process_bind_param" method of this class.
Expand source code
def process_result_value(self, value, dialect): if value is None: return value else: if not isinstance(value, uuid.UUID): value = uuid.UUID(value) return value