Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,28 @@ private DingNotifyConst() { }

public static final String DING_NOTICE_TITLE = "动态线程池通知";

public static final String DING_ALARM_TEMPLATE =
public static final String DING_ALARM_TEMPLATE_PREFIX =
"<font color=#EA9F00>【报警】 </font> 动态线程池运行告警 \n\n" +
"<font color=#664B4B size=2>服务名称:%s</font> \n\n " +
"<font color=#664B4B size=2>实例信息:%s</font> \n\n " +
"<font color=#664B4B size=2>环境:%s</font> \n\n " +
"<font color=#664B4B size=2>线程池名称:%s</font> \n\n " +
"<font color=alarmType size=2>报警项:%s</font> \n\n " +
"<font color=alarmValue size=2>报警阈值 / 当前值:%s</font> \n\n " +
"<font color=alarmValue size=2>报警阈值 / 当前值:%s</font> \n\n ";

public static final String DING_ALARM_COMMON_TEMPLATE =
"<font color=#664B4B size=2>%s</font> \n\n ";

public static final String DING_ALARM_TEMPLATE_SUFFIX =
"<font color=#664B4B size=2>上次报警时间:%s</font> \n\n" +
"<font color=#664B4B size=2>报警时间:%s</font> \n\n" +
"<font color=#664B4B size=2>接收人:@%s</font> \n\n" +
"<font color=#664B4B size=2>trace 信息:%s</font> \n\n" +
"<font color=#22B838 size=2>报警间隔:%ss</font> \n\n" +
"<font color=#664B4B size=2>扩展信息:%s</font> \n\n";

public static final String DING_ALARM_TEMPLATE =
DING_ALARM_TEMPLATE_PREFIX +
"<font color=corePoolSize size=2>核心线程数:%d</font> \n\n " +
"<font color=maximumPoolSize size=2>最大线程数:%d</font> \n\n " +
"<font color=poolSize size=2>当前线程数:%d</font> \n\n " +
Expand All @@ -65,12 +79,7 @@ private DingNotifyConst() { }
"<font color=rejectCount size=2>总拒绝任务数量:%s</font> \n\n " +
"<font color=runTimeoutCount size=2>总执行超时任务数量:%s</font> \n\n " +
"<font color=queueTimeoutCount size=2>总等待超时任务数量:%s</font> \n\n " +
"<font color=#664B4B size=2>上次报警时间:%s</font> \n\n" +
"<font color=#664B4B size=2>报警时间:%s</font> \n\n" +
"<font color=#664B4B size=2>接收人:@%s</font> \n\n" +
"<font color=#664B4B size=2>trace 信息:%s</font> \n\n" +
"<font color=#22B838 size=2>报警间隔:%ss</font> \n\n" +
"<font color=#664B4B size=2>扩展信息:%s</font> \n\n";
DING_ALARM_TEMPLATE_SUFFIX;

public static final String DING_CHANGE_NOTICE_TEMPLATE =
"<font color=#5AB030>【通知】</font> 动态线程池参数变更 \n\n " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,15 @@ private DynamicTpConst() { }
public static final String FALSE_STR = "false";

/**
* jre
* jre 21
*/
public static final String THREAD_PER_TASK_EXECUTOR = "java.util.concurrent.ThreadPerTaskExecutor";

public static final String PINNED_EVENT = "jdk.VirtualThreadPinned";

public static final String MAX_PINNED_TIME = "maxPinnedTime";

public static final String TOTAL_PINNED_TIME = "totalPinnedTime";

public static final String PINNED_DURATION = "pinnedDuration";
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ private LarkNotifyConst() { }

public static final String SIGN_PARAM_PREFIX = "{\"timestamp\": \"%s\",\"sign\": \"%s\",";

