序
tomcat提供了JdbcInterceptor可以用来监控jdbc的执行情况,默认提供了好几个现成的interceptor可以用,SlowQueryReport以及SlowQueryReportJmx就是其中的两个。
JdbcInterceptor的基本原理
/**
* Abstract class that is to be extended for implementations of interceptors.
* Everytime an operation is called on the {@link java.sql.Connection} object the
* {@link #invoke(Object, Method, Object[])} method on the interceptor will be called.
* Interceptors are useful to change or improve behavior of the connection pool.<br>
* Interceptors can receive a set of properties. Each sub class is responsible for parsing the properties during runtime when they
* are needed or simply override the {@link #setProperties(Map)} method.
* Properties arrive in a key-value pair of Strings as they were received through the configuration.
* This method is called once per cached connection object when the object is first configured.
*
* @version 1.0
*/
public abstract class JdbcInterceptor implements InvocationHandler {
/**
* {@link java.sql.Connection#close()} method name
*/
public static final String CLOSE_VAL = "close";
/**
* {@link Object#toString()} method name
*/
public static final String TOSTRING_VAL = "toString";
/**
* {@link java.sql.Connection#isClosed()} method name
*/
public static final String ISCLOSED_VAL = "isClosed";
/**
* {@link javax.sql.PooledConnection#getConnection()} method name
*/
public static final String GETCONNECTION_VAL = "getConnection";
/**
* {@link java.sql.Wrapper#unwrap(Class)} method name
*/
public static final String UNWRAP_VAL = "unwrap";
/**
* {@link java.sql.Wrapper#isWrapperFor(Class)} method name
*/
public static final String ISWRAPPERFOR_VAL = "isWrapperFor";
/**
* {@link java.sql.Connection#isValid(int)} method name
*/
public static final String ISVALID_VAL = "isValid";
/**
* {@link java.lang.Object#equals(Object)}
*/
public static final String EQUALS_VAL = "equals";
/**
* {@link java.lang.Object#hashCode()}
*/
public static final String HASHCODE_VAL = "hashCode";
/**
* Properties for this interceptor.
*/
protected Map<String,InterceptorProperty> properties = null;
/**
* The next interceptor in the chain
*/
private volatile JdbcInterceptor next = null;
/**
* Property that decides how we do string comparison, default is to use
* {@link String#equals(Object)}. If set to <code>false</code> then the
* equality operator (==) is used.
*/
private boolean useEquals = true;
/**
* Public constructor for instantiation through reflection
*/
public JdbcInterceptor() {
// NOOP
}
/**
* Gets invoked each time an operation on {@link java.sql.Connection} is invoked.
* {@inheritDoc}
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (getNext()!=null) return getNext().invoke(proxy,method,args);
else throw new NullPointerException();
}
/**
* Returns the next interceptor in the chain
* @return the next interceptor in the chain
*/
public JdbcInterceptor getNext() {
return next;
}
/**
* configures the next interceptor in the chain
* @param next The next chain item
*/
public void setNext(JdbcInterceptor next) {
this.next = next;
}
/**
* Performs a string comparison, using references unless the useEquals property is set to true.
* @param name1 The first name
* @param name2 The second name
* @return true if name1 is equal to name2 based on {@link #useEquals}
*/
public boolean compare(String name1, String name2) {
if (isUseEquals()) {
return name1.equals(name2);
} else {
return name1==name2;
}
}
/**
* Compares a method name (String) to a method (Method)
* {@link #compare(String,String)}
* Uses reference comparison unless the useEquals property is set to true
* @param methodName The method name
* @param method The method
* @return <code>true</code> if the name matches
*/
public boolean compare(String methodName, Method method) {
return compare(methodName, method.getName());
}
/**
* Gets called each time the connection is borrowed from the pool
* This means that if an interceptor holds a reference to the connection
* the interceptor can be reused for another connection.
* <br>
* This method may be called with null as both arguments when we are closing down the connection.
* @param parent - the connection pool owning the connection
* @param con - the pooled connection
*/
public abstract void reset(ConnectionPool parent, PooledConnection con);
/**
* Called when {@link java.sql.Connection#close()} is called on the underlying connection.
* This is to notify the interceptors, that the physical connection has been released.
* Implementation of this method should be thought through with care, as no actions should trigger an exception.
* @param parent - the connection pool that this connection belongs to
* @param con - the pooled connection that holds this connection
* @param finalizing - if this connection is finalizing. True means that the pooled connection will not reconnect the underlying connection
*/
public void disconnected(ConnectionPool parent, PooledConnection con, boolean finalizing) {
}
/**
* Returns the properties configured for this interceptor
* @return the configured properties for this interceptor
*/
public Map<String,InterceptorProperty> getProperties() {
return properties;
}
/**
* Called during the creation of an interceptor
* The properties can be set during the configuration of an interceptor
* Override this method to perform type casts between string values and object properties
* @param properties The properties
*/
public void setProperties(Map<String,InterceptorProperty> properties) {
this.properties = properties;
final String useEquals = "useEquals";
InterceptorProperty p = properties.get(useEquals);
if (p!=null) {
setUseEquals(Boolean.parseBoolean(p.getValue()));
}
}
/**
* @return true if the compare method uses the Object.equals(Object) method
* false if comparison is done on a reference level
*/
public boolean isUseEquals() {
return useEquals;
}
/**
* Set to true if string comparisons (for the {@link #compare(String, Method)} and {@link #compare(String, String)} methods) should use the Object.equals(Object) method
* The default is false
* @param useEquals <code>true</code> if equals will be used for comparisons
*/
public void setUseEquals(boolean useEquals) {
this.useEquals = useEquals;
}
/**
* This method is invoked by a connection pool when the pool is closed.
* Interceptor classes can override this method if they keep static
* variables or other tracking means around.
* <b>This method is only invoked on a single instance of the interceptor, and not on every instance created.</b>
* @param pool - the pool that is being closed.
*/
public void poolClosed(ConnectionPool pool) {
// NOOP
}
/**
* This method is invoked by a connection pool when the pool is first started up, usually when the first connection is requested.
* Interceptor classes can override this method if they keep static
* variables or other tracking means around.
* <b>This method is only invoked on a single instance of the interceptor, and not on every instance created.</b>
* @param pool - the pool that is being closed.
*/
public void poolStarted(ConnectionPool pool) {
// NOOP
}
}
可以看到它实现了InvocationHandler这个接口,也就是使用的是java内置的动态代理技术,主要是因为jdbc本身就是面向接口编程的,因而用java内置的动态代理是水到渠成的。
ConnectionPool
tomcat-jdbc-8.5.11-sources.jar!/org/apache/tomcat/jdbc/pool/ConnectionPool.java
/**
* configures a pooled connection as a proxy.
* This Proxy implements {@link java.sql.Connection} and {@link javax.sql.PooledConnection} interfaces.
* All calls on {@link java.sql.Connection} methods will be propagated down to the actual JDBC connection except for the
* {@link java.sql.Connection#close()} method.
* @param con a {@link PooledConnection} to wrap in a Proxy
* @return a {@link java.sql.Connection} object wrapping a pooled connection.
* @throws SQLException if an interceptor can't be configured, if the proxy can't be instantiated
*/
protected Connection setupConnection(PooledConnection con) throws SQLException {
//fetch previously cached interceptor proxy - one per connection
JdbcInterceptor handler = con.getHandler();
if (handler==null) {
//build the proxy handler
handler = new ProxyConnection(this,con,getPoolProperties().isUseEquals());
//set up the interceptor chain
PoolProperties.InterceptorDefinition[] proxies = getPoolProperties().getJdbcInterceptorsAsArray();
for (int i=proxies.length-1; i>=0; i--) {
try {
//create a new instance
JdbcInterceptor interceptor = proxies[i].getInterceptorClass().newInstance();
//configure properties
interceptor.setProperties(proxies[i].getProperties());
//setup the chain
interceptor.setNext(handler);
//call reset
interceptor.reset(this, con);
//configure the last one to be held by the connection
handler = interceptor;
}catch(Exception x) {
SQLException sx = new SQLException("Unable to instantiate interceptor chain.");
sx.initCause(x);
throw sx;
}
}
//cache handler for the next iteration
con.setHandler(handler);
} else {
JdbcInterceptor next = handler;
//we have a cached handler, reset it
while (next!=null) {
next.reset(this, con);
next = next.getNext();
}
}
try {
getProxyConstructor(con.getXAConnection() != null);
//create the proxy
//TODO possible optimization, keep track if this connection was returned properly, and don't generate a new facade
Connection connection = null;
if (getPoolProperties().getUseDisposableConnectionFacade() ) {
connection = (Connection)proxyClassConstructor.newInstance(new Object[] { new DisposableConnectionFacade(handler) });
} else {
connection = (Connection)proxyClassConstructor.newInstance(new Object[] {handler});
}
//return the connection
return connection;
}catch (Exception x) {
SQLException s = new SQLException();
s.initCause(x);
throw s;
}
}
这里判断有没有interceptor,有的话,则创建ProxyConnection,然后构造interceptor的链
ProxyConnection
public class ProxyConnection extends JdbcInterceptor {
protected PooledConnection connection = null;
protected ConnectionPool pool = null;
public PooledConnection getConnection() {
return connection;
}
public void setConnection(PooledConnection connection) {
this.connection = connection;
}
public ConnectionPool getPool() {
return pool;
}
public void setPool(ConnectionPool pool) {
this.pool = pool;
}
protected ProxyConnection(ConnectionPool parent, PooledConnection con,
boolean useEquals) {
pool = parent;
connection = con;
setUseEquals(useEquals);
}
@Override
public void reset(ConnectionPool parent, PooledConnection con) {
this.pool = parent;
this.connection = con;
}
public boolean isWrapperFor(Class<?> iface) {
if (iface == XAConnection.class && connection.getXAConnection()!=null) {
return true;
} else {
return (iface.isInstance(connection.getConnection()));
}
}
public Object unwrap(Class<?> iface) throws SQLException {
if (iface == PooledConnection.class) {
return connection;
}else if (iface == XAConnection.class) {
return connection.getXAConnection();
} else if (isWrapperFor(iface)) {
return connection.getConnection();
} else {
throw new SQLException("Not a wrapper of "+iface.getName());
}
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (compare(ISCLOSED_VAL,method)) {
return Boolean.valueOf(isClosed());
}
if (compare(CLOSE_VAL,method)) {
if (connection==null) return null; //noop for already closed.
PooledConnection poolc = this.connection;
this.connection = null;
pool.returnConnection(poolc);
return null;
} else if (compare(TOSTRING_VAL,method)) {
return this.toString();
} else if (compare(GETCONNECTION_VAL,method) && connection!=null) {
return connection.getConnection();
} else if (method.getDeclaringClass().equals(XAConnection.class)) {
try {
return method.invoke(connection.getXAConnection(),args);
}catch (Throwable t) {
if (t instanceof InvocationTargetException) {
throw t.getCause() != null ? t.getCause() : t;
} else {
throw t;
}
}
}
if (isClosed()) throw new SQLException("Connection has already been closed.");
if (compare(UNWRAP_VAL,method)) {
return unwrap((Class<?>)args[0]);
} else if (compare(ISWRAPPERFOR_VAL,method)) {
return Boolean.valueOf(this.isWrapperFor((Class<?>)args[0]));
}
try {
PooledConnection poolc = connection;
if (poolc!=null) {
return method.invoke(poolc.getConnection(),args);
} else {
throw new SQLException("Connection has already been closed.");
}
}catch (Throwable t) {
if (t instanceof InvocationTargetException) {
throw t.getCause() != null ? t.getCause() : t;
} else {
throw t;
}
}
}
public boolean isClosed() {
return connection==null || connection.isDiscarded();
}
public PooledConnection getDelegateConnection() {
return connection;
}
public ConnectionPool getParentPool() {
return pool;
}
@Override
public String toString() {
return "ProxyConnection["+(connection!=null?connection.toString():"null")+"]";
}
}
ProxyConnection本身就是JdbcInterceptor,包装了PooledConnection
AbstractCreateStatementInterceptor
这个是JdbcInterceptor的一个比较重要的扩展,SlowQueryReport就是基于这个扩展的。这个定义了一些抽象方法供子类实现。
/**
* Abstraction interceptor. This component intercepts all calls to create some type of SQL statement.
* By extending this class, one can intercept queries and update statements by overriding the {@link #createStatement(Object, Method, Object[], Object, long)}
* method.
* @version 1.0
*/
public abstract class AbstractCreateStatementInterceptor extends JdbcInterceptor {
protected static final String CREATE_STATEMENT = "createStatement";
protected static final int CREATE_STATEMENT_IDX = 0;
protected static final String PREPARE_STATEMENT = "prepareStatement";
protected static final int PREPARE_STATEMENT_IDX = 1;
protected static final String PREPARE_CALL = "prepareCall";
protected static final int PREPARE_CALL_IDX = 2;
protected static final String[] STATEMENT_TYPES = {CREATE_STATEMENT, PREPARE_STATEMENT, PREPARE_CALL};
protected static final int STATEMENT_TYPE_COUNT = STATEMENT_TYPES.length;
protected static final String EXECUTE = "execute";
protected static final String EXECUTE_QUERY = "executeQuery";
protected static final String EXECUTE_UPDATE = "executeUpdate";
protected static final String EXECUTE_BATCH = "executeBatch";
protected static final String[] EXECUTE_TYPES = {EXECUTE, EXECUTE_QUERY, EXECUTE_UPDATE, EXECUTE_BATCH};
public AbstractCreateStatementInterceptor() {
super();
}
/**
* {@inheritDoc}
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (compare(CLOSE_VAL,method)) {
closeInvoked();
return super.invoke(proxy, method, args);
} else {
boolean process = false;
process = isStatement(method, process);
if (process) {
long start = System.currentTimeMillis();
Object statement = super.invoke(proxy,method,args);
long delta = System.currentTimeMillis() - start;
return createStatement(proxy,method,args,statement, delta);
} else {
return super.invoke(proxy,method,args);
}
}
}
/**
* This method will be invoked after a successful statement creation. This method can choose to return a wrapper
* around the statement or return the statement itself.
* If this method returns a wrapper then it should return a wrapper object that implements one of the following interfaces.
* {@link java.sql.Statement}, {@link java.sql.PreparedStatement} or {@link java.sql.CallableStatement}
* @param proxy the actual proxy object
* @param method the method that was called. It will be one of the methods defined in {@link #STATEMENT_TYPES}
* @param args the arguments to the method
* @param statement the statement that the underlying connection created
* @param time Elapsed time
* @return a {@link java.sql.Statement} object
*/
public abstract Object createStatement(Object proxy, Method method, Object[] args, Object statement, long time);
/**
* Method invoked when the operation {@link java.sql.Connection#close()} is invoked.
*/
public abstract void closeInvoked();
/**
* Returns true if the method that is being invoked matches one of the statement types.
*
* @param method the method being invoked on the proxy
* @param process boolean result used for recursion
* @return returns true if the method name matched
*/
protected boolean isStatement(Method method, boolean process){
return process(STATEMENT_TYPES, method, process);
}
/**
* Returns true if the method that is being invoked matches one of the execute types.
*
* @param method the method being invoked on the proxy
* @param process boolean result used for recursion
* @return returns true if the method name matched
*/
protected boolean isExecute(Method method, boolean process){
return process(EXECUTE_TYPES, method, process);
}
/*
* Returns true if the method that is being invoked matches one of the method names passed in
* @param names list of method names that we want to intercept
* @param method the method being invoked on the proxy
* @param process boolean result used for recursion
* @return returns true if the method name matched
*/
protected boolean process(String[] names, Method method, boolean process) {
final String name = method.getName();
for (int i=0; (!process) && i<names.length; i++) {
process = compare(names[i],name);
}
return process;
}
/**
* no-op for this interceptor. no state is stored.
*/
@Override
public void reset(ConnectionPool parent, PooledConnection con) {
// NOOP
}
}
AbstractQueryReport
主要实现了createStatement方法:
/**
* Creates a statement interceptor to monitor query response times
*/
@Override
public Object createStatement(Object proxy, Method method, Object[] args, Object statement, long time) {
try {
Object result = null;
String name = method.getName();
String sql = null;
Constructor<?> constructor = null;
if (compare(CREATE_STATEMENT,name)) {
//createStatement
constructor = getConstructor(CREATE_STATEMENT_IDX,Statement.class);
}else if (compare(PREPARE_STATEMENT,name)) {
//prepareStatement
sql = (String)args[0];
constructor = getConstructor(PREPARE_STATEMENT_IDX,PreparedStatement.class);
if (sql!=null) {
prepareStatement(sql, time);
}
}else if (compare(PREPARE_CALL,name)) {
//prepareCall
sql = (String)args[0];
constructor = getConstructor(PREPARE_CALL_IDX,CallableStatement.class);
prepareCall(sql,time);
}else {
//do nothing, might be a future unsupported method
//so we better bail out and let the system continue
return statement;
}
result = constructor.newInstance(new Object[] { new StatementProxy(statement,sql) });
return result;
}catch (Exception x) {
log.warn("Unable to create statement proxy for slow query report.",x);
}
return statement;
}
这里同样适用了jdk的动态代理,包装了statement
/**
* Class to measure query execute time
*
*/
protected class StatementProxy implements InvocationHandler {
protected boolean closed = false;
protected Object delegate;
protected final String query;
public StatementProxy(Object parent, String query) {
this.delegate = parent;
this.query = query;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//get the name of the method for comparison
final String name = method.getName();
//was close invoked?
boolean close = compare(JdbcInterceptor.CLOSE_VAL,name);
//allow close to be called multiple times
if (close && closed) return null;
//are we calling isClosed?
if (compare(JdbcInterceptor.ISCLOSED_VAL,name)) return Boolean.valueOf(closed);
//if we are calling anything else, bail out
if (closed) throw new SQLException("Statement closed.");
boolean process = false;
//check to see if we are about to execute a query
process = isExecute( method, process);
//if we are executing, get the current time
long start = (process)?System.currentTimeMillis():0;
Object result = null;
try {
//execute the query
result = method.invoke(delegate,args);
}catch (Throwable t) {
reportFailedQuery(query,args,name,start,t);
if (t instanceof InvocationTargetException
&& t.getCause() != null) {
throw t.getCause();
} else {
throw t;
}
}
//measure the time
long delta = (process)?(System.currentTimeMillis()-start):Long.MIN_VALUE;
//see if we meet the requirements to measure
if (delta>threshold) {
try {
//report the slow query
reportSlowQuery(query, args, name, start, delta);
}catch (Exception t) {
if (log.isWarnEnabled()) log.warn("Unable to process slow query",t);
}
} else if (process) {
reportQuery(query, args, name, start, delta);
}
//perform close cleanup
if (close) {
closed=true;
delegate = null;
}
return result;
}
}
这里记录了sql的执行耗时,然后跟阈值对比,判断是否记录到slow query