摘要:調用計算的時間,這個方法會清理移除并過期的連接除了清理過期的連接外,還通過間接觸發,去清理關閉或異常的連接
序
本文主要研究一下jdk httpclient的ConnectionPool
HttpConnection HttpConnection.getConnectionjava.net.http/jdk/internal/net/http/HttpConnection.java
/** * Factory for retrieving HttpConnections. A connection can be retrieved * from the connection pool, or a new one created if none available. * * The given {@code addr} is the ultimate destination. Any proxies, * etc, are determined from the request. Returns a concrete instance which * is one of the following: * {@link PlainHttpConnection} * {@link PlainTunnelingConnection} * * The returned connection, if not from the connection pool, must have its, * connect() or connectAsync() method invoked, which ( when it completes * successfully ) renders the connection usable for requests. */ public static HttpConnection getConnection(InetSocketAddress addr, HttpClientImpl client, HttpRequestImpl request, Version version) { // The default proxy selector may select a proxy whose address is // unresolved. We must resolve the address before connecting to it. InetSocketAddress proxy = Utils.resolveAddress(request.proxy()); HttpConnection c = null; boolean secure = request.secure(); ConnectionPool pool = client.connectionPool(); if (!secure) { c = pool.getConnection(false, addr, proxy); if (c != null && c.isOpen() /* may have been eof/closed when in the pool */) { final HttpConnection conn = c; if (DEBUG_LOGGER.on()) DEBUG_LOGGER.log(conn.getConnectionFlow() + ": plain connection retrieved from HTTP/1.1 pool"); return c; } else { return getPlainConnection(addr, proxy, request, client); } } else { // secure if (version != HTTP_2) { // only HTTP/1.1 connections are in the pool c = pool.getConnection(true, addr, proxy); } if (c != null && c.isOpen()) { final HttpConnection conn = c; if (DEBUG_LOGGER.on()) DEBUG_LOGGER.log(conn.getConnectionFlow() + ": SSL connection retrieved from HTTP/1.1 pool"); return c; } else { String[] alpn = null; if (version == HTTP_2 && hasRequiredHTTP2TLSVersion(client)) { alpn = new String[] { "h2", "http/1.1" }; } return getSSLConnection(addr, proxy, alpn, request, client); } } }
這里非https、https1.1的,走pool.getConnection(true, addr, proxy)
HttpConnection.closeOrReturnToCachejava.net.http/jdk/internal/net/http/HttpConnection.java
void closeOrReturnToCache(HttpHeaders hdrs) { if (hdrs == null) { // the connection was closed by server, eof close(); return; } if (!isOpen()) { return; } HttpClientImpl client = client(); if (client == null) { close(); return; } ConnectionPool pool = client.connectionPool(); boolean keepAlive = hdrs.firstValue("Connection") .map((s) -> !s.equalsIgnoreCase("close")) .orElse(true); if (keepAlive) { Log.logTrace("Returning connection to the pool: {0}", this); pool.returnToPool(this); } else { close(); } }
調用pool.returnToPool(this)歸還連接
ConnectionPooljava.net.http/jdk/internal/net/http/ConnectionPool.java
/** * Http 1.1 connection pool. */ final class ConnectionPool { static final long KEEP_ALIVE = Utils.getIntegerNetProperty( "jdk.httpclient.keepalive.timeout", 1200); // seconds static final long MAX_POOL_SIZE = Utils.getIntegerNetProperty( "jdk.httpclient.connectionPoolSize", 0); // unbounded final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); // Pools of idle connections private final HashMap> plainPool; private final HashMap > sslPool; private final ExpiryList expiryList; private final String dbgTag; // used for debug boolean stopped; //...... /** * Entries in connection pool are keyed by destination address and/or * proxy address: * case 1: plain TCP not via proxy (destination only) * case 2: plain TCP via proxy (proxy only) * case 3: SSL not via proxy (destination only) * case 4: SSL over tunnel (destination and proxy) */ static class CacheKey { final InetSocketAddress proxy; final InetSocketAddress destination; CacheKey(InetSocketAddress destination, InetSocketAddress proxy) { this.proxy = proxy; this.destination = destination; } @Override public boolean equals(Object obj) { if (obj == null) { return false; } if (getClass() != obj.getClass()) { return false; } final CacheKey other = (CacheKey) obj; if (!Objects.equals(this.proxy, other.proxy)) { return false; } if (!Objects.equals(this.destination, other.destination)) { return false; } return true; } @Override public int hashCode() { return Objects.hash(proxy, destination); } } synchronized HttpConnection getConnection(boolean secure, InetSocketAddress addr, InetSocketAddress proxy) { if (stopped) return null; CacheKey key = new CacheKey(addr, proxy); HttpConnection c = secure ? findConnection(key, sslPool) : findConnection(key, plainPool); //System.out.println ("getConnection returning: " + c); return c; } private HttpConnection findConnection(CacheKey key, HashMap > pool) { LinkedList l = pool.get(key); if (l == null || l.isEmpty()) { return null; } else { HttpConnection c = l.removeFirst(); expiryList.remove(c); return c; } } /** * Returns the connection to the pool. */ void returnToPool(HttpConnection conn) { returnToPool(conn, Instant.now(), KEEP_ALIVE); } // Called also by whitebox tests void returnToPool(HttpConnection conn, Instant now, long keepAlive) { // Don"t call registerCleanupTrigger while holding a lock, // but register it before the connection is added to the pool, // since we don"t want to trigger the cleanup if the connection // is not in the pool. CleanupTrigger cleanup = registerCleanupTrigger(conn); // it"s possible that cleanup may have been called. HttpConnection toClose = null; synchronized(this) { if (cleanup.isDone()) { return; } else if (stopped) { conn.close(); return; } if (MAX_POOL_SIZE > 0 && expiryList.size() >= MAX_POOL_SIZE) { toClose = expiryList.removeOldest(); if (toClose != null) removeFromPool(toClose); } if (conn instanceof PlainHttpConnection) { putConnection(conn, plainPool); } else { assert conn.isSecure(); putConnection(conn, sslPool); } expiryList.add(conn, now, keepAlive); } if (toClose != null) { if (debug.on()) { debug.log("Maximum pool size reached: removing oldest connection %s", toClose.dbgString()); } close(toClose); } //System.out.println("Return to pool: " + conn); } private void removeFromPool(HttpConnection c) { assert Thread.holdsLock(this); if (c instanceof PlainHttpConnection) { removeFromPool(c, plainPool); } else { assert c.isSecure(); removeFromPool(c, sslPool); } } private boolean removeFromPool(HttpConnection c, HashMap > pool) { //System.out.println("cacheCleaner removing: " + c); assert Thread.holdsLock(this); CacheKey k = c.cacheKey(); List l = pool.get(k); if (l == null || l.isEmpty()) { pool.remove(k); return false; } return l.remove(c); } private void putConnection(HttpConnection c, HashMap > pool) { CacheKey key = c.cacheKey(); LinkedList l = pool.get(key); if (l == null) { l = new LinkedList<>(); pool.put(key, l); } l.add(c); } void stop() { List closelist = Collections.emptyList(); try { synchronized (this) { stopped = true; closelist = expiryList.stream() .map(e -> e.connection) .collect(Collectors.toList()); expiryList.clear(); plainPool.clear(); sslPool.clear(); } } finally { closelist.forEach(this::close); } } }
借用連接調用getConnection方法,最后是調用findConnection方法,從LinkedList
歸還連接調用returnToPool方法,如果當前expiryList超出MAX_POOL_SIZE,則移除掉最老的一個,再將其從ExpiryList、HashMap
可以看見ConnectionPool維護了HashMap
MAX_POOL_SIZE讀取的是jdk.httpclient.connectionPoolSize,讀取不到默認為0,表示無限
ConnectionPool有個stop方法,在HttpClient的stop時候調用(SelectorManager線程退出時觸發),stop方法會清除連接池并關閉連接
ExpiryListjava.net.http/jdk/internal/net/http/ConnectionPool.java
/** * Manages a LinkedList of sorted ExpiryEntry. The entry with the closer * deadline is at the tail of the list, and the entry with the farther * deadline is at the head. In the most common situation, new elements * will need to be added at the head (or close to it), and expired elements * will need to be purged from the tail. */ private static final class ExpiryList { private final LinkedListlist = new LinkedList<>(); private volatile boolean mayContainEntries; int size() { return list.size(); } // A loosely accurate boolean whose value is computed // at the end of each operation performed on ExpiryList; // Does not require synchronizing on the ConnectionPool. boolean purgeMaybeRequired() { return mayContainEntries; } // Returns the next expiry deadline // should only be called while holding a synchronization // lock on the ConnectionPool Optional nextExpiryDeadline() { if (list.isEmpty()) return Optional.empty(); else return Optional.of(list.getLast().expiry); } // should only be called while holding a synchronization // lock on the ConnectionPool HttpConnection removeOldest() { ExpiryEntry entry = list.pollLast(); return entry == null ? null : entry.connection; } // should only be called while holding a synchronization // lock on the ConnectionPool void add(HttpConnection conn) { add(conn, Instant.now(), KEEP_ALIVE); } // Used by whitebox test. void add(HttpConnection conn, Instant now, long keepAlive) { Instant then = now.truncatedTo(ChronoUnit.SECONDS) .plus(keepAlive, ChronoUnit.SECONDS); // Elements with the farther deadline are at the head of // the list. It"s more likely that the new element will // have the farthest deadline, and will need to be inserted // at the head of the list, so we"re using an ascending // list iterator to find the right insertion point. ListIterator li = list.listIterator(); while (li.hasNext()) { ExpiryEntry entry = li.next(); if (then.isAfter(entry.expiry)) { li.previous(); // insert here li.add(new ExpiryEntry(conn, then)); mayContainEntries = true; return; } } // last (or first) element of list (the last element is // the first when the list is empty) list.add(new ExpiryEntry(conn, then)); mayContainEntries = true; } // should only be called while holding a synchronization // lock on the ConnectionPool void remove(HttpConnection c) { if (c == null || list.isEmpty()) return; ListIterator li = list.listIterator(); while (li.hasNext()) { ExpiryEntry e = li.next(); if (e.connection.equals(c)) { li.remove(); mayContainEntries = !list.isEmpty(); return; } } } // should only be called while holding a synchronization // lock on the ConnectionPool. // Purge all elements whose deadline is before now (now included). List purgeUntil(Instant now) { if (list.isEmpty()) return Collections.emptyList(); List closelist = new ArrayList<>(); // elements with the closest deadlines are at the tail // of the queue, so we"re going to use a descending iterator // to remove them, and stop when we find the first element // that has not expired yet. Iterator li = list.descendingIterator(); while (li.hasNext()) { ExpiryEntry entry = li.next(); // use !isAfter instead of isBefore in order to // remove the entry if its expiry == now if (!entry.expiry.isAfter(now)) { li.remove(); HttpConnection c = entry.connection; closelist.add(c); } else break; // the list is sorted } mayContainEntries = !list.isEmpty(); return closelist; } // should only be called while holding a synchronization // lock on the ConnectionPool java.util.stream.Stream stream() { return list.stream(); } // should only be called while holding a synchronization // lock on the ConnectionPool void clear() { list.clear(); mayContainEntries = false; } } static final class ExpiryEntry { final HttpConnection connection; final Instant expiry; // absolute time in seconds of expiry time ExpiryEntry(HttpConnection connection, Instant expiry) { this.connection = connection; this.expiry = expiry; } }
ExpiryList內部使用了LinkedList
ExpiryEntry里頭除了HttpConnection,還維護了expiry時間,表示該連接的失效時間
對ExpiryList的添加操作是根據當前時間的秒數+KEEP_ALIVE參數計算出expiry時間,KEEP_ALIVE讀取的是jdk.httpclient.keepalive.timeout,讀取不到默認是1200秒;之后根據失效時間插入到LinkedList
對ExpiryList的移除操作有兩類,一類是移除最老的,通過pollLast操作完成,一類是移除指定連接,即使用ListIterator遍歷LinkedList
這里維護了mayContainEntries變量,在LinkedList
java.net.http/jdk/internal/net/http/ConnectionPool.java
/** * Purge expired connection and return the number of milliseconds * in which the next connection is scheduled to expire. * If no connections are scheduled to be purged return 0. * @return the delay in milliseconds in which the next connection will * expire. */ long purgeExpiredConnectionsAndReturnNextDeadline() { if (!expiryList.purgeMaybeRequired()) return 0; return purgeExpiredConnectionsAndReturnNextDeadline(Instant.now()); } // Used for whitebox testing long purgeExpiredConnectionsAndReturnNextDeadline(Instant now) { long nextPurge = 0; // We may be in the process of adding new elements // to the expiry list - but those elements will not // have outlast their keep alive timer yet since we"re // just adding them. if (!expiryList.purgeMaybeRequired()) return nextPurge; Listcloselist; synchronized (this) { closelist = expiryList.purgeUntil(now); for (HttpConnection c : closelist) { if (c instanceof PlainHttpConnection) { boolean wasPresent = removeFromPool(c, plainPool); assert wasPresent; } else { boolean wasPresent = removeFromPool(c, sslPool); assert wasPresent; } } nextPurge = now.until( expiryList.nextExpiryDeadline().orElse(now), ChronoUnit.MILLIS); } closelist.forEach(this::close); return nextPurge; }
由于ExpiryList的connection具有失效時間,因而存在清理失效連接的步驟,這個步驟是通過purgeExpiredConnectionsAndReturnNextDeadline來完成
purgeExpiredConnectionsAndReturnNextDeadline方法被SelectorManager調用,用于計算selector.select的timeout時間
該方法首先調用expiryList.purgeMaybeRequired()訪問mayContainEntries,看expiryList有無連接,沒有連接直接返回0;之后調用expiryList.purgeUntil(now)移除并獲取目前過期的連接,然后挨個從HashMap
java.net.http/jdk/internal/net/http/ConnectionPool.java
private CleanupTrigger registerCleanupTrigger(HttpConnection conn) { // Connect the connection flow to a pub/sub pair that will take the // connection out of the pool and close it if anything happens // while the connection is sitting in the pool. CleanupTrigger cleanup = new CleanupTrigger(conn); FlowTube flow = conn.getConnectionFlow(); if (debug.on()) debug.log("registering %s", cleanup); flow.connectFlows(cleanup, cleanup); return cleanup; } void cleanup(HttpConnection c, Throwable error) { if (debug.on()) debug.log("%s : ConnectionPool.cleanup(%s)", String.valueOf(c.getConnectionFlow()), error); synchronized(this) { removeFromPool(c); expiryList.remove(c); } c.close(); } /** * An object that subscribes to the flow while the connection is in * the pool. Anything that comes in will cause the connection to be closed * and removed from the pool. */ private final class CleanupTrigger implements FlowTube.TubeSubscriber, FlowTube.TubePublisher, Flow.Subscription { private final HttpConnection connection; private volatile boolean done; public CleanupTrigger(HttpConnection connection) { this.connection = connection; } public boolean isDone() { return done;} private void triggerCleanup(Throwable error) { done = true; cleanup(connection, error); } @Override public void request(long n) {} @Override public void cancel() {} @Override public void onSubscribe(Flow.Subscription subscription) { subscription.request(1); } @Override public void onError(Throwable error) { triggerCleanup(error); } @Override public void onComplete() { triggerCleanup(null); } @Override public void onNext(Listitem) { triggerCleanup(new IOException("Data received while in pool")); } @Override public void subscribe(Flow.Subscriber super List > subscriber) { subscriber.onSubscribe(this); } @Override public String toString() { return "CleanupTrigger(" + connection.getConnectionFlow() + ")"; } }
在調用returnToPool的時候,會調用registerCleanupTrigger,創建一個CleanupTrigger,然后調用conn.getConnectionFlow()獲取flow,再調用flow.connectFlows(cleanup, cleanup)
CleanupTrigger既是FlowTube.TubeSubscriber也是FlowTube.TubePublisher,在onComplete及onError方法里頭調用了cleanup方法,將連接從HashMap
這個CleanupTrigger的功能可能類似于主動式的連接健康檢查,在底層連接發生異常關閉的時候,通知到連接池這邊,觸發清理這些臟的連接
小結jdk httpclient的ConnectionPool相對于apache common pools而言比較簡單,有幾個參數(實際作用于ExpiryList):
MAX_POOL_SIZE(jdk.httpclient.connectionPoolSize),默認為0,表示無限
KEEP_ALIVE(jdk.httpclient.keepalive.timeout),默認是1200秒
ConnectionPool同時維護了兩個屬性:HashMap
SelectorManager調用purgeExpiredConnectionsAndReturnNextDeadline計算select的timeout時間,這個方法會清理(移除并close)過期的連接
除了SelectorManager清理過期的連接外,connection還通過FlowTube間接觸發CleanupTrigger,去清理關閉或異常的連接
docjava.net.http javadoc
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/77366.html
摘要:序本文主要研究一下的這里如果的為,則會創建這里如果是的話,參數傳遞的是如果是同步的方法,則傳的值是這里創建了一個,然后調用,這里使用了可以看到這里使用的是的方法注意這個方法是才有的,也是在這里使用的由于默認是使用創建的, 序 本文主要研究一下jdk httpclient的executor HttpClientImpl java.net.http/jdk/internal/net/htt...
摘要:在中也可以直接使用返回的是,然后通過來獲取結果阻塞線程,從中獲取結果四一點嘮叨非常的年輕,網絡資料不多,且代碼非常精細和復雜,目前來看底層應該是使用了線程池搭配進行異步通訊。 零 前期準備 0 版本 JDK 版本 : OpenJDK 11.0.1 IDE : idea 2018.3 1 HttpClient 簡介 java.net.http.HttpClient 是 jdk11 中正式...
摘要:序本文主要研究一下的異常實例代碼異常日志如下最后調用這里調用獲取連接如果沒有連接會新創建一個,走的是這里先是調用了獲取連接,然后調用進行連接這里委托給這里如果有設置的話,則會創建一個調用進行連接,如果連接未 序 本文主要研究一下httpclient的connect timeout異常 實例代碼 @Test public void testConnectTimeout()...
摘要:序本文主要研究一下的參數這里有一個類型的變量,用來記錄請求次數另外還有一個,讀取的是值,讀取不到默認取,為進入該方法的時候,調用,遞增請求次數,然后判斷有無超出限制,有則返回帶有異常的,即通過返回如果沒有超出限制,但是執行請求失敗,則 序 本文主要研究一下jdk httpclient的retry參數 DEFAULT_MAX_ATTEMPTS java.net.http/jdk/inte...
摘要:首先先解讀下這個報警內容,原因活躍線程數過多,是監聽的端口號用來獲取虛擬機各項信息,代表著此時的線程數,是設置的報警閾值。 前言 前天,一位21世紀的好好青年正在工位上默念社會主義大法好的時候,釘釘上又報警了(公司項目接入了open-faclon監控,指標不正常會報警給釘釘的機器人),無奈默默流淚揮手告別社會主義大法開始定位線上問題。 報警內容 首先我們先來看下報警信息,為防止泄露公...
閱讀 1365·2021-10-09 09:44
閱讀 1444·2021-09-28 09:36
閱讀 15986·2021-09-22 15:55
閱讀 1248·2021-09-22 15:45
閱讀 2205·2021-09-02 09:48
閱讀 2789·2019-08-29 17:19
閱讀 2301·2019-08-29 10:54
閱讀 915·2019-08-23 18:40