public static final String LARK_ALARM_JSON_STR_PREFIX =
"{\"msg_type\":\"interactive\",\"card\":{\"config\":{\"wide_screen_mode\":true},\"header\":{\"template\":\"red\",\"title\":{\"tag\":\"plain_text\",\"content\":\"【报警】 动态线程池告警\"}},\"elements\":[{\"tag\":\"div\",\"fields\":[{\"is_short\":true,\"text\":{\"tag\":\"lark_md\",\"content\":\"**服务名称:**\\n%s\"}},{\"is_short\":true,\"text\":{\"tag\":\"lark_md\",\"content\":\"**实例信息:**\\n%s\"}}]},{\"tag\":\"div\",\"fields\":[{\"is_short\":true,\"text\":{\"tag\":\"lark_md\",\"content\":\"**环境:**\\n%s\"}},{\"is_short\":true,\"text\":{\"tag\":\"lark_md\",\"content\":\"**线程池名称:**\\n%s\"}}]},{\"tag\":\"div\",\"fields\":[{\"is_short\":true,\"text\":{\"tag\":\"lark_md\",\"content\":\"alarmType **报警项:**\\n%s\"}},{\"is_short\":true,\"text\":{\"tag\":\"lark_md\",\"content\":\"alarmValue **报警阈值 / 当前值:**\\n%s\"}}]},{\"tag\":\"hr\"},";

public static final String LARK_ALARM_JSON_COMMON_STR =
"{\"tag\":\"div\",\"fields\":[{\"is_short\":true,\"text\":{\"tag\":\"lark_md\",\"content\":\"%s\"}}]},";

public static final String LARK_ALARM_JSON_STR_SUFFIX =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这是一个完整的json,不要拆分开,很难维护,格式容易配置错

"{\"tag\":\"hr\"},{\"tag\":\"div\",\"fields\":[{\"is_short\":true,\"text\":{\"tag\":\"lark_md\",\"content\":\"**上次报警时间:**\\n %s\"}},{\"is_short\":true,\"text\":{\"tag\":\"lark_md\",\"content\":\"**报警时间:**\\n %s\"}}]},{\"tag\":\"div\",\"fields\":[{\"is_short\":true,\"text\":{\"tag\":\"lark_md\",\"content\":\"**接收人:**\\n %s\"}},{\"is_short\":true,\"text\":{\"tag\":\"lark_md\",\"content\":\"**trace 信息:**\\n %s\"}}]},{\"tag\":\"div\",\"fields\":[{\"is_short\":true,\"text\":{\"tag\":\"lark_md\",\"content\":\"**报警间隔:**\\n %ss\"}},{\"is_short\":true,\"text\":{\"tag\":\"lark_md\",\"content\":\"**扩展信息:**\\n %s\"}}]}]}}";

/**
* lark alarm json str
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,31 @@ private WechatNotifyConst() { }

public static final String COMMENT_COLOR = "comment";

/**
* receivers only supports userid, view more, see <a href="https://developer.work.weixin.qq.com/document/path/91770">more</a>.
*/
public static final String WECHAT_ALARM_TEMPLATE =
public static final String WECHAT_ALARM_TEMPLATE_PREFIX =
"<font color='warning'>【报警】</font> 动态线程池告警 \n" +
"> <font color='comment'>服务名称:%s</font> \n" +
"> <font color='comment'>实例信息:%s</font> \n" +
"> <font color='comment'>环境:%s</font> \n" +
"> <font color='comment'>线程池名称:%s</font> \n" +
"> <font color='alarmType'>报警项:%s</font> \n" +
"> <font color='alarmValue'>报警阈值 / 当前值:%s</font> \n" +
"> <font color='alarmValue'>报警阈值 / 当前值:%s</font> \n";

public static final String WECHAT_ALARM_COMMON_TEMPLATE =
"> <font color='comment'>%s</font> \n";

public static final String WECHAT_ALARM_TEMPLATE_SUFFIX =
"> <font color='comment'>上次报警时间:%s</font> \n" +
"> <font color='comment'>报警时间:%s</font> \n" +
"> <font color='comment'>接收人:%s</font> \n" +
"> <font color='comment'>trace 信息:%s</font> \n" +
"> <font color='info'>报警间隔:%ss</font> \n" +
"> <font color='comment'>扩展信息:%s</font> \n";

