Module scenographer.database

Module to connect with database

Expand source code
#!/usr/bin/env python3

"""
Module to connect with database
"""

import subprocess
import uuid
from functools import lru_cache
from pathlib import Path
from pipes import quote
from types import SimpleNamespace
from typing import Any, Iterable, List, Mapping, NamedTuple, TextIO

import postgres_copy
import sqlalchemy
from loguru import logger
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.engine import Engine, ResultProxy
from sqlalchemy.schema import Table
from sqlalchemy.sql.expression import Select
from sqlalchemy.types import CHAR, TypeDecorator

from scenographer.utils import PrintAs


class Database(NamedTuple):
    "Wrapper around database operations"
    database_url: str

    def execute(self, *args, **kwargs) -> ResultProxy:
        """
        Executes query and returns the usual ResultProxy from SQLAlchemy.
        """
        with self.engine.begin() as connection:
            return connection.execute(*args, **kwargs)

    def execute_return_list(self, *args, **kwargs) -> List[Any]:
        """
        Executes query and returns a list of the resulting values
        It raises an AssertionError if more than one column is returned
        """
        resultproxy = list(self.execute(*args, **kwargs))

        if resultproxy:
            columns = [column_name for column_name, value in resultproxy[0].items()]
            if len(columns) != 1:
                print(columns)
            assert len(columns) == 1

        return [rowproxy.values()[0] for rowproxy in resultproxy]

    def execute_return_dict(self, *args, **kwargs) -> List[Mapping[str, Any]]:
        """
        Executes query and returns each row as a dictionary;
        It raises an AssertionError if any column name is repeated.
        """
        resultproxy = list(self.execute(*args, **kwargs))

        if resultproxy:
            columns = [column_name for column_name, value in resultproxy[0].items()]
            assert len(columns) == len(set(columns))

        return [
            {column: value for column, value in rowproxy.items()}
            for rowproxy in resultproxy
        ]

    def copy_to_csv(self, file: TextIO, select: Select) -> None:
        "Executes query and write the rows into a file object in CSV format."
        postgres_copy.copy_to(select, file, self.engine, format="csv", header=True)

    @property
    @lru_cache()
    def engine(self) -> Engine:
        "Create, return and cache the associated sqlalchemy engine"
        return sqlalchemy.create_engine(self.database_url)

    @property
    @lru_cache()
    def tables(self) -> SimpleNamespace:
        "Reflect the database to return and cache a namespace with all of its tables"

        logger.info("Reflecting source database")
        metadata = sqlalchemy.MetaData()
        with PrintAs(logger.warning):
            metadata.reflect(self.engine, views=False)

        return SimpleNamespace(**metadata.tables)

    def load_schema(self, source_database: "Database") -> None:
        """
        pg_dump \
            --format=custom --no-owner --schema-only \
            --verbose {source_database} \
        | pg_restore \
            --format=custom --no-owner --schema-only \
            --no-acl \
            --verbose -d {target_database}
        """
        pg_copy_schema = self.load_schema.__doc__.format(
            source_database=quote(source_database.database_url),
            target_database=quote(self.database_url),
        )
        process = subprocess.Popen(
            pg_copy_schema,
            shell=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
        )
        with process.stdout:
            for line in iter(process.stdout.readline, b""):
                logger.trace("{}", line)

        exit_code = process.wait()
        logger.debug("Command pg_dump | pg_restore exited with code {}", exit_code)
        # assert exit_code == 0

    def load_samples(self, directory: Path, samples: Iterable[Table]) -> None:
        "Copy the generated sample CSVs into the database"
        for table in samples:
            with open(directory / Path(table.name).with_suffix(".csv")) as file:
                postgres_copy.copy_from(
                    file, table, self.engine, format="csv", header=True
                )

    def test_conn(self) -> "Database":
        "Copy the generated sample CSVs into the database"
        return self.engine and self  # Always returns self


