Module scenographer.scenographer

This module takes care of loading the settings, transforming them into an appropriate format if necessary and providing the user-provided settings to the rest of the application.

Expand source code
"""
This module takes care of loading the settings, transforming them
into an appropriate format if necessary and providing the
user-provided settings to the rest of the application.
"""

import json
import operator
import os
import sys
from functools import lru_cache
from pathlib import Path
from tempfile import mkdtemp
from typing import Any, Iterable, List, Mapping, NamedTuple, Optional

import sqlalchemy
from dict_digger import dig
from loguru import logger
from pyrsistent import freeze, thaw
from sqlalchemy.schema import Table

from scenographer.database import Database
from scenographer.relations import Relation, RelationDAG
from scenographer.sample import TableSample


class Settings(NamedTuple):
    SOURCE_DATABASE_URL: str
    TARGET_DATABASE_URL: str
    QUERY_MODIFIERS: dict = freeze({})
    IGNORE_TABLES: List[str] = freeze([])
    EXTEND_RELATIONS: List[Mapping[str, str]] = freeze([])
    IGNORE_RELATIONS: List[Mapping[str, str]] = freeze([])
    OUTPUT_DIRECTORY: Optional[str] = None

    @classmethod
    def load(cls, path):
        import commentjson

        with open(path) as f:
            data = commentjson.load(f)

        return cls(**freeze(data))

    @classmethod
    @lru_cache()
    def empty(cls):
        return cls(
            SOURCE_DATABASE_URL="",
            TARGET_DATABASE_URL="",
            IGNORE_TABLES=freeze(["example1", "migrations"]),
            EXTEND_RELATIONS=freeze(
                [{"pk": "product.id", "fk": "product_ownership.product_id"}]
            ),
            IGNORE_RELATIONS=freeze(
                [{"pk": "product.id", "fk": "client.favorite_product_id"}]
            ),
            QUERY_MODIFIERS=freeze(
                {
                    "_default": {"conditions": [], "limit": 300},
                    "users": {"conditions": ["email ilike '%@example.com'"]},
                }
            ),
        )

    @property
    @lru_cache()
    def json(self):
        return json.dumps(
            {k: thaw(v) for k, v in self._asdict().items()},
            indent=4,
            sort_keys=True,
        )


