查看版本
>>> import sqlalchemy >>> sqlalchemy.__version__ '1.0.9'
创建连接
from sqlclachemy import create_engine engine = create_engine("sqlite:///:memory:", echo=True)
‘sqlite:///:memory:’ 是 database URL
postgresql
sqlalchemy 默认使用 psycopg2
# 默认情况(即使用psycopg2) engine = create_engine('postgresql://scott:tiger@localhost/mydatabase') # 使用psycopg2 engine = create_engine('postgresql+psycopg2://scott:tiger@localhost/mydatabase') # 使用pg8000 engine = create_engine('postgresql+pg8000://scott:tiger@localhost/mydatabase')
MySQL
默认使用mysql-python
# 默认情况(即使用mysql-python) engine = create_engine('mysql://scott:tiger@localhost/foo') # 使用mysql-python engine = create_engine('mysql+mysqldb://scott:tiger@localhost/foo') # 使用MySQL-connector-python engine = create_engine('mysql+mysqlconnector://scott:tiger@localhost/foo') # 使用OurSQL engine = create_engine('mysql+oursql://scott:tiger@localhost/foo')
Oracle
默认使用cx_oracle
# 默认情况(即使用cx_oracle)
engine = create_engine('oracle://scott:tiger@127.0.0.1:1521/sidname') # 使用cx_oracle engine = create_engine('oracle+cx_oracle://scott:tiger@tnsname')
Microsoft SQL Server
默认使用pyodbc
# 使用pyodbc engine = create_engine('mssql+pyodbc://scott:tiger@mydsn') # 使用pymssql engine = create_engine('mssql+pymssql://scott:tiger@hostname:port/dbname')
SQLite
因为sqlite是基于文件的数据库,所以database URL 和前面的不太一样。
# database URL 形式是 sqlite://<nohostname>/<path> engine = create_engine('sqlite:///foo.db') # 在Unix/Mac engine = create_engine('sqlite:////absolute/path/to/foo.db') # 在Windows engine = create_engine('sqlite:///C:\\path\\to\\foo.db') # 在Windows 中使用原始字符串 engine = create_engine(r'sqlite:///C:\path\to\foo.db') # 使用内存 engine = create_engine('sqlite://') engine = create_engine('sqlite:///:memory:')
创建表
使用ORM,需要首先描述表,然后把类映射到表上。在SQLAclhemy中这两步通常在一起,使用叫做Declarative的系统,可以让我们创建一个包含描述我们需要创建表的类
创建declarative base class
需要先创建基类
from sqlalchemy.ext.declarative import declarative_base Base = declarative_base()
创建类
比如创建User类,User类需要继承前面创建的Base。
from sqlalchemy import Column, Integer, String class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String) fullname = Column(String) password = Column(String) def __repr__(self): return "<User(name='%s', fullname='%s', password='%s')>" % ( self.name, self.fullname, self.password)
需要使用tablename 指名表名。Column 表明字段。
当声名ORM的类时,Declarative系统会使用python的元类,创建一个包含表信息的Table类(可以通过User.table_ 查看),然后通过Mapper把这个Table与ORM的类联系起来。所有ORM类的Table类都包含在一个MetaData类中,通过Base.metadata 可以查看MetaData类
对于SQLAlchemy ORM来说Declarative 系统不是必须的,比如:
from sqlalchemy import Table, MetaData, Column, Integer, String, ForeignKey from sqlalchemy.orm import mapper metadata = MetaData() user = Table('user', metadata, Column('id', Integer, primary_key=True), Column('name', String(50)), Column('fullname', String(50)), Column('password', String(12)) ) class User(object): def __init__(self, name, fullname, password): self.name = name self.fullname = fullname self.password = password mapper(User, user)
创建User表
Base.metadata.create_all(engine)
基本的CURD操作
需要创建一个session来对表进行操作
创建一个session
from sqlalchemy.orm import sessionmaker Session = sessionmaker(bind=engine) Session = sessionmaker() '''or Session = sessionmaker() Session.configure(bind=engine) ''' session = Session()
插入数据
ed_user = User(name='ed', fullname='Ed Jones', password='edspassword') session.add(ed_user)
一次添加多行
session.add_all([
User(name='wendy', fullname='Wendy Williams', password='foobar'), User(name='mary', fullname='Mary Contrary', password='xxg527'), User(name='fred', fullname='Fred Flinstone', password='blah')])
此时数据是没有插入到数据库中的,ed_user.id 是为None的。如果想真正插入数据库中需要commit。
session.commit()
如果想要操作只是在内存中,不真正commit,ed_user.id 不为None,可以使用flush操作,它只是写到内存中。
session.flush()
回滚操作
当没有commit之前,可以回滚
session.rollback()
读取操作,Querying
执行query操作之前会自动执行flush。
query例子:
for instance in session.query(User).order_by(User.id): print(instance.name, instance.fullname) # 可以只取其中的某些字段 for name, fullname in session.query(User.name, User.fullname): print(name, fullname) for row in session.query(User, User.name).all(): print(row.User, row.name) for row in session.query(User.name.label('name_label')).all(): print(row.name_label)
可以使用alias对表取别名,可以在表join本身时用。
from sqlalchemy.orm import aliased user_alias = aliased(User, name='user_alias') for row in session.query(user_alias, user_alias.name).all(): print(row.user_alias)
使用filter_by筛选
for name, in sesson.query(User.name).filter_by(fullname='Ed Jones'): print(name)
也可以使用filter,它更加的灵活
for name, in sesson.query(User.name).filter(User.fullname=='Ed Jones'): print(name)
#相等 query.filter(User.name == 'ed') #不相等 query.filter(User.name != 'ed') #like query.filter(User.name.like('%ed%')) #in query.filter(User.name.in_(['ed', 'wendy', 'jack'])) query.filter(User.name.in_( session.query(User.name).filter(User.name.like('%ed%')) )) #not in query.filter(~User.name.in_(['ed', 'wendy', 'jack'])) #IS NULL query.filter(User.name == None) query.filter(User.name.is_(None)) #IS NOT NUKK query.filter(User.name != None) query.filter(User.name.isnot(None)) #And from sqlalchemy import and_ query.filter(and_(User.name == 'ed', User.fullname == 'Ed Jones')) query.filter(User.name == 'ed', User.fullname == 'Ed Jones') query.filter(User.name == 'ed').filter(User.fullname == 'Ed Jones') #OR from sqlalchemy import or_ query.filter(or_(User.name == 'ed', User.name == 'wendy')) #Match query.filter(User.name.match('wendy'))
返回列表
all() 操作,一次取出所有,返回一个列表。
query = session.query(User).filter(Usre.name.like('%ed%')).all()
first() 取出第一个,如果没有就返回None
obj = query.first()
one() 取出一个,如果筛选的结果不只一个,报MultipleResultsFound错,如果没有报NoResultFound 错
one_or_none() 取出一个,如果筛选的结果不只一个,报MultipleResultsFound错,如果没有返回None
scalar() 跟one_or_none() 一样
使用原始的sql
query = session.query(User).filter('id<200').order_by(text('id')).all()
使用参数
query = session.query.(User).filter(text('id<:value and name=:name')).params(value=224, name='fred').order_by(User.id).one()
使用完全基于string
query = session.query(User).from_statement(
text('SELECT * FROM users where name=:name').params(name='ed').all() ).all()
count
count = session.query(User).filter(User.name.like('%ed%')).count()
from sqlalchemy import func
query = session.query(func.count(User.name), User.name).group_by(User.name).all()
session.query(func.count('*')).select_from(User).scalar()
session.query(func.count(User.id)).scalar()
更新
user.name = 'haha' session.commit()
删除
session.delete(user)