# Retrieved from
# https://docs.sqlalchemy.org/en/13/core/custom_types.html#backend-agnostic-guid-type
class UUIDField(TypeDecorator):
    """Platform-independent GUID type.

    Uses PostgreSQL's UUID type, otherwise uses
    CHAR(32), storing as stringified hex values.
    """

    impl = CHAR

    def load_dialect_impl(self, dialect):
        if dialect.name == "postgresql":
            return dialect.type_descriptor(UUID())
        else:
            return dialect.type_descriptor(CHAR(32))

    def process_bind_param(self, value, dialect):
        if value is None:
            return value
        elif dialect.name == "postgresql":
            return str(value)
        else:
            if not isinstance(value, uuid.UUID):
                return "%.32x" % uuid.UUID(value).int
            else:
                # hexstring
                return "%.32x" % value.int

    def process_result_value(self, value, dialect):
        if value is None:
            return value
        else:
            if not isinstance(value, uuid.UUID):
                value = uuid.UUID(value)
            return value

Classes

class Database (database_url: str)

Wrapper around database operations

Expand source code
class Database(NamedTuple):
    "Wrapper around database operations"
    database_url: str

    def execute(self, *args, **kwargs) -> ResultProxy:
        """
        Executes query and returns the usual ResultProxy from SQLAlchemy.
        """
        with self.engine.begin() as connection:
            return connection.execute(*args, **kwargs)

    def execute_return_list(self, *args, **kwargs) -> List[Any]:
        """
        Executes query and returns a list of the resulting values
        It raises an AssertionError if more than one column is returned
        """
        resultproxy = list(self.execute(*args, **kwargs))

        if resultproxy:
            columns = [column_name for column_name, value in resultproxy[0].items()]
            if len(columns) != 1:
                print(columns)
            assert len(columns) == 1

        return [rowproxy.values()[0] for rowproxy in resultproxy]

    def execute_return_dict(self, *args, **kwargs) -> List[Mapping[str, Any]]:
        """
        Executes query and returns each row as a dictionary;
        It raises an AssertionError if any column name is repeated.
        """
        resultproxy = list(self.execute(*args, **kwargs))

        if resultproxy:
            columns = [column_name for column_name, value in resultproxy[0].items()]
            assert len(columns) == len(set(columns))

        return [
            {column: value for column, value in rowproxy.items()}
            for rowproxy in resultproxy
        ]

    def copy_to_csv(self, file: TextIO, select: Select) -> None:
        "Executes query and write the rows into a file object in CSV format."
        postgres_copy.copy_to(select, file, self.engine, format="csv", header=True)

    @property
    @lru_cache()
    def engine(self) -> Engine:
        "Create, return and cache the associated sqlalchemy engine"
        return sqlalchemy.create_engine(self.database_url)

    @property
    @lru_cache()
    def tables(self) -> SimpleNamespace:
        "Reflect the database to return and cache a namespace with all of its tables"

        logger.info("Reflecting source database")
        metadata = sqlalchemy.MetaData()
        with PrintAs(logger.warning):
            metadata.reflect(self.engine, views=False)

        return SimpleNamespace(**metadata.tables)

    def load_schema(self, source_database: "Database") -> None:
        """
        pg_dump \
            --format=custom --no-owner --schema-only \
            --verbose {source_database} \
        | pg_restore \
            --format=custom --no-owner --schema-only \
            --no-acl \
            --verbose -d {target_database}
        """
        pg_copy_schema = self.load_schema.__doc__.format(
            source_database=quote(source_database.database_url),
            target_database=quote(self.database_url),
        )
        process = subprocess.Popen(
            pg_copy_schema,
            shell=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
        )
        with process.stdout:
            for line in iter(process.stdout.readline, b""):
                logger.trace("{}", line)

        exit_code = process.wait()
        logger.debug("Command pg_dump | pg_restore exited with code {}", exit_code)
        # assert exit_code == 0

    def load_samples(self, directory: Path, samples: Iterable[Table]) -> None:
        "Copy the generated sample CSVs into the database"
        for table in samples:
            with open(directory / Path(table.name).with_suffix(".csv")) as file:
                postgres_copy.copy_from(
                    file, table, self.engine, format="csv", header=True
                )

    def test_conn(self) -> "Database":
        "Copy the generated sample CSVs into the database"
        return self.engine and self  # Always returns self

Ancestors

  • builtins.tuple

Instance variables

var database_url : str

Alias for field number 0

var engine : sqlalchemy.engine.base.Engine

Create, return and cache the associated sqlalchemy engine