class Scenographer(NamedTuple):
    options: Settings

    @property
    @lru_cache()
    def source_database(self) -> Database:
        """
        SOURCE_DATABASE_URL key.
        Abort if any any issue arises!
        """
        return self._instanciate_database("SOURCE_DATABASE_URL")

    @property
    def target_database(self) -> Database:
        """
        TARGET_DATABASE_URL key.
        Abort if any any issue arises!
        """
        return self._instanciate_database("TARGET_DATABASE_URL")

    @property
    @lru_cache()
    def output_directory(self) -> Path:
        """
        SOURCE_DATABASE_URL key.
        Abort if any any issue arises!
        """
        if not self.options.OUTPUT_DIRECTORY:
            directory = mkdtemp(prefix="sample-")
            logger.info(
                "Output directory unspecified. "
                "Saving samples in temporary directory {}",
                directory,
            )
        else:
            directory = self.options.OUTPUT_DIRECTORY
            try:
                os.mkdir(directory)
            except FileExistsError:
                logger.error(
                    "Error creating output directory {}. "
                    "Make sure there's nothing there. "
                    "Aborting.",
                    directory,
                )
                sys.exit(1)

        return Path(directory)

    @property
    @lru_cache()
    def extend_relations(self) -> Iterable[Relation]:
        """
        EXTEND_RELATIONS key.
        Scenographer will recognize the relations specified here,
        in addition to those specified by database constraints.
        This method will return the equivalent Relation instances
        """
        relations = []
        for relation in self.options.EXTEND_RELATIONS:
            try:
                relations.append(
                    Relation(
                        pk=self.column_by_name[relation["pk"]],
                        fk=self.column_by_name[relation["fk"]],
                    )
                )
            except KeyError:
                logger.warning("Can't match relation {}. Skipping.", dict(relation))
                continue

        return relations

    @property
    @lru_cache()
    def ignore_relations(self) -> Iterable[Relation]:
        """
        IGNORE_RELATIONS key.
        Scenographer will not recognize the relations specified here.
        This method will return the equivalent Relation instances
        """
        relations = []
        for relation in self.options.IGNORE_RELATIONS:
            try:
                relations.append(
                    Relation(
                        pk=self.column_by_name[relation["pk"]],
                        fk=self.column_by_name[relation["fk"]],
                    )
                )
            except KeyError:
                logger.warning("Can't match relation {}. Skipping.", relation)
                continue

        return relations

    @property
    @lru_cache()
    def ignore_tables(self) -> Iterable[Table]:
        """
        IGNORE_TABLES key.
        Scenographer will not sample any data coming from these tables.
        This method will return the equivalent Table instances
        (binded to source_database)
        """
        ignored_tables = []
        for table_name in self.options.IGNORE_TABLES:
            if table_name in self.source_database.tables.__dict__:
                ignored_tables.append(getattr(self.source_database.tables, table_name))
            else:
                logger.warning("Can't find table {}. Skipping.", table_name)
        return ignored_tables

    @property
    @lru_cache()
    def query_modifiers(self) -> Mapping[Table, Mapping[str, Any]]:
        """
        QUERY_MODIFIERS key.
        Scenographer will take this key in account while sampling.
        This method will return the equivalent Table instances
        (binded to source_database)
        """
        mods = self.options.QUERY_MODIFIERS
        default_limit = dig(thaw(mods), "_default", "limit") or 30
        default_conditions = dig(thaw(mods), "_default", "conditions") or list()

        non_specified_entrypoints = operator.sub(
            set([t.name for t in self.relation_dag.entrypoints]),
            set(mods.keys()),
        )
        if non_specified_entrypoints:
            logger.warning(
                "Entrypoints are advised to be added as query modifiers. "
                "They define what the final sample will look like"
            )
            logger.warning(
                "These entrypoints are not specified: {}",
                non_specified_entrypoints,
            )

        modifiers = {}
        for table in self.relation_dag.tables:
            if table.name not in mods:
                limit = default_limit
                conditions = default_conditions

            else:
                limit, conditions = (
                    mods[table.name].get("limit"),
                    mods[table.name].get("conditions"),
                )
                if not limit and not conditions:
                    logger.warning("QUERY_MODIFIER for {} malformed.", table.name)
                    continue

                limit = limit or default_limit
                conditions = conditions or default_conditions

            modifiers[table] = {"limit": limit, "conditions": conditions}

        return modifiers

    @property
    @lru_cache()
    def relation_dag(self) -> RelationDAG:
        logger.info("Building the database graph")
        return RelationDAG.load(
            self.source_database,
            extend_relations=self.extend_relations,
            ignore_relations=self.ignore_relations,
            ignore_tables=self.ignore_tables,
        )

    @property
    def samples(self) -> Iterable[Table]:
        logger.info("Starting sampling")
        return TableSample.sample_dag(
            self.source_database,
            self.relation_dag,
            self.query_modifiers,
            self.output_directory,
        )

    @property
    @lru_cache()
    def column_by_name(self):
        return {
            f"{column.table.name}.{column.name}": column
            for columns in [
                table.columns for table in self.source_database.tables.__dict__.values()
            ]
            for column in columns
        }

    def copy_schema(self):
        logger.info("Copying schema")
        self.target_database.load_schema(self.source_database)
        return self

    def copy_sample(self):
        logger.info("Loading sample into target")
        self.target_database.load_samples(self.output_directory, self.samples)
        return self

    def _instanciate_database(self, var):
        """
        Helper method to instanciate a database method
        Scenographer will read a postgres url from the config
        and try to connect with that database.
        Abort if any any issue arises!
        """
        try:
            database_url = self.options._asdict()[var]
            return Database(database_url).test_conn()
        except sqlalchemy.exc.ArgumentError:
            logger.error("Error connecting to {}", var)
            sys.exit(1)

Classes

class Scenographer (options: Settings)

Scenographer(options,)

