From 35d3e761f5852c2ac2ab632797341cf7192aabbd Mon Sep 17 00:00:00 2001 From: Roland Praml Date: Wed, 26 Mar 2025 08:48:03 +0100 Subject: [PATCH 01/12] Refactored FreeConnectionBuffer to use custom linked list --- .../datasource/pool/FreeConnectionBuffer.java | 106 +++++++++++++++--- 1 file changed, 90 insertions(+), 16 deletions(-) diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/FreeConnectionBuffer.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/FreeConnectionBuffer.java index 73ba836..cfe31fe 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/FreeConnectionBuffer.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/FreeConnectionBuffer.java @@ -10,45 +10,52 @@ */ final class FreeConnectionBuffer { - /** - * Buffer oriented for add and remove. - */ - private final LinkedList freeBuffer = new LinkedList<>(); + + private final Node free = Node.init(); + + int size = 0; /** * Return the number of entries in the buffer. */ int size() { - return freeBuffer.size(); + return size; } /** * Return true if the buffer is empty. */ boolean isEmpty() { - return freeBuffer.isEmpty(); + return size == 0; } /** * Add connection to the free list. */ void add(PooledConnection pc) { - freeBuffer.addFirst(pc); + new Node(pc).addAfter(free); + size++; } /** * Remove a connection from the free list. */ PooledConnection remove() { - return freeBuffer.removeFirst(); + Node node = free.next; + node.remove(); + size--; + return node.pc; } /** * Close all connections in this buffer. */ void closeAll(boolean logErrors) { - List tempList = new ArrayList<>(freeBuffer); - freeBuffer.clear(); + List tempList = new ArrayList<>(); + while (size > 0) { + tempList.add(remove()); + } + if (Log.isLoggable(System.Logger.Level.TRACE)) { Log.trace("... closing all {0} connections from the free list with logErrors: {1}", tempList.size(), logErrors); } @@ -62,15 +69,82 @@ void closeAll(boolean logErrors) { */ int trim(int minSize, long usedSince, long createdSince) { int trimCount = 0; - ListIterator iterator = freeBuffer.listIterator(minSize); - while (iterator.hasNext()) { - PooledConnection pooledConnection = iterator.next(); - if (pooledConnection.shouldTrim(usedSince, createdSince)) { - iterator.remove(); - pooledConnection.closeConnectionFully(true); + Node node = free; // first boundary node + do { + node = node.next; + } while (!node.isBoundaryNode() && minSize-- > 0); + + while (!node.isBoundaryNode()) { + Node current = node; + node = node.next; + if (current.pc.shouldTrim(usedSince, createdSince)) { + current.remove(); + size--; + current.pc.closeConnectionFully(true); trimCount++; } } return trimCount; } + + /** + * Node of a linkedlist. The linkedLists always have two empty nodes at the start and end. + * (boundary nodes) They are generated with the init() method. + *

+ * the first usable node is startNode.next (which could be the end edge) + */ + static final class Node { + + private Node next; + private Node prev; + final PooledConnection pc; + + private Node(PooledConnection pc) { + this.pc = pc; + } + + /** + * Creates new "list" with two empty boundary nodes + */ + public static Node init() { + Node node1 = new Node(null); + Node node2 = new Node(null); + node1.next = node2; + node2.prev = node1; + return node1; + } + + /** + * Retruns true, if this is a boundary node. (start or end node of list) + */ + private boolean isBoundaryNode() { + return pc == null; + } + + /** + * Removes the node from the list. The node can be re-added to an other list + */ + private void remove() { + assert pc != null : "called remove a boundary node"; + assert prev != null && next != null : "not part of a list"; + next.prev = prev; + prev.next = next; + prev = null; + next = null; + } + + /** + * Adds this after node. + *

+ * Node is in most cases a boundary node (e.g. start of list) + */ + public void addAfter(Node node) { + assert !this.isBoundaryNode() : "this is a boundary node"; + assert next == null & prev == null : "Node already member of a list"; + next = node.next; + prev = node; + node.next.prev = this; + node.next = this; + } + } } From 357ab8775814f744b729fa7c23142428d7932c02 Mon Sep 17 00:00:00 2001 From: Roland Praml Date: Tue, 25 Mar 2025 08:04:06 +0100 Subject: [PATCH 02/12] Renames - minimal functional/logical code changes --- ...ctionBuffer.java => ConnectionBuffer.java} | 39 ++++++----- .../pool/PooledConnectionQueue.java | 29 ++++---- ...ferTest.java => ConnectionBufferTest.java} | 66 +++++++++---------- 3 files changed, 70 insertions(+), 64 deletions(-) rename ebean-datasource/src/main/java/io/ebean/datasource/pool/{FreeConnectionBuffer.java => ConnectionBuffer.java} (81%) rename ebean-datasource/src/test/java/io/ebean/datasource/pool/{FreeConnectionBufferTest.java => ConnectionBufferTest.java} (60%) diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/FreeConnectionBuffer.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java similarity index 81% rename from ebean-datasource/src/main/java/io/ebean/datasource/pool/FreeConnectionBuffer.java rename to ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java index cfe31fe..cf414a6 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/FreeConnectionBuffer.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java @@ -3,57 +3,62 @@ import java.util.*; /** - * A buffer designed especially to hold free pooled connections. + * A buffer designed especially to hold pooled connections (free and busy ones) *

* All thread safety controlled externally (by PooledConnectionQueue). *

*/ -final class FreeConnectionBuffer { +final class ConnectionBuffer { private final Node free = Node.init(); - int size = 0; + int freeSize = 0; /** * Return the number of entries in the buffer. */ - int size() { - return size; + int freeSize() { + return freeSize; } /** * Return true if the buffer is empty. */ - boolean isEmpty() { - return size == 0; + boolean hasFreeConnections() { + return freeSize > 0; } /** * Add connection to the free list. */ - void add(PooledConnection pc) { + void addFree(PooledConnection pc) { new Node(pc).addAfter(free); - size++; + freeSize++; } /** - * Remove a connection from the free list. + * Remove a connection from the free list. Returns null if there is not any. */ - PooledConnection remove() { + PooledConnection popFree () { Node node = free.next; + if (node.isBoundaryNode()) { + return null; + } node.remove(); - size--; + freeSize--; return node.pc; } /** - * Close all connections in this buffer. + * Close all connections in the free list. */ - void closeAll(boolean logErrors) { + void closeAllFree(boolean logErrors) { List tempList = new ArrayList<>(); - while (size > 0) { - tempList.add(remove()); + PooledConnection c = popFree(); + while (c != null) { + tempList.add(c); + c = popFree(); } if (Log.isLoggable(System.Logger.Level.TRACE)) { @@ -79,7 +84,7 @@ int trim(int minSize, long usedSince, long createdSince) { node = node.next; if (current.pc.shouldTrim(usedSince, createdSince)) { current.remove(); - size--; + freeSize--; current.pc.closeConnectionFully(true); trimCount++; } diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java index 7a66c98..1f3f187 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java @@ -20,7 +20,7 @@ final class PooledConnectionQueue { /** * A 'circular' buffer designed specifically for free connections. */ - private final FreeConnectionBuffer freeList; + private final ConnectionBuffer buffer; /** * A 'slots' buffer designed specifically for busy connections. * Fast add remove based on slot id. @@ -75,13 +75,13 @@ final class PooledConnectionQueue { this.maxAgeMillis = pool.maxAgeMillis(); this.validateStaleMillis = pool.validateStaleMillis(); this.busyList = new BusyConnectionBuffer(maxSize, 20); - this.freeList = new FreeConnectionBuffer(); + this.buffer = new ConnectionBuffer(); this.lock = new ReentrantLock(false); this.notEmpty = lock.newCondition(); } private PoolStatus createStatus() { - return new Status(minSize, maxSize, freeList.size(), busyList.size(), waitingThreads, highWaterMark, + return new Status(minSize, maxSize, buffer.freeSize(), busyList.size(), waitingThreads, highWaterMark, waitCount, hitCount, totalAcquireNanos, maxAcquireNanos); } @@ -126,7 +126,7 @@ void setMaxSize(int maxSize) { } private int totalConnections() { - return freeList.size() + busyList.size(); + return buffer.freeSize() + busyList.size(); } void ensureMinimumConnections() throws SQLException { @@ -135,7 +135,7 @@ void ensureMinimumConnections() throws SQLException { int add = minSize - totalConnections(); if (add > 0) { for (int i = 0; i < add; i++) { - freeList.add(pool.createConnectionForQueue(connectionId++)); + buffer.addFree(pool.createConnectionForQueue(connectionId++)); } notEmpty.signal(); } @@ -156,7 +156,7 @@ void returnPooledConnection(PooledConnection c, boolean forceClose) { if (forceClose || c.shouldTrimOnReturn(lastResetTime, maxAgeMillis)) { c.closeConnectionFully(false); } else { - freeList.add(c); + buffer.addFree(c); notEmpty.signal(); } } finally { @@ -165,10 +165,11 @@ void returnPooledConnection(PooledConnection c, boolean forceClose) { } private PooledConnection extractFromFreeList() { - if (freeList.isEmpty()) { + + PooledConnection c = buffer.popFree(); + if (c == null) { return null; } - final PooledConnection c = freeList.remove(); if (validateStaleMillis > 0 && staleEviction(c)) { c.closeConnectionFully(false); return null; @@ -291,7 +292,7 @@ private PooledConnection _obtainConnectionWaitLoop() throws SQLException, Interr try { nanos = notEmpty.awaitNanos(nanos); - if (!freeList.isEmpty()) { + if (buffer.hasFreeConnections()) { // successfully waited return extractFromFreeList(); } @@ -373,20 +374,20 @@ void trim(long maxInactiveMillis, long maxAgeMillis) { private boolean trimInactiveConnections(long maxInactiveMillis, long maxAgeMillis) { final long createdSince = (maxAgeMillis == 0) ? 0 : System.currentTimeMillis() - maxAgeMillis; final int trimmedCount; - if (freeList.size() > minSize) { + if (buffer.freeSize() > minSize) { // trim on maxInactive and maxAge long usedSince = System.currentTimeMillis() - maxInactiveMillis; - trimmedCount = freeList.trim(minSize, usedSince, createdSince); + trimmedCount = buffer.trim(minSize, usedSince, createdSince); } else if (createdSince > 0) { // trim only on maxAge - trimmedCount = freeList.trim(0, createdSince, createdSince); + trimmedCount = buffer.trim(0, createdSince, createdSince); } else { trimmedCount = 0; } if (trimmedCount > 0 && Log.isLoggable(DEBUG)) { Log.debug("DataSource [{0}] trimmed [{1}] inactive connections. New size[{2}]", name, trimmedCount, totalConnections()); } - return trimmedCount > 0 && freeList.size() < minSize; + return trimmedCount > 0 && buffer.freeSize() < minSize; } /** @@ -395,7 +396,7 @@ private boolean trimInactiveConnections(long maxInactiveMillis, long maxAgeMilli private void closeFreeConnections(boolean logErrors) { lock.lock(); try { - freeList.closeAll(logErrors); + buffer.closeAllFree(logErrors); } finally { lock.unlock(); } diff --git a/ebean-datasource/src/test/java/io/ebean/datasource/pool/FreeConnectionBufferTest.java b/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionBufferTest.java similarity index 60% rename from ebean-datasource/src/test/java/io/ebean/datasource/pool/FreeConnectionBufferTest.java rename to ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionBufferTest.java index c4ddcb7..62e0006 100644 --- a/ebean-datasource/src/test/java/io/ebean/datasource/pool/FreeConnectionBufferTest.java +++ b/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionBufferTest.java @@ -9,70 +9,70 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.*; -class FreeConnectionBufferTest { +class ConnectionBufferTest { @Test void test() { - FreeConnectionBuffer b = new FreeConnectionBuffer(); + ConnectionBuffer b = new ConnectionBuffer(); PooledConnection p0 = new PooledConnection("0"); PooledConnection p1 = new PooledConnection("1"); PooledConnection p2 = new PooledConnection("2"); // PooledConnection p3 = new PooledConnection("3"); - assertEquals(0, b.size()); - assertTrue(b.isEmpty()); + assertEquals(0, b.freeSize()); + assertFalse(b.hasFreeConnections()); - b.add(p0); + b.addFree(p0); - assertEquals(1, b.size()); - assertFalse(b.isEmpty()); + assertEquals(1, b.freeSize()); + assertTrue(b.hasFreeConnections()); - PooledConnection r0 = b.remove(); + PooledConnection r0 = b.popFree(); assertThat(p0).isSameAs(r0); - assertEquals(0, b.size()); - assertTrue(b.isEmpty()); + assertEquals(0, b.freeSize()); + assertFalse(b.hasFreeConnections()); - b.add(p0); - b.add(p1); - b.add(p2); + b.addFree(p0); + b.addFree(p1); + b.addFree(p2); - assertEquals(3, b.size()); + assertEquals(3, b.freeSize()); - PooledConnection r1 = b.remove(); + PooledConnection r1 = b.popFree(); assertSame(p2, r1); - PooledConnection r2 = b.remove(); + PooledConnection r2 = b.popFree(); assertSame(p1, r2); - assertEquals(1, b.size()); - b.add(p2); - assertEquals(2, b.size()); - PooledConnection r3 = b.remove(); + assertEquals(1, b.freeSize()); + b.addFree(p2); + assertEquals(2, b.freeSize()); + PooledConnection r3 = b.popFree(); assertSame(p2, r3); - assertEquals(1, b.size()); - PooledConnection r4 = b.remove(); + assertEquals(1, b.freeSize()); + PooledConnection r4 = b.popFree(); assertSame(p0, r4); - assertEquals(0, b.size()); + assertEquals(0, b.freeSize()); - b.add(p2); - b.add(p1); - b.add(p0); + b.addFree(p2); + b.addFree(p1); + b.addFree(p0); - assertEquals(3, b.size()); + assertEquals(3, b.freeSize()); - PooledConnection r5 = b.remove(); + PooledConnection r5 = b.popFree(); assertSame(p0, r5); - assertEquals(2, b.size()); + assertEquals(2, b.freeSize()); - PooledConnection r6 = b.remove(); + PooledConnection r6 = b.popFree(); assertSame(p1, r6); - assertEquals(1, b.size()); + assertEquals(1, b.freeSize()); - PooledConnection r7 = b.remove(); + PooledConnection r7 = b.popFree(); assertSame(p2, r7); - assertEquals(0, b.size()); + assertEquals(0, b.freeSize()); } From 73534c64e46a59d130336145e4876112d5dc99d5 Mon Sep 17 00:00:00 2001 From: Roland Praml Date: Wed, 26 Mar 2025 09:33:30 +0100 Subject: [PATCH 03/12] ConnectionBuffer maintains free and busy connections as LinkedList now --- .../datasource/pool/BusyConnectionBuffer.java | 159 ------------------ .../datasource/pool/ConnectionBuffer.java | 118 ++++++++++++- .../datasource/pool/PooledConnection.java | 23 ++- .../pool/PooledConnectionQueue.java | 38 ++--- .../pool/BusyConnectionBufferTest.java | 99 ----------- .../datasource/pool/ConnectionBufferTest.java | 113 +++++++++---- 6 files changed, 229 insertions(+), 321 deletions(-) delete mode 100644 ebean-datasource/src/main/java/io/ebean/datasource/pool/BusyConnectionBuffer.java delete mode 100644 ebean-datasource/src/test/java/io/ebean/datasource/pool/BusyConnectionBufferTest.java diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/BusyConnectionBuffer.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/BusyConnectionBuffer.java deleted file mode 100644 index 2fe702d..0000000 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/BusyConnectionBuffer.java +++ /dev/null @@ -1,159 +0,0 @@ -package io.ebean.datasource.pool; - -import java.util.Arrays; - -/** - * A buffer especially designed for Busy PooledConnections. - *

- * All thread safety controlled externally (by PooledConnectionQueue). - *

- * It has a set of 'slots' and PooledConnections know which slot they went into - * and this allows for fast addition and removal (by slotId without looping). - * The capacity will increase on demand by the 'growBy' amount. - */ -final class BusyConnectionBuffer { - - private PooledConnection[] slots; - private final int growBy; - private int size; - private int pos = -1; - - /** - * Create the buffer with an initial capacity and fixed growBy. - * We generally do not want the buffer to grow very often. - * - * @param capacity the initial capacity - * @param growBy the fixed amount to grow the buffer by. - */ - BusyConnectionBuffer(int capacity, int growBy) { - this.slots = new PooledConnection[capacity]; - this.growBy = growBy; - } - - /** - * We can only grow (not shrink) the capacity. - */ - void setCapacity(int newCapacity) { - if (newCapacity > slots.length) { - PooledConnection[] current = this.slots; - this.slots = new PooledConnection[newCapacity]; - System.arraycopy(current, 0, this.slots, 0, current.length); - } - } - - @Override - public String toString() { - return Arrays.toString(slots); - } - - int capacity() { - return slots.length; - } - - int size() { - return size; - } - - boolean isEmpty() { - return size == 0; - } - - int add(PooledConnection pc) { - if (size == slots.length) { - // grow the capacity - setCapacity(slots.length + growBy); - } - int slot = nextEmptySlot(); - pc.setSlotId(slot); - slots[slot] = pc; - return ++size; - } - - boolean remove(PooledConnection pc) { - int slotId = pc.slotId(); - if (slots[slotId] != pc) { - PooledConnection heldBy = slots[slotId]; - Log.warn("Failed to remove from slot[{0}] PooledConnection[{1}] - HeldBy[{2}]", pc.slotId(), pc, heldBy); - return false; - } - slots[slotId] = null; - --size; - return true; - } - - /** - * Close connections that should be considered leaked. - */ - void closeBusyConnections(long leakTimeMinutes) { - long olderThanTime = System.currentTimeMillis() - (leakTimeMinutes * 60000); - Log.debug("Closing busy connections using leakTimeMinutes {0}", leakTimeMinutes); - for (int i = 0; i < slots.length; i++) { - if (slots[i] != null) { - //tmp.add(slots[i]); - PooledConnection pc = slots[i]; - //noinspection StatementWithEmptyBody - if (pc.lastUsedTime() > olderThanTime) { - // PooledConnection has been used recently or - // expected to be longRunning so not closing... - } else { - slots[i] = null; - --size; - closeBusyConnection(pc); - } - } - } - } - - private void closeBusyConnection(PooledConnection pc) { - try { - Log.warn("DataSource closing busy connection? {0}", pc.fullDescription()); - System.out.println("CLOSING busy connection: " + pc.fullDescription()); - pc.closeConnectionFully(false); - } catch (Exception ex) { - Log.error("Error when closing potentially leaked connection " + pc.description(), ex); - } - } - - /** - * Returns information describing connections that are currently being used. - */ - String busyConnectionInformation(boolean toLogger) { - if (toLogger) { - Log.info("Dumping [{0}] busy connections: (Use datasource.xxx.capturestacktrace=true ... to get stackTraces)", size()); - } - StringBuilder sb = new StringBuilder(); - for (PooledConnection pc : slots) { - if (pc != null) { - if (toLogger) { - Log.info("Busy Connection - {0}", pc.fullDescription()); - } else { - sb.append(pc.fullDescription()).append("\r\n"); - } - } - } - return sb.toString(); - } - - - /** - * Return the position of the next empty slot. - */ - private int nextEmptySlot() { - // search forward - while (++pos < slots.length) { - if (slots[pos] == null) { - return pos; - } - } - // search from beginning - pos = -1; - while (++pos < slots.length) { - if (slots[pos] == null) { - return pos; - } - } - // not expecting this - throw new RuntimeException("No Empty Slot Found?"); - } - -} diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java index cf414a6..f886e77 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java @@ -5,15 +5,22 @@ /** * A buffer designed especially to hold pooled connections (free and busy ones) *

+ * The buffer contains two linkedLists (free and busy connection nodes) + *

+ * When a node from the free list is removed, the node is attached to the + * PooledConnection, so that the node object can be reused. This avoids object + * creation/gc during remove operations. + *

* All thread safety controlled externally (by PooledConnectionQueue). *

*/ final class ConnectionBuffer { - private final Node free = Node.init(); + private final Node busy = Node.init(); int freeSize = 0; + int busySize = 0; /** * Return the number of entries in the buffer. @@ -22,6 +29,13 @@ int freeSize() { return freeSize; } + /** + * Return the number of busy connections. + */ + int busySize() { + return busySize; + } + /** * Return true if the buffer is empty. */ @@ -30,28 +44,74 @@ boolean hasFreeConnections() { } /** - * Add connection to the free list. + * Adds a new connection to the free list. */ void addFree(PooledConnection pc) { + assert pc.busyNode() == null : "Connection seems not to be new"; new Node(pc).addAfter(free); freeSize++; } + /** + * Removes the connection from the busy list. (For full close) + * Returns true, if this connection was part of the busy list or false, if not (or removed twice) + */ + boolean removeBusy(PooledConnection c) { + if (c.busyNode() == null) { + return false; + } + c.busyNode().remove(); + busySize--; + c.setBusyNode(null); + return true; + } + + /** + * Moves the connection from the busy list to the free list. + */ + boolean moveToFreeList(PooledConnection c) { + Node node = c.busyNode(); + if (node == null) { + return false; + } + node.remove(); + busySize--; + node.addAfter(free); + freeSize++; + c.setBusyNode(null); + return true; + } + /** * Remove a connection from the free list. Returns null if there is not any. */ - PooledConnection popFree () { + PooledConnection popFree() { Node node = free.next; if (node.isBoundaryNode()) { return null; } node.remove(); freeSize--; + node.pc.setBusyNode(node); // sets the node for reuse in "addBusy" return node.pc; } /** - * Close all connections in the free list. + * Adds the connection to the busy list. The connection must be either new or popped from the free list. + */ + int addBusy(PooledConnection c) { + Node node = c.busyNode(); // we try to reuse the node to avoid object creation. + if (node == null) { + node = new Node(c); + c.setBusyNode(node); + } + node.addAfter(busy); + busySize++; + return busySize; + } + + /** + * Close all free connections in this buffer. */ void closeAllFree(boolean logErrors) { List tempList = new ArrayList<>(); @@ -92,6 +152,56 @@ int trim(int minSize, long usedSince, long createdSince) { return trimCount; } + void closeBusyConnections(long leakTimeMinutes) { + long olderThanTime = System.currentTimeMillis() - (leakTimeMinutes * 60000); + Log.debug("Closing busy connections using leakTimeMinutes {0}", leakTimeMinutes); + Node node = busy.next; + while (!node.isBoundaryNode()) { + Node current = node; + node = node.next; + + PooledConnection pc = current.pc; + //noinspection StatementWithEmptyBody + if (pc.lastUsedTime() > olderThanTime) { + // PooledConnection has been used recently or + // expected to be longRunning so not closing... + } else { + current.remove(); + --busySize; + closeBusyConnection(pc); + } + } + } + + private void closeBusyConnection(PooledConnection pc) { + try { + Log.warn("DataSource closing busy connection? {0}", pc.fullDescription()); + System.out.println("CLOSING busy connection: " + pc.fullDescription()); + pc.closeConnectionFully(false); + } catch (Exception ex) { + Log.error("Error when closing potentially leaked connection " + pc.description(), ex); + } + } + + String busyConnectionInformation(boolean toLogger) { + if (toLogger) { + Log.info("Dumping [{0}] busy connections: (Use datasource.xxx.capturestacktrace=true ... to get stackTraces)", busySize()); + } + StringBuilder sb = new StringBuilder(); + Node node = busy.next; + while (!node.isBoundaryNode()) { + PooledConnection pc = node.pc; + node = node.next; + if (toLogger) { + Log.info("Busy Connection - {0}", pc.fullDescription()); + } else { + sb.append(pc.fullDescription()).append("\r\n"); + } + } + return sb.toString(); + } + + /** * Node of a linkedlist. The linkedLists always have two empty nodes at the start and end. * (boundary nodes) They are generated with the init() method. diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java index 4d2b2bd..6d4cb0c 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java @@ -1,6 +1,13 @@ package io.ebean.datasource.pool; -import java.sql.*; +import java.sql.CallableStatement; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.sql.Savepoint; +import java.sql.Statement; import java.util.ArrayList; import java.util.Map; import java.util.concurrent.locks.ReentrantLock; @@ -135,7 +142,7 @@ final class PooledConnection extends ConnectionDelegator { /** * Slot position in the BusyConnectionBuffer. */ - private int slotId; + private ConnectionBuffer.Node busyNode; /** @@ -184,17 +191,17 @@ final class PooledConnection extends ConnectionDelegator { } /** - * Return the slot position in the busy buffer. + * Return the node in the busy list. If this is empty, the connection is free */ - int slotId() { - return slotId; + ConnectionBuffer.Node busyNode() { + return busyNode; } /** - * Set the slot position in the busy buffer. + * Set the busy node. */ - void setSlotId(int slotId) { - this.slotId = slotId; + void setBusyNode(ConnectionBuffer.Node busyNode) { + this.busyNode = busyNode; } /** diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java index 1f3f187..8682c5d 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java @@ -18,14 +18,9 @@ final class PooledConnectionQueue { private final String name; private final ConnectionPool pool; /** - * A 'circular' buffer designed specifically for free connections. + * A buffer designed specifically for free and busy connections. */ private final ConnectionBuffer buffer; - /** - * A 'slots' buffer designed specifically for busy connections. - * Fast add remove based on slot id. - */ - private final BusyConnectionBuffer busyList; /** * Main lock guarding all access */ @@ -74,14 +69,14 @@ final class PooledConnectionQueue { this.waitTimeoutMillis = pool.waitTimeoutMillis(); this.maxAgeMillis = pool.maxAgeMillis(); this.validateStaleMillis = pool.validateStaleMillis(); - this.busyList = new BusyConnectionBuffer(maxSize, 20); + //this.busyList = new BusyConnectionBuffer(maxSize, 20); this.buffer = new ConnectionBuffer(); this.lock = new ReentrantLock(false); this.notEmpty = lock.newCondition(); } private PoolStatus createStatus() { - return new Status(minSize, maxSize, buffer.freeSize(), busyList.size(), waitingThreads, highWaterMark, + return new Status(minSize, maxSize, buffer.freeSize(), buffer.busySize(), waitingThreads, highWaterMark, waitCount, hitCount, totalAcquireNanos, maxAcquireNanos); } @@ -100,7 +95,7 @@ PoolStatus status(boolean reset) { try { PoolStatus s = createStatus(); if (reset) { - highWaterMark = busyList.size(); + highWaterMark = buffer.busySize(); hitCount = 0; waitCount = 0; maxAcquireNanos = 0; @@ -118,7 +113,6 @@ void setMaxSize(int maxSize) { if (maxSize < this.minSize) { throw new IllegalArgumentException("maxSize " + maxSize + " < minSize " + this.minSize); } - this.busyList.setCapacity(maxSize); this.maxSize = maxSize; } finally { lock.unlock(); @@ -126,7 +120,7 @@ void setMaxSize(int maxSize) { } private int totalConnections() { - return buffer.freeSize() + busyList.size(); + return buffer.freeSize() + buffer.busySize(); } void ensureMinimumConnections() throws SQLException { @@ -150,13 +144,15 @@ void ensureMinimumConnections() throws SQLException { void returnPooledConnection(PooledConnection c, boolean forceClose) { lock.lock(); try { - if (!busyList.remove(c)) { - Log.error("Connection [{0}] not found in BusyList?", c); - } if (forceClose || c.shouldTrimOnReturn(lastResetTime, maxAgeMillis)) { + if (!buffer.removeBusy(c)) { + Log.error("Connection [{0}] not found in BusyList?", c); + } c.closeConnectionFully(false); } else { - buffer.addFree(c); + if (!buffer.moveToFreeList(c)) { + Log.error("Connection [{0}] not found in BusyList?", c); + } notEmpty.signal(); } } finally { @@ -209,7 +205,7 @@ PooledConnection obtainConnection() throws SQLException { * Register the PooledConnection with the busyList. */ private int registerBusyConnection(PooledConnection connection) { - int busySize = busyList.add(connection); + int busySize = buffer.addBusy(connection); if (busySize > highWaterMark) { highWaterMark = busySize; } @@ -255,7 +251,7 @@ private PooledConnection _obtainConnection() throws InterruptedException, SQLExc } private PooledConnection createConnection() throws SQLException { - if (busyList.size() < maxSize) { + if (buffer.busySize() < maxSize) { // grow the connection pool PooledConnection c = pool.createConnectionForQueue(connectionId++); int busySize = registerBusyConnection(c); @@ -314,8 +310,8 @@ PoolStatus shutdown(boolean closeBusyConnections) { // connections close on return to pool lastResetTime = System.currentTimeMillis() - 100; } else { - if (!busyList.isEmpty()) { - Log.warn("Closing busy connections on shutdown size: {0}", busyList.size()); + if (buffer.busySize() > 0) { + Log.warn("Closing busy connections on shutdown size: {0}", buffer.busySize()); dumpBusyConnectionInformation(); closeBusyConnections(0); } @@ -415,7 +411,7 @@ private void closeFreeConnections(boolean logErrors) { void closeBusyConnections(long leakTimeMinutes) { lock.lock(); try { - busyList.closeBusyConnections(leakTimeMinutes); + buffer.closeBusyConnections(leakTimeMinutes); } finally { lock.unlock(); } @@ -435,7 +431,7 @@ void dumpBusyConnectionInformation() { private String getBusyConnectionInformation(boolean toLogger) { lock.lock(); try { - return busyList.busyConnectionInformation(toLogger); + return buffer.busyConnectionInformation(toLogger); } finally { lock.unlock(); } diff --git a/ebean-datasource/src/test/java/io/ebean/datasource/pool/BusyConnectionBufferTest.java b/ebean-datasource/src/test/java/io/ebean/datasource/pool/BusyConnectionBufferTest.java deleted file mode 100644 index 67e258b..0000000 --- a/ebean-datasource/src/test/java/io/ebean/datasource/pool/BusyConnectionBufferTest.java +++ /dev/null @@ -1,99 +0,0 @@ -package io.ebean.datasource.pool; - -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -public class BusyConnectionBufferTest { - - @Test - public void test() { - - BusyConnectionBuffer b = new BusyConnectionBuffer(2, 4); - - PooledConnection p0 = new PooledConnection("0"); - PooledConnection p1 = new PooledConnection("1"); - PooledConnection p2 = new PooledConnection("2"); - PooledConnection p3 = new PooledConnection("3"); - - assertEquals(2, b.capacity()); - b.add(p0); - b.add(p1); - assertEquals(2, b.capacity()); - b.add(p2); - assertEquals(6, b.capacity()); - b.add(p3); - - assertEquals(0, p0.slotId()); - assertEquals(1, p1.slotId()); - assertEquals(2, p2.slotId()); - assertEquals(3, p3.slotId()); - - b.remove(p2); - b.add(p2); - assertEquals(4, p2.slotId()); - - b.remove(p0); - b.add(p0); - assertEquals(5, p0.slotId()); - - b.remove(p2); - b.add(p2); - assertEquals(0, p2.slotId()); - - } - - @Test - public void test_rotate() { - - BusyConnectionBuffer b = new BusyConnectionBuffer(2, 2); - - PooledConnection p0 = new PooledConnection("0"); - PooledConnection p1 = new PooledConnection("1"); - PooledConnection p2 = new PooledConnection("2"); - PooledConnection p3 = new PooledConnection("3"); - - assertEquals(2, b.capacity()); - assertEquals(0, b.size()); - - b.add(p0); - b.add(p1); - assertEquals(2, b.size()); - assertEquals(2, b.capacity()); - b.add(p2); - assertEquals(3, b.size()); - assertEquals(4, b.capacity()); - b.add(p3); - assertEquals(4, b.size()); - assertEquals(4, b.capacity()); - - assertEquals(0, p0.slotId()); - assertEquals(1, p1.slotId()); - assertEquals(2, p2.slotId()); - assertEquals(3, p3.slotId()); - - b.remove(p2); - assertEquals(3, b.size()); - b.remove(p0); - assertEquals(2, b.size()); - b.remove(p3); - assertEquals(1, b.size()); - b.add(p2); - assertEquals(2, b.size()); - assertEquals(0, p2.slotId()); - - b.remove(p0); - assertEquals(2, b.size()); - b.add(p0); - assertEquals(3, b.size()); - - // p1 is still in it's slot - assertEquals(2, p0.slotId()); - - b.remove(p2); - b.add(p2); - assertEquals(3, p2.slotId()); - - } - -} diff --git a/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionBufferTest.java b/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionBufferTest.java index 62e0006..a26188e 100644 --- a/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionBufferTest.java +++ b/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionBufferTest.java @@ -2,12 +2,14 @@ import org.junit.jupiter.api.Test; -import java.util.LinkedHashSet; -import java.util.LinkedList; -import java.util.ListIterator; - import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.*; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; class ConnectionBufferTest { @@ -30,81 +32,132 @@ void test() { assertTrue(b.hasFreeConnections()); PooledConnection r0 = b.popFree(); + b.addBusy(p0); assertThat(p0).isSameAs(r0); assertEquals(0, b.freeSize()); assertFalse(b.hasFreeConnections()); - b.addFree(p0); + assertEquals(0, b.freeSize()); + assertEquals(1, b.busySize()); + b.moveToFreeList(p0); + assertEquals(1, b.freeSize()); + assertEquals(0, b.busySize()); + b.addFree(p1); b.addFree(p2); assertEquals(3, b.freeSize()); PooledConnection r1 = b.popFree(); + b.addBusy(r1); assertSame(p2, r1); PooledConnection r2 = b.popFree(); + b.addBusy(r2); assertSame(p1, r2); assertEquals(1, b.freeSize()); - b.addFree(p2); + b.moveToFreeList(r1); + assertEquals(2, b.freeSize()); PooledConnection r3 = b.popFree(); + b.addBusy(r3); assertSame(p2, r3); assertEquals(1, b.freeSize()); PooledConnection r4 = b.popFree(); + b.addBusy(r4); assertSame(p0, r4); assertEquals(0, b.freeSize()); - b.addFree(p2); - b.addFree(p1); - b.addFree(p0); + b.moveToFreeList(r3); // = p2 + b.moveToFreeList(r2); // = p1 + b.moveToFreeList(r4); // = p0 assertEquals(3, b.freeSize()); PooledConnection r5 = b.popFree(); + b.addBusy(r5); assertSame(p0, r5); assertEquals(2, b.freeSize()); PooledConnection r6 = b.popFree(); + b.addBusy(r6); assertSame(p1, r6); assertEquals(1, b.freeSize()); PooledConnection r7 = b.popFree(); + b.addBusy(r7); assertSame(p2, r7); assertEquals(0, b.freeSize()); } + @Test - void listIterator() { + public void test_busy_free() { + + ConnectionBuffer b = new ConnectionBuffer(); + PooledConnection p0 = new PooledConnection("0"); PooledConnection p1 = new PooledConnection("1"); PooledConnection p2 = new PooledConnection("2"); PooledConnection p3 = new PooledConnection("3"); - var list = new LinkedList(); - list.add(p0); - list.add(p1); - list.add(p2); - list.add(p3); + assertEquals(0, b.busySize()); + assertEquals(0, b.freeSize()); + b.addBusy(p0); + b.addBusy(p1); - var set1 = listIterate(list, 1); - assertThat(set1).hasSize(3); - assertThat(set1).contains(p1, p2, p3); + assertEquals(2, b.busySize()); + assertEquals(0, b.freeSize()); - var set3 = listIterate(list, 3); - assertThat(set3).hasSize(1); - assertThat(set3).contains(p3); - } + b.addFree(p2); + b.addFree(p3); - private LinkedHashSet listIterate(LinkedList list, int position) { - ListIterator it = list.listIterator(position); - var set = new LinkedHashSet(); - while (it.hasNext()) { - set.add(it.next()); - } - return set; + assertEquals(2, b.busySize()); + assertEquals(2, b.freeSize()); + + PooledConnection c3 = b.popFree(); + assertSame(p3, c3); + assertEquals(2, b.busySize()); + assertEquals(1, b.freeSize()); + + b.addBusy(c3); + assertThatThrownBy(() -> b.addBusy(p3)).hasMessageContaining("Node already member of a list"); + assertEquals(3, b.busySize()); + + PooledConnection c2 = b.popFree(); + assertSame(p2, c2); + b.addBusy(c2); + assertSame(p2, c2); + + assertEquals(4, b.busySize()); + assertEquals(0, b.freeSize()); + + assertNull(b.popFree()); // no free connections left + + // all are busy now + assertNotNull(p0.busyNode()); + assertNotNull(p1.busyNode()); + assertNotNull(p2.busyNode()); + assertNotNull(p3.busyNode()); + + b.removeBusy(p0); + assertEquals(3, b.busySize()); + assertEquals(0, b.freeSize()); + + assertFalse(b.moveToFreeList(p0)); + assertTrue(b.moveToFreeList(p1)); + assertFalse(b.moveToFreeList(p1)); + + assertEquals(2, b.busySize()); + assertEquals(1, b.freeSize()); + + b.moveToFreeList(p2); + b.moveToFreeList(p3); + + assertEquals(0, b.busySize()); + assertEquals(3, b.freeSize()); } } From caa6f3564a7b8145f5a9433cb8ddf0a154907cbb Mon Sep 17 00:00:00 2001 From: Roland Praml Date: Wed, 26 Mar 2025 09:45:01 +0100 Subject: [PATCH 04/12] Add affinity support --- .../ebean/datasource/DataSourceBuilder.java | 21 ++ .../io/ebean/datasource/DataSourceConfig.java | 28 +++ .../datasource/DataSourceConnection.java | 17 ++ .../io/ebean/datasource/DataSourcePool.java | 6 + .../datasource/pool/ConnectionBuffer.java | 197 +++++++++++++++++- .../ebean/datasource/pool/ConnectionPool.java | 31 ++- .../datasource/pool/PooledConnection.java | 18 +- .../pool/PooledConnectionQueue.java | 21 +- .../datasource/pool/ConnectionBufferTest.java | 71 +++++-- 9 files changed, 365 insertions(+), 45 deletions(-) create mode 100644 ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourceConnection.java diff --git a/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourceBuilder.java b/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourceBuilder.java index 501f2f2..37570aa 100644 --- a/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourceBuilder.java +++ b/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourceBuilder.java @@ -7,6 +7,7 @@ import java.util.Properties; import java.util.function.BooleanSupplier; import java.util.function.Consumer; +import java.util.function.Supplier; /** * Builder for DataSourcePool. @@ -639,6 +640,26 @@ default DataSourceBuilder customProperties(Map customProperties) */ DataSourceBuilder addProperty(String key, int value); + /** + * sets the affinity-size (internal hashmap of distinct affinity keys). Should be a prime number. Default: 257 + */ + DataSourceBuilder affinitySize(int affinitySize); + + /** + * Returns the affinity size. + */ + int getAffinitySize(); + + /** + * Sets the affinity provider. e.g. Thread::currentThread. + */ + DataSourceBuilder affinityProvider(Supplier affinityProvider); + + /** + * Returns the affinity provider. + */ + Supplier getAffinityProvider(); + /** * Set the database owner username (used to create connection for use with InitDatabase). */ diff --git a/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourceConfig.java b/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourceConfig.java index 5008fcc..fd464ac 100644 --- a/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourceConfig.java +++ b/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourceConfig.java @@ -9,6 +9,7 @@ import java.util.Map; import java.util.Properties; import java.util.function.Consumer; +import java.util.function.Supplier; /** * Configuration information for a DataSource. @@ -84,6 +85,8 @@ public class DataSourceConfig implements DataSourceBuilder.Settings { private boolean shutdownOnJvmExit; private boolean validateOnHeartbeat = !System.getenv().containsKey("LAMBDA_TASK_ROOT"); private boolean enforceCleanClose; + private int affinitySize = 257; + private Supplier affinityProvider; @Override public Settings settings() { @@ -147,6 +150,8 @@ public DataSourceConfig copy() { copy.alert = alert; copy.listener = listener; copy.enforceCleanClose = enforceCleanClose; + copy.affinitySize = affinitySize; + copy.affinityProvider = affinityProvider; return copy; } @@ -658,6 +663,28 @@ public DataSourceConfig addProperty(String key, int value) { return addProperty(key, Integer.toString(value)); } + @Override + public DataSourceBuilder affinitySize(int affinitySize) { + this.affinitySize = affinitySize; + return this; + } + + @Override + public int getAffinitySize() { + return affinitySize; + } + + @Override + public DataSourceBuilder affinityProvider(Supplier affinityProvider) { + this.affinityProvider = affinityProvider; + return this; + } + + @Override + public Supplier getAffinityProvider() { + return affinityProvider; + } + @Override public String getOwnerUsername() { return ownerUsername; @@ -805,6 +832,7 @@ private void loadSettings(ConfigPropertiesHelper properties) { shutdownOnJvmExit = properties.getBoolean("shutdownOnJvmExit", shutdownOnJvmExit); validateOnHeartbeat = properties.getBoolean("validateOnHeartbeat", validateOnHeartbeat); enforceCleanClose = properties.getBoolean("enforceCleanClose", enforceCleanClose); + affinitySize = properties.getInt("affinityCacheSize", affinitySize); String isoLevel = properties.get("isolationLevel", _isolationLevel(isolationLevel)); diff --git a/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourceConnection.java b/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourceConnection.java new file mode 100644 index 0000000..a419709 --- /dev/null +++ b/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourceConnection.java @@ -0,0 +1,17 @@ +package io.ebean.datasource; + +import java.sql.Connection; + +/** + * Interface for connection objects returned from the ebean-datasource connection pool + * + * @author Roland Praml, Foconis Analytics GmbH + */ +public interface DataSourceConnection extends Connection { + + /** + * Returns the affinity-ID, this connection was assigned to. + */ + Object affinityId(); + +} diff --git a/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourcePool.java b/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourcePool.java index 97186c8..68260cd 100644 --- a/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourcePool.java +++ b/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourcePool.java @@ -81,6 +81,12 @@ static DataSourceBuilder builder() { */ void offline(); + /** + * Returns a connection for given affinity ID. It is guaranteed, that connection.affinityId in listener etc. + * is the same object. + */ + DataSourceConnection getConnection(Object affinityId) throws SQLException; + /** * Shutdown the pool. *

diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java index f886e77..33b337f 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java @@ -11,14 +11,134 @@ * PooledConnection, so that the node object can be reused. This avoids object * creation/gc during remove operations. *

+ * The connectionbuffer iself has one linkedList from free to + * freeEnd. In parallel, the elements in this list can also be part + * the affinityNodes list, which implement a kind of hashmap. + *

+ * So you can prefer which connection should be taken. You can use CurrentThread or + * currentTenant as affinity ID. So you likely get a connection that has the right + * pstatement caches or is already in the CPU cache. + *

+ * Without affinityId, the first free-connection is taken. + *

+ * With affinityId, the affinityNodes-list is determined by the hashCode, then the + * list is searched, if there is a connection with the same affinity object. + *

+ * If there is no one found, we take the LAST connection in freeList, as this is + * the best candidate not to steal the affinity of a connection, that was currently + * used. This ensures (or also causes) that the pool has at least that size of the + * frequent used affinityIds. E.g. if the affinity id represents tenant id, and + * 15 tenants are active, the pool will not shrink below 15 - on the other hand, + * there is always one connection ready for each active tenant. + *

+ * A free node can be member in two lists: + *

    + *
  1. it is definitively member in the freeList
  2. + *
  3. it may be member in one of the affinity-lists (mod hash)
  4. + *
+ * The remove / transition from free to busy will remove the node from both lists. + *

+ * Graphical exammple + *

+ *     By default, the busy list is empty
+ *     busy ---------------------------------------------------> busyEnd
+ *     free --> c1 --> c2 --> c3 --> c4 --> c5 --> c6 --> c7 --> freeEnd
+ *     al1  ---------------------------------------------------> end
+ *     al2  ---------------------------------------------------> end
+ *     ...
+ *     al257---------------------------------------------------> end
+ *
+ *     if a popFree(1) is called, we lookup in al1 and found no usable node.
+ *     in this case, we take the last node, c7 and move it to the busy list
+ *
+ *     busy --> c7 --------------------------------------------> busyEnd
+ *     free --> c1 --> c2 --> c3 --> c4 --> c5 --> c6 ---------> freeEnd
+ *
+ *     When we put that node back in the freelist, it becomes the first node
+ *     and it will be also linked in affinity-list1
+ *
+ *     busy ---------------------------------------------------> busyEnd
+ *     free --> c7 --> c1 --> c2 --> c3 --> c4 --> c5 --> c6 --> freeEnd
+ *     al1  --> c7 --> end  (the node for c7 is in 'free' and 'al1')
+ *     al2-> (empty)
+ *
+ *     subsequent popFree(1) will always return c7 as long as it is not busy.
+ *     now we call popFree(1) twice, we will get this picture
+ *
+ *     busy --> c6 --> c7 ----------------------------------------> busyEnd
+ *     free --> c1 --> c2 --> c3 --> c4 --> c5 -------------------> freeEnd
+ *     al1-> (empty)
+ *     al2-> (empty)
+ *
+ *     putting them back
+ *
+ *     busy ------------------------------------------------------> busyEnd
+ *     free --> c7 --> c6 --> c1 --> c2 --> c3 --> c4 --> c5 -----> freeEnd
+ *     al1  --> c7 --> c6 --> end
+ *     al2-> (empty)
+ *
+ *     fetching a connection with affinity = 2 will remove c5
+ *     (we take connection from the end, as the front of the list may
+ *     contain 'hot' connections. c7 would be a bad choice here
+ *
+ *     busy --> c5 -----------------------------------------------> busyEnd
+ *     free --> c7 --> c6 --> c1 --> c2 --> c3 --> c4 ------------> freeEnd
+ *     al1  --> c7 --> c6 --> end
+ *     al2-> (empty)
+ *
+ *     putting c5 back results in this list
+ *
+ *     busy ------------------------------------------------------> busyEnd
+ *     free --> c5 --> c7 --> c6 --> c1 --> c2 --> c3 --> c4 -----> freeEnd
+ *     al1  ---------> c7 --> c6 --> end
+ *     al2  --> c5 ----------------> end
+ *
+ *     so we have 2 connections for affinity 1 and one connection for affinity 2
+ *     (and the rest is ordered itself in the freeList)
+ *
+ *     when we now fetch a connection for affinity = 1 we will get c7:
+ *
+ *     busy --> c7 -----------------------------------------------> busyEnd
+ *     free --> c5 ---------> c6 --> c1 --> c2 --> c3 --> c4 -----> freeEnd
+ *     al1  ----------------> c6 --> end
+ *     al2  --> c5 ----------------> end
+ *
+ *     putting c7 back will add the connection back to freelist and affinity
+ *     list 1
+ *
+ *     busy ------------------------------------------------------> busyEnd
+ *     free --> c7 --> c5 --> c6 --> c1 --> c2 --> c3 --> c4 -----> freeEnd
+ *     al1  --> c7 ---------> c6 --> end
+ *     al2  ---------> c5 ---------> end
+ *
+ *     when we now only fetch connections with affinity id 1 and 2, we will
+ *     always get c7/c5 and the pool can trim c6,c1,c2,c3,c4
+ * 
+ *

* All thread safety controlled externally (by PooledConnectionQueue). *

*/ final class ConnectionBuffer { private final Node free = Node.init(); + private final Node freeEnd = free.next; private final Node busy = Node.init(); + private final Node[] affinityNodes; + private final int hashSize; + + ConnectionBuffer(int hashSize) { + this.hashSize = hashSize; + if (hashSize > 0) { + affinityNodes = new Node[hashSize]; + for (int i = 0; i < affinityNodes.length; i++) { + affinityNodes[i] = Node.init(); + } + } else { + affinityNodes = null; + } + } + int freeSize = 0; int busySize = 0; @@ -76,7 +196,11 @@ boolean moveToFreeList(PooledConnection c) { } node.remove(); busySize--; - node.addAfter(free); + if (affinityNodes != null && c.affinityId() != null) { + node.addAfter(free, affinityNodes[c.affinityId().hashCode() % hashSize]); + } else { + node.addAfter(free); + } freeSize++; c.setBusyNode(null); return true; @@ -84,9 +208,23 @@ boolean moveToFreeList(PooledConnection c) { /** * Remove a connection from the free list. Returns null if there is not any. + *

+ * Connections that are returend from this method must be either added to busyList with + * addBusy or closed fully. */ - PooledConnection popFree() { - Node node = free.next; + PooledConnection popFree(Object affinityId) { + Node node; + if (affinityId == null || affinityNodes == null) { + node = free.next; + } else { + node = affinityNodes[affinityId.hashCode() % hashSize].find(affinityId); + if (node == null) { + // when we did not find a node with that affinity, we take the last (oldest one) + // and reuse this with the new affinity. This avoids to "steal" the affinity + // from the newest one. + node = freeEnd.prev; + } + } if (node.isBoundaryNode()) { return null; } @@ -115,10 +253,10 @@ int addBusy(PooledConnection c) { */ void closeAllFree(boolean logErrors) { List tempList = new ArrayList<>(); - PooledConnection c = popFree(); + PooledConnection c = popFree(null); while (c != null) { tempList.add(c); - c = popFree(); + c = popFree(null); } if (Log.isLoggable(System.Logger.Level.TRACE)) { @@ -152,6 +290,9 @@ int trim(int minSize, long usedSince, long createdSince) { return trimCount; } + /** + * Close connections that should be considered leaked. + */ void closeBusyConnections(long leakTimeMinutes) { long olderThanTime = System.currentTimeMillis() - (leakTimeMinutes * 60000); Log.debug("Closing busy connections using leakTimeMinutes {0}", leakTimeMinutes); @@ -183,6 +324,9 @@ private void closeBusyConnection(PooledConnection pc) { } } + /** + * Returns information describing connections that are currently being used. + */ String busyConnectionInformation(boolean toLogger) { if (toLogger) { Log.info("Dumping [{0}] busy connections: (Use datasource.xxx.capturestacktrace=true ... to get stackTraces)", busySize()); @@ -206,12 +350,15 @@ String busyConnectionInformation(boolean toLogger) { * Node of a linkedlist. The linkedLists always have two empty nodes at the start and end. * (boundary nodes) They are generated with the init() method. *

- * the first usable node is startNode.next (which could be the end edge) + * the first usable node is startNode.next (which could be the end boundary) */ static final class Node { private Node next; private Node prev; + // Double-LL nodes for affinity management + private Node afNext; + private Node afPrev; final PooledConnection pc; private Node(PooledConnection pc) { @@ -226,6 +373,8 @@ public static Node init() { Node node2 = new Node(null); node1.next = node2; node2.prev = node1; + node1.afNext = node2; + node2.afPrev = node1; return node1; } @@ -240,12 +389,18 @@ private boolean isBoundaryNode() { * Removes the node from the list. The node can be re-added to an other list */ private void remove() { - assert pc != null : "called remove a boundary node"; + assert pc != null : "called remove on a boundary node"; assert prev != null && next != null : "not part of a list"; next.prev = prev; prev.next = next; prev = null; next = null; + if (afNext != null) { + afNext.afPrev = afPrev; + afPrev.afNext = afNext; + afPrev = null; + afNext = null; + } } /** @@ -255,11 +410,37 @@ private void remove() { */ public void addAfter(Node node) { assert !this.isBoundaryNode() : "this is a boundary node"; - assert next == null & prev == null : "Node already member of a list"; + assert next == null && prev == null : "Node already member of a list"; next = node.next; prev = node; node.next.prev = this; node.next = this; } + + /** + * Adds this after node AND as affinity-node after afNode. + */ + public void addAfter(Node node, Node afNode) { + addAfter(node); + assert afNext == null && afPrev == null : "Node already member of affinity-list"; + afNext = afNode.afNext; + afPrev = afNode; + afNode.afNext.afPrev = this; + afNode.afNext = this; + } + + /** + * Find the connection with given affinity id in this affinity-list. + */ + public Node find(Object affinityId) { + Node n = this.afNext; + while (!n.isBoundaryNode()) { + if (affinityId.equals(n.pc.affinityId())) { + return n; + } + n = n.afNext; + } + return null; + } } } diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java index 3fdc440..f37765e 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java @@ -10,6 +10,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; import static io.ebean.datasource.pool.TransactionIsolation.description; @@ -67,6 +68,8 @@ final class ConnectionPool implements DataSourcePool { private final String applicationName; private final DataSource source; private final boolean validateOnHeartbeat; + private final int affinitySize; + private final Supplier affinityProvider; private long nextTrimTime; /** @@ -123,6 +126,13 @@ final class ConnectionPool implements DataSourcePool { this.validateStaleMillis = params.validateStaleMillis(); this.applicationName = params.getApplicationName(); this.clientInfo = params.getClientInfo(); + if (params.getAffinityProvider() == null) { + this.affinitySize = 0; + this.affinityProvider = () -> null; // dummy + } else { + this.affinityProvider = params.getAffinityProvider(); + this.affinitySize = params.getAffinitySize(); + } this.queue = new PooledConnectionQueue(this); this.schema = params.getSchema(); this.catalog = params.catalog(); @@ -372,7 +382,7 @@ private void testConnection() { PooledConnection conn = null; try { // Get a connection from the pool and test it - conn = getPooledConnection(); + conn = getPooledConnection(affinityProvider.get()); heartbeatPoolExhaustedCount = 0; if (testConnection(conn)) { notifyDataSourceIsUp(); @@ -619,7 +629,15 @@ public Connection getConnection(String username, String password) throws SQLExce */ @Override public Connection getConnection() throws SQLException { - return getPooledConnection(); + return getPooledConnection(affinityProvider.get()); + } + + /** + * Return a pooled connection with given affinity id. + */ + @Override + public DataSourceConnection getConnection(Object affinityId) throws SQLException { + return getPooledConnection(affinityId); } /** @@ -628,8 +646,9 @@ public Connection getConnection() throws SQLException { * This will grow the pool if all the current connections are busy. This * will go into a wait if the pool has hit its maximum size. */ - private PooledConnection getPooledConnection() throws SQLException { - PooledConnection c = queue.obtainConnection(); + private PooledConnection getPooledConnection(Object affinitiyId) throws SQLException { + PooledConnection c = queue.obtainConnection(affinitiyId); + c.setAffinityId(affinitiyId); if (captureStackTrace) { c.setStackTrace(Thread.currentThread().getStackTrace()); } @@ -753,6 +772,10 @@ int pstmtCacheSize() { return pstmtCacheSize; } + int affinitySize() { + return affinitySize; + } + /** * Not implemented and shouldn't be used. */ diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java index 6d4cb0c..1dbc5a1 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java @@ -1,5 +1,8 @@ package io.ebean.datasource.pool; +import io.ebean.datasource.DataSourceConnection; +import io.ebean.datasource.DataSourcePool; + import java.sql.CallableStatement; import java.sql.Connection; import java.sql.DatabaseMetaData; @@ -24,7 +27,7 @@ * It has caching of Statements and PreparedStatements. Remembers the last * statement that was executed. Keeps statistics on how long it is in use. */ -final class PooledConnection extends ConnectionDelegator { +final class PooledConnection extends ConnectionDelegator implements DataSourceConnection { private static final String IDLE_CONNECTION_ACCESSED_ERROR = "Pooled Connection has been accessed whilst idle in the pool, via method: "; @@ -143,6 +146,7 @@ final class PooledConnection extends ConnectionDelegator { * Slot position in the BusyConnectionBuffer. */ private ConnectionBuffer.Node busyNode; + private Object affinityId; /** @@ -204,6 +208,18 @@ void setBusyNode(ConnectionBuffer.Node busyNode) { this.busyNode = busyNode; } + /** + * Return the affinity-id (only for busy connections!) + */ + @Override + public Object affinityId() { + return affinityId; + } + + void setAffinityId(Object affinityId) { + this.affinityId = affinityId; + } + /** * Return a string to identify the connection. */ diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java index 8682c5d..7a4072b 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java @@ -69,8 +69,7 @@ final class PooledConnectionQueue { this.waitTimeoutMillis = pool.waitTimeoutMillis(); this.maxAgeMillis = pool.maxAgeMillis(); this.validateStaleMillis = pool.validateStaleMillis(); - //this.busyList = new BusyConnectionBuffer(maxSize, 20); - this.buffer = new ConnectionBuffer(); + this.buffer = new ConnectionBuffer(pool.affinitySize()); this.lock = new ReentrantLock(false); this.notEmpty = lock.newCondition(); } @@ -160,9 +159,9 @@ void returnPooledConnection(PooledConnection c, boolean forceClose) { } } - private PooledConnection extractFromFreeList() { + private PooledConnection extractFromFreeList(Object affinitiyId) { - PooledConnection c = buffer.popFree(); + PooledConnection c = buffer.popFree(affinitiyId); if (c == null) { return null; } @@ -188,9 +187,9 @@ private boolean stale(PooledConnection c) { return c.lastUsedTime() < System.currentTimeMillis() - validateStaleMillis; } - PooledConnection obtainConnection() throws SQLException { + PooledConnection obtainConnection(Object affinitiyId) throws SQLException { try { - PooledConnection pc = _obtainConnection(); + PooledConnection pc = _obtainConnection(affinitiyId); pc.resetForUse(); return pc; @@ -212,7 +211,7 @@ private int registerBusyConnection(PooledConnection connection) { return busySize; } - private PooledConnection _obtainConnection() throws InterruptedException, SQLException { + private PooledConnection _obtainConnection(Object affinitiyId) throws InterruptedException, SQLException { var start = System.nanoTime(); lock.lockInterruptibly(); try { @@ -224,7 +223,7 @@ private PooledConnection _obtainConnection() throws InterruptedException, SQLExc hitCount++; // are other threads already waiting? (they get priority) if (waitingThreads == 0) { - PooledConnection connection = extractFromFreeList(); + PooledConnection connection = extractFromFreeList(affinitiyId); if (connection != null) { return connection; } @@ -238,7 +237,7 @@ private PooledConnection _obtainConnection() throws InterruptedException, SQLExc // a wait loop until connections are returned into the pool. waitCount++; waitingThreads++; - return _obtainConnectionWaitLoop(); + return _obtainConnectionWaitLoop(affinitiyId); } finally { waitingThreads--; } @@ -267,7 +266,7 @@ private PooledConnection createConnection() throws SQLException { /** * Got into a loop waiting for connections to be returned to the pool. */ - private PooledConnection _obtainConnectionWaitLoop() throws SQLException, InterruptedException { + private PooledConnection _obtainConnectionWaitLoop(Object affinitiyId) throws SQLException, InterruptedException { long nanos = MILLIS_TIME_UNIT.toNanos(waitTimeoutMillis); for (; ; ) { if (nanos <= 0) { @@ -290,7 +289,7 @@ private PooledConnection _obtainConnectionWaitLoop() throws SQLException, Interr nanos = notEmpty.awaitNanos(nanos); if (buffer.hasFreeConnections()) { // successfully waited - return extractFromFreeList(); + return extractFromFreeList(affinitiyId); } } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread diff --git a/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionBufferTest.java b/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionBufferTest.java index a26188e..c8e65e6 100644 --- a/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionBufferTest.java +++ b/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionBufferTest.java @@ -4,24 +4,19 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertSame; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.*; class ConnectionBufferTest { @Test void test() { - ConnectionBuffer b = new ConnectionBuffer(); + ConnectionBuffer b = new ConnectionBuffer(257); PooledConnection p0 = new PooledConnection("0"); PooledConnection p1 = new PooledConnection("1"); PooledConnection p2 = new PooledConnection("2"); - // PooledConnection p3 = new PooledConnection("3"); + assertEquals(0, b.freeSize()); assertFalse(b.hasFreeConnections()); @@ -31,7 +26,7 @@ void test() { assertEquals(1, b.freeSize()); assertTrue(b.hasFreeConnections()); - PooledConnection r0 = b.popFree(); + PooledConnection r0 = b.popFree(null); b.addBusy(p0); assertThat(p0).isSameAs(r0); @@ -49,10 +44,10 @@ void test() { assertEquals(3, b.freeSize()); - PooledConnection r1 = b.popFree(); + PooledConnection r1 = b.popFree(null); b.addBusy(r1); assertSame(p2, r1); - PooledConnection r2 = b.popFree(); + PooledConnection r2 = b.popFree(null); b.addBusy(r2); assertSame(p1, r2); @@ -60,11 +55,11 @@ void test() { b.moveToFreeList(r1); assertEquals(2, b.freeSize()); - PooledConnection r3 = b.popFree(); + PooledConnection r3 = b.popFree(null); b.addBusy(r3); assertSame(p2, r3); assertEquals(1, b.freeSize()); - PooledConnection r4 = b.popFree(); + PooledConnection r4 = b.popFree(null); b.addBusy(r4); assertSame(p0, r4); assertEquals(0, b.freeSize()); @@ -75,17 +70,17 @@ void test() { assertEquals(3, b.freeSize()); - PooledConnection r5 = b.popFree(); + PooledConnection r5 = b.popFree(null); b.addBusy(r5); assertSame(p0, r5); assertEquals(2, b.freeSize()); - PooledConnection r6 = b.popFree(); + PooledConnection r6 = b.popFree(null); b.addBusy(r6); assertSame(p1, r6); assertEquals(1, b.freeSize()); - PooledConnection r7 = b.popFree(); + PooledConnection r7 = b.popFree(null); b.addBusy(r7); assertSame(p2, r7); assertEquals(0, b.freeSize()); @@ -96,7 +91,7 @@ void test() { @Test public void test_busy_free() { - ConnectionBuffer b = new ConnectionBuffer(); + ConnectionBuffer b = new ConnectionBuffer(257); PooledConnection p0 = new PooledConnection("0"); PooledConnection p1 = new PooledConnection("1"); @@ -117,7 +112,7 @@ public void test_busy_free() { assertEquals(2, b.busySize()); assertEquals(2, b.freeSize()); - PooledConnection c3 = b.popFree(); + PooledConnection c3 = b.popFree(null); assertSame(p3, c3); assertEquals(2, b.busySize()); assertEquals(1, b.freeSize()); @@ -126,15 +121,14 @@ public void test_busy_free() { assertThatThrownBy(() -> b.addBusy(p3)).hasMessageContaining("Node already member of a list"); assertEquals(3, b.busySize()); - PooledConnection c2 = b.popFree(); - assertSame(p2, c2); + PooledConnection c2 = b.popFree(null); b.addBusy(c2); assertSame(p2, c2); assertEquals(4, b.busySize()); assertEquals(0, b.freeSize()); - assertNull(b.popFree()); // no free connections left + assertNull(b.popFree(null)); // no free connections left // all are busy now assertNotNull(p0.busyNode()); @@ -160,4 +154,39 @@ public void test_busy_free() { assertEquals(3, b.freeSize()); } + @Test + public void test_Affinity() { + + ConnectionBuffer b = new ConnectionBuffer(257); + + PooledConnection p0 = new PooledConnection("0"); + PooledConnection p1 = new PooledConnection("1"); + PooledConnection p2 = new PooledConnection("2"); + PooledConnection p3 = new PooledConnection("3"); + + b.addFree(p0); + b.addFree(p1); + b.addFree(p2); + b.addFree(p3); + + PooledConnection c1 = getConnection(b, 42); + PooledConnection c2 = getConnection(b, 17); + b.moveToFreeList(c1); + b.moveToFreeList(c2); + + PooledConnection c3 = getConnection(b,43); + assertNotSame(c3, c1); + assertNotSame(c2, c1); + + PooledConnection c4 = getConnection(b,42); + assertSame(c4, c1); + } + + private static PooledConnection getConnection(ConnectionBuffer b, Object affinity) { + PooledConnection c1 = b.popFree(affinity); + c1.setAffinityId(affinity); + b.addBusy(c1); + return c1; + } + } From 32ccbf46ba2c637794aef736d891fb95818f9352 Mon Sep 17 00:00:00 2001 From: Roland Praml Date: Tue, 25 Mar 2025 16:18:35 +0100 Subject: [PATCH 05/12] Add test that shows benefit of affinity --- .../pool/ConnectionPoolSpeedTest.java | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionPoolSpeedTest.java b/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionPoolSpeedTest.java index 2b3df7e..1155a0e 100644 --- a/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionPoolSpeedTest.java +++ b/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionPoolSpeedTest.java @@ -7,7 +7,10 @@ import org.slf4j.LoggerFactory; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; import java.util.Random; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; @@ -36,6 +39,7 @@ private ConnectionPool createPool() { config.setMinConnections(2); config.setMaxConnections(100); config.setAutoCommit(true); + config.affinityProvider(Thread::currentThread); return new ConnectionPool("testspeed", config); } @@ -64,6 +68,60 @@ public void test() throws SQLException { assertThat(avgNanos).isLessThan(300); } + /** + * Shows the benefit of affinity support. + *

+ * This test starts 10 threads, where each thread has its own set of statements. + * The problem is, if each thread takes the first free connection, which is most + * likely the connection from an other thread, the cached pstmts are useless. + *

+ * When we return the last used connection, we can increase the pstmt hit ratio: + *

+ * With affinity support: psc[hit:19.800 miss:200 put:20.000 rem:0] + *

+ * Without affinity support: psc[hit:7.231 miss:12.769 put:20.000 rem:12.279] + */ + @Test + public void testMultiThread() throws Exception { + warm(); + + total = 0; + List threads = new ArrayList<>(); + for (int threadCount = 0; threadCount < 10; threadCount++) { + threads.add(createThread()); + } + + long startNano = System.nanoTime(); + for (Thread thread : threads) { + thread.start(); + } + for (Thread thread : threads) { + thread.join(); + } + long exeNanos = System.nanoTime() - startNano; + + logger.info("exeNanos[{}]", exeNanos); + } + + private Thread createThread() { + return new Thread(() -> { + try { + for (int j = 0; j < 100; j++) { + for (int k = 0; k < 20; k++) { + try (Connection conn = pool.getConnection()) { + try (PreparedStatement stmt = conn.prepareStatement("select '" + Thread.currentThread().getName() + "', " + k)) { + stmt.execute(); + } + conn.rollback(); + } + } + } + } catch (SQLException e) { + e.printStackTrace(); + } + }); + } + private void perform() throws SQLException { for (int i = 0; i < 1_000_000; i++) { getAndCloseSome(); From cd08b306754170dab8a0b1995fa9061bfb74e090 Mon Sep 17 00:00:00 2001 From: Roland Praml Date: Wed, 26 Mar 2025 14:41:41 +0100 Subject: [PATCH 06/12] In rare cases node can be no member of busylist --- .../java/io/ebean/datasource/pool/ConnectionBuffer.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java index 33b337f..6bfa694 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java @@ -177,10 +177,12 @@ void addFree(PooledConnection pc) { * Returns true, if this connection was part of the busy list or false, if not (or removed twice) */ boolean removeBusy(PooledConnection c) { - if (c.busyNode() == null) { + Node node = c.busyNode(); + if (node == null || node.next == null) { + // node is not yet or no longer in busy list return false; } - c.busyNode().remove(); + node.remove(); busySize--; c.setBusyNode(null); return true; From 3f1cec3e37c9dbb070f57eebd260f924bdc9507b Mon Sep 17 00:00:00 2001 From: Roland Praml Date: Thu, 27 Mar 2025 10:36:32 +0100 Subject: [PATCH 07/12] Allow connection pool to grow --- .../datasource/pool/ConnectionBuffer.java | 19 +++++++++++++++---- .../pool/PooledConnectionQueue.java | 8 +++++++- .../datasource/pool/ConnectionBufferTest.java | 3 +++ 3 files changed, 25 insertions(+), 5 deletions(-) diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java index 6bfa694..505708d 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java @@ -120,6 +120,8 @@ */ final class ConnectionBuffer { + static final Object POP_LAST = new Object(); + private final Node free = Node.init(); private final Node freeEnd = free.next; private final Node busy = Node.init(); @@ -213,18 +215,27 @@ boolean moveToFreeList(PooledConnection c) { *

* Connections that are returend from this method must be either added to busyList with * addBusy or closed fully. + * + * @param affinityId the preferred affinity-id. + * If null is provided, the first element in the list is + * returned. + * If the affinity-id is not present in the list, null + * is returned. The caller can decide to create a new connection or + * ask again with POP_LAST, which returns the last + * (=oldest) connection if affinity is enabled. */ PooledConnection popFree(Object affinityId) { Node node; if (affinityId == null || affinityNodes == null) { node = free.next; + } else if (affinityId == POP_LAST) { + node = freeEnd.prev; } else { node = affinityNodes[affinityId.hashCode() % hashSize].find(affinityId); if (node == null) { - // when we did not find a node with that affinity, we take the last (oldest one) - // and reuse this with the new affinity. This avoids to "steal" the affinity - // from the newest one. - node = freeEnd.prev; + // when we did not find a node with that affinity, we return null + // this allows the pool to grow to its maximum size + return null; } } if (node.isBoundaryNode()) { diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java index 7a4072b..0732b90 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java @@ -231,13 +231,19 @@ private PooledConnection _obtainConnection(Object affinitiyId) throws Interrupte if (connection != null) { return connection; } + if (affinitiyId != null) { + connection = extractFromFreeList(ConnectionBuffer.POP_LAST); + if (connection != null) { + return connection; + } + } } try { // The pool is at maximum size. We are going to go into // a wait loop until connections are returned into the pool. waitCount++; waitingThreads++; - return _obtainConnectionWaitLoop(affinitiyId); + return _obtainConnectionWaitLoop(null); } finally { waitingThreads--; } diff --git a/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionBufferTest.java b/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionBufferTest.java index c8e65e6..9f2105a 100644 --- a/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionBufferTest.java +++ b/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionBufferTest.java @@ -184,6 +184,9 @@ public void test_Affinity() { private static PooledConnection getConnection(ConnectionBuffer b, Object affinity) { PooledConnection c1 = b.popFree(affinity); + if (c1 == null) { + c1 = b.popFree(ConnectionBuffer.POP_LAST); + } c1.setAffinityId(affinity); b.addBusy(c1); return c1; From b41fe18273ad7679f262b3fcd13850830e245a2a Mon Sep 17 00:00:00 2001 From: Roland Praml Date: Fri, 28 Mar 2025 13:40:28 +0100 Subject: [PATCH 08/12] refactored the list in separate classes --- .../datasource/pool/ConnectionBuffer.java | 461 ++++++------------ .../ebean/datasource/pool/ConnectionList.java | 199 ++++++++ .../datasource/pool/PooledConnection.java | 36 +- .../pool/PooledConnectionQueue.java | 19 +- .../datasource/pool/ConnectionBufferTest.java | 66 +-- .../datasource/pool/ConnectionListTest.java | 83 ++++ 6 files changed, 502 insertions(+), 362 deletions(-) create mode 100644 ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionList.java create mode 100644 ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionListTest.java diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java index 505708d..67ba168 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java @@ -1,118 +1,116 @@ package io.ebean.datasource.pool; -import java.util.*; +import java.util.ArrayList; +import java.util.List; /** * A buffer designed especially to hold pooled connections (free and busy ones) *

- * The buffer contains two linkedLists (free and busy connection nodes) + * The buffer contains two linkedLists (free and busy connection nodes) and optional + * affinityLists. *

- * When a node from the free list is removed, the node is attached to the - * PooledConnection, so that the node object can be reused. This avoids object - * creation/gc during remove operations. + * The PooledConnections holds references to list-Nodes, which can be reused. + * This avoids object creation/gc during add/unlink operations. *

- * The connectionbuffer iself has one linkedList from free to - * freeEnd. In parallel, the elements in this list can also be part - * the affinityNodes list, which implement a kind of hashmap. + * The connectionBuffer itself has one freeList, that contains all free connections + * ordered by their last-used time. (the oldest connection is at the end) + * In parallel, connections in the freeList can also be in an affinityList. + * A hashing algoritm is used to distribute the connections to the affinityLists. *

- * So you can prefer which connection should be taken. You can use CurrentThread or - * currentTenant as affinity ID. So you likely get a connection that has the right - * pstatement caches or is already in the CPU cache. + * If hashSize == 0 affinity support is disabled, and the connectionBuffer + * handles only free and busyList. *

- * Without affinityId, the first free-connection is taken. + * Otherwise, there are hashSize+1 affinityLists. *

- * With affinityId, the affinityNodes-list is determined by the hashCode, then the - * list is searched, if there is a connection with the same affinity object. + * The last one is used for affinity-less connections and for the others, the + * object.hashCode() mod hashSize is computed. *

- * If there is no one found, we take the LAST connection in freeList, as this is - * the best candidate not to steal the affinity of a connection, that was currently - * used. This ensures (or also causes) that the pool has at least that size of the - * frequent used affinityIds. E.g. if the affinity id represents tenant id, and - * 15 tenants are active, the pool will not shrink below 15 - on the other hand, - * there is always one connection ready for each active tenant. + * (When affinity is enabled, and no affinityID is used, freeList + * and affinityLists[hashSize] have the same content.) *

- * A free node can be member in two lists: + * When we call removeFree(someObjKey), + *

    + *
  • try to return a matching connection from the according affinity-list slot
  • + *
  • try to return a connection with affinityId == null
  • + *
  • return null (and let the caller decide to create a new connection or query + * again with GET_OLDEST to return the last free connecion)
  • + *
+ *

+ * A free node is member in two lists: *

    - *
  1. it is definitively member in the freeList
  2. - *
  3. it may be member in one of the affinity-lists (mod hash)
  4. + *
  5. it is member in the freeList
  6. + *
  7. it is EITHER member in one of the affinity-hash-slots + * OR it is member of the null (=last) affiinity-slot
  8. *
* The remove / transition from free to busy will remove the node from both lists. *

* Graphical exammple *

  *     By default, the busy list is empty
- *     busy ---------------------------------------------------> busyEnd
- *     free --> c1 --> c2 --> c3 --> c4 --> c5 --> c6 --> c7 --> freeEnd
- *     al1  ---------------------------------------------------> end
- *     al2  ---------------------------------------------------> end
+ *     busyList (empty)
+ *     freeList --> c1 --> c2 --> c3 --> c4 --> (end)
+ *     al[0]    (empty - first hash slot)
+ *     al[1]    (empty)
+ *     al[2]    (empty)
  *     ...
- *     al257---------------------------------------------------> end
+ *     al[N-1]  (empty - last hash slot)
+ *     al[N]    --> c1 --> c2 --> c3 --> c4 --> (end - null slot)
  *
- *     if a popFree(1) is called, we lookup in al1 and found no usable node.
- *     in this case, we take the last node, c7 and move it to the busy list
+ *     if a removeFree(1) is called, we lookup in al[1] and found no usable node.
+ *     in this case, we return the first node of al[N] that has no affinity yet.
+ *     When we remove the node from the affinityList, it is automatically removed
+ *     from the freeList
  *
- *     busy --> c7 --------------------------------------------> busyEnd
- *     free --> c1 --> c2 --> c3 --> c4 --> c5 --> c6 ---------> freeEnd
+ *     busyList --> c1 --> (end)
+ *     freeList ---------> c2 --> c3 --> c4 --> (end)
+ *     al[0]    (empty)
+ *     al[1]    (empty)
+ *     al[2]    (empty)
+ *     ...
+ *     al[N-1]  (empty)
+ *     al[N]    ---------> c2 --> c3 --> c4 --> (end)
  *
  *     When we put that node back in the freelist, it becomes the first node
- *     and it will be also linked in affinity-list1
- *
- *     busy ---------------------------------------------------> busyEnd
- *     free --> c7 --> c1 --> c2 --> c3 --> c4 --> c5 --> c6 --> freeEnd
- *     al1  --> c7 --> end  (the node for c7 is in 'free' and 'al1')
- *     al2-> (empty)
- *
- *     subsequent popFree(1) will always return c7 as long as it is not busy.
- *     now we call popFree(1) twice, we will get this picture
- *
- *     busy --> c6 --> c7 ----------------------------------------> busyEnd
- *     free --> c1 --> c2 --> c3 --> c4 --> c5 -------------------> freeEnd
- *     al1-> (empty)
- *     al2-> (empty)
- *
- *     putting them back
- *
- *     busy ------------------------------------------------------> busyEnd
- *     free --> c7 --> c6 --> c1 --> c2 --> c3 --> c4 --> c5 -----> freeEnd
- *     al1  --> c7 --> c6 --> end
- *     al2-> (empty)
- *
- *     fetching a connection with affinity = 2 will remove c5
- *     (we take connection from the end, as the front of the list may
- *     contain 'hot' connections. c7 would be a bad choice here
- *
- *     busy --> c5 -----------------------------------------------> busyEnd
- *     free --> c7 --> c6 --> c1 --> c2 --> c3 --> c4 ------------> freeEnd
- *     al1  --> c7 --> c6 --> end
- *     al2-> (empty)
+ *     and it will be also linked in al[1]
  *
- *     putting c5 back results in this list
- *
- *     busy ------------------------------------------------------> busyEnd
- *     free --> c5 --> c7 --> c6 --> c1 --> c2 --> c3 --> c4 -----> freeEnd
- *     al1  ---------> c7 --> c6 --> end
- *     al2  --> c5 ----------------> end
+ *     busyList (empty)
+ *     freeList --> c1 --> c2 --> c3 --> c4 --> (end)
+ *     al[0]    (empty)
+ *     al[1]    --> c1 --> (end)
+ *     al[2]    (empty)
+ *     ...
+ *     al[N-1]  (empty)
+ *     al[N]    ---------> c2 --> c3 --> c4 --> (end)
  *
- *     so we have 2 connections for affinity 1 and one connection for affinity 2
- *     (and the rest is ordered itself in the freeList)
+ *     now we call removeFree(2) tree times. This will move c2 to c4 to busy list
+ *     busyList --> c4 --> c3 --> c2 --> (end)
+ *     freeList --> c1 ----------------> (end)
+ *     al[0]    (empty)
+ *     al[1]    --> c1 --> (end)
+ *     al[2]    (empty)
+ *     ...
+ *     al[N-1]  (empty)
+ *     al[N]    (empty)
  *
- *     when we now fetch a connection for affinity = 1 we will get c7:
+ *     when we return the connections (c4 to c2), we have this picture
+ *     and there are no more affinity nodes left.
  *
- *     busy --> c7 -----------------------------------------------> busyEnd
- *     free --> c5 ---------> c6 --> c1 --> c2 --> c3 --> c4 -----> freeEnd
- *     al1  ----------------> c6 --> end
- *     al2  --> c5 ----------------> end
+ *     busyList (empty)
+ *     freeList --> c2 --> c3 --> c4 --> c1 --> (end)
+ *     al[0]    (empty)
+ *     al[1]    --> c1 --> (end)
+ *     al[2]    --> c2 --> c3 --> c4 --> (end)
+ *     ...
+ *     al[N-1]  (empty)
+ *     al[N]    (empty)
  *
- *     putting c7 back will add the connection back to freelist and affinity
- *     list 1
+ *     subsequent queries to affinityId=1 / 2 will return c1, respectively c2..c4
  *
- *     busy ------------------------------------------------------> busyEnd
- *     free --> c7 --> c5 --> c6 --> c1 --> c2 --> c3 --> c4 -----> freeEnd
- *     al1  --> c7 ---------> c6 --> end
- *     al2  ---------> c5 ---------> end
+ *     querying for a connection with affinityId=3 will return null,
+ *     because there is neither a matching one nor a null one.
  *
- *     when we now only fetch connections with affinity id 1 and 2, we will
- *     always get c7/c5 and the pool can trim c6,c1,c2,c3,c4
+ *     The caller can now decide to create a new connection "c5" or
+ *     query with GET_OLDEST for "c1"
  * 
*

* All thread safety controlled externally (by PooledConnectionQueue). @@ -120,94 +118,81 @@ */ final class ConnectionBuffer { - static final Object POP_LAST = new Object(); + // special key to return the oldest connection from freeList. + static final Object GET_OLDEST = new Object(); - private final Node free = Node.init(); - private final Node freeEnd = free.next; - private final Node busy = Node.init(); + private final ConnectionList[] affinityLists; + private final ConnectionList freeList = new ConnectionList(); + private final ConnectionList busyList = new ConnectionList(); - private final Node[] affinityNodes; private final int hashSize; ConnectionBuffer(int hashSize) { + assert hashSize >= 0; this.hashSize = hashSize; - if (hashSize > 0) { - affinityNodes = new Node[hashSize]; - for (int i = 0; i < affinityNodes.length; i++) { - affinityNodes[i] = Node.init(); - } + if (hashSize == 0) { + affinityLists = null; } else { - affinityNodes = null; + // we instantiate hashSize+1 slots. The last slot is reserved for connections + // with `null` as affinityId + affinityLists = new ConnectionList[hashSize + 1]; + for (int i = 0; i < affinityLists.length; i++) { + affinityLists[i] = new ConnectionList(); + } } } - int freeSize = 0; - int busySize = 0; - /** - * Return the number of entries in the buffer. + * Return the number of free connections. */ int freeSize() { - return freeSize; + return freeList.size(); } /** * Return the number of busy connections. */ int busySize() { - return busySize; + return busyList.size(); } /** - * Return true if the buffer is empty. + * Add the connection to the beginning of the free list. + *

+ * Note, the connection must be either new or unlinked from the busy list. */ - boolean hasFreeConnections() { - return freeSize > 0; + void addFree(PooledConnection c) { + c.unlink(); + freeList.addFirst(c.busyFree()); + if (affinityLists != null) { + if (c.affinityId() != null) { + affinityLists[c.affinityId().hashCode() % hashSize].addFirst(c.affinity()); + } else { + affinityLists[hashSize].addFirst(c.affinity()); + } + } } /** - * Adds a new connection to the free list. + * Adds the connection to the busy list. + *

+ * Note, the connection must be either new or unlinked from the free list. */ - void addFree(PooledConnection pc) { - assert pc.busyNode() == null : "Connection seems not to be new"; - new Node(pc).addAfter(free); - freeSize++; + int addBusy(PooledConnection c) { + busyList.addFirst(c.busyFree()); + return busyList.size(); } /** - * Removes the connection from the busy list. (For full close) + * Removes the connection from the busy list. * Returns true, if this connection was part of the busy list or false, if not (or removed twice) */ boolean removeBusy(PooledConnection c) { - Node node = c.busyNode(); - if (node == null || node.next == null) { - // node is not yet or no longer in busy list - return false; - } - node.remove(); - busySize--; - c.setBusyNode(null); - return true; - } - - /** - * Moves the connection from the busy list to the free list. - */ - boolean moveToFreeList(PooledConnection c) { - Node node = c.busyNode(); - if (node == null) { - return false; - } - node.remove(); - busySize--; - if (affinityNodes != null && c.affinityId() != null) { - node.addAfter(free, affinityNodes[c.affinityId().hashCode() % hashSize]); - } else { - node.addAfter(free); + if (busyList.isLinkedTo(c.busyFree())) { + c.unlink(); + return true; } - freeSize++; - c.setBusyNode(null); - return true; + return false; } /** @@ -224,41 +209,27 @@ boolean moveToFreeList(PooledConnection c) { * ask again with POP_LAST, which returns the last * (=oldest) connection if affinity is enabled. */ - PooledConnection popFree(Object affinityId) { - Node node; - if (affinityId == null || affinityNodes == null) { - node = free.next; - } else if (affinityId == POP_LAST) { - node = freeEnd.prev; - } else { - node = affinityNodes[affinityId.hashCode() % hashSize].find(affinityId); - if (node == null) { - // when we did not find a node with that affinity, we return null - // this allows the pool to grow to its maximum size - return null; + PooledConnection removeFree(Object affinityId) { + PooledConnection pc; + if (affinityId == GET_OLDEST) { + pc = freeList.peekLast(); + } else if (affinityLists == null) { + pc = freeList.peekFirst(); + } else if (affinityId == null) { + pc = affinityLists[hashSize].peekFirst(); + } else { // we have an affinity id. + pc = affinityLists[affinityId.hashCode() % hashSize].find(affinityId); + if (pc == null) { + // no pc with this affinity-id in the pool. + // query "null"-affinityList + pc = affinityLists[hashSize].peekFirst(); } } - if (node.isBoundaryNode()) { + if (pc == null) { return null; } - node.remove(); - freeSize--; - node.pc.setBusyNode(node); // sets the node for reuse in "addBusy" - return node.pc; - } - - /** - * Adds the connection to the busy list. The connection must be either new or popped from the free list. - */ - int addBusy(PooledConnection c) { - Node node = c.busyNode(); // we try to reuse the node to avoid object creation. - if (node == null) { - node = new Node(c); - c.setBusyNode(node); - } - node.addAfter(busy); - busySize++; - return busySize; + pc.unlink(); + return pc; } /** @@ -266,11 +237,11 @@ int addBusy(PooledConnection c) { */ void closeAllFree(boolean logErrors) { List tempList = new ArrayList<>(); - PooledConnection c = popFree(null); - while (c != null) { - tempList.add(c); - c = popFree(null); - } + + freeList.forEach(pc -> { + pc.unlink(); + tempList.add(pc); + }); if (Log.isLoggable(System.Logger.Level.TRACE)) { Log.trace("... closing all {0} connections from the free list with logErrors: {1}", tempList.size(), logErrors); @@ -284,23 +255,20 @@ void closeAllFree(boolean logErrors) { * Trim any inactive connections that have not been used since usedSince. */ int trim(int minSize, long usedSince, long createdSince) { - int trimCount = 0; - Node node = free; // first boundary node - do { - node = node.next; - } while (!node.isBoundaryNode() && minSize-- > 0); + int toTrim = freeSize() - minSize; - while (!node.isBoundaryNode()) { - Node current = node; - node = node.next; - if (current.pc.shouldTrim(usedSince, createdSince)) { - current.remove(); - freeSize--; - current.pc.closeConnectionFully(true); - trimCount++; + List ret = new ArrayList<>(toTrim); + for (PooledConnection pc : freeList.reverse()) { + if (ret.size() >= toTrim) { + break; + } + if (pc.shouldTrim(usedSince, createdSince)) { + pc.unlink(); + ret.add(pc); } } - return trimCount; + ret.forEach(pc -> pc.closeConnectionFully(true)); + return ret.size(); } /** @@ -309,22 +277,15 @@ int trim(int minSize, long usedSince, long createdSince) { void closeBusyConnections(long leakTimeMinutes) { long olderThanTime = System.currentTimeMillis() - (leakTimeMinutes * 60000); Log.debug("Closing busy connections using leakTimeMinutes {0}", leakTimeMinutes); - Node node = busy.next; - while (!node.isBoundaryNode()) { - Node current = node; - node = node.next; - - PooledConnection pc = current.pc; - //noinspection StatementWithEmptyBody + busyList.forEach(pc -> { if (pc.lastUsedTime() > olderThanTime) { // PooledConnection has been used recently or // expected to be longRunning so not closing... } else { - current.remove(); - --busySize; + pc.unlink(); closeBusyConnection(pc); } - } + }); } private void closeBusyConnection(PooledConnection pc) { @@ -345,115 +306,13 @@ String busyConnectionInformation(boolean toLogger) { Log.info("Dumping [{0}] busy connections: (Use datasource.xxx.capturestacktrace=true ... to get stackTraces)", busySize()); } StringBuilder sb = new StringBuilder(); - Node node = busy.next; - while (!node.isBoundaryNode()) { - PooledConnection pc = node.pc; - node = node.next; + busyList.forEach(pc -> { if (toLogger) { Log.info("Busy Connection - {0}", pc.fullDescription()); } else { sb.append(pc.fullDescription()).append("\r\n"); } - } + }); return sb.toString(); } - - - /** - * Node of a linkedlist. The linkedLists always have two empty nodes at the start and end. - * (boundary nodes) They are generated with the init() method. - *

- * the first usable node is startNode.next (which could be the end boundary) - */ - static final class Node { - - private Node next; - private Node prev; - // Double-LL nodes for affinity management - private Node afNext; - private Node afPrev; - final PooledConnection pc; - - private Node(PooledConnection pc) { - this.pc = pc; - } - - /** - * Creates new "list" with two empty boundary nodes - */ - public static Node init() { - Node node1 = new Node(null); - Node node2 = new Node(null); - node1.next = node2; - node2.prev = node1; - node1.afNext = node2; - node2.afPrev = node1; - return node1; - } - - /** - * Retruns true, if this is a boundary node. (start or end node of list) - */ - private boolean isBoundaryNode() { - return pc == null; - } - - /** - * Removes the node from the list. The node can be re-added to an other list - */ - private void remove() { - assert pc != null : "called remove on a boundary node"; - assert prev != null && next != null : "not part of a list"; - next.prev = prev; - prev.next = next; - prev = null; - next = null; - if (afNext != null) { - afNext.afPrev = afPrev; - afPrev.afNext = afNext; - afPrev = null; - afNext = null; - } - } - - /** - * Adds this after node. - *

- * Node is in most cases a boundary node (e.g. start of list) - */ - public void addAfter(Node node) { - assert !this.isBoundaryNode() : "this is a boundary node"; - assert next == null && prev == null : "Node already member of a list"; - next = node.next; - prev = node; - node.next.prev = this; - node.next = this; - } - - /** - * Adds this after node AND as affinity-node after afNode. - */ - public void addAfter(Node node, Node afNode) { - addAfter(node); - assert afNext == null && afPrev == null : "Node already member of affinity-list"; - afNext = afNode.afNext; - afPrev = afNode; - afNode.afNext.afPrev = this; - afNode.afNext = this; - } - - /** - * Find the connection with given affinity id in this affinity-list. - */ - public Node find(Object affinityId) { - Node n = this.afNext; - while (!n.isBoundaryNode()) { - if (affinityId.equals(n.pc.affinityId())) { - return n; - } - n = n.afNext; - } - return null; - } - } } diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionList.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionList.java new file mode 100644 index 0000000..210ca82 --- /dev/null +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionList.java @@ -0,0 +1,199 @@ +package io.ebean.datasource.pool; + +import java.util.Iterator; +import java.util.Objects; + +/** + * A linked list implementation designed for pooledConnections. + *

+ * The linkedList supports adding and removing connections in constant time. + * In contrast to the java.util.LinkedList, this linkedList provides access + * to the list nodes. The nodes can be unlinked from one list and can be + * added to another. This saves a bit overhead of creating new node objects + * on each transition from free to busy list. + *

+ * the list implements Iterable and reverse() for reverse traversion. + * + * @author Roland Praml, Foconis Analytics GmbH + */ +final class ConnectionList implements Iterable { + + private final Node head; + private final Node tail; + private int size; + + /** + * Construct new list with two boundary nodes. + */ + ConnectionList() { + // initialize the two boundary nodes; + head = new Node(null); + tail = new Node(null); + head.next = tail; + tail.prev = head; + } + + /** + * Adds the node in front of the list. This works in constant time + */ + void addFirst(Node n) { + assert !n.isBoundaryNode() : "this is a boundary node"; + assert !n.isLinked() : "Node already member of a list"; + n.next = head.next; + n.prev = head; + head.next.prev = n; + head.next = n; + n.list = this; + size++; + } + + /** + * returns the first element in the list or null if list is empty + */ + PooledConnection peekFirst() { + Node ret = head.next; + return ret.isBoundaryNode() ? null : ret.pc; + } + + /** + * returns last first element in the list or null if list is empty + */ + PooledConnection peekLast() { + Node ret = tail.prev; + return ret.isBoundaryNode() ? null : ret.pc; + } + + /** + * Iterates the list starting with first element. + */ + public Iterator iterator() { + return new Iterator<>() { + private Node n = head.next; + private PooledConnection pc; + + @Override + public boolean hasNext() { + return !n.isBoundaryNode(); + } + + @Override + public PooledConnection next() { + pc = n.pc; + n = n.next; + return pc; + } + + @Override + public void remove() { + pc.unlink(); + } + }; + } + + /** + * Iterates the reverse way over the list + */ + Iterable reverse() { + return () -> new Iterator<>() { + private Node n = tail.prev; + private PooledConnection pc; + + @Override + public boolean hasNext() { + return !n.isBoundaryNode(); + } + + @Override + public PooledConnection next() { + pc = n.pc; + n = n.prev; + return pc; + } + + @Override + public void remove() { + pc.unlink(); + } + }; + } + + /** + * Finds the node with this affinity id. + */ + PooledConnection find(Object affinityId) { + Node n = head.next; + while (!n.isBoundaryNode()) { + if (Objects.equals(affinityId, n.pc.affinityId())) { + return n.pc; + } + n = n.next; + } + return null; + } + + int size() { + return size; + } + + /** + * Returns true, if this node is linked to that list. + */ + boolean isLinkedTo(Node node) { + // The implementation relies on node.list == this + // which is guaranteed by the add/unlink mehtod. + return node != null && node.list == this; + } + + + /** + * Node of a linkedlist. The linkedLists always have two empty nodes + * at the start and end. (boundary nodes) + *

+ * the first usable node is startNode.next (which could be the end boundary) + */ + static final class Node { + + private Node next; + private Node prev; + private ConnectionList list; + private final PooledConnection pc; + + Node(PooledConnection pc) { + this.pc = pc; + } + + /** + * Retruns true, if this is a boundary node. (start or end node of list) + */ + private boolean isBoundaryNode() { + return pc == null; + } + + /** + * Removes the node from the list. The node can be re-added to an other list. + *

+ * Note: As PooledConnections are often in two lists, always use + * PooledConnection.detach() instead of calling this method directly. + */ + boolean unlink() { + assert !isBoundaryNode() : "called remove on a boundary node"; + if (!isLinked()) { + return false; + } + list.size--; + next.prev = prev; + prev.next = next; + prev = null; + next = null; + list = null; + return true; + } + + /** + * Returns true, if this node is linked in a list. + */ + boolean isLinked() { + return list != null; + } + } +} diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java index 1dbc5a1..e9b1288 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java @@ -1,16 +1,8 @@ package io.ebean.datasource.pool; import io.ebean.datasource.DataSourceConnection; -import io.ebean.datasource.DataSourcePool; - -import java.sql.CallableStatement; -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.sql.SQLWarning; -import java.sql.Savepoint; -import java.sql.Statement; + +import java.sql.*; import java.util.ArrayList; import java.util.Map; import java.util.concurrent.locks.ReentrantLock; @@ -142,10 +134,11 @@ final class PooledConnection extends ConnectionDelegator implements DataSourceCo private String createdByMethod; private StackTraceElement[] stackTrace; private final int maxStackTrace; - /** - * Slot position in the BusyConnectionBuffer. - */ - private ConnectionBuffer.Node busyNode; + + // node in busyFree or affinity list + private final ConnectionList.Node busyFree = new ConnectionList.Node(this); + private final ConnectionList.Node affinity = new ConnectionList.Node(this); + private Object affinityId; @@ -197,15 +190,20 @@ final class PooledConnection extends ConnectionDelegator implements DataSourceCo /** * Return the node in the busy list. If this is empty, the connection is free */ - ConnectionBuffer.Node busyNode() { - return busyNode; + ConnectionList.Node busyFree() { + return busyFree; + } + + ConnectionList.Node affinity() { + return affinity; } /** - * Set the busy node. + * Unlinks the pooledConnection from the busyFree and affinity-list. */ - void setBusyNode(ConnectionBuffer.Node busyNode) { - this.busyNode = busyNode; + void unlink() { + busyFree.unlink(); + affinity.unlink(); } /** diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java index 0732b90..d43182e 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java @@ -143,15 +143,13 @@ void ensureMinimumConnections() throws SQLException { void returnPooledConnection(PooledConnection c, boolean forceClose) { lock.lock(); try { + if (!buffer.removeBusy(c)) { + Log.error("Connection [{0}] not found in BusyList?", c); + } if (forceClose || c.shouldTrimOnReturn(lastResetTime, maxAgeMillis)) { - if (!buffer.removeBusy(c)) { - Log.error("Connection [{0}] not found in BusyList?", c); - } c.closeConnectionFully(false); } else { - if (!buffer.moveToFreeList(c)) { - Log.error("Connection [{0}] not found in BusyList?", c); - } + buffer.addFree(c); notEmpty.signal(); } } finally { @@ -161,7 +159,7 @@ void returnPooledConnection(PooledConnection c, boolean forceClose) { private PooledConnection extractFromFreeList(Object affinitiyId) { - PooledConnection c = buffer.popFree(affinitiyId); + PooledConnection c = buffer.removeFree(affinitiyId); if (c == null) { return null; } @@ -232,7 +230,7 @@ private PooledConnection _obtainConnection(Object affinitiyId) throws Interrupte return connection; } if (affinitiyId != null) { - connection = extractFromFreeList(ConnectionBuffer.POP_LAST); + connection = extractFromFreeList(ConnectionBuffer.GET_OLDEST); if (connection != null) { return connection; } @@ -293,9 +291,10 @@ private PooledConnection _obtainConnectionWaitLoop(Object affinitiyId) throws SQ try { nanos = notEmpty.awaitNanos(nanos); - if (buffer.hasFreeConnections()) { + PooledConnection c = extractFromFreeList(affinitiyId); + if (c != null) { // successfully waited - return extractFromFreeList(affinitiyId); + return c; } } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread diff --git a/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionBufferTest.java b/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionBufferTest.java index 9f2105a..1ccd912 100644 --- a/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionBufferTest.java +++ b/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionBufferTest.java @@ -19,23 +19,20 @@ void test() { assertEquals(0, b.freeSize()); - assertFalse(b.hasFreeConnections()); b.addFree(p0); assertEquals(1, b.freeSize()); - assertTrue(b.hasFreeConnections()); - PooledConnection r0 = b.popFree(null); + PooledConnection r0 = b.removeFree(null); b.addBusy(p0); assertThat(p0).isSameAs(r0); assertEquals(0, b.freeSize()); - assertFalse(b.hasFreeConnections()); assertEquals(0, b.freeSize()); assertEquals(1, b.busySize()); - b.moveToFreeList(p0); + b.addFree(p0); assertEquals(1, b.freeSize()); assertEquals(0, b.busySize()); @@ -44,43 +41,43 @@ void test() { assertEquals(3, b.freeSize()); - PooledConnection r1 = b.popFree(null); + PooledConnection r1 = b.removeFree(null); b.addBusy(r1); assertSame(p2, r1); - PooledConnection r2 = b.popFree(null); + PooledConnection r2 = b.removeFree(null); b.addBusy(r2); assertSame(p1, r2); assertEquals(1, b.freeSize()); - b.moveToFreeList(r1); + b.addFree(r1); assertEquals(2, b.freeSize()); - PooledConnection r3 = b.popFree(null); + PooledConnection r3 = b.removeFree(null); b.addBusy(r3); assertSame(p2, r3); assertEquals(1, b.freeSize()); - PooledConnection r4 = b.popFree(null); + PooledConnection r4 = b.removeFree(null); b.addBusy(r4); assertSame(p0, r4); assertEquals(0, b.freeSize()); - b.moveToFreeList(r3); // = p2 - b.moveToFreeList(r2); // = p1 - b.moveToFreeList(r4); // = p0 + b.addFree(r3);// = p2 + b.addFree(r2);// = p1 + b.addFree(r4);// = p0 assertEquals(3, b.freeSize()); - PooledConnection r5 = b.popFree(null); + PooledConnection r5 = b.removeFree(null); b.addBusy(r5); assertSame(p0, r5); assertEquals(2, b.freeSize()); - PooledConnection r6 = b.popFree(null); + PooledConnection r6 = b.removeFree(null); b.addBusy(r6); assertSame(p1, r6); assertEquals(1, b.freeSize()); - PooledConnection r7 = b.popFree(null); + PooledConnection r7 = b.removeFree(null); b.addBusy(r7); assertSame(p2, r7); assertEquals(0, b.freeSize()); @@ -112,43 +109,48 @@ public void test_busy_free() { assertEquals(2, b.busySize()); assertEquals(2, b.freeSize()); - PooledConnection c3 = b.popFree(null); + PooledConnection c3 = b.removeFree(null); assertSame(p3, c3); assertEquals(2, b.busySize()); assertEquals(1, b.freeSize()); b.addBusy(c3); + assertEquals(3, b.busySize()); + assertEquals(1, b.freeSize()); assertThatThrownBy(() -> b.addBusy(p3)).hasMessageContaining("Node already member of a list"); assertEquals(3, b.busySize()); - PooledConnection c2 = b.popFree(null); + PooledConnection c2 = b.removeFree(null); b.addBusy(c2); assertSame(p2, c2); assertEquals(4, b.busySize()); assertEquals(0, b.freeSize()); - assertNull(b.popFree(null)); // no free connections left + assertNull(b.removeFree(null)); // no free connections left // all are busy now - assertNotNull(p0.busyNode()); - assertNotNull(p1.busyNode()); - assertNotNull(p2.busyNode()); - assertNotNull(p3.busyNode()); + assertTrue(p0.busyFree().isLinked()); + assertTrue(p1.busyFree().isLinked()); + assertTrue(p2.busyFree().isLinked()); + assertTrue(p3.busyFree().isLinked()); b.removeBusy(p0); assertEquals(3, b.busySize()); assertEquals(0, b.freeSize()); + assertFalse(p0.busyFree().isLinked()); - assertFalse(b.moveToFreeList(p0)); - assertTrue(b.moveToFreeList(p1)); - assertFalse(b.moveToFreeList(p1)); + assertFalse(b.removeBusy(p0)); + assertTrue(b.removeBusy(p1)); + b.addFree(p1); + assertFalse(b.removeBusy(p1)); assertEquals(2, b.busySize()); assertEquals(1, b.freeSize()); - b.moveToFreeList(p2); - b.moveToFreeList(p3); + b.addFree(p2); + b.addFree(p3); + b.addFree(p3); assertEquals(0, b.busySize()); assertEquals(3, b.freeSize()); @@ -171,8 +173,8 @@ public void test_Affinity() { PooledConnection c1 = getConnection(b, 42); PooledConnection c2 = getConnection(b, 17); - b.moveToFreeList(c1); - b.moveToFreeList(c2); + b.addFree(c1); + b.addFree(c2); PooledConnection c3 = getConnection(b,43); assertNotSame(c3, c1); @@ -183,9 +185,9 @@ public void test_Affinity() { } private static PooledConnection getConnection(ConnectionBuffer b, Object affinity) { - PooledConnection c1 = b.popFree(affinity); + PooledConnection c1 = b.removeFree(affinity); if (c1 == null) { - c1 = b.popFree(ConnectionBuffer.POP_LAST); + c1 = b.removeFree(ConnectionBuffer.GET_OLDEST); } c1.setAffinityId(affinity); b.addBusy(c1); diff --git a/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionListTest.java b/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionListTest.java new file mode 100644 index 0000000..4140ab7 --- /dev/null +++ b/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionListTest.java @@ -0,0 +1,83 @@ +package io.ebean.datasource.pool; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class ConnectionListTest { + + /** + * Test adding and unlinking from connectionList. + */ + @Test + void test() { + + ConnectionList l1 = new ConnectionList(); + ConnectionList l2 = new ConnectionList(); + + PooledConnection p0 = new PooledConnection("0"); + p0.setAffinityId(43); + PooledConnection p1 = new PooledConnection("1"); + p1.setAffinityId(44); + PooledConnection p2 = new PooledConnection("2"); + p2.setAffinityId(45); + + assertThat(l1.size()).isEqualTo(0); + assertThat(l1.peekFirst()).isNull(); + assertThat(l1.peekLast()).isNull(); + assertThat(l1.find(42)).isNull(); + + + l1.addFirst(p0.busyFree()); + l2.addFirst(p1.affinity()); + + assertThat(l1.size()).isEqualTo(1); + assertThat(l1.peekFirst()).isSameAs(p0); + assertThat(l1.peekLast()).isSameAs(p0); + assertThat(l1.find(42)).isNull(); + assertThat(l1.find(43)).isSameAs(p0); + + assertThat(l2.size()).isEqualTo(1); + + l1.addFirst(p1.busyFree()); + l1.addFirst(p2.busyFree()); + + assertThat(l1.size()).isEqualTo(3); + assertThat(l2.size()).isEqualTo(1); + + assertThat(l1).containsExactly(p2, p1, p0); + assertThat(l1.reverse()).containsExactly(p0, p1, p2); + + p0.unlink(); + assertThat(l1.size()).isEqualTo(2); + assertThat(l2.size()).isEqualTo(1); + + p1.unlink(); // p1 is member of both lists + assertThat(l1.size()).isEqualTo(1); + assertThat(l2.size()).isEqualTo(0); + + ConnectionList l3 = new ConnectionList(); + + // adding an already linked conncection will throw an error + assertThatThrownBy(() -> l3.addFirst(p2.busyFree())) + .isInstanceOf(AssertionError.class) + .hasMessageContaining("Node already member of a list"); + + assertThat(l3.size()).isEqualTo(0); + + p2.unlink(); + l3.addFirst(p2.busyFree()); + assertThat(l3.size()).isEqualTo(1); + + p2.unlink(); + p2.unlink(); // subsequent unlink must not throw error + l2.addFirst(p2.busyFree()); + assertThat(l2.size()).isEqualTo(1); + assertThat(l3.size()).isEqualTo(0); + } + +} From 865af3f56107f63788663b33214229168ae7f608 Mon Sep 17 00:00:00 2001 From: Roland Praml Date: Mon, 31 Mar 2025 13:29:55 +0200 Subject: [PATCH 09/12] Query also oldest connection in queue --- .../io/ebean/datasource/pool/ConnectionBuffer.java | 7 +++++++ .../datasource/pool/PooledConnectionQueue.java | 14 ++++++++------ 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java index 67ba168..6a420f2 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java @@ -142,6 +142,13 @@ final class ConnectionBuffer { } } + /** + * affinity is supported by the buffer. + */ + boolean isAffinitySupported() { + return affinityLists != null; + } + /** * Return the number of free connections. */ diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java index d43182e..a4a3830 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java @@ -226,15 +226,14 @@ private PooledConnection _obtainConnection(Object affinitiyId) throws Interrupte return connection; } connection = createConnection(); + if (connection == null && buffer.isAffinitySupported()) { + // we could not find connection with required affinity and + // buffer is full. So try to get oldest connection from buffer + connection = extractFromFreeList(ConnectionBuffer.GET_OLDEST); + } if (connection != null) { return connection; } - if (affinitiyId != null) { - connection = extractFromFreeList(ConnectionBuffer.GET_OLDEST); - if (connection != null) { - return connection; - } - } } try { // The pool is at maximum size. We are going to go into @@ -292,6 +291,9 @@ private PooledConnection _obtainConnectionWaitLoop(Object affinitiyId) throws SQ try { nanos = notEmpty.awaitNanos(nanos); PooledConnection c = extractFromFreeList(affinitiyId); + if (c == null && buffer.isAffinitySupported()) { + c = extractFromFreeList(ConnectionBuffer.GET_OLDEST); + } if (c != null) { // successfully waited return c; From db3099eaed2546905a6ada2deafbcee635b4b217 Mon Sep 17 00:00:00 2001 From: Roland Praml Date: Mon, 31 Mar 2025 14:30:52 +0200 Subject: [PATCH 10/12] Move expensive operations outside lock --- .../datasource/pool/ConnectionBuffer.java | 63 +++++-------- .../pool/PooledConnectionQueue.java | 91 ++++++++++++------- 2 files changed, 82 insertions(+), 72 deletions(-) diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java index 6a420f2..c422b67 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java @@ -239,29 +239,44 @@ PooledConnection removeFree(Object affinityId) { return pc; } + /** - * Close all free connections in this buffer. + * Clears the freelist and return the connections. */ - void closeAllFree(boolean logErrors) { + List clearFreeList() { List tempList = new ArrayList<>(); freeList.forEach(pc -> { pc.unlink(); tempList.add(pc); }); + return tempList; + } - if (Log.isLoggable(System.Logger.Level.TRACE)) { - Log.trace("... closing all {0} connections from the free list with logErrors: {1}", tempList.size(), logErrors); - } - for (PooledConnection connection : tempList) { - connection.closeConnectionFully(logErrors); - } + + /** + * Clears the busy list and return the connections that should be considered leaked. + */ + List cleanupBusyList(long leakTimeMinutes) { + long olderThanTime = System.currentTimeMillis() - (leakTimeMinutes * 60000); + List tempList = new ArrayList<>(); + + busyList.forEach(pc -> { + if (pc.lastUsedTime() > olderThanTime) { + // PooledConnection has been used recently or + // expected to be longRunning so not closing... + } else { + pc.unlink(); + tempList.add(pc); + } + }); + return tempList; } /** * Trim any inactive connections that have not been used since usedSince. */ - int trim(int minSize, long usedSince, long createdSince) { + List trim(int minSize, long usedSince, long createdSince) { int toTrim = freeSize() - minSize; List ret = new ArrayList<>(toTrim); @@ -274,35 +289,7 @@ int trim(int minSize, long usedSince, long createdSince) { ret.add(pc); } } - ret.forEach(pc -> pc.closeConnectionFully(true)); - return ret.size(); - } - - /** - * Close connections that should be considered leaked. - */ - void closeBusyConnections(long leakTimeMinutes) { - long olderThanTime = System.currentTimeMillis() - (leakTimeMinutes * 60000); - Log.debug("Closing busy connections using leakTimeMinutes {0}", leakTimeMinutes); - busyList.forEach(pc -> { - if (pc.lastUsedTime() > olderThanTime) { - // PooledConnection has been used recently or - // expected to be longRunning so not closing... - } else { - pc.unlink(); - closeBusyConnection(pc); - } - }); - } - - private void closeBusyConnection(PooledConnection pc) { - try { - Log.warn("DataSource closing busy connection? {0}", pc.fullDescription()); - System.out.println("CLOSING busy connection: " + pc.fullDescription()); - pc.closeConnectionFully(false); - } catch (Exception ex) { - Log.error("Error when closing potentially leaked connection " + pc.description(), ex); - } + return ret; } /** diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java index a4a3830..9c601c8 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java @@ -5,6 +5,8 @@ import io.ebean.datasource.pool.ConnectionPool.Status; import java.sql.SQLException; +import java.util.Collections; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -107,15 +109,10 @@ PoolStatus status(boolean reset) { } void setMaxSize(int maxSize) { - lock.lock(); - try { - if (maxSize < this.minSize) { - throw new IllegalArgumentException("maxSize " + maxSize + " < minSize " + this.minSize); - } - this.maxSize = maxSize; - } finally { - lock.unlock(); + if (maxSize < this.minSize) { + throw new IllegalArgumentException("maxSize " + maxSize + " < minSize " + this.minSize); } + this.maxSize = maxSize; } private int totalConnections() { @@ -141,13 +138,14 @@ void ensureMinimumConnections() throws SQLException { * Return a PooledConnection. */ void returnPooledConnection(PooledConnection c, boolean forceClose) { + boolean closeConnection = false; lock.lock(); try { if (!buffer.removeBusy(c)) { Log.error("Connection [{0}] not found in BusyList?", c); } if (forceClose || c.shouldTrimOnReturn(lastResetTime, maxAgeMillis)) { - c.closeConnectionFully(false); + closeConnection = true; } else { buffer.addFree(c); notEmpty.signal(); @@ -155,6 +153,9 @@ void returnPooledConnection(PooledConnection c, boolean forceClose) { } finally { lock.unlock(); } + if (closeConnection) { + c.closeConnectionFully(false); + } } private PooledConnection extractFromFreeList(Object affinitiyId) { @@ -356,17 +357,12 @@ void reset(long leakTimeMinutes) { } void trim(long maxInactiveMillis, long maxAgeMillis) { - lock.lock(); - try { - if (trimInactiveConnections(maxInactiveMillis, maxAgeMillis)) { - try { - ensureMinimumConnections(); - } catch (SQLException e) { - Log.error("Error trying to ensure minimum connections", e); - } + if (trimInactiveConnections(maxInactiveMillis, maxAgeMillis)) { + try { + ensureMinimumConnections(); + } catch (SQLException e) { + Log.error("Error trying to ensure minimum connections", e); } - } finally { - lock.unlock(); } } @@ -375,33 +371,50 @@ void trim(long maxInactiveMillis, long maxAgeMillis) { */ private boolean trimInactiveConnections(long maxInactiveMillis, long maxAgeMillis) { final long createdSince = (maxAgeMillis == 0) ? 0 : System.currentTimeMillis() - maxAgeMillis; - final int trimmedCount; - if (buffer.freeSize() > minSize) { - // trim on maxInactive and maxAge - long usedSince = System.currentTimeMillis() - maxInactiveMillis; - trimmedCount = buffer.trim(minSize, usedSince, createdSince); - } else if (createdSince > 0) { - // trim only on maxAge - trimmedCount = buffer.trim(0, createdSince, createdSince); - } else { - trimmedCount = 0; + final List toTrim; + lock.lock(); + try { + if (buffer.freeSize() > minSize) { + // trim on maxInactive and maxAge + long usedSince = System.currentTimeMillis() - maxInactiveMillis; + toTrim = buffer.trim(minSize, usedSince, createdSince); + } else if (createdSince > 0) { + // trim only on maxAge + toTrim = buffer.trim(0, createdSince, createdSince); + } else { + toTrim = Collections.emptyList(); + } + } finally { + lock.unlock(); + } + if (toTrim.isEmpty()) { + return false; } - if (trimmedCount > 0 && Log.isLoggable(DEBUG)) { - Log.debug("DataSource [{0}] trimmed [{1}] inactive connections. New size[{2}]", name, trimmedCount, totalConnections()); + toTrim.forEach(pc -> pc.closeConnectionFully(true)); + if (Log.isLoggable(DEBUG)) { + Log.debug("DataSource [{0}] trimmed [{1}] inactive connections. New size[{2}]", name, toTrim.size(), totalConnections()); } - return trimmedCount > 0 && buffer.freeSize() < minSize; + return buffer.freeSize() < minSize; } /** * Close all the connections that are in the free list. */ private void closeFreeConnections(boolean logErrors) { + List tempList; lock.lock(); try { - buffer.closeAllFree(logErrors); + tempList = buffer.clearFreeList(); } finally { lock.unlock(); } + // closing the connections is done outside lock + if (Log.isLoggable(System.Logger.Level.TRACE)) { + Log.trace("... closing all {0} connections from the free list with logErrors: {1}", tempList.size(), logErrors); + } + for (PooledConnection connection : tempList) { + connection.closeConnectionFully(logErrors); + } } /** @@ -415,12 +428,22 @@ private void closeFreeConnections(boolean logErrors) { * closed and put back into the pool. */ void closeBusyConnections(long leakTimeMinutes) { + List tempList; lock.lock(); try { - buffer.closeBusyConnections(leakTimeMinutes); + tempList = buffer.cleanupBusyList(leakTimeMinutes); } finally { lock.unlock(); } + for (PooledConnection pc : tempList) { + try { + Log.warn("DataSource closing busy connection? {0}", pc.fullDescription()); + System.out.println("CLOSING busy connection: " + pc.fullDescription()); + pc.closeConnectionFully(false); + } catch (Exception ex) { + Log.error("Error when closing potentially leaked connection " + pc.description(), ex); + } + } } String getBusyConnectionInformation() { From 1fc59655ed52574e596eeb8166c858c82c2be108 Mon Sep 17 00:00:00 2001 From: Roland Praml Date: Fri, 4 Apr 2025 09:13:19 +0200 Subject: [PATCH 11/12] Optimized find strategy --- .../datasource/pool/ConnectionBuffer.java | 21 ++++++++++++------- .../ebean/datasource/pool/ConnectionPool.java | 3 +-- .../pool/PooledConnectionQueue.java | 21 +++++++++++-------- .../datasource/pool/ConnectionBufferTest.java | 2 +- 4 files changed, 28 insertions(+), 19 deletions(-) diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java index c422b67..8d83a10 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java @@ -119,7 +119,8 @@ final class ConnectionBuffer { // special key to return the oldest connection from freeList. - static final Object GET_OLDEST = new Object(); + static final Object GET_LAST = new Object(); + static final Object GET_FIRST = new Object(); private final ConnectionList[] affinityLists; private final ConnectionList freeList = new ConnectionList(); @@ -218,17 +219,23 @@ boolean removeBusy(PooledConnection c) { */ PooledConnection removeFree(Object affinityId) { PooledConnection pc; - if (affinityId == GET_OLDEST) { - pc = freeList.peekLast(); - } else if (affinityLists == null) { + if (affinityLists == null) { + // affinity disabled. Always use first in list pc = freeList.peekFirst(); } else if (affinityId == null) { + // null affinity passed pc = affinityLists[hashSize].peekFirst(); - } else { // we have an affinity id. + } else if (affinityId == GET_FIRST) { + // explicitly first one was requested (for heartbeat) + pc = freeList.peekFirst(); + } else if (affinityId == GET_LAST) { + // explicitly last one was requested (for changing affinityId) + pc = freeList.peekLast(); + } else { + // we have an affinity id request pc = affinityLists[affinityId.hashCode() % hashSize].find(affinityId); if (pc == null) { - // no pc with this affinity-id in the pool. - // query "null"-affinityList + // no pc with this affinity-id in the pool. Query "null"-affinityList pc = affinityLists[hashSize].peekFirst(); } } diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java index f37765e..ff660a7 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java @@ -382,7 +382,7 @@ private void testConnection() { PooledConnection conn = null; try { // Get a connection from the pool and test it - conn = getPooledConnection(affinityProvider.get()); + conn = getPooledConnection(ConnectionBuffer.GET_FIRST); heartbeatPoolExhaustedCount = 0; if (testConnection(conn)) { notifyDataSourceIsUp(); @@ -648,7 +648,6 @@ public DataSourceConnection getConnection(Object affinityId) throws SQLException */ private PooledConnection getPooledConnection(Object affinitiyId) throws SQLException { PooledConnection c = queue.obtainConnection(affinitiyId); - c.setAffinityId(affinitiyId); if (captureStackTrace) { c.setStackTrace(Thread.currentThread().getStackTrace()); } diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java index 9c601c8..1f2f0be 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java @@ -190,6 +190,9 @@ PooledConnection obtainConnection(Object affinitiyId) throws SQLException { try { PooledConnection pc = _obtainConnection(affinitiyId); pc.resetForUse(); + if (affinitiyId != ConnectionBuffer.GET_FIRST && affinitiyId != ConnectionBuffer.GET_LAST) { + pc.setAffinityId(affinitiyId); + } return pc; } catch (InterruptedException e) { @@ -227,11 +230,6 @@ private PooledConnection _obtainConnection(Object affinitiyId) throws Interrupte return connection; } connection = createConnection(); - if (connection == null && buffer.isAffinitySupported()) { - // we could not find connection with required affinity and - // buffer is full. So try to get oldest connection from buffer - connection = extractFromFreeList(ConnectionBuffer.GET_OLDEST); - } if (connection != null) { return connection; } @@ -254,7 +252,7 @@ private PooledConnection _obtainConnection(Object affinitiyId) throws Interrupte } private PooledConnection createConnection() throws SQLException { - if (buffer.busySize() < maxSize) { + if (totalConnections() < maxSize) { // grow the connection pool PooledConnection c = pool.createConnectionForQueue(connectionId++); int busySize = registerBusyConnection(c); @@ -279,6 +277,13 @@ private PooledConnection _obtainConnectionWaitLoop(Object affinitiyId) throws SQ if (conn != null) { return conn; } + // we could not create new connection, so we take the last one and change the affinity id + if (buffer.isAffinitySupported()) { + conn = extractFromFreeList(ConnectionBuffer.GET_LAST); + } + if (conn != null) { + return conn; + } String msg = "Unsuccessfully waited [" + waitTimeoutMillis + "] millis for a connection to be returned." + " No connections are free. You need to Increase the max connections of [" + maxSize + "]" + " or look for a connection pool leak using datasource.xxx.capturestacktrace=true"; @@ -292,9 +297,7 @@ private PooledConnection _obtainConnectionWaitLoop(Object affinitiyId) throws SQ try { nanos = notEmpty.awaitNanos(nanos); PooledConnection c = extractFromFreeList(affinitiyId); - if (c == null && buffer.isAffinitySupported()) { - c = extractFromFreeList(ConnectionBuffer.GET_OLDEST); - } + if (c != null) { // successfully waited return c; diff --git a/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionBufferTest.java b/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionBufferTest.java index 1ccd912..88db122 100644 --- a/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionBufferTest.java +++ b/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionBufferTest.java @@ -187,7 +187,7 @@ public void test_Affinity() { private static PooledConnection getConnection(ConnectionBuffer b, Object affinity) { PooledConnection c1 = b.removeFree(affinity); if (c1 == null) { - c1 = b.removeFree(ConnectionBuffer.GET_OLDEST); + c1 = b.removeFree(ConnectionBuffer.GET_LAST); } c1.setAffinityId(affinity); b.addBusy(c1); From 88a2a0500f45fad62e365d0b260447be1bf8b018 Mon Sep 17 00:00:00 2001 From: Roland Praml Date: Tue, 8 Apr 2025 08:45:05 +0200 Subject: [PATCH 12/12] Modified search strategy --- .../pool/PooledConnectionQueue.java | 67 ++++++++++--------- 1 file changed, 36 insertions(+), 31 deletions(-) diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java index 1f2f0be..85d8167 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java @@ -158,8 +158,24 @@ void returnPooledConnection(PooledConnection c, boolean forceClose) { } } - private PooledConnection extractFromFreeList(Object affinitiyId) { + /** + * Tries to extract the best matching connection from the freeList. + * If there is none, it tries to create one or steal one from other affinity slot. + */ + private PooledConnection extractFromFreeList(Object affinitiyId, boolean create) throws SQLException { + PooledConnection c = extractFromFreeList(affinitiyId); + if (c == null && create) { + c = createConnection(); + } + if (c == null && buffer.isAffinitySupported()) { + // TODO: This "stealing" stragegy could be optimized. + // we might build a heurestic, which one would be the best candidate + c = extractFromFreeList(ConnectionBuffer.GET_LAST); + } + return c; + } + private PooledConnection extractFromFreeList(Object affinitiyId) { PooledConnection c = buffer.removeFree(affinitiyId); if (c == null) { return null; @@ -182,6 +198,21 @@ private boolean staleEviction(PooledConnection c) { return pool.invalidConnection(c); } + + private PooledConnection createConnection() throws SQLException { + if (totalConnections() < maxSize) { + // grow the connection pool + PooledConnection c = pool.createConnectionForQueue(connectionId++); + int busySize = registerBusyConnection(c); + if (Log.isLoggable(DEBUG)) { + Log.debug("DataSource [{0}] grow; id[{1}] busy[{2}] max[{3}]", name, c.name(), busySize, maxSize); + } + return c; + } else { + return null; + } + } + private boolean stale(PooledConnection c) { return c.lastUsedTime() < System.currentTimeMillis() - validateStaleMillis; } @@ -225,13 +256,9 @@ private PooledConnection _obtainConnection(Object affinitiyId) throws Interrupte hitCount++; // are other threads already waiting? (they get priority) if (waitingThreads == 0) { - PooledConnection connection = extractFromFreeList(affinitiyId); - if (connection != null) { - return connection; - } - connection = createConnection(); - if (connection != null) { - return connection; + PooledConnection c = this.extractFromFreeList(affinitiyId, true); + if (c != null) { + return c; } } try { @@ -251,20 +278,6 @@ private PooledConnection _obtainConnection(Object affinitiyId) throws Interrupte } } - private PooledConnection createConnection() throws SQLException { - if (totalConnections() < maxSize) { - // grow the connection pool - PooledConnection c = pool.createConnectionForQueue(connectionId++); - int busySize = registerBusyConnection(c); - if (Log.isLoggable(DEBUG)) { - Log.debug("DataSource [{0}] grow; id[{1}] busy[{2}] max[{3}]", name, c.name(), busySize, maxSize); - } - return c; - } else { - return null; - } - } - /** * Got into a loop waiting for connections to be returned to the pool. */ @@ -277,13 +290,6 @@ private PooledConnection _obtainConnectionWaitLoop(Object affinitiyId) throws SQ if (conn != null) { return conn; } - // we could not create new connection, so we take the last one and change the affinity id - if (buffer.isAffinitySupported()) { - conn = extractFromFreeList(ConnectionBuffer.GET_LAST); - } - if (conn != null) { - return conn; - } String msg = "Unsuccessfully waited [" + waitTimeoutMillis + "] millis for a connection to be returned." + " No connections are free. You need to Increase the max connections of [" + maxSize + "]" + " or look for a connection pool leak using datasource.xxx.capturestacktrace=true"; @@ -296,8 +302,7 @@ private PooledConnection _obtainConnectionWaitLoop(Object affinitiyId) throws SQ try { nanos = notEmpty.awaitNanos(nanos); - PooledConnection c = extractFromFreeList(affinitiyId); - + PooledConnection c = this.extractFromFreeList(affinitiyId, false); if (c != null) { // successfully waited return c;