Expand source code
@property
@lru_cache()
def engine(self) -> Engine:
    "Create, return and cache the associated sqlalchemy engine"
    return sqlalchemy.create_engine(self.database_url)
var tables : types.SimpleNamespace

Reflect the database to return and cache a namespace with all of its tables

Expand source code
@property
@lru_cache()
def tables(self) -> SimpleNamespace:
    "Reflect the database to return and cache a namespace with all of its tables"

    logger.info("Reflecting source database")
    metadata = sqlalchemy.MetaData()
    with PrintAs(logger.warning):
        metadata.reflect(self.engine, views=False)

    return SimpleNamespace(**metadata.tables)

Methods

def copy_to_csv(self, file: , select: sqlalchemy.sql.selectable.Select) ‑> NoneType

Executes query and write the rows into a file object in CSV format.

Expand source code
def copy_to_csv(self, file: TextIO, select: Select) -> None:
    "Executes query and write the rows into a file object in CSV format."
    postgres_copy.copy_to(select, file, self.engine, format="csv", header=True)
def execute(self, *args, **kwargs) ‑> sqlalchemy.engine.result.ResultProxy

Executes query and returns the usual ResultProxy from SQLAlchemy.

Expand source code
def execute(self, *args, **kwargs) -> ResultProxy:
    """
    Executes query and returns the usual ResultProxy from SQLAlchemy.
    """
    with self.engine.begin() as connection:
        return connection.execute(*args, **kwargs)
def execute_return_dict(self, *args, **kwargs) ‑> List[Mapping[str, Any]]

Executes query and returns each row as a dictionary; It raises an AssertionError if any column name is repeated.

Expand source code
def execute_return_dict(self, *args, **kwargs) -> List[Mapping[str, Any]]:
    """
    Executes query and returns each row as a dictionary;
    It raises an AssertionError if any column name is repeated.
    """
    resultproxy = list(self.execute(*args, **kwargs))

    if resultproxy:
        columns = [column_name for column_name, value in resultproxy[0].items()]
        assert len(columns) == len(set(columns))

    return [
        {column: value for column, value in rowproxy.items()}
        for rowproxy in resultproxy
    ]
def execute_return_list(self, *args, **kwargs) ‑> List[Any]

Executes query and returns a list of the resulting values It raises an AssertionError if more than one column is returned

Expand source code
def execute_return_list(self, *args, **kwargs) -> List[Any]:
    """
    Executes query and returns a list of the resulting values
    It raises an AssertionError if more than one column is returned
    """
    resultproxy = list(self.execute(*args, **kwargs))

    if resultproxy:
        columns = [column_name for column_name, value in resultproxy[0].items()]
        if len(columns) != 1:
            print(columns)
        assert len(columns) == 1

    return [rowproxy.values()[0] for rowproxy in resultproxy]
def load_samples(self, directory: pathlib.Path, samples: Iterable[sqlalchemy.sql.schema.Table]) ‑> NoneType

Copy the generated sample CSVs into the database

Expand source code
def load_samples(self, directory: Path, samples: Iterable[Table]) -> None:
    "Copy the generated sample CSVs into the database"
    for table in samples:
        with open(directory / Path(table.name).with_suffix(".csv")) as file:
            postgres_copy.copy_from(
                file, table, self.engine, format="csv", header=True
            )
def load_schema(self, source_database: Database) ‑> NoneType

pg_dump –format=custom –no-owner –schema-only –verbose {source_database} | pg_restore –format=custom –no-owner –schema-only –no-acl –verbose -d {target_database}

Expand source code
def load_schema(self, source_database: "Database") -> None:
    """
    pg_dump \
        --format=custom --no-owner --schema-only \
        --verbose {source_database} \
    | pg_restore \
        --format=custom --no-owner --schema-only \
        --no-acl \
        --verbose -d {target_database}
    """
    pg_copy_schema = self.load_schema.__doc__.format(
        source_database=quote(source_database.database_url),
        target_database=quote(self.database_url),
    )
    process = subprocess.Popen(
        pg_copy_schema,
        shell=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
    )
    with process.stdout:
        for line in iter(process.stdout.readline, b""):
            logger.trace("{}", line)

    exit_code = process.wait()
    logger.debug("Command pg_dump | pg_restore exited with code {}", exit_code)
