Skip to content

Commit

Permalink
feat: add local token cache for cluster
Browse files Browse the repository at this point in the history
Change-Id: I6b1ece200cd6b2db181f570ab7207325f12fc12b
  • Loading branch information
absolute8511 authored and liwen.2022 committed Apr 23, 2024
1 parent cd02b1d commit 109c3b4
Show file tree
Hide file tree
Showing 8 changed files with 654 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@
package com.alibaba.csp.sentinel.cluster.client;

import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.alibaba.csp.sentinel.cluster.ClusterConstants;
import com.alibaba.csp.sentinel.cluster.ClusterErrorMessages;
Expand All @@ -35,6 +41,7 @@
import com.alibaba.csp.sentinel.cluster.response.data.FlowTokenResponseData;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.util.StringUtil;
import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory;

/**
* Default implementation of {@link ClusterTokenClient}.
Expand All @@ -43,11 +50,29 @@
* @since 1.4.0
*/
public class DefaultClusterTokenClient implements ClusterTokenClient {
public class CachedTokenData {
AtomicInteger count;
AtomicInteger lastStatus;
AtomicLong lastWaitUntilMs;
AtomicInteger lastWaitPrefetchCnt;
AtomicInteger lastRemaining;
public CachedTokenData() {
count = new AtomicInteger(0);
lastStatus = new AtomicInteger(TokenResultStatus.OK);
lastWaitUntilMs = new AtomicLong(0);
lastWaitPrefetchCnt = new AtomicInteger(0);
lastRemaining = new AtomicInteger(0);
}
}

private ClusterTransportClient transportClient;
private TokenServerDescriptor serverDescriptor;

private final AtomicBoolean shouldStart = new AtomicBoolean(false);
private int checkInterval = 2;
ConcurrentHashMap<Long, CachedTokenData> localPrefetchedTokens = new ConcurrentHashMap<>();
private final ScheduledExecutorService prefetchScheduler = Executors.newScheduledThreadPool(2,
new NamedThreadFactory("sentinel-cluster-prefetch-scheduler", true));

public DefaultClusterTokenClient() {
ClusterClientConfigManager.addServerChangeObserver(new ServerChangeObserver() {
Expand Down Expand Up @@ -146,6 +171,140 @@ public TokenServerDescriptor currentServer() {
return serverDescriptor;
}

public void setInterval(int val) {
checkInterval = val;
}

public void resetCache() {
localPrefetchedTokens.clear();
}

public int currentRuleCached(Long flowId) {
CachedTokenData d = localPrefetchedTokens.get(flowId);
if (d == null) {
return 0;
}
return d.count.get();
}

private void preFetch(Long flowId, CachedTokenData value, int prefetchCnt) {
long waitUntil = value.lastWaitUntilMs.get();
if (waitUntil > 0 && System.currentTimeMillis() < waitUntil) {
return;
}
if (waitUntil > 0) {
value.count.addAndGet(value.lastWaitPrefetchCnt.get());
value.lastStatus.set(TokenResultStatus.OK);
value.lastWaitUntilMs.set(0);
value.lastWaitPrefetchCnt.set(0);
}
int current = value.count.get();
if (current >= prefetchCnt / 2) {
return;
}
if (current < -1 * prefetchCnt) {
// avoid too much prefetch
current = -1 * prefetchCnt;
}
prefetchCnt = prefetchCnt - current;
TokenResult fetched = requestToken(flowId, prefetchCnt, true);
value.lastWaitUntilMs.set(0);
value.lastStatus.set(fetched.getStatus());
value.lastRemaining.set(fetched.getRemaining());
if (fetched.getStatus() == TokenResultStatus.OK) {
value.count.addAndGet(prefetchCnt);
} else if (fetched.getStatus() == TokenResultStatus.SHOULD_WAIT) {
value.lastWaitUntilMs.set(System.currentTimeMillis() + fetched.getWaitInMs());
value.lastWaitPrefetchCnt.set(prefetchCnt);
}
}

private TokenResult tryLocalCachedToken(CachedTokenData data, int acquireCount, int prefetchCnt) {
int count = data.count.get();
TokenResult ret = new TokenResult(data.lastStatus.get());
ret.setFromCached(true);
ret.setRemaining(data.lastRemaining.get());
if (count >= acquireCount) {
// here we allow the concurrency which may cause decrease to negative count, it
// is just skipped some requests
// and it will be refilled by the bg prefetch in next round.
data.count.addAndGet(-1 * acquireCount);
ret.setStatus(TokenResultStatus.OK);
return ret;
}
if (acquireCount > prefetchCnt) {
return null;
}
if (ret.getStatus() == TokenResultStatus.SHOULD_WAIT) {
int newN = data.count.addAndGet(-1 * acquireCount);
if (newN + data.lastWaitPrefetchCnt.get() < -1 * prefetchCnt) {
data.count.addAndGet(acquireCount);
if (acquireCount <= prefetchCnt / 2) {
ret.setStatus(TokenResultStatus.BLOCKED);
return ret;
}
// for the large acquireCount, we can try remote again, since large request will
// much slower which will have less pressure to remote
return null;
}
int waitMs = (int) (data.lastWaitUntilMs.get() - System.currentTimeMillis());
if (waitMs > 0) {
ret.setWaitInMs(waitMs);
}
return ret;
} else if (ret.getStatus() == TokenResultStatus.OK) {
// last ok, but the cached count is not enough, we can preuse it to avoid remote
// request too often,
// otherwise just try remote request
int newN = data.count.addAndGet(-1 * acquireCount);
if (newN < -1 * prefetchCnt * 2) {
// preuse failed since not enough, added it back
data.count.addAndGet(acquireCount);
if (acquireCount <= prefetchCnt / 2) {
// since last is still ok, we should not block directly, make it failover to local
ret.setStatus(TokenResultStatus.FAIL);
return ret;
}
// for the large acquireCount, we can try remote again, since large request will much slower which will have less pressure to remote
return null;
}
// preuse ok
return ret;
} else {
// should fail directly
return ret;
}
}

@Override
public TokenResult requestTokenWithCache(Long flowId, int acquireCount, int prefetchCnt) {
if (notValidRequest(flowId, acquireCount)) {
return badRequest();
}
// try local prefetched first
CachedTokenData data = localPrefetchedTokens.get(flowId);
if (data != null) {
TokenResult ret = tryLocalCachedToken(data, acquireCount, prefetchCnt);
if (ret != null) {
return ret;
}
} else {
localPrefetchedTokens.computeIfAbsent(flowId, k -> {
CachedTokenData v = new CachedTokenData();
prefetchScheduler.scheduleAtFixedRate(() -> {
try {
preFetch(flowId, v, prefetchCnt);
} catch (Throwable e) {
RecordLog.info("[DefaultClusterTokenClient] prefetch failed for flowId {}", flowId, e);
}
}, 0, checkInterval, TimeUnit.MILLISECONDS);
return v;
});
}
// fallback to remote request
return requestToken(flowId, acquireCount, true);
}

@Override
public TokenResult requestToken(Long flowId, int acquireCount, boolean prioritized) {
if (notValidRequest(flowId, acquireCount)) {
Expand Down
Loading

0 comments on commit 109c3b4

Please sign in to comment.