安装
sudo pip install sqlalchemy 或sudo pip3 install sqlalchemy
下载速度慢可使用国内源,如:
sudo pip install sqlalchemy -i http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com
测试1
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
eng = create_engine('mysql+mysqlconnector://y-user:y-passwd@127.0.0.1:3306/y-db?charset=utf8')
#eng = create_engine('postgresql+psycopg2://y-user:y-passwd@127.0.0.1:5432/y-db',echo=True,client_encoding='utf8')
DB_Session = sessionmaker(bind=eng)
session = DB_Session()
data = session.execute("SELECT * FROM y-table")
for row in data:
for col in row:
print col,
print
session.close()
测试2
#coding=utf-8
from sqlalchemy import *
from sqlalchemy.orm import *
eng = create_engine('mysql+mysqlconnector://y-user:y-passwd@127.0.0.1:3306/y-db?charset=utf8')
metadata = MetaData(eng) # 绑定元信息
y_table = Table('y-table', metadata, autoload=True)
session = create_session()
query = session.query(y_table)
data = query.all()
for row in data:
for col in row:
print col,
print
session.close()
测试3–orm方式
数据库表STUDENT的建立参考:
http://my.oschina.net/u/2245781/blog/653897?fromerr=YEVmVLAx
# coding=utf-8
from sqlalchemy import Column, String, create_engine
from sqlalchemy.dialects.mysql import DATETIME
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
import sys
reload(sys)
sys.setdefaultencoding( "utf-8" )
# 创建对象的基类:
Base = declarative_base()
# 定义User对象:
class Stu(Base):
# 表的名字:
__tablename__ = 'STUDENT'
# 表的结构:
SNO = Column(String(7), primary_key=True)
SNAME = Column(String(8))
SEX = Column(String(2))
BDATE = Column(DATETIME)
DIR = Column(String(16))
def __repr__(self):
return "<Stu(SNO='%s', SNAME='%s', SEX='%s',BDATE ='%s',DIR ='%s')>" % \
(self.SNO,self.SNAME,self.SEX,self.BDATE.strftime("%Y-%m-%d"),self.DIR)
# 初始化数据库连接:
engine = create_engine('mysql+mysqlconnector://y-user:y-passwd@127.0.0.1:3306/y-db?charset=utf8')
# 创建DBSession类型:
DBSession = sessionmaker(bind=engine)
# 创建session对象:
session = DBSession()
query = session.query(Stu)
for s in session.query(Stu):
print(s)
#for s in query.all():
# print s.SNAME+' '+s.SNO+' '+s.SEX +' '+s.BDATE.strftime("%Y-%m-%d")+' ' + s.DIR
session.close()
插入
# coding=utf-8
from sqlalchemy import Column, String, create_engine
from sqlalchemy.dialects.mysql import DATETIME
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
# 创建对象的基类:
Base = declarative_base()
# 定义User对象:
class Stu(Base):
# 表的名字:
__tablename__ = 'STUDENT'
# 表的结构:
SNO = Column(String(7), primary_key=True)
SNAME = Column(String(8))
SEX = Column(String(2))
BDATE = Column(DATETIME)
DIR = Column(String(16))
def __repr__(self):
return "<Stu(SNO='%s', SNAME='%s', SEX='%s',BDATE ='%s',DIR ='%s')>" % \
(self.SNO,self.SNAME,self.SEX,self.BDATE.strftime("%Y-%m-%d"),self.DIR)
# 初始化数据库连接:
engine = create_engine('mysql+mysqlconnector://y-user:y-passwd@127.0.0.1:3306/y-db?charset=utf8')
# 创建DBSession类型:
DBSession = sessionmaker(bind=engine)
# 创建session对象:
session = DBSession()
new_stu = Stu(SNO='9302205', SNAME='方邪真',SEX='男',BDATE='1973-11-18',DIR='数理逻辑')
# delete from STUDENT where SNO ='9302205';
# 添加到session:
session.add(new_stu)
# 提交即保存到数据库:
session.commit()
# 关闭session:
session.close()
根据sqlalchemy文档http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html
当数据库为postgresql时DATETIME改为DATE类型,关于查询的例子如下:
# coding=utf-8
from sqlalchemy import Column, String, create_engine
from sqlalchemy.dialects.postgres import DATE
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from datetime import *
import sys
reload(sys)
sys.setdefaultencoding("utf-8")
# 创建对象的基类:
Base = declarative_base()
# 定义Stu对象:
class Stu(Base):
# 表的名字:
__tablename__ = 'student'
# 表的结构:
sno = Column(String(7), primary_key=True)
sname = Column(String(8))
sex = Column(String(2))
bdate = Column(DATE)
dir = Column(String(16))
def __repr__(self):
return "<Stu(SNO='%s', SNAME='%s', SEX='%s',BDATE ='%s',DIR ='%s')>" % \
(self.sno,self.sname,self.sex,self.bdate.strftime("%Y-%m-%d"),self.dir)
# 初始化数据库连接:
engine = create_engine('postgresql+psycopg2://user:passwd@127.0.0.1:5432/db',echo=True,client_encoding='utf8')
# 创建DBSession类型:
DBSession = sessionmaker(bind=engine)
# 创建session对象:
session = DBSession()
query = session.query(Stu)
#for s in query.all():
# print s.sname+' '+s.sno+' '+s.sex +' '+s.bdate.strftime("%Y-%m-%d")+' ' + s.dir
for s in session.query(Stu):
print(s)
session.close()
根据官方文档中的例子:
表创建crtusers.py
# coding=utf-8
from sqlalchemy import ForeignKey
from sqlalchemy import create_engine
from sqlalchemy import Column, Integer,String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
engine = create_engine('mysql+mysqlconnector://y-user:y-passwd@127.0.0.1:3306/y-db?charset=utf8')
Base = declarative_base()
#建立表users
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(20))
fullname = Column(String(40))
password = Column(String(12))
def __init__(self, name, fullname, password):
self.name = name
self.fullname = fullname
self.password = password
def __repr__(self):
return "<User(name='%s', fullname='%s', password='%s')>" % (self.name, self.fullname, self.password)
Base.metadata.create_all(engine)
#建立表addresses
class Address(Base):
__tablename__ = 'addresses'
id = Column(Integer, primary_key=True)
email_address = Column(String(40), nullable=False)
user_id = Column(Integer, ForeignKey('users.id'))
user = relationship("User", back_populates="addresses")
def __repr__(self):
return "<Address(email_address='%s')>" % self.email_address
User.addresses = relationship("Address", order_by=Address.id, back_populates="user")
Base.metadata.create_all(engine)
#插入一条user记录
DBSession = sessionmaker(bind=engine)
session = DBSession()
ed_user = User(name='ed', fullname='Ed Jones', password='edspassword')
session.add(ed_user)
#插入多条记录
session.add_all([User(name='wendy', fullname='Wendy Williams', password='foobar'),\
User(name='mary', fullname='Mary Contrary', password='xxg527'),\
User(name='lisa', fullname='lisa Contrary', password='ls123'),\
User(name='cred', fullname='cred Flinstone', password='bla123'),\
User(name='fred', fullname='Fred Flinstone', password='blah')])
#插入1条user记录,两条address
jack = User(name='jack', fullname='Jack Bean', password='gjffdd')
jack.addresses = [Address(email_address='jack@google.com'),Address(email_address='j25@yahoo.com')]
session.add(jack)
# 提交即保存到数据库:
session.commit()
# 关闭session:
session.close()
查询
# coding=utf-8
from operator import or_
from sqlalchemy import text
from sqlalchemy import create_engine
from sqlalchemy import Column, Integer,String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import func
from crtusers import Address, User
engine = create_engine('mysql+mysqlconnector://y-user:y-passwd@127.0.0.1:3306/y-db?charset=utf8')
DBSession = sessionmaker(bind=engine)
session = DBSession()
#query sample
query = session.query(User)
for u in query.all():
print str(u.id)+' '+u.name+' '+u.fullname+' '+u.password
for u in session.query(User).order_by(User.id)[1:4]:
print(u)
for name in session.query(User.name).filter_by(fullname='Ed Jones'):
print(name)
for u in session.query(User).filter(User.name.isnot(None)):
print(u)
for u in session.query(User).filter(or_(User.name == 'ed', User.name == 'wendy')):
print(u)
for u in session.query(User).filter(~User.name.like('%ed')).order_by(User.id):
print(u)
for u in session.query(User).filter(User.name=='ed').filter(User.fullname == 'Ed Jones'):
print(u)
for u in session.query(User).filter(User.name.in_(['ed', 'wendy', 'jack'])):
print(u)
#Using Textual SQL
for user in session.query(User).filter(text("id<224")).order_by(text("id")).all():
print(user.name)
for u in session.query(User).from_statement\
(text("SELECT * FROM users where id >= :id")).\
params(id=2).all():
print(u)
for u in session.query(User).from_statement \
(text("SELECT * FROM users where name=:name")). \
params(name='ed').all():
print(u)
#Counting on count()
print(session.query(func.count('*')).select_from(User).scalar())
print(session.query(User).filter(User.name.like('%ed')).count())
for n in session.query(func.count(User.name), User.name).group_by(User.name).all():
print(n)
#Querying Address
for u, a in session.query(User, Address).filter(User.id == Address.user_id).\
filter(Address.email_address == 'jack@google.com').all():
print(u)
print(a)
for u in session.query(User).join(Address).filter(Address.email_address=='jack@google.com').all():
print(u)
for name, in session.query(User.name).filter(User.addresses.any(Address.email_address.like('%google%'))):
print(name)
# 关闭session:
session.close()
安装psycopg2要指定路径,故用源码编译方式
psycopg2-2.6.2$ python setup.py build_ext –pg-config /opt/PostgreSQL/9.5/bin/pg_config build
psycopg2-2.6.2$ sudo python setup.py build_ext –pg-config /opt/PostgreSQL/9.5/bin/pg_config install
新版地址
wget http://initd.org/psycopg/tarballs/PSYCOPG-2-7/psycopg2-2.7.1.tar.gz
例子:根据psycopg2-2.6.2/examples/simple.py(只修改DSN以适合你的数据库环境)
# simple.py - very simple example of plain DBAPI-2.0 usage
#
# currently used as test-me-stress-me script for psycopg 2.0
#
# Copyright (C) 2001-2010 Federico Di Gregorio <fog@debian.org>
#
# psycopg2 is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
## put in DSN your DSN string
DSN = 'dbname=your-db user=your-user password=your-passwd host=127.0.0.1'
## don't modify anything below this line (except for experimenting)
class SimpleQuoter(object):
def sqlquote(x=None):
return "'bar'"
import sys
import psycopg2
if len(sys.argv) > 1:
DSN = sys.argv[1]
print "Opening connection using dsn:", DSN
conn = psycopg2.connect(DSN)
print "Encoding for this connection is", conn.encoding
curs = conn.cursor()
curs.execute("SELECT 1 AS foo")
print curs.fetchone()
curs.execute("SELECT 1 AS foo")
print curs.fetchmany()
curs.execute("SELECT 1 AS foo")
print curs.fetchall()
conn.rollback()
sys.exit(0)
curs.execute("SELECT 1 AS foo", async=1)
curs.execute("SELECT %(foo)s AS foo", {'foo':'bar'})
curs.execute("SELECT %(foo)s AS foo", {'foo':None})
curs.execute("SELECT %(foo)f AS foo", {'foo':42})
curs.execute("SELECT %(foo)s AS foo", {'foo':SimpleQuoter()})
PyGreSQL安装
修改setup.py
f = os.popen('pg_config --%s' % s)
改为:
f = os.popen('/opt/PostgreSQL/10/bin/pg_config --%s' % s)
再安装:
sudo python3 setup.py install
测试代码PyGreSQLtest1.py:
#coding=utf-8
from pg import DB
db = DB(dbname='testdb', host='localhost', port=5432,user='mymotif', passwd='wxwpxh');
print(db.query("select * from userinfo"));
测试2:
# -*- coding: utf-8 -*-
#导入pg模块
import pg
def operate_postgre_tbl_product():
#连接数据库
try:
pgdb_conn = pg.connect(dbname = "testdb", host = "127.0.0.1", user = "mymotif", passwd = "wxwpxh")
except Exception, e:
print e.args[0]
return
#删除表
sql_desc = "DROP TABLE IF EXISTS tbl_product3;"
try:
pgdb_conn.query(sql_desc)
except Exception, e:
print "drop table failed"
pgdb_conn.close()
return
#创建表
sql_desc = """CREATE TABLE tbl_product3(
i_index INTEGER,
sv_productname VARCHAR(32)
);"""
try:
pgdb_conn.query(sql_desc)
except Exception, e:
print "create table failed"
pgdb_conn.close()
return
#插入记录
sql_desc = "INSERT INTO tbl_product3(sv_productname) values('apple')"
try:
pgdb_conn.query(sql_desc)
except Exception, e:
print "insert record into table failed"
pgdb_conn.close()
return
#查询表 1
sql_desc = "select * from tbl_product3"
for row in pgdb_conn.query(sql_desc).dictresult():
print row
#关闭数据库连接
pgdb_conn.close()
if __name__ == "__main__":
#操作数据库
operate_postgre_tbl_product()
PyGreSQL with SQLAlchemy
# coding=utf-8
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# PyGreSQL
eng = create_engine('postgresql+pygresql://mymotif:wxwpxh@127.0.0.1:5432/mymotif')
DB_Session = sessionmaker(bind=eng)
session = DB_Session()
data = session.execute("SELECT * FROM STUDENT")
for row in data:
for col in row:
print col, # python3改为print(col, end=' ')
print # python3改为print()
session.close()
参考:http://blog.csdn.net/phashh/article/details/51077943