def test_conn(self) ‑> Database

Copy the generated sample CSVs into the database

Expand source code
def test_conn(self) -> "Database":
    "Copy the generated sample CSVs into the database"
    return self.engine and self  # Always returns self
class UUIDField (*args, **kwargs)

Platform-independent GUID type.

Uses PostgreSQL's UUID type, otherwise uses CHAR(32), storing as stringified hex values.

Construct a :class:.TypeDecorator.

Arguments sent here are passed to the constructor of the class assigned to the impl class level attribute, assuming the impl is a callable, and the resulting object is assigned to the self.impl instance attribute (thus overriding the class attribute of the same name).

If the class level impl is not a callable (the unusual case), it will be assigned to the same instance attribute 'as-is', ignoring those arguments passed to the constructor.

Subclasses can override this to customize the generation of self.impl entirely.

Expand source code
class UUIDField(TypeDecorator):
    """Platform-independent GUID type.

    Uses PostgreSQL's UUID type, otherwise uses
    CHAR(32), storing as stringified hex values.
    """

    impl = CHAR

    def load_dialect_impl(self, dialect):
        if dialect.name == "postgresql":
            return dialect.type_descriptor(UUID())
        else:
            return dialect.type_descriptor(CHAR(32))

    def process_bind_param(self, value, dialect):
        if value is None:
            return value
        elif dialect.name == "postgresql":
            return str(value)
        else:
            if not isinstance(value, uuid.UUID):
                return "%.32x" % uuid.UUID(value).int
            else:
                # hexstring
                return "%.32x" % value.int

    def process_result_value(self, value, dialect):
        if value is None:
            return value
        else:
            if not isinstance(value, uuid.UUID):
                value = uuid.UUID(value)
            return value

Ancestors

  • sqlalchemy.sql.type_api.TypeDecorator
  • sqlalchemy.sql.base.SchemaEventTarget
  • sqlalchemy.sql.type_api.TypeEngine
  • sqlalchemy.sql.visitors.Visitable

Class variables

var impl

The SQL CHAR type.

Methods

def load_dialect_impl(self, dialect)

Return a :class:.TypeEngine object corresponding to a dialect.

This is an end-user override hook that can be used to provide differing types depending on the given dialect. It is used by the :class:.TypeDecorator implementation of :meth:type_engine to help determine what type should ultimately be returned for a given :class:.TypeDecorator.

By default returns self.impl.

Expand source code
def load_dialect_impl(self, dialect):
    if dialect.name == "postgresql":
        return dialect.type_descriptor(UUID())
    else:
        return dialect.type_descriptor(CHAR(32))
def process_bind_param(self, value, dialect)

Receive a bound parameter value to be converted.

Subclasses override this method to return the value that should be passed along to the underlying :class:.TypeEngine object, and from there to the DBAPI execute() method.

The operation could be anything desired to perform custom behavior, such as transforming or serializing data. This could also be used as a hook for validating logic.

This operation should be designed with the reverse operation in mind, which would be the process_result_value method of this class.

:param value: Data to operate upon, of any type expected by this method in the subclass. Can be None. :param dialect: the :class:.Dialect in use.

Expand source code
def process_bind_param(self, value, dialect):
    if value is None:
        return value
    elif dialect.name == "postgresql":
        return str(value)
    else:
        if not isinstance(value, uuid.UUID):
            return "%.32x" % uuid.UUID(value).int
        else:
            # hexstring
            return "%.32x" % value.int
def process_result_value(self, value, dialect)

Receive a result-row column value to be converted.

Subclasses should implement this method to operate on data fetched from the database.

Subclasses override this method to return the value that should be passed back to the application, given a value that is already processed by the underlying :class:.TypeEngine object, originally from the DBAPI cursor method fetchone() or similar.

The operation could be anything desired to perform custom behavior, such as transforming or serializing data. This could also be used as a hook for validating logic.

:param value: Data to operate upon, of any type expected by this method in the subclass. Can be None. :param dialect: the :class:.Dialect in use.

This operation should be designed to be reversible by the "process_bind_param" method of this class.

Expand source code
def process_result_value(self, value, dialect):
    if value is None:
        return value
    else:
        if not isinstance(value, uuid.UUID):
            value = uuid.UUID(value)
        return value