Expand source code
class Scenographer(NamedTuple):
    options: Settings

    @property
    @lru_cache()
    def source_database(self) -> Database:
        """
        SOURCE_DATABASE_URL key.
        Abort if any any issue arises!
        """
        return self._instanciate_database("SOURCE_DATABASE_URL")

    @property
    def target_database(self) -> Database:
        """
        TARGET_DATABASE_URL key.
        Abort if any any issue arises!
        """
        return self._instanciate_database("TARGET_DATABASE_URL")

    @property
    @lru_cache()
    def output_directory(self) -> Path:
        """
        SOURCE_DATABASE_URL key.
        Abort if any any issue arises!
        """
        if not self.options.OUTPUT_DIRECTORY:
            directory = mkdtemp(prefix="sample-")
            logger.info(
                "Output directory unspecified. "
                "Saving samples in temporary directory {}",
                directory,
            )
        else:
            directory = self.options.OUTPUT_DIRECTORY
            try:
                os.mkdir(directory)
            except FileExistsError:
                logger.error(
                    "Error creating output directory {}. "
                    "Make sure there's nothing there. "
                    "Aborting.",
                    directory,
                )
                sys.exit(1)

        return Path(directory)

    @property
    @lru_cache()
    def extend_relations(self) -> Iterable[Relation]:
        """
        EXTEND_RELATIONS key.
        Scenographer will recognize the relations specified here,
        in addition to those specified by database constraints.
        This method will return the equivalent Relation instances
        """
        relations = []
        for relation in self.options.EXTEND_RELATIONS:
            try:
                relations.append(
                    Relation(
                        pk=self.column_by_name[relation["pk"]],
                        fk=self.column_by_name[relation["fk"]],
                    )
                )
            except KeyError:
                logger.warning("Can't match relation {}. Skipping.", dict(relation))
                continue

        return relations

    @property
    @lru_cache()
    def ignore_relations(self) -> Iterable[Relation]:
        """
        IGNORE_RELATIONS key.
        Scenographer will not recognize the relations specified here.
        This method will return the equivalent Relation instances
        """
        relations = []
        for relation in self.options.IGNORE_RELATIONS:
            try:
                relations.append(
                    Relation(
                        pk=self.column_by_name[relation["pk"]],
                        fk=self.column_by_name[relation["fk"]],
                    )
                )
            except KeyError:
                logger.warning("Can't match relation {}. Skipping.", relation)
                continue

        return relations

    @property
    @lru_cache()
    def ignore_tables(self) -> Iterable[Table]:
        """
        IGNORE_TABLES key.
        Scenographer will not sample any data coming from these tables.
        This method will return the equivalent Table instances
        (binded to source_database)
        """
        ignored_tables = []
        for table_name in self.options.IGNORE_TABLES:
            if table_name in self.source_database.tables.__dict__:
                ignored_tables.append(getattr(self.source_database.tables, table_name))
            else:
                logger.warning("Can't find table {}. Skipping.", table_name)
        return ignored_tables

    @property
    @lru_cache()
    def query_modifiers(self) -> Mapping[Table, Mapping[str, Any]]:
        """
        QUERY_MODIFIERS key.
        Scenographer will take this key in account while sampling.
        This method will return the equivalent Table instances
        (binded to source_database)
        """
        mods = self.options.QUERY_MODIFIERS
        default_limit = dig(thaw(mods), "_default", "limit") or 30
        default_conditions = dig(thaw(mods), "_default", "conditions") or list()

        non_specified_entrypoints = operator.sub(
            set([t.name for t in self.relation_dag.entrypoints]),
            set(mods.keys()),
        )
        if non_specified_entrypoints:
            logger.warning(
                "Entrypoints are advised to be added as query modifiers. "
                "They define what the final sample will look like"
            )
            logger.warning(
                "These entrypoints are not specified: {}",
                non_specified_entrypoints,
            )

        modifiers = {}
        for table in self.relation_dag.tables:
            if table.name not in mods:
                limit = default_limit
                conditions = default_conditions

            else:
                limit, conditions = (
                    mods[table.name].get("limit"),
                    mods[table.name].get("conditions"),
                )
                if not limit and not conditions:
                    logger.warning("QUERY_MODIFIER for {} malformed.", table.name)
                    continue

                limit = limit or default_limit
                conditions = conditions or default_conditions

            modifiers[table] = {"limit": limit, "conditions": conditions}

        return modifiers

    @property
    @lru_cache()
    def relation_dag(self) -> RelationDAG:
        logger.info("Building the database graph")
        return RelationDAG.load(
            self.source_database,
            extend_relations=self.extend_relations,
            ignore_relations=self.ignore_relations,
            ignore_tables=self.ignore_tables,
        )

    @property
    def samples(self) -> Iterable[Table]:
        logger.info("Starting sampling")
        return TableSample.sample_dag(
            self.source_database,
            self.relation_dag,
            self.query_modifiers,
            self.output_directory,
        )

    @property
    @lru_cache()
    def column_by_name(self):
        return {
            f"{column.table.name}.{column.name}": column
            for columns in [
                table.columns for table in self.source_database.tables.__dict__.values()
            ]
            for column in columns
        }

    def copy_schema(self):
        logger.info("Copying schema")
        self.target_database.load_schema(self.source_database)
        return self

    def copy_sample(self):
        logger.info("Loading sample into target")
        self.target_database.load_samples(self.output_directory, self.samples)
        return self

    def _instanciate_database(self, var):
        """
        Helper method to instanciate a database method
        Scenographer will read a postgres url from the config
        and try to connect with that database.
        Abort if any any issue arises!
        """
        try:
            database_url = self.options._asdict()[var]
            return Database(database_url).test_conn()
        except sqlalchemy.exc.ArgumentError:
            logger.error("Error connecting to {}", var)
            sys.exit(1)

