10
votes

I'm using SQL Alchemy and have some schema's that are account specific. The name of the schema is derived using the account ID, so I don't have the name of the schema until I hit my application service or repository layer. I'm wondering if it's possible to run a query against an entity that has it's schema dynamically set at runtime?

I know I need to set the __table_args__['schema'] and have tried doing that using the type() built-in, but I always get the following error:

could not assemble any primary key columns for mapped table

I'm ready to give up and just write straight sql, but I really hate to do that. Any idea how this can be done? I'm using SA 0.99 and I do have a PK mapped.

Thanks

4
can you provide a working example of what code is resulting in this error and a stack trace?Haleemur Ali
I appreciate the help here, but I'm going to have to move past the idea of a dynamic schema for now and just have one view for the application. I've spent too much time trying to get this working, and although I was successful without using SA, I then had other issues.Corey Coogan

4 Answers

7
votes
4
votes

One option would be to reflect the particular account-dependent tables. Here is the SqlAlchemy Documentation on the matter.

Alternatively, You can create the table with a static schema attribute and update it as needed at runtime and run the queries you need to. I can't think of a non-messy way to do this. So here's the messy option

Use a loop to update the schema property in each table definition whenever the account is switched.

  1. add all the tables that are account-specific to a list.

    • if the tables are expressed in the declarative syntax, then you have to modify the DeclarativeName.__table__.schema attribute. I'm not sure if you need to also modify DeclarativeName.__table_args__['schema'], but I guess it won't hurt.
    • If the tables are expressed in the old style Table syntax, then you have to modify the Table.schema attribute.
  2. If you're using text for any relationships or foreign keys, then that will break, and you have to inspect each table for such hard coded usage and change them
    example

    • user_id = Column(ForeignKey('my_schema.user.id')) needs to be written as user_id = Column(ForeignKey(User.id)). Then you can change the schema of User to my_new_schema. Otherwise, at query time sqlalchemy will be confused because the foreign key will point to my_schema.user.id while the query would point to my_new_schema.user.
  3. I'm not sure if more complicated relationships can be expressed without the use of plain text, so I guess that's the limit to my proposed solution.

Here's an example I wrote up in the terminal:

>>> from sqlalchemy import Column, Table, Integer, String, select, ForeignKey
>>> from sqlalchemy.orm import relationship, backref
>>> from sqlalchemy.ext.declarative import declarative_base
>>> B = declarative_base()
>>> 
>>> class User(B):
...   __tablename__ = 'user'
...   __table_args__ = {'schema': 'first_schema'}
...   id = Column(Integer, primary_key=True)
...   name = Column(String)
...   email = Column(String)
... 
>>> class Posts(B):
...   __tablename__ = 'posts'
...   __table_args__ = {'schema':'first_schema'}
...   id = Column(Integer, primary_key=True)
...   user_id = Column(ForeignKey(User.id))
...   text = Column(String)
... 
>>> str(select([User.id, Posts.text]).select_from(User.__table__.join(Posts)))
'SELECT first_schema."user".id, first_schema.posts.text \nFROM first_schema."user" JOIN first_schema.posts ON first_schema."user".id = first_schema.posts.user_id'
>>> account_specific = [User, Posts]
>>> for Tbl in account_specific:
...   Tbl.__table__.schema = 'second_schema'
... 
>>> str(select([User.id, Posts.text]).select_from(User.__table__.join(Posts)))
'SELECT second_schema."user".id, second_schema.posts.text \nFROM second_schema."user" JOIN second_schema.posts ON second_schema."user".id = second_schema.posts.user_id'

As you see the same query refers to the second_schema after I update the table's schema attribute.

4
votes

edit: Although you can do what I did here, using the schema translation map as shown in the the answer below is the proper way to do it.

They are set statically. Foreign keys needs the same treatment, and I have an additional issue, in that I have multiple schemas that contain multiple tables so I did this:

from sqlalchemy.ext.declarative import declarative_base

staging_dbase = declarative_base()
model_dbase = declarative_base()


def adjust_schemas(staging, model):
    for vv in staging_dbase.metadata.tables.values():
        vv.schema = staging
    for vv in model_dbase.metadata.tables.values():
        vv.schema = model


def all_tables():
    return staging_dbase.metadata.tables.union(model_dbase.metadata.tables)

Then in my startup code:

adjust_schemas(staging=staging_name, model=model_name)

You can mod this for a single declarative base.

1
votes

I'm working on a project in which I have to create postgres schemas and tables dynamically and then insert data in proper schema. Here is something I have done maybe it will help someone:

import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.models.user import User

engine_uri = "postgres://someusername:somepassword@localhost:5432/users"
engine = create_engine(engine_uri, pool_pre_ping=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def create_schema(schema_name: str):
    """
        Creates a new postgres schema
        - **schema_name**: name of the new schema to create
    """
    if not engine.dialect.has_schema(engine, schema_name):
        engine.execute(sqlalchemy.schema.CreateSchema(schema_name))


def create_tables(schema_name: str):
    """
        Create new tables for postgres schema
        - **schema_name**: schema in which tables are to be created
    """
    if (
        engine.dialect.has_schema(engine, schema_name) and
        not engine.dialect.has_table(engine, str(User.__table__.name))
    ):
        User.__table__.schema = schema_name
        User.__table__.create(engine)


def add_data(schema_name: str):
    """
        Add data to a particular postgres schema
        - **schema_name**: schema in which data is to be added
    """
    if engine.dialect.has_table(engine, str(User.__table__.name)):
        db = SessionLocal()
        db.connection(execution_options={
            "schema_translate_map": {None: schema_name}},
        )
        user = User()
        user.name = "Moin"
        user.salary = 10000
        db.add(user)
        db.commit()