参考
https://segmentfault.com/a/1190000012022631
https://www.cnblogs.com/daisy89/p/5307727.html
官方文档:
https://cx-oracle.readthedocs.io/en/latest/connection.html
oracle公司官方技术文档
https://www.oracle.com/technetwork/cn/articles/dsl/mastering-oracle-python-1391323-zhs.html
第 1 部分:查询最佳实践
第 2 部分:处理时间和日期
第 3 部分:数据解析
第 4 部分:事务和大型对象
第 5 部分:存储过程、Python 编程
第 6 部分:Python 支持 XML
第 7 部分:面向服务的 Python 架构
第 8 部分:适合 Oracle DBA 使用的 Python
第 9 部分:Jython 和 IronPython — 在 Python 中使用 JDBC 和 ODP.NET
1、安装cx_Oracle包:
pip install cx_Oracle
当前版本7.0
2、创建数据库连接的三种方式:
方法一:用户名、密码和监听分开写
import cx_Oracle
db=cx_Oracle.connect(‘username/password@host/orcl’)
db.close()
方法二:用户名、密码和监听写在一起
import cx_Oracle
db=cx_Oracle.connect(‘username’,’password’,’host/orcl’)
db.close()
方法三:配置监听并连接
import cx_Oracle
tns=cx_Oracle.makedsn(‘host’,1521,’orcl’)
db=cx_Oracle.connect(‘username’,’password’,tns)
db.close()
3、建立cursor并执行SQL语句:查询、更新、插入、删除
创建数据库连接,创建游标cursor,然后执行sql语句,执行完成后,关闭游标,关闭数据库连接
创建连接后,建立cursor,并执行SQL语句
#encoding=utf-8
# 示例
import cx_Oracle
conn = cx_Oracle.connect(‘system/oracle@192.0.2.7/orcl’)
cursor = conn.cursor()
#执行语句的两种方式,预处理和直接执行
# 首先准备该语句,然后利用改变的参数执行 None。根据绑定变量时准备一个语句即足够这一原则,Oracle 将如同在上例中一样对其进行处理。准备好的语句可执行任意次。
sql = “select * from dba_users where user_id = :dbv”
cursor.prepare(sql)
rs = cursor.execute (None,{‘dbv’:63})
rsa = cursor.fetchall()
print (rsa)
# 直接执行
cursor.execute (“select * from dba_users where user_id=62”)
row = cursor.fetchone ()
print (row[0])
cursor.close ()
conn.close()
import cx_Oracle
# 创建连接后,建立cursor,并执行SQL语句
db=cx_Oracle.connect(‘system’,’oracle’,’10.98.156.148/oral’)
# db.close()
cr = db.cursor() # 创建cursor
sql = ‘select * from v$version’
cr.execute(sql) # 执行sql语句
# 一次返回所有结果集 fetchall
rs = cr.fetchall()
print(“print all:(%s)” %rs)
for x in rs:
print (x)
# 一次返回一行 fetchone
print(“fetchone”)
cr.execute(sql)
while (1):
rs = cr.fetchone()
if rs == None:
break
print(rs)
#使用参数查询
pr = {‘dbv’:’61’}
cr.execute(‘select * from dba_users where user_id = :dbv’,pr)
# 这里将参数作为一个字典来处理
rs = cr.fetchall()
print(“parameter print all:(%s)” %rs)
cr.execute(‘select * from dba_users where user_id = :dbv’,dbv = ’61’)
# 这里直接写参数
rs = cr.fetchall()
print(“parameter print all: (%s)” %rs)
cr.close()
db.close()
# 插入、更新、删除操作后需要提交commit
# 查询include:select
def sqlSelect(sql,db):
cr=db.cursor()
cr.execute(sql)
rs=cr.fetchall()
cr.close()
return rs
# 插入、更新、删除操作后需要提交include:insert,update,delete
def sqlDML(sql,db):
cr=db.cursor()
cr.execute(sql)
cr.close()
db.commit()
# execute dml with parameters
def sqlDML2(sql,params,db):
cr=db.cursor()
cr.execute(sql,params)
cr.close()
db.commit()
#增删改查 示例
#1、单条插入:
sql = “INSERT INTO T_AUTOMONITOR_TMP(point_id) VALUES(:pointId)”
cursorObj.prepare(sql)
rown = cursorObj.execute(None, {‘pointId’ : pointId})
connectObj.commit()
#2、多条插入:
sql = “INSERT INTO T_AUTOMONITOR_TMP(point_id) VALUES(:pointId)”
cursorObj.prepare(sql)
rown = cursorObj.executemany(None, recordList)
connectObj.commit()
#删
sql = “DELETE FROM T_AUTOMONITOR_TMP t WHERE t.point_id = :pointId ”
cursorObj.prepare(sql)
rown = cursorObj.execute(None, {‘pointId’ : pointId})
connectObj.commit()
#改
sql = “UPDATE t_automonitor_other t\
SET t.active = ‘2’\
WHERE t.active = ‘1’\
AND t.point_id = :pointId\
”
cursorObj.prepare(sql)
cursorObj.execute(None, {‘pointId’ : pointId})
connectObj.commit()
#查
sql = “SELECT t.describ FROM t_automonitor_tmp t WHERE t.point_id = :pointId”
cursorObj.prepare(sql)
cursorObj.execute(None, {‘pointId’ : pointId})
#在绑定时,您可以首先准备该语句,然后利用改变的参数执行 None。根据绑定变量时准备一个语句即足够这一原则,Oracle 将如同在上例中一样对其进行处理。准备好的语句可执行任意次。
r1 = cursor.execute(‘SELECT * FROM locations WHERE country_id=:1 AND city=:2’, (‘US’, ‘Seattle’))
#数据库操作简单工具
class baseUtilsX():
“””baseUtils”””
def __init__(self):
self.connectObj = “”
self.connCnt = 0
self.cursorCnt = 0
def initOracleConnect(self):
oracle_tns = cx_Oracle.makedsn(‘XXX.XXX.XXX.XXX’, 1521,’XX’)
if self.connCnt == 0:
self.connectObj = cx_Oracle.connect(‘oracleUserName’, ‘password’, oracle_tns)
self.connCnt += 1
def getOracleConnect(self):
self.initOracleConnect()
return self.connectObj
def closeOracleConnect(self, connectObj):
connectObj.close()
self.connCnt -= 1
def getOracleCursor(self):
self.initOracleConnect()
self.cursorCnt += 1
return self.connectObj.cursor()
def closeOracleCursor(self, cursorObj):
cursorObj.close()
self.cursorCnt -= 1
if self.cursorCnt == 0:
print “will close conn”
self.closeOracleConnect(self.connectObj)
def selectFromDbTable(self, sql, argsDict):
# 将查询结果由tuple转为list
queryAnsList = []
selectCursor = self.getOracleCursor()
selectCursor.prepare(sql)
queryAns = selectCursor.execute(None, argsDict)
for ansItem in queryAns:
queryAnsList.append(list(ansItem))
self.closeOracleCursor(selectCursor)
return queryAnsList
# 摘录oracle官方文档https://www.oracle.com/technetwork/cn/articles/dsl/mastering-oracle-python-1391323-zhs.html
# oracle 日期函数
# 使用 EXTRACT 语句通过 SQL 查询将年、月、日这些字段提取出来
SELECT EXTRACT(YEAR FROM hire_date) FROM employees ORDER BY 1;
使用 Python 的 datetime.date、datetime.time 和 datetime.datetime 对象作为绑定变量来查询日期
如果您需要将现有的字符串分析为 date(time) 对象,可以使用 datetime 对象的 strptime() 方法。
>>> from datetime import datetime
>>> datetime.strptime(“2007-12-31 23:59:59”, “%Y-%m-%d %H:%M:%S”)
datetime.datetime(2007, 12, 31, 23, 59, 59)
>>> import datetime
>>> d = datetime.datetime.now()
>>> print d
2007-03-03 16:48:27.734000
>>> print type(d)
<type ‘datetime.datetime’>
>>> print d.hour, d.minute, d.second
(16, 48, 27)
–EXTRACT() 函数用于提取日期/时间的单独部分,比如年、月、日、小时、分钟等等。
select sysdate,
extract(year from sysdate),
extract(month from sysdate),
extract(day from sysdate)
from dual s;
SELECT EXTRACT(HOUR FROM TIMESTAMP ‘2010-01-10 11:12:13’),
EXTRACT(minute FROM TIMESTAMP ‘2010-01-10 11:12:13’),
EXTRACT(second FROM TIMESTAMP ‘2010-01-10 11:12:13’)
FROM DUAL;
#精通 Oracle+Python,第 3 部分:数据解析
https://www.oracle.com/technetwork/cn/articles/dsl/prez-python-dataparsing-087750-zhs.html
有关使用正则表达式的最好建议是尽可能地避免使用它们。在将它们嵌入代码前,请确定没有字符串方法可以完成相同的工作,因为字符串方法更快且不会带来导入以及正则表达式处理这些额外的开销。在字符串对象上使用 dir() 就可以看到可用的内容。
下例展示了在 Python 这样一种动态语言中看待正则表达式的方式。解析 tnsnames.ora 文件以便为每个网络别名创建简单连接字符串(将 file() 函数指向您的 tnsnames.ora 文件的位置):
>>> import re
>>> tnsnames = file(r’tnsnames.ora’).read()
>>> easy_connects = {}
>>> tns_re = “^(\w+?)\s?=.*?HOST\s?=\s?(.+?)\).*?PORT\s?=\s?(\d+?)\).*?SERVICE_NAME\s?=\s?(.+?)\)”
>>> for match in re.finditer(tns_re, tnsnames, re.M+re.S):
… t = match.groups()
… easy_connects[t[0]] = “%s:%s/%s” % t[1:]
>>> print easy_connects
此程序在 Oracle Database XE 默认的 tnsnames.ora 文件上的输出是:
{‘XE’: ‘localhost:1521/XE’}