Ancestors

  • builtins.tuple

Instance variables

var column_by_name
Expand source code
@property
@lru_cache()
def column_by_name(self):
    return {
        f"{column.table.name}.{column.name}": column
        for columns in [
            table.columns for table in self.source_database.tables.__dict__.values()
        ]
        for column in columns
    }
var extend_relations : Iterable[Relation]

EXTEND_RELATIONS key. Scenographer will recognize the relations specified here, in addition to those specified by database constraints. This method will return the equivalent Relation instances

Expand source code
@property
@lru_cache()
def extend_relations(self) -> Iterable[Relation]:
    """
    EXTEND_RELATIONS key.
    Scenographer will recognize the relations specified here,
    in addition to those specified by database constraints.
    This method will return the equivalent Relation instances
    """
    relations = []
    for relation in self.options.EXTEND_RELATIONS:
        try:
            relations.append(
                Relation(
                    pk=self.column_by_name[relation["pk"]],
                    fk=self.column_by_name[relation["fk"]],
                )
            )
        except KeyError:
            logger.warning("Can't match relation {}. Skipping.", dict(relation))
            continue

    return relations
var ignore_relations : Iterable[Relation]

IGNORE_RELATIONS key. Scenographer will not recognize the relations specified here. This method will return the equivalent Relation instances

Expand source code
@property
@lru_cache()
def ignore_relations(self) -> Iterable[Relation]:
    """
    IGNORE_RELATIONS key.
    Scenographer will not recognize the relations specified here.
    This method will return the equivalent Relation instances
    """
    relations = []
    for relation in self.options.IGNORE_RELATIONS:
        try:
            relations.append(
                Relation(
                    pk=self.column_by_name[relation["pk"]],
                    fk=self.column_by_name[relation["fk"]],
                )
            )
        except KeyError:
            logger.warning("Can't match relation {}. Skipping.", relation)
            continue

    return relations
var ignore_tables : Iterable[sqlalchemy.sql.schema.Table]

IGNORE_TABLES key. Scenographer will not sample any data coming from these tables. This method will return the equivalent Table instances (binded to source_database)

Expand source code
@property
@lru_cache()
def ignore_tables(self) -> Iterable[Table]:
    """
    IGNORE_TABLES key.
    Scenographer will not sample any data coming from these tables.
    This method will return the equivalent Table instances
    (binded to source_database)
    """
    ignored_tables = []
    for table_name in self.options.IGNORE_TABLES:
        if table_name in self.source_database.tables.__dict__:
            ignored_tables.append(getattr(self.source_database.tables, table_name))
        else:
            logger.warning("Can't find table {}. Skipping.", table_name)
    return ignored_tables
