DataSourceConfiguration
配置类,springBoot默认采用HikariDataSource
/*** Hikari DataSource configuration.*/@Configuration(proxyBeanMethods = false)@ConditionalOnClass(HikariDataSource.class)@ConditionalOnMissingBean(DataSource.class)@ConditionalOnProperty(name = "spring.datasource.type", havingValue = "com.zaxxer.hikari.HikariDataSource",matchIfMissing = true)static class Hikari {@Bean@ConfigurationProperties(prefix = "spring.datasource.hikari")HikariDataSource dataSource(DataSourceProperties properties) {HikariDataSource dataSource = createDataSource(properties, HikariDataSource.class);if (StringUtils.hasText(properties.getName())) {dataSource.setPoolName(properties.getName());}return dataSource;}}
@Configuration(proxyBeanMethods = false)得含义是
@Configuration注解的意思是proxyBeanMethods配置类是用来指定@Bean注解标注的方法是否使用代理,默认是true使用代理,直接从IOC容器之中取得对象;如果设置为false,也就是不使用注解,每次调用@Bean标注的方法获取到的对象和IOC容器中的都不一样,是一个新的对象,所以我们可以将此属性设置为false来提高性能;
**@ConditionalOnProperty(name = “spring.datasource.type”, havingValue = “com.zaxxer.hikari.HikariDataSource”,
** matchIfMissing = true)**
如果配置spring.datasource.type为com.zaxxer.hikari.HikariDataSource才加载,如果配置为空,则默认加载此数据源
HikariDataSource
一个一个看这些接口
AutoCloseable
代表一个对象在close之前可能持有某些资源(文件或socket)。如果对象是在try-with-resources代码块中声明的,
AutoCloseable对象的close()方法会被自动执行。这种构造方式保证了最快的资源释放,避免资源耗尽异常。void close() throws Exception
关闭资源,放弃所有内在的资源。如果对象是在try-with-resources代码块中声明的,那么这个方法会自动被执行。
虽然这个接口的方法声明成抛出Exception异常,但是强烈推荐实现类抛出更详细的异常,或者不要要出异常。
需要注意实现类的close操作可能会失败。 强烈推荐优先放弃内部资源和内部标记资源已经关闭,尽量不去抛出异常。close方法不会执行
多次,所以要确保资源能够及时释放。此外,优先关闭内部资源可以降低资源封装带来的问题,假如一个资源包含了另一个资源,要依次关闭。
实现类也强烈建议不要抛出InterruptedException异常。这个异常和thread的 interrupt标志相关,而且线程运行时可能会出现一些迷惑行为。
例如实现类
import lombok.Data;@Data
public class AutoCloseAbleTest implements AutoCloseable {private boolean closed = false;@Overridepublic void close() throws IllegalArgumentException {closed = true;throw new IllegalArgumentException("测试抛出异常");}public void doSomething(){System.out.println("现在资源是开着的");}
}
测试类
public class AutoCloseAbleExample {/*** 使用try-with-resources模式声明资源* @param args*/public static void main(String[] args){try(AutoCloseAbleTest test = new AutoCloseAbleTest()){test.doSomething();}catch (IllegalArgumentException e){e.printStackTrace();}}
}
执行结果
java.lang.IllegalArgumentException: 测试抛出异常at org.zhuzhenxi.learning.lang.autocloseable.AutoCloseAbleTest.close(AutoCloseAbleTest.java:14)at org.zhuzhenxi.learning.lang.autocloseable.AutoCloseAbleExample.main(AutoCloseAbleExample.java:7)
HikariConfig
// Properties changeable at runtime through the HikariConfigMXBean//private volatile String catalog;private volatile long connectionTimeout;private volatile long validationTimeout;private volatile long idleTimeout;private volatile long leakDetectionThreshold;private volatile long maxLifetime;private volatile int maxPoolSize;private volatile int minIdle;private volatile String username;private volatile String password;// Properties NOT changeable at runtime//private long initializationFailTimeout;private String connectionInitSql;private String connectionTestQuery;private String dataSourceClassName;private String dataSourceJndiName;private String driverClassName;private String exceptionOverrideClassName;private String jdbcUrl;private String poolName;private String schema;private String transactionIsolationName;private boolean isAutoCommit;private boolean isReadOnly;private boolean isIsolateInternalQueries;private boolean isRegisterMbeans;private boolean isAllowPoolSuspension;private DataSource dataSource;private Properties dataSourceProperties;private ThreadFactory threadFactory;private ScheduledExecutorService scheduledExecutor;private MetricsTrackerFactory metricsTrackerFactory;private Object metricRegistry;private Object healthCheckRegistry;private Properties healthCheckProperties;
配置类,有很多配置项
name 描述 构造器默认值 默认配置validate之后的值 validate重置
autoCommit 自动提交从池中返回的连接 true true -
connectionTimeout 等待来自池的连接的最大毫秒数 SECONDS.toMillis(30) = 30000 30000 如果小于250毫秒,则被重置回30秒
idleTimeout 连接允许在池中闲置的最长时间 MINUTES.toMillis(10) = 600000 600000 如果idleTimeout+1秒>maxLifetime 且 maxLifetime>0,则会被重置为0(代表永远不会退出);如果idleTimeout!=0且小于10秒,则会被重置为10秒
maxLifetime 池中连接最长生命周期 MINUTES.toMillis(30) = 1800000 1800000 如果不等于0且小于30秒则会被重置回30分钟
connectionTestQuery 如果您的驱动程序支持JDBC4,我们强烈建议您不要设置此属性 null null -
minimumIdle 池中维护的最小空闲连接数 -1 10 minIdle<0或者minIdle>maxPoolSize,则被重置为maxPoolSize
maximumPoolSize 池中最大连接数,包括闲置和使用中的连接 -1 10 如果maxPoolSize小于1,则会被重置。当minIdle<=0被重置为DEFAULT_POOL_SIZE则为10;如果minIdle>0则重置为minIdle的值
metricRegistry 该属性允许您指定一个 Codahale / Dropwizard MetricRegistry 的实例,供池使用以记录各种指标 null null -
healthCheckRegistry 该属性允许您指定池使用的Codahale / Dropwizard HealthCheckRegistry的实例来报告当前健康信息 null null -
poolName 连接池的用户定义名称,主要出现在日志记录和JMX管理控制台中以识别池和池配置 null HikariPool-1 -
initializationFailTimeout 如果池无法成功初始化连接,则此属性控制池是否将 fail fast 1 1 -
isolateInternalQueries 是否在其自己的事务中隔离内部池查询,例如连接活动测试 false false -
allowPoolSuspension 控制池是否可以通过JMX暂停和恢复 false false -
readOnly 从池中获取的连接是否默认处于只读模式 false false -
registerMbeans 是否注册JMX管理Bean(MBeans) false false -
catalog 为支持 catalog 概念的数据库设置默认 catalog driver default null -
connectionInitSql 该属性设置一个SQL语句,在将每个新连接创建后,将其添加到池中之前执行该语句。 null null -
driverClassName HikariCP将尝试通过仅基于jdbcUrl的DriverManager解析驱动程序,但对于一些较旧的驱动程序,还必须指定driverClassName null null -
transactionIsolation 控制从池返回的连接的默认事务隔离级别 null null -
validationTimeout 连接将被测试活动的最大时间量 SECONDS.toMillis(5) = 5000 5000 如果小于250毫秒,则会被重置回5秒
leakDetectionThreshold 记录消息之前连接可能离开池的时间量,表示可能的连接泄漏 0 0 如果大于0且不是单元测试,则进一步判断:(leakDetectionThreshold < SECONDS.toMillis(2) or (leakDetectionThreshold > maxLifetime && maxLifetime > 0),会被重置为0 . 即如果要生效则必须>0,而且不能小于2秒,而且当maxLifetime > 0时不能大于maxLifetime
dataSource 这个属性允许你直接设置数据源的实例被池包装,而不是让HikariCP通过反射来构造它 null null -
schema 该属性为支持模式概念的数据库设置默认模式 driver default null -
threadFactory 此属性允许您设置将用于创建池使用的所有线程的java.util.concurrent.ThreadFactory的实例。 null null -
scheduledExecutor 此属性允许您设置将用于各种内部计划任务的java.util.concurrent.ScheduledExecutorService实例 null null -
DataSource
两个获取数据库连接的接口
public interface DataSource extends CommonDataSource, Wrapper {/*** <p>Attempts to establish a connection with the data source that* this {@code DataSource} object represents.** @return a connection to the data source* @exception SQLException if a database access error occurs* @throws java.sql.SQLTimeoutException when the driver has determined that the* timeout value specified by the {@code setLoginTimeout} method* has been exceeded and has at least tried to cancel the* current database connection attempt*/Connection getConnection() throws SQLException;/*** <p>Attempts to establish a connection with the data source that* this {@code DataSource} object represents.** @param username the database user on whose behalf the connection is* being made* @param password the user's password* @return a connection to the data source* @exception SQLException if a database access error occurs* @throws java.sql.SQLTimeoutException when the driver has determined that the* timeout value specified by the {@code setLoginTimeout} method* has been exceeded and has at least tried to cancel the* current database connection attempt* @since 1.4*/Connection getConnection(String username, String password)throws SQLException;
}
HikariPool
hikaripool就是数据库连接池的核心了
PoolBase
构造进行初始化
PoolBase(final HikariConfig config){this.config = config;this.networkTimeout = UNINITIALIZED;this.catalog = config.getCatalog();this.schema = config.getSchema();this.isReadOnly = config.isReadOnly();this.isAutoCommit = config.isAutoCommit();this.exceptionOverride = UtilityElf.createInstance(config.getExceptionOverrideClassName(), SQLExceptionOverride.class);this.transactionIsolation = UtilityElf.getTransactionIsolation(config.getTransactionIsolation());this.isQueryTimeoutSupported = UNINITIALIZED;this.isNetworkTimeoutSupported = UNINITIALIZED;this.isUseJdbc4Validation = config.getConnectionTestQuery() == null;this.isIsolateInternalQueries = config.isIsolateInternalQueries();this.poolName = config.getPoolName();this.connectionTimeout = config.getConnectionTimeout();this.validationTimeout = config.getValidationTimeout();this.lastConnectionFailure = new AtomicReference<>();initializeDataSource();}
private void initializeDataSource(){final String jdbcUrl = config.getJdbcUrl();final String username = config.getUsername();final String password = config.getPassword();final String dsClassName = config.getDataSourceClassName();final String driverClassName = config.getDriverClassName();final String dataSourceJNDI = config.getDataSourceJNDI();final Properties dataSourceProperties = config.getDataSourceProperties();DataSource ds = config.getDataSource();if (dsClassName != null && ds == null) {ds = createInstance(dsClassName, DataSource.class);PropertyElf.setTargetFromProperties(ds, dataSourceProperties);}else if (jdbcUrl != null && ds == null) {ds = new DriverDataSource(jdbcUrl, driverClassName, dataSourceProperties, username, password);}else if (dataSourceJNDI != null && ds == null) {try {InitialContext ic = new InitialContext();ds = (DataSource) ic.lookup(dataSourceJNDI);} catch (NamingException e) {throw new PoolInitializationException(e);}}if (ds != null) {setLoginTimeout(ds);createNetworkTimeoutExecutor(ds, dsClassName, jdbcUrl);}this.dataSource = ds;}
hikariPoll构造初始化
public HikariPool(final HikariConfig config){super(config);//数据库连接背包,相当于线程池的任务队列this.connectionBag = new ConcurrentBag<>(this);//是否控制获取连接数this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK;//创建一个线程池this.houseKeepingExecutorService = initializeHouseKeepingExecutorService();checkFailFast();if (config.getMetricsTrackerFactory() != null) {setMetricsTrackerFactory(config.getMetricsTrackerFactory());}else {setMetricRegistry(config.getMetricRegistry());}setHealthCheckRegistry(config.getHealthCheckRegistry());handleMBeans(this, true);ThreadFactory threadFactory = config.getThreadFactory();final int maxPoolSize = config.getMaximumPoolSize();LinkedBlockingQueue<Runnable> addConnectionQueue = new LinkedBlockingQueue<>(maxPoolSize);this.addConnectionQueueReadOnlyView = unmodifiableCollection(addConnectionQueue);this.addConnectionExecutor = createThreadPoolExecutor(addConnectionQueue, poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardOldestPolicy());this.closeConnectionExecutor = createThreadPoolExecutor(maxPoolSize, poolName + " connection closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());this.leakTaskFactory = new ProxyLeakTaskFactory(config.getLeakDetectionThreshold(), houseKeepingExecutorService);this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, housekeepingPeriodMs, MILLISECONDS);if (Boolean.getBoolean("com.zaxxer.hikari.blockUntilFilled") && config.getInitializationFailTimeout() > 1) {addConnectionExecutor.setCorePoolSize(Math.min(16, Runtime.getRuntime().availableProcessors()));addConnectionExecutor.setMaximumPoolSize(Math.min(16, Runtime.getRuntime().availableProcessors()));final long startTime = currentTime();while (elapsedMillis(startTime) < config.getInitializationFailTimeout() && getTotalConnections() < config.getMinimumIdle()) {quietlySleep(MILLISECONDS.toMillis(100));}addConnectionExecutor.setCorePoolSize(1);addConnectionExecutor.setMaximumPoolSize(1);}}
com.zaxxer.hikari.pool.HikariPool#checkFailFast
private void checkFailFast(){final long initializationTimeout = config.getInitializationFailTimeout();if (initializationTimeout < 0) {return;}final long startTime = currentTime();do {final PoolEntry poolEntry = createPoolEntry();if (poolEntry != null) {if (config.getMinimumIdle() > 0) {connectionBag.add(poolEntry);logger.debug("{} - Added connection {}", poolName, poolEntry.connection);}else {quietlyCloseConnection(poolEntry.close(), "(initialization check complete and minimumIdle is zero)");}return;}if (getLastConnectionFailure() instanceof ConnectionSetupException) {throwPoolInitializationException(getLastConnectionFailure().getCause());}quietlySleep(SECONDS.toMillis(1));} while (elapsedMillis(startTime) < initializationTimeout);if (initializationTimeout > 0) {throwPoolInitializationException(getLastConnectionFailure());}}
前面说过initializationFailTimeout是否要检查进行failfast快速失败(启动就检查连接报错,而不是等真正需要数据库操作时才发现错误),
PoolEntry
用来封装一个数据库连接
Connection connection;long lastAccessed;long lastBorrowed;@SuppressWarnings("FieldCanBeLocal")private volatile int state = 0;private volatile boolean evict;private volatile ScheduledFuture<?> endOfLife;private final FastList<Statement> openStatements;private final HikariPool hikariPool;private final boolean isReadOnly;private final boolean isAutoCommit;
先创建一个连接
com.zaxxer.hikari.pool.PoolBase#newConnection
private Connection newConnection() throws Exception{final long start = currentTime();Connection connection = null;try {String username = config.getUsername();String password = config.getPassword();connection = (username == null) ? dataSource.getConnection() : dataSource.getConnection(username, password);if (connection == null) {throw new SQLTransientConnectionException("DataSource returned null unexpectedly");}setupConnection(connection);lastConnectionFailure.set(null);return connection;}catch (Exception e) {if (connection != null) {quietlyCloseConnection(connection, "(Failed to create/setup connection)");}else if (getLastConnectionFailure() == null) {logger.debug("{} - Failed to create/setup connection: {}", poolName, e.getMessage());}lastConnectionFailure.set(e);throw e;}finally {// tracker will be null during failFast checkif (metricsTracker != null) {metricsTracker.recordConnectionCreated(elapsedMillis(start));}}}
com.zaxxer.hikari.pool.PoolBase#setupConnection
根据配置设置连接,默认设置自动提交,非只读事务
private void setupConnection(final Connection connection) throws ConnectionSetupException{try {if (networkTimeout == UNINITIALIZED) {networkTimeout = getAndSetNetworkTimeout(connection, validationTimeout);}else {setNetworkTimeout(connection, validationTimeout);}if (connection.isReadOnly() != isReadOnly) {connection.setReadOnly(isReadOnly);}if (connection.getAutoCommit() != isAutoCommit) {connection.setAutoCommit(isAutoCommit);}checkDriverSupport(connection);if (transactionIsolation != defaultTransactionIsolation) {connection.setTransactionIsolation(transactionIsolation);}if (catalog != null) {connection.setCatalog(catalog);}if (schema != null) {connection.setSchema(schema);}executeSql(connection, config.getConnectionInitSql(), true);setNetworkTimeout(connection, networkTimeout);}catch (SQLException e) {throw new ConnectionSetupException(e);}}
如果上层逻辑不希望自动提交,可以重新设置是否自动提交。例如spring事务,会在获取到数据库连接后再设置不自动提交。
通过jdbc建立一个数据库连接
PoolEntry newPoolEntry() throws Exception{return new PoolEntry(newConnection(), this, isReadOnly, isAutoCommit);}
存储了除了数据库连接coneection还有 是不是只读事务,是否自动提交等信息
创建poolEntry后
if (poolEntry != null) {if (config.getMinimumIdle() > 0) {connectionBag.add(poolEntry);logger.debug("{} - Added connection {}", poolName, poolEntry.connection);}else {quietlyCloseConnection(poolEntry.close(), "(initialization check complete and minimumIdle is zero)");}return;}
如果配置的最小空闲时为0,放入connectionBag,否则立马关闭或这个连接
放入bag
com.zaxxer.hikari.util.ConcurrentBag#add
public void add(final T bagEntry){if (closed) {LOGGER.info("ConcurrentBag has been closed, ignoring add()");throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");}sharedList.add(bagEntry);// spin until a thread takes it or none are waitingwhile (waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE && !handoffQueue.offer(bagEntry)) {Thread.yield();}}
放到sharedList中CopyOnWriteArrayList时一个线程安全的集合
private final CopyOnWriteArrayList<T> sharedList;
连接池获取一个连接
com.zaxxer.hikari.HikariDataSource#getConnection()
开启事务,或者数据库查询都会获取连接
public Connection getConnection() throws SQLException{if (isClosed()) {throw new SQLException("HikariDataSource " + this + " has been closed.");}if (fastPathPool != null) {return fastPathPool.getConnection();}// See http://en.wikipedia.org/wiki/Double-checked_locking#Usage_in_JavaHikariPool result = pool;if (result == null) {synchronized (this) {result = pool;if (result == null) {validate();LOGGER.info("{} - Starting...", getPoolName());try {pool = result = new HikariPool(this);this.seal();}catch (PoolInitializationException pie) {if (pie.getCause() instanceof SQLException) {throw (SQLException) pie.getCause();}else {throw pie;}}LOGGER.info("{} - Start completed.", getPoolName());}}}return result.getConnection();}
进入
com.zaxxer.hikari.pool.HikariPool#getConnection(long)
public Connection getConnection(final long hardTimeout) throws SQLException{suspendResumeLock.acquire();final long startTime = currentTime();try {long timeout = hardTimeout;do {PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS);if (poolEntry == null) {break; // We timed out... break and throw exception}final long now = currentTime();if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > aliveBypassWindowMs && !isConnectionAlive(poolEntry.connection))) {closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE);timeout = hardTimeout - elapsedMillis(startTime);}else {metricsTracker.recordBorrowStats(poolEntry, startTime);return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);}} while (timeout > 0L);metricsTracker.recordBorrowTimeoutStats(startTime);throw createTimeoutException(startTime);}catch (InterruptedException e) {Thread.currentThread().interrupt();throw new SQLException(poolName + " - Interrupted during connection acquisition", e);}finally {suspendResumeLock.release();}}
suspendResumeLock
数据库连接池暂停恢复锁。如果hikari配置中设置isAllowPoolSuspension为true,那么就会在HikariPool中实例一个暂停恢复锁。
初始化
this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK;
如果配置了isAllowPoolSuspension那么会有Semaphore进行控制,如果没配置,那么使用的FAUX_LOCK是一个空的实现,也就是没有并发控制
配置了使用一个公平锁控制并发量
其实就是在限制连接的数量,如果连接超过10K,那么就会阻塞直到有别的连接断开后释放了信号量。这是控制IO流量的一种常见的方式。
private static final int MAX_PERMITS = 10000;
private SuspendResumeLock(final boolean createSemaphore){acquisitionSemaphore = (createSemaphore ? new Semaphore(MAX_PERMITS, true) : null);}
public class SuspendResumeLock
{public static final SuspendResumeLock FAUX_LOCK = new SuspendResumeLock(false) {@Overridepublic void acquire() {}@Overridepublic void release() {}@Overridepublic void suspend() {}@Overridepublic void resume() {}};private static final int MAX_PERMITS = 10000;private final Semaphore acquisitionSemaphore;/*** Default constructor*/public SuspendResumeLock(){this(true);}private SuspendResumeLock(final boolean createSemaphore){acquisitionSemaphore = (createSemaphore ? new Semaphore(MAX_PERMITS, true) : null);}public void acquire() throws SQLException{if (acquisitionSemaphore.tryAcquire()) {return;}else if (Boolean.getBoolean("com.zaxxer.hikari.throwIfSuspended")) {throw new SQLTransientException("The pool is currently suspended and configured to throw exceptions upon acquisition");}acquisitionSemaphore.acquireUninterruptibly();}public void release(){acquisitionSemaphore.release();}public void suspend(){acquisitionSemaphore.acquireUninterruptibly(MAX_PERMITS);}public void resume(){acquisitionSemaphore.release(MAX_PERMITS);}
}
继续获取连接
PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS);
进入ConcurrentBag
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException{// Try the thread-local list firstfinal List<Object> list = threadList.get();for (int i = list.size() - 1; i >= 0; i--) {final Object entry = list.remove(i);@SuppressWarnings("unchecked")final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {return bagEntry;}}// Otherwise, scan the shared list ... then poll the handoff queuefinal int waiting = waiters.incrementAndGet();try {for (T bagEntry : sharedList) {if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {// If we may have stolen another waiter's connection, request another bag add.if (waiting > 1) {listener.addBagItem(waiting - 1);}return bagEntry;}}listener.addBagItem(waiting);timeout = timeUnit.toNanos(timeout);do {final long start = currentTime();final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {return bagEntry;}timeout -= elapsedNanos(start);} while (timeout > 10_000);return null;}finally {waiters.decrementAndGet();}}
尝试threadList中获取数据库连接对象
如果没用获取到,添加等待数量
尝试从sharedList中获取,获取到通过cas将未使用的状态设置成使用的状态
这里有个listener
IBagStateListener
public interface IBagStateListener{void addBagItem(int waiting);}
实现了这个接口,向bag中加入连接
PoolEntryCreator
创建新得连接
private final class PoolEntryCreator implements Callable<Boolean>{private final String loggingPrefix;PoolEntryCreator(String loggingPrefix){this.loggingPrefix = loggingPrefix;}@Overridepublic Boolean call(){long sleepBackoff = 250L;while (poolState == POOL_NORMAL && shouldCreateAnotherConnection()) {final PoolEntry poolEntry = createPoolEntry();if (poolEntry != null) {connectionBag.add(poolEntry);logger.debug("{} - Added connection {}", poolName, poolEntry.connection);if (loggingPrefix != null) {logPoolState(loggingPrefix);}return Boolean.TRUE;}// failed to get connection from db, sleep and retryif (loggingPrefix != null) logger.debug("{} - Connection add failed, sleeping with backoff: {}ms", poolName, sleepBackoff);quietlySleep(sleepBackoff);sleepBackoff = Math.min(SECONDS.toMillis(10), Math.min(connectionTimeout, (long) (sleepBackoff * 1.5)));}// Pool is suspended or shutdown or at max sizereturn Boolean.FALSE;}
private synchronized boolean shouldCreateAnotherConnection() {return getTotalConnections() < config.getMaximumPoolSize() &&(connectionBag.getWaitingThreadCount() > 0 || getIdleConnections() < config.getMinimumIdle());}
判断是否应该添加连接 不能大于最大配置的连接数,并且有线程在等待获取连接,并且空闲得连接数小于最小得空闲数
创建新的连接
如果sharedList中也没有获取到空闲连接
do {final long start = currentTime();final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {return bagEntry;}timeout -= elapsedNanos(start);} while (timeout > 10_000);return null;
当前线程需要等待阻塞等到空闲连接
ProxyLeakTask
class ProxyLeakTask implements Runnable
{private static final Logger LOGGER = LoggerFactory.getLogger(ProxyLeakTask.class);static final ProxyLeakTask NO_LEAK;private ScheduledFuture<?> scheduledFuture;private String connectionName;private Exception exception;private String threadName; private boolean isLeaked;static{NO_LEAK = new ProxyLeakTask() {@Overridevoid schedule(ScheduledExecutorService executorService, long leakDetectionThreshold) {}@Overridepublic void run() {}@Overridepublic void cancel() {}};}ProxyLeakTask(final PoolEntry poolEntry){this.exception = new Exception("Apparent connection leak detected");this.threadName = Thread.currentThread().getName();this.connectionName = poolEntry.connection.toString();}private ProxyLeakTask(){}void schedule(ScheduledExecutorService executorService, long leakDetectionThreshold){scheduledFuture = executorService.schedule(this, leakDetectionThreshold, TimeUnit.MILLISECONDS);}/** {@inheritDoc} */@Overridepublic void run(){isLeaked = true;final StackTraceElement[] stackTrace = exception.getStackTrace(); final StackTraceElement[] trace = new StackTraceElement[stackTrace.length - 5];System.arraycopy(stackTrace, 5, trace, 0, trace.length);exception.setStackTrace(trace);LOGGER.warn("Connection leak detection triggered for {} on thread {}, stack trace follows", connectionName, threadName, exception);}void cancel(){scheduledFuture.cancel(false);if (isLeaked) {LOGGER.info("Previously reported leaked connection {} on thread {} was returned to the pool (unleaked)", connectionName, threadName);}}
}
leakDetectionThreshold 设定了连接泄露阀值10s,即当从连接池里取出连接超过10s不归还时,会warn Apparent connection leak detected,用来警告数据库连接的泄露风险。
在连接归还或被踢除时,会清除此调度任务,详见ProxyConnection。即在10s内归还连接,会清除调度任务。超过10s调度任务执行。
清除调度任务时,如果已经超过调度时间,会LOGGER.info(“Previously reported leaked connection {} was returned to the pool (unleaked)”, connectionName);。详见ProxyLeakTask#cancel。
这个case中最大的亮点是。Hikari warn日志中详细的打印出,存在连接泄露的代码行数,异常信息非常直白易懂。实现逻辑详见 ProxyLeakTask#ProxyLeakTask ProxyLeakTask#run。优秀的代码设计+不错的性能,也难怪SpringBoot2.0起默认数据库连接池从TomcatPool换到了「HikariCP」。
具体看如何实现如何精准打印堆栈的
ProxyLeakTask(final PoolEntry poolEntry){this.exception = new Exception("Apparent connection leak detected");this.threadName = Thread.currentThread().getName();this.connectionName = poolEntry.connection.toString();}
在构造ProxyLeakTask创建了exception,异常中
@Overridepublic void run(){isLeaked = true;final StackTraceElement[] stackTrace = exception.getStackTrace(); final StackTraceElement[] trace = new StackTraceElement[stackTrace.length - 5];System.arraycopy(stackTrace, 5, trace, 0, trace.length);exception.setStackTrace(trace);LOGGER.warn("Connection leak detection triggered for {} on thread {}, stack trace follows", connectionName, threadName, exception);}
可以看出来,减了5层的堆栈,刚好就是查询处的堆栈
handoffQueue
SynchronousQueue<T> handoffQueue;
我们知道SynchronousQueue是一个比较特殊的队列 投放任务的生产者在没用和消费者匹配之前,生产者线程会被阻塞。生产者和消费者配一对一通信。
这里,getConection充当一个消费者的角色,去尝试取出数据
至于怎么放入数据,我们后面继续看。
返回一个来连接后,对原生数据库连接做扩展,
return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);
Connection createProxyConnection(final ProxyLeakTask leakTask, final long now){return ProxyFactory.getProxyConnection(this, connection, openStatements, leakTask, now, isReadOnly, isAutoCommit);}
打开ProxyFactory.getProxyConnection这个方法看起来很有意思
static ProxyConnection getProxyConnection(final PoolEntry poolEntry, final Connection connection, final FastList<Statement> openStatements, final ProxyLeakTask leakTask, final long now, final boolean isReadOnly, final boolean isAutoCommit){// Body is replaced (injected) by JavassistProxyFactorythrow new IllegalStateException("You need to run the CLI build and you need target/classes in your classpath to run.");}
发现他是一个空的实现,直接抛出异常并且注释Body is replaced (injected) by JavassistProxyFactory
而且它也没用子类实现,那么如何实现创建对应的conneection对象的呢。
当我们打开这个类就明白了
case "getProxyConnection":method.setBody("{return new " + packageName + ".HikariProxyConnection($$);}");break;
利用 javaasist直接修改ProxyFactory的字节码,
这样获取的连接被ProxyConnection包裹扩展
ProxyConnection
public abstract class ProxyConnection implements Connection
{static final int DIRTY_BIT_READONLY = 0b000001;static final int DIRTY_BIT_AUTOCOMMIT = 0b000010;static final int DIRTY_BIT_ISOLATION = 0b000100;static final int DIRTY_BIT_CATALOG = 0b001000;static final int DIRTY_BIT_NETTIMEOUT = 0b010000;static final int DIRTY_BIT_SCHEMA = 0b100000;private static final Logger LOGGER;private static final Set<String> ERROR_STATES;private static final Set<Integer> ERROR_CODES;@SuppressWarnings("WeakerAccess")protected Connection delegate;private final PoolEntry poolEntry;private final ProxyLeakTask leakTask;private final FastList<Statement> openStatements;private int dirtyBits;private long lastAccess;private boolean isCommitStateDirty;private boolean isReadOnly;private boolean isAutoCommit;private int networkTimeout;private int transactionIsolation;private String dbcatalog;private String dbschema;
}
其内部也维护了连接的一些信息。
在使用spring事务的情况下获取一个连接后
org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin
@Overrideprotected void doBegin(Object transaction, TransactionDefinition definition) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;Connection con = null;try {if (!txObject.hasConnectionHolder() ||txObject.getConnectionHolder().isSynchronizedWithTransaction()) {Connection newCon = obtainDataSource().getConnection();if (logger.isDebugEnabled()) {logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");}txObject.setConnectionHolder(new ConnectionHolder(newCon), true);}txObject.getConnectionHolder().setSynchronizedWithTransaction(true);con = txObject.getConnectionHolder().getConnection();Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);txObject.setPreviousIsolationLevel(previousIsolationLevel);txObject.setReadOnly(definition.isReadOnly());// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,// so we don't want to do it unnecessarily (for example if we've explicitly// configured the connection pool to set it already).if (con.getAutoCommit()) {txObject.setMustRestoreAutoCommit(true);if (logger.isDebugEnabled()) {logger.debug("Switching JDBC Connection [" + con + "] to manual commit");}con.setAutoCommit(false);}prepareTransactionalConnection(con, definition);txObject.getConnectionHolder().setTransactionActive(true);int timeout = determineTimeout(definition);if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {txObject.getConnectionHolder().setTimeoutInSeconds(timeout);}// Bind the connection holder to the thread.if (txObject.isNewConnectionHolder()) {TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());}}catch (Throwable ex) {if (txObject.isNewConnectionHolder()) {DataSourceUtils.releaseConnection(con, obtainDataSource());txObject.setConnectionHolder(null, false);}throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);}}
会根据事务注解设置事务自动提交,隔离级别,是否为只读连接信息。
这样spring创建了他的事务
事务提交方法
@Overridepublic void commit() throws SQLException{//实际的数据库连接delegate.commit();//设置状态isCommitStateDirty = false;lastAccess = currentTime();}
关闭连接方法
@Overridepublic final void close() throws SQLException{// Closing statements can cause connection eviction, so this must run before the conditional belowcloseStatements();if (delegate != ClosedConnection.CLOSED_CONNECTION) {leakTask.cancel();try {if (isCommitStateDirty && !isAutoCommit) {delegate.rollback();lastAccess = currentTime();LOGGER.debug("{} - Executed rollback on connection {} due to dirty commit state on close().", poolEntry.getPoolName(), delegate);}if (dirtyBits != 0) {poolEntry.resetConnectionState(this, dirtyBits);lastAccess = currentTime();}delegate.clearWarnings();}catch (SQLException e) {// when connections are aborted, exceptions are often thrown that should not reach the applicationif (!poolEntry.isMarkedEvicted()) {throw checkException(e);}}finally {delegate = ClosedConnection.CLOSED_CONNECTION;poolEntry.recycle(lastAccess);}}}
首先关闭此连接创建的Statement
com.zaxxer.hikari.pool.ProxyConnection#closeStatements
private synchronized void closeStatements(){final int size = openStatements.size();if (size > 0) {for (int i = 0; i < size && delegate != ClosedConnection.CLOSED_CONNECTION; i++) {try (Statement ignored = openStatements.get(i)) {// automatic resource cleanup}catch (SQLException e) {LOGGER.warn("{} - Connection {} marked as broken because of an exception closing open statements during Connection.close()",poolEntry.getPoolName(), delegate);leakTask.cancel();poolEntry.evict("(exception closing Statements during Connection.close())");delegate = ClosedConnection.CLOSED_CONNECTION;}}openStatements.clear();}}
这里她利用了try-resources-catch来释放资源
leakTask是一个打印异常日志的定时任务
@Overridepublic void run(){isLeaked = true;final StackTraceElement[] stackTrace = exception.getStackTrace(); final StackTraceElement[] trace = new StackTraceElement[stackTrace.length - 5];System.arraycopy(stackTrace, 5, trace, 0, trace.length);exception.setStackTrace(trace);LOGGER.warn("Connection leak detection triggered for {} on thread {}, stack trace follows", connectionName, threadName, exception);}
如果关闭stateMent失败,那么直接关闭这个数据连接
poolEntry.evict("(exception closing Statements during Connection.close())");
事务执行完毕后需要恢复这个连接原本的状态
finally {cleanupAfterCompletion(status);}
org.springframework.jdbc.datasource.DataSourceTransactionManager#doCleanupAfterCompletion
protected void doCleanupAfterCompletion(Object transaction) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;// Remove the connection holder from the thread, if exposed.if (txObject.isNewConnectionHolder()) {TransactionSynchronizationManager.unbindResource(obtainDataSource());}// Reset connection.Connection con = txObject.getConnectionHolder().getConnection();try {if (txObject.isMustRestoreAutoCommit()) {con.setAutoCommit(true);}DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel(), txObject.isReadOnly());}catch (Throwable ex) {logger.debug("Could not reset JDBC Connection after transaction", ex);}if (txObject.isNewConnectionHolder()) {if (logger.isDebugEnabled()) {logger.debug("Releasing JDBC Connection [" + con + "] after transaction");}DataSourceUtils.releaseConnection(con, this.dataSource);}txObject.getConnectionHolder().clear();}
把autoConmit和隔离级别恢复成连接之前的状态。
回头看close方法
isCommitStateDirty代表事务未提交时有更新操作
如果有更新操作,并且在事务中, 需要回滚
if (isCommitStateDirty && !isAutoCommit) {delegate.rollback();lastAccess = currentTime();LOGGER.debug("{} - Executed rollback on connection {} due to dirty commit state on close().", poolEntry.getPoolName(), delegate);}
关闭连接
finally {delegate = ClosedConnection.CLOSED_CONNECTION;poolEntry.recycle(lastAccess);}
static final Connection CLOSED_CONNECTION = getClosedConnection();private static Connection getClosedConnection(){InvocationHandler handler = (proxy, method, args) -> {final String methodName = method.getName();if ("isClosed".equals(methodName)) {return Boolean.TRUE;}else if ("isValid".equals(methodName)) {return Boolean.FALSE;}if ("abort".equals(methodName)) {return Void.TYPE;}if ("close".equals(methodName)) {return Void.TYPE;}else if ("toString".equals(methodName)) {return ClosedConnection.class.getCanonicalName();}throw new SQLException("Connection is closed");};return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), new Class[] { Connection.class }, handler);}
先设置一个空的连接,用代理返回一个不合法的连接
进入recycle循环利用
void recycle(final PoolEntry poolEntry){metricsTracker.recordConnectionUsage(poolEntry);connectionBag.requite(poolEntry);}
public void requite(final T bagEntry){bagEntry.setState(STATE_NOT_IN_USE);for (int i = 0; waiters.get() > 0; i++) {if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {return;}else if ((i & 0xff) == 0xff) {parkNanos(MICROSECONDS.toNanos(10));}else {Thread.yield();}}final List<Object> threadLocalList = threadList.get();if (threadLocalList.size() < 50) {threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);}}
1.设置为为未使用状态
- 如果等待连接的线程数大于 0,放连接到handoffQueue中,刚好与获取连接的线程配对
handoffQueue.offer(bagEntry)
- 如果threadList大小小于50,放进去
threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
weakThreadLocals可以根据配置是否设置为弱引用(发生gc就回收)
数据库连接池的三级缓存
三级缓存threadList
本地线程存储使用,回收的连接,最多存储50个,在真正关闭连接时移除
三级缓存线程隔离,数据无需同步,速度最快
二级缓存sharedList
在创建新的连接就放入,在移除连接时取出
sharedList是线程共享的集合对象,这里使用的类型是CopyOnWriteArrayList
对于CopyOnWriteArrayList需要掌握以下几点
创建:CopyOnWriteArrayList()
添加元素:即add(E)方法
获取单个对象:即get(int)方法
删除对象:即remove(E)方法
遍历所有对象:即iterator(),在实际中更常用的是增强型的for循环去做遍历
注:CopyOnWriteArrayList是一个线程安全,读操作时无锁的ArrayList。
CopyOnWriteArrayList的创建
public CopyOnWriteArrayList()
使用方法:
List list = new CopyOnWriteArrayList();
-
CopyOnWriteArrayList(写数组的拷贝)是ArrayList的一个线程安全的变体,CopyOnWriteArrayList和CopyOnWriteSet都是线程安全的集合,其中所有可变操作(add、set等等)都是通过对底层数组进行一次新的复制来实现的。
-
它绝对不会抛出ConcurrentModificationException的异常。因为该列表(CopyOnWriteArrayList)在遍历时将不会被做任何的修改。
-
CopyOnWriteArrayList适合用在“读多,写少”的“并发”应用中,换句话说,它适合使用在读操作远远大于写操作的场景里,比如缓存。它不存在“扩容”的概念,每次写操作(add or remove)都要copy一个副本,在副本的基础上修改后改变array引用,所以称为“CopyOnWrite”,因此在写操作是加锁,并且对整个list的copy操作时相当耗时的,过多的写操作不推荐使用该存储结构。
-
CopyOnWriteArrayList的功能是是创建一个列表,有三种构造方法:
(1)CopyOnWriteArrayList ()创建一个空列表。
(2)CopyOnWriteArrayList (Collection<? extendsE> c)
创建一个按 collection的迭代器返回元素的顺序包含指定 collection元素的列表。
(3)CopyOnWriteArrayList(E[] toCopyIn)
创建一个保存给定数组的副本的列表
CopyOnWriteArrayList虽然写时使用了同步锁但是读无锁,在读多的场景下效率高。
一级缓存handoffQueue
前面介绍过这是一个同步队列
获取连接的消费者线程,与关闭连接时循环使用的的生产者线程配对
使用同步,消费者线程会阻塞,效率最低
思考为什么会发生数据库连接开启事务导致锁死行纪录
数据库连接建立后,是一个长连接在数据库与客户端之间
如果客户端在未回滚的情况下断开连接,数据库服务器感知到,会回滚该连接事务并移除连接
如果客户端非正常断开连接,不是关闭线程等操作,而是突然宕机,断网,导致与服务器的连接变为死连接,如果不设置事务超时时间,那么这个事务一直存在,一直不回滚,导致锁死行记录,因此设置事务超时时间是有必要的。