/**
* receivers only supports userid, view more, see <a href="https://developer.work.weixin.qq.com/document/path/91770">more</a>.
*/
public static final String WECHAT_ALARM_TEMPLATE =
WECHAT_ALARM_TEMPLATE_PREFIX +
"> <font color='corePoolSize'>核心线程数:%s</font> \n" +
"> <font color='maximumPoolSize'>最大线程数:%s</font> \n" +
"> <font color='poolSize'>当前线程数:%s</font> \n" +
Expand All @@ -64,12 +78,7 @@ private WechatNotifyConst() { }
"> <font color='rejectCount'>总拒绝任务数量:%s</font> \n" +
"> <font color='runTimeoutCount'>总执行超时任务数量:%s</font> \n" +
"> <font color='queueTimeoutCount'>总等待超时任务数量:%s</font> \n" +
"> <font color='comment'>上次报警时间:%s</font> \n" +
"> <font color='comment'>报警时间:%s</font> \n" +
"> <font color='comment'>接收人:%s</font> \n" +
"> <font color='comment'>trace 信息:%s</font> \n" +
"> <font color='info'>报警间隔:%ss</font> \n" +
"> <font color='comment'>扩展信息:%s</font> \n";
WECHAT_ALARM_TEMPLATE_SUFFIX;

public static final String WECHAT_CHANGE_NOTICE_TEMPLATE =
"<font color='info'>【通知】</font> 动态线程池参数变更 \n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ public enum NotifyItemEnum {
/**
* Task queue wait timeout alarm.
*/
QUEUE_TIMEOUT("queue_timeout", "");
QUEUE_TIMEOUT("queue_timeout", ""),

/**
* Pin timeout alarm.
*/
PIN_TIMEOUT("pin_timeout", "");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个不要归类到该枚举里,枚举里是线程池层面常规项,这个加入不太合适


private final String value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import lombok.Data;
import lombok.EqualsAndHashCode;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


/**
* ExecutorStats related
Expand Down Expand Up @@ -74,7 +77,12 @@ public class ExecutorStats extends Metrics {
/**
* 是否为虚拟线程执行器
*/
private boolean isVirtualExecutor;
private boolean isVirtualThreadExecutor;

/**
* 拓展字段
*/
private final Map<String, Object> extMap = new ConcurrentHashMap<>();

