File size: 4,123 Bytes
a08d451
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# gpu_run.py
# 用法:
#   python gpu_run.py
#   python gpu_run.py --target 0.6 --duration 0
#   CUDA_VISIBLE_DEVICES=0,1 python gpu_run.py --target 0.6 --size 4096 --streams 4
#
# 说明:
# - 目标是把每张可见 GPU 的平均利用率维持在大约 target(默认 60%)
# - 原理:每个周期里“忙一会儿 + 休一会儿”,让 nvidia-smi 统计窗口看到约 60% 的利用率
# - 不同显卡/驱动/CUDA 版本会有偏差;要更接近 60%,优先调 --size 和 --cycle-ms

import argparse
import threading
import time
import torch


def worker(
    gpu_id: int,
    target: float,
    duration: float,
    size: int,
    streams: int,
    cycle_ms: int,
):
    device = f"cuda:{gpu_id}"
    torch.cuda.set_device(device)
    torch.backends.cuda.matmul.allow_tf32 = True
    torch.set_float32_matmul_precision("high")

    # 用 FP16/BF16 大矩阵乘法制造稳定算力负载
    # 如果你的卡不太适合 fp16,也可改成 torch.float32
    dtype = torch.float16
    if torch.cuda.get_device_capability(gpu_id)[0] >= 8:
        # Ampere/Hopper 往往 BF16 也很稳
        dtype = torch.bfloat16

    # 为每个 stream 预分配张量,避免循环中频繁申请显存
    cuda_streams = [torch.cuda.Stream(device=device) for _ in range(streams)]
    tensors = []
    for _ in range(streams):
        a = torch.randn((size, size), device=device, dtype=dtype)
        b = torch.randn((size, size), device=device, dtype=dtype)
        tensors.append((a, b))

    # 预热
    for s, (a, b) in zip(cuda_streams, tensors):
        with torch.cuda.stream(s):
            for _ in range(4):
                _ = a @ b
    torch.cuda.synchronize(device)

    busy_ms = max(1, int(cycle_ms * target))
    idle_ms = max(0, cycle_ms - busy_ms)

    start = time.time()
    end_time = start + duration if duration > 0 else float("inf")

    print(
        f"[GPU {gpu_id}] start | target={target:.2f}, size={size}, streams={streams}, "
        f"cycle={cycle_ms}ms, busy={busy_ms}ms, idle={idle_ms}ms, dtype={dtype}"
    )

    while time.time() < end_time:
        t0 = time.time()

        # busy phase:在设定时间内持续发 matmul
        while (time.time() - t0) * 1000 < busy_ms:
            for s, (a, b) in zip(cuda_streams, tensors):
                with torch.cuda.stream(s):
                    _ = a @ b

        # 确保本轮计算真正执行完,避免堆积过多 kernel
        torch.cuda.synchronize(device)

        # idle phase:CPU sleep,让 GPU 闲下来,形成平均 60% 利用率
        if idle_ms > 0:
            time.sleep(idle_ms / 1000.0)

    torch.cuda.synchronize(device)
    print(f"[GPU {gpu_id}] done")


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--target", type=float, default=0.6, help="目标平均利用率,默认 0.6")
    parser.add_argument("--duration", type=float, default=0, help="运行秒数;0 表示一直跑")
    parser.add_argument("--size", type=int, default=4096, help="矩阵边长,默认 4096")
    parser.add_argument("--streams", type=int, default=4, help="每张卡并发 stream 数,默认 4")
    parser.add_argument("--cycle-ms", type=int, default=1000, help="占空比周期,默认 1000ms")
    args = parser.parse_args()

    if not torch.cuda.is_available():
        raise RuntimeError("没有检测到 CUDA GPU。")

    if not (0 < args.target <= 1.0):
        raise ValueError("--target 必须在 (0, 1] 内")

    num_gpus = torch.cuda.device_count()
    print(f"Detected {num_gpus} visible GPU(s)")
    for i in range(num_gpus):
        print(f"  GPU {i}: {torch.cuda.get_device_name(i)}")

    threads = []
    for gpu_id in range(num_gpus):
        t = threading.Thread(
            target=worker,
            args=(
                gpu_id,
                args.target,
                args.duration,
                args.size,
                args.streams,
                args.cycle_ms,
            ),
            daemon=False,
        )
        t.start()
        threads.append(t)

    for t in threads:
        t.join()


if __name__ == "__main__":
    main()