var optionsSettings

Alias for field number 0

var output_directory : pathlib.Path

SOURCE_DATABASE_URL key. Abort if any any issue arises!

Expand source code
@property
@lru_cache()
def output_directory(self) -> Path:
    """
    SOURCE_DATABASE_URL key.
    Abort if any any issue arises!
    """
    if not self.options.OUTPUT_DIRECTORY:
        directory = mkdtemp(prefix="sample-")
        logger.info(
            "Output directory unspecified. "
            "Saving samples in temporary directory {}",
            directory,
        )
    else:
        directory = self.options.OUTPUT_DIRECTORY
        try:
            os.mkdir(directory)
        except FileExistsError:
            logger.error(
                "Error creating output directory {}. "
                "Make sure there's nothing there. "
                "Aborting.",
                directory,
            )
            sys.exit(1)

    return Path(directory)
var query_modifiers : Mapping[sqlalchemy.sql.schema.Table, Mapping[str, Any]]

QUERY_MODIFIERS key. Scenographer will take this key in account while sampling. This method will return the equivalent Table instances (binded to source_database)

Expand source code
@property
@lru_cache()
def query_modifiers(self) -> Mapping[Table, Mapping[str, Any]]:
    """
    QUERY_MODIFIERS key.
    Scenographer will take this key in account while sampling.
    This method will return the equivalent Table instances
    (binded to source_database)
    """
    mods = self.options.QUERY_MODIFIERS
    default_limit = dig(thaw(mods), "_default", "limit") or 30
    default_conditions = dig(thaw(mods), "_default", "conditions") or list()

    non_specified_entrypoints = operator.sub(
        set([t.name for t in self.relation_dag.entrypoints]),
        set(mods.keys()),
    )
    if non_specified_entrypoints:
        logger.warning(
            "Entrypoints are advised to be added as query modifiers. "
            "They define what the final sample will look like"
        )
        logger.warning(
            "These entrypoints are not specified: {}",
            non_specified_entrypoints,
        )

    modifiers = {}
    for table in self.relation_dag.tables:
        if table.name not in mods:
            limit = default_limit
            conditions = default_conditions

        else:
            limit, conditions = (
                mods[table.name].get("limit"),
                mods[table.name].get("conditions"),
            )
            if not limit and not conditions:
                logger.warning("QUERY_MODIFIER for {} malformed.", table.name)
                continue

            limit = limit or default_limit
            conditions = conditions or default_conditions

        modifiers[table] = {"limit": limit, "conditions": conditions}

    return modifiers
var relation_dagRelationDAG
Expand source code
@property
@lru_cache()
def relation_dag(self) -> RelationDAG:
    logger.info("Building the database graph")
    return RelationDAG.load(
        self.source_database,
        extend_relations=self.extend_relations,
        ignore_relations=self.ignore_relations,
        ignore_tables=self.ignore_tables,
    )
var samples : Iterable[sqlalchemy.sql.schema.Table]
Expand source code
@property
def samples(self) -> Iterable[Table]:
    logger.info("Starting sampling")
    return TableSample.sample_dag(
        self.source_database,
        self.relation_dag,
        self.query_modifiers,
        self.output_directory,
    )
var source_databaseDatabase

SOURCE_DATABASE_URL key. Abort if any any issue arises!

Expand source code
@property
@lru_cache()
def source_database(self) -> Database:
    """
    SOURCE_DATABASE_URL key.
    Abort if any any issue arises!
    """
    return self._instanciate_database("SOURCE_DATABASE_URL")
var target_databaseDatabase

TARGET_DATABASE_URL key. Abort if any any issue arises!

Expand source code
@property
def target_database(self) -> Database:
    """
    TARGET_DATABASE_URL key.
    Abort if any any issue arises!
    """
    return self._instanciate_database("TARGET_DATABASE_URL")

Methods

def copy_sample(self)
Expand source code
def copy_sample(self):
    logger.info("Loading sample into target")
    self.target_database.load_samples(self.output_directory, self.samples)
    return self