/**
* 空闲时间 (ms)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public static List<NotifyItem> mergeAllNotifyItems(List<NotifyItem> source) {
val defaultItems = getAllNotifyItems().stream()
.filter(t -> !StringUtil.containsIgnoreCase(t.getType(), configuredTypes))
.collect(Collectors.toList());
List<NotifyItem> notifyItems = new ArrayList<>(6);
List<NotifyItem> notifyItems = new ArrayList<>(7);
notifyItems.addAll(defaultItems);
notifyItems.addAll(source);
return notifyItems;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private static void tryPrintError(Runnable r, Throwable t) {
if (r instanceof FutureTask) {
try {
FutureTask<?> future = (FutureTask<?>) r;
if (future.isDone()) {
if (future.isDone() && !future.isCancelled()) {
future.get();
}
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.dromara.dynamictp.common.util;

import jdk.jfr.consumer.RecordedFrame;
import jdk.jfr.consumer.RecordedStackTrace;
import org.apache.commons.collections4.CollectionUtils;

import java.util.Collection;
import java.util.stream.Collectors;

/**
* StringUtil related
Expand Down Expand Up @@ -64,4 +67,22 @@ public static boolean containsIgnoreCase(CharSequence str, CharSequence testStr)
}
return str.toString().toLowerCase().contains(testStr.toString().toLowerCase());
}

public static String formatJfrStackTrace(RecordedStackTrace stackTrace, int maxDepth) {
if (stackTrace == null) {
return "\t<not available>";
}
String formatted = "\t" + stackTrace.getFrames().stream()
.limit(maxDepth)
.map(StringUtil::formatStackTraceFrame)
.collect(Collectors.joining("\n\t"));
if (maxDepth < stackTrace.getFrames().size()) {
return formatted + "\n\t(...)";
}
return formatted;
}

private static String formatStackTraceFrame(RecordedFrame frame) {
return frame.getMethod().getType().getName() + "#" + frame.getMethod().getName() + ": " + frame.getLineNumber();
}
}
10 changes: 7 additions & 3 deletions core/src/main/java/org/dromara/dynamictp/core/DtpRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.dromara.dynamictp.core.reject.RejectHandlerGetter;
import org.dromara.dynamictp.core.support.adapter.ExecutorAdapter;
import org.dromara.dynamictp.core.support.ExecutorWrapper;
import org.dromara.dynamictp.core.support.proxy.VirtualThreadExecutorProxy;
import org.dromara.dynamictp.core.support.task.wrapper.TaskWrapper;
import org.dromara.dynamictp.core.support.task.wrapper.TaskWrappers;

Expand Down Expand Up @@ -300,9 +301,13 @@ private static void doRefreshDtp(ExecutorWrapper executorWrapper, DtpExecutorPro
}

private static void updateWrapper(ExecutorWrapper executorWrapper, DtpExecutorProps props) {
if (executorWrapper.isDtpExecutor()) {
if (executorWrapper.isDtpExecutor() || executorWrapper.isVirtualThreadExecutor()) {
executorWrapper.setThreadPoolAliasName(props.getThreadPoolAliasName());
executorWrapper.setNotifyItems(((DtpExecutor) executorWrapper.getExecutor()).getNotifyItems());
if (executorWrapper.isDtpExecutor()) {
executorWrapper.setNotifyItems(((DtpExecutor) executorWrapper.getExecutor()).getNotifyItems());
} else {
executorWrapper.setNotifyItems(((VirtualThreadExecutorProxy) executorWrapper.getExecutor().getOriginal()).getNotifyItems());
}
executorWrapper.setPlatformIds(props.getPlatformIds());
executorWrapper.setNotifyEnabled(props.isNotifyEnabled());
}
Expand Down Expand Up @@ -339,7 +344,6 @@ private static void doRefreshPoolSize(ExecutorAdapter<?> executor, DtpExecutorPr
}

private static void updateQueueProps(ExecutorAdapter<?> executor, DtpExecutorProps props) {

val blockingQueue = executor.getQueue();
if (blockingQueue instanceof MemorySafeLinkedBlockingQueue) {
((MemorySafeLinkedBlockingQueue<Runnable>) blockingQueue).setMaxFreeMemory(props.getMaxFreeMemory() * M_1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import lombok.extern.slf4j.Slf4j;
import org.dromara.dynamictp.core.notifier.manager.AlarmManager;
import org.dromara.dynamictp.core.support.adapter.ExecutorAdapter;
import org.dromara.dynamictp.core.support.ThreadPoolStatProvider;
import org.dromara.dynamictp.core.support.ExecutorStatProvider;
import org.slf4j.MDC;

import java.util.Objects;
Expand Down Expand Up @@ -51,7 +51,7 @@ public String getName() {

@Override
public void beforeReject(Runnable runnable, Executor executor) {
ThreadPoolStatProvider statProvider = statProviders.get(executor);
ExecutorStatProvider statProvider = statProviders.get(executor);
if (Objects.isNull(statProvider)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.dromara.dynamictp.common.entity.TpExecutorProps;
import org.dromara.dynamictp.core.support.ExecutorWrapper;
import org.dromara.dynamictp.core.support.ThreadPoolStatProvider;
import org.dromara.dynamictp.core.support.ExecutorStatProvider;

import java.util.Map;
import java.util.Objects;
Expand All @@ -34,11 +34,11 @@
*/
public abstract class TaskStatAware implements ExecutorAware {

protected final Map<Executor, ThreadPoolStatProvider> statProviders = new ConcurrentHashMap<>();
protected final Map<Executor, ExecutorStatProvider> statProviders = new ConcurrentHashMap<>();

@Override
public void register(ExecutorWrapper wrapper) {
ThreadPoolStatProvider statProvider = wrapper.getThreadPoolStatProvider();
ExecutorStatProvider statProvider = wrapper.getExecutorStatProvider();
statProviders.put(wrapper.getExecutor(), statProvider);
statProviders.put(wrapper.getExecutor().getOriginal(), statProvider);
}
Expand All @@ -48,7 +48,7 @@ public void refresh(ExecutorWrapper wrapper, TpExecutorProps props) {
if (Objects.isNull(statProviders.get(wrapper.getExecutor()))) {
register(wrapper);
}
ThreadPoolStatProvider statProvider = wrapper.getThreadPoolStatProvider();
ExecutorStatProvider statProvider = wrapper.getExecutorStatProvider();
refresh(props, statProvider);
}

Expand All @@ -58,5 +58,5 @@ public void remove(ExecutorWrapper wrapper) {
statProviders.remove(wrapper.getExecutor().getOriginal());
}

protected void refresh(TpExecutorProps props, ThreadPoolStatProvider statProvider) { }
protected void refresh(TpExecutorProps props, ExecutorStatProvider statProvider) { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import lombok.extern.slf4j.Slf4j;
import org.dromara.dynamictp.common.entity.TpExecutorProps;
import org.dromara.dynamictp.core.support.ThreadPoolStatProvider;
import org.dromara.dynamictp.core.support.ExecutorStatProvider;

import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -48,7 +48,7 @@ public String getName() {
}

@Override
protected void refresh(TpExecutorProps props, ThreadPoolStatProvider statProvider) {
protected void refresh(TpExecutorProps props, ExecutorStatProvider statProvider) {
super.refresh(props, statProvider);
if (Objects.nonNull(props)) {
statProvider.setRunTimeout(props.getRunTimeout());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
import org.dromara.dynamictp.core.monitor.PerformanceProvider;
import org.dromara.dynamictp.core.support.adapter.ExecutorAdapter;
import org.dromara.dynamictp.core.support.ExecutorWrapper;
import org.dromara.dynamictp.core.support.ThreadPoolStatProvider;
import org.dromara.dynamictp.core.support.ExecutorStatProvider;

import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -58,7 +59,7 @@ public static ExecutorStats toMetrics(ExecutorWrapper wrapper) {
if (executor == null) {
return null;
}
ThreadPoolStatProvider provider = wrapper.getThreadPoolStatProvider();
ExecutorStatProvider provider = wrapper.getExecutorStatProvider();
PerformanceProvider performanceProvider = provider.getPerformanceProvider();
val performanceSnapshot = performanceProvider.getSnapshotAndReset();
ExecutorStats executorStats = convertCommon(executor);
Expand All @@ -68,7 +69,8 @@ public static ExecutorStats toMetrics(ExecutorWrapper wrapper) {
executorStats.setQueueTimeoutCount(provider.getQueueTimeoutCount());
executorStats.setRejectCount(provider.getRejectedTaskCount());

executorStats.setVirtualExecutor(wrapper.isVirtualThreadExecutor());
executorStats.setVirtualThreadExecutor(wrapper.isVirtualThreadExecutor());
toVirtualThreadMetrics(executorStats);

executorStats.setDynamic(executor instanceof DtpExecutor);
executorStats.setTps(performanceSnapshot.getTps());
Expand All @@ -84,6 +86,15 @@ public static ExecutorStats toMetrics(ExecutorWrapper wrapper) {
return executorStats;
}

private static void toVirtualThreadMetrics(ExecutorStats executorStats) {
if (!executorStats.isVirtualThreadExecutor()) {
return;
}
Map<String, Double> vteStats = PerformanceProvider.getVteStat(executorStats.getExecutorName());
executorStats.getExtMap().put("maxPinnedTime", vteStats.getOrDefault("maxPinnedTime", 0d));
executorStats.getExtMap().put("totalPinnedTime", vteStats.getOrDefault("totalPinnedTime", 0d));
}

private static ExecutorStats convertCommon(ExecutorAdapter<?> executor) {
ExecutorStats executorStats = new ExecutorStats();
executorStats.setCorePoolSize(executor.getCorePoolSize());
Expand Down
Loading