def copy_schema(self)
Expand source code
def copy_schema(self):
    logger.info("Copying schema")
    self.target_database.load_schema(self.source_database)
    return self
class Settings (SOURCE_DATABASE_URL: str, TARGET_DATABASE_URL: str, QUERY_MODIFIERS: dict = pmap({}), IGNORE_TABLES: List[str] = pvector([]), EXTEND_RELATIONS: List[Mapping[str, str]] = pvector([]), IGNORE_RELATIONS: List[Mapping[str, str]] = pvector([]), OUTPUT_DIRECTORY: Union[str, NoneType] = None)

Settings(SOURCE_DATABASE_URL, TARGET_DATABASE_URL, QUERY_MODIFIERS, IGNORE_TABLES, EXTEND_RELATIONS, IGNORE_RELATIONS, OUTPUT_DIRECTORY)

Expand source code
class Settings(NamedTuple):
    SOURCE_DATABASE_URL: str
    TARGET_DATABASE_URL: str
    QUERY_MODIFIERS: dict = freeze({})
    IGNORE_TABLES: List[str] = freeze([])
    EXTEND_RELATIONS: List[Mapping[str, str]] = freeze([])
    IGNORE_RELATIONS: List[Mapping[str, str]] = freeze([])
    OUTPUT_DIRECTORY: Optional[str] = None

    @classmethod
    def load(cls, path):
        import commentjson

        with open(path) as f:
            data = commentjson.load(f)

        return cls(**freeze(data))

    @classmethod
    @lru_cache()
    def empty(cls):
        return cls(
            SOURCE_DATABASE_URL="",
            TARGET_DATABASE_URL="",
            IGNORE_TABLES=freeze(["example1", "migrations"]),
            EXTEND_RELATIONS=freeze(
                [{"pk": "product.id", "fk": "product_ownership.product_id"}]
            ),
            IGNORE_RELATIONS=freeze(
                [{"pk": "product.id", "fk": "client.favorite_product_id"}]
            ),
            QUERY_MODIFIERS=freeze(
                {
                    "_default": {"conditions": [], "limit": 300},
                    "users": {"conditions": ["email ilike '%@example.com'"]},
                }
            ),
        )

    @property
    @lru_cache()
    def json(self):
        return json.dumps(
            {k: thaw(v) for k, v in self._asdict().items()},
            indent=4,
            sort_keys=True,
        )

Ancestors

  • builtins.tuple

Static methods

def empty(cls)
Expand source code
@classmethod
@lru_cache()
def empty(cls):
    return cls(
        SOURCE_DATABASE_URL="",
        TARGET_DATABASE_URL="",
        IGNORE_TABLES=freeze(["example1", "migrations"]),
        EXTEND_RELATIONS=freeze(
            [{"pk": "product.id", "fk": "product_ownership.product_id"}]
        ),
        IGNORE_RELATIONS=freeze(
            [{"pk": "product.id", "fk": "client.favorite_product_id"}]
        ),
        QUERY_MODIFIERS=freeze(
            {
                "_default": {"conditions": [], "limit": 300},
                "users": {"conditions": ["email ilike '%@example.com'"]},
            }
        ),
    )
def load(path)
Expand source code
@classmethod
def load(cls, path):
    import commentjson

    with open(path) as f:
        data = commentjson.load(f)

    return cls(**freeze(data))

Instance variables

var EXTEND_RELATIONS : List[Mapping[str, str]]

Alias for field number 4

var IGNORE_RELATIONS : List[Mapping[str, str]]

Alias for field number 5

var IGNORE_TABLES : List[str]

Alias for field number 3

var OUTPUT_DIRECTORY : Union[str, NoneType]

Alias for field number 6

var QUERY_MODIFIERS : dict

Alias for field number 2

var SOURCE_DATABASE_URL : str

Alias for field number 0

var TARGET_DATABASE_URL : str

Alias for field number 1

var json
Expand source code
@property
@lru_cache()
def json(self):
    return json.dumps(
        {k: thaw(v) for k, v in self._asdict().items()},
        indent=4,
        sort_keys=True,
    )