Skip to content

Commit 361de33

Browse files
committed
enable topic slo
1 parent 2e5502c commit 361de33

File tree

8 files changed

+154
-124
lines changed

8 files changed

+154
-124
lines changed

.github/workflows/slo.yml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,26 @@ jobs:
5757
--read-timeout 1000 \
5858
--write-timeout 1000
5959
cleanup-args: grpc://localhost:2135 /Root/testdb
60+
- prefix: topic
61+
workload: topic-basic
62+
create-args: |
63+
grpc://localhost:2135 /Root/testdb \
64+
--path /Root/testdb/slo_topic \
65+
--partitions-count 10
66+
run-args: |
67+
grpc://localhost:2135 /Root/testdb \
68+
--path /Root/testdb/slo_topic \
69+
--prom-pgw localhost:9091 \
70+
--partitions-count 10 \
71+
--read-threads 10 \
72+
--write-threads 10 \
73+
--report-period 250 \
74+
--time ${{inputs.slo_workload_duration_seconds || 600}} \
75+
--read-rps ${{inputs.slo_workload_read_max_rps || 100}} \
76+
--write-rps ${{inputs.slo_workload_write_max_rps || 100}} \
77+
--read-timeout 5000 \
78+
--write-timeout 5000
79+
cleanup-args: grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic
6080

6181

6282
concurrency:

tests/slo/TOPIC_SLO.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,8 @@ python -m src topic-run grpc://localhost:2135 /local \
9797
- `cleanup_topic()` - очистка топика
9898

9999
2. **metrics.py** - расширен для поддержки топик метрик:
100-
- `OP_TYPE_TOPIC_WRITE` - метрики записи
101-
- `OP_TYPE_TOPIC_READ` - метрики чтения
100+
- `OP_TYPE_WRITE` - метрики записи
101+
- `OP_TYPE_READ` - метрики чтения
102102

103103
3. **options.py** - добавлена команда `topic-run` с параметрами для топиков
104104

tests/slo/src/core/metrics.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, push_to_gateway # noqa: E402
1111

1212
OP_TYPE_READ, OP_TYPE_WRITE = "read", "write"
13-
OP_TYPE_TOPIC_READ, OP_TYPE_TOPIC_WRITE = "topic_read", "topic_write"
1413
OP_STATUS_SUCCESS, OP_STATUS_FAILURE = "success", "err"
1514

1615
REF = environ.get("REF", "main")

tests/slo/src/jobs/base.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@
44
import time
55
from ratelimiter import RateLimiter
66

7+
import ydb
8+
79
logger = logging.getLogger(__name__)
810

911

1012
class BaseJobManager(ABC):
1113
def __init__(self, driver, args, metrics):
12-
self.driver = driver
14+
self.driver: ydb.Driver = driver
1315
self.args = args
1416
self.metrics = metrics
1517

tests/slo/src/jobs/topic_jobs.py

Lines changed: 38 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from ratelimiter import RateLimiter
66

77
from .base import BaseJobManager
8-
from core.metrics import OP_TYPE_TOPIC_READ, OP_TYPE_TOPIC_WRITE
8+
from core.metrics import OP_TYPE_READ, OP_TYPE_WRITE
99

1010
logger = logging.getLogger(__name__)
1111

@@ -24,14 +24,17 @@ def run_tests(self):
2424
def _run_topic_write_jobs(self):
2525
logger.info("Start topic write jobs")
2626

27-
write_limiter = RateLimiter(max_calls=self.args.topic_write_rps, period=1)
27+
write_limiter = RateLimiter(max_calls=self.args.write_rps, period=1)
2828

2929
futures = []
30-
for i in range(self.args.topic_write_threads):
30+
for i in range(self.args.write_threads):
3131
future = threading.Thread(
3232
name=f"slo_topic_write_{i}",
3333
target=self._run_topic_writes,
34-
args=(write_limiter,),
34+
args=(
35+
write_limiter,
36+
i,
37+
),
3538
)
3639
future.start()
3740
futures.append(future)
@@ -41,10 +44,10 @@ def _run_topic_write_jobs(self):
4144
def _run_topic_read_jobs(self):
4245
logger.info("Start topic read jobs")
4346

44-
read_limiter = RateLimiter(max_calls=self.args.topic_read_rps, period=1)
47+
read_limiter = RateLimiter(max_calls=self.args.read_rps, period=1)
4548

4649
futures = []
47-
for i in range(self.args.topic_read_threads):
50+
for i in range(self.args.read_threads):
4851
future = threading.Thread(
4952
name=f"slo_topic_read_{i}",
5053
target=self._run_topic_reads,
@@ -55,15 +58,16 @@ def _run_topic_read_jobs(self):
5558

5659
return futures
5760

58-
def _run_topic_writes(self, limiter):
61+
def _run_topic_writes(self, limiter, partition_id=None):
5962
start_time = time.time()
6063
logger.info("Start topic write workload")
6164

62-
writer = self.driver.topic_client.writer(self.args.topic_path, codec=ydb.TopicCodec.GZIP)
63-
logger.info("Topic writer created")
64-
65-
try:
66-
write_session = writer.__enter__()
65+
with self.driver.topic_client.writer(
66+
self.args.path,
67+
codec=ydb.TopicCodec.GZIP,
68+
partition_id=partition_id,
69+
) as writer:
70+
logger.info("Topic writer created")
6771

6872
message_count = 0
6973
while time.time() - start_time < self.args.time:
@@ -72,47 +76,45 @@ def _run_topic_writes(self, limiter):
7276

7377
content = f"message_{message_count}_{threading.current_thread().name}".encode("utf-8")
7478

75-
if len(content) < self.args.topic_message_size:
76-
content += b"x" * (self.args.topic_message_size - len(content))
79+
if len(content) < self.args.message_size:
80+
content += b"x" * (self.args.message_size - len(content))
7781

7882
message = ydb.TopicWriterMessage(data=content)
7983

80-
ts = self.metrics.start((OP_TYPE_TOPIC_WRITE,))
84+
ts = self.metrics.start((OP_TYPE_WRITE,))
8185
try:
82-
write_session.write(message)
83-
self.metrics.stop((OP_TYPE_TOPIC_WRITE,), ts)
86+
writer.write(message)
87+
if message_count % 100 == 0:
88+
writer.flush(timeout=self.args.write_timeout / 1000)
89+
90+
self.metrics.stop((OP_TYPE_WRITE,), ts)
8491
except Exception as e:
85-
self.metrics.stop((OP_TYPE_TOPIC_WRITE,), ts, error=e)
92+
self.metrics.stop((OP_TYPE_WRITE,), ts, error=e)
8693
logger.error("Write error: %s", e)
8794

88-
finally:
89-
writer.__exit__(None, None, None)
90-
9195
logger.info("Stop topic write workload")
9296

9397
def _run_topic_reads(self, limiter):
9498
start_time = time.time()
9599
logger.info("Start topic read workload")
96100

97-
reader = self.driver.topic_client.reader(self.args.topic_consumer, self.args.topic_path)
98-
logger.info("Topic reader created")
99-
100-
try:
101-
read_session = reader.__enter__()
101+
with self.driver.topic_client.reader(
102+
self.args.path,
103+
self.args.consumer,
104+
) as reader:
105+
logger.info("Topic reader created")
102106

103107
while time.time() - start_time < self.args.time:
104108
with limiter:
105-
ts = self.metrics.start((OP_TYPE_TOPIC_READ,))
109+
ts = self.metrics.start((OP_TYPE_READ,))
106110
try:
107-
batch = read_session.receive_message(timeout=self.args.topic_read_timeout / 1000)
108-
if batch is not None:
109-
read_session.commit_offset(batch.batches[-1].message_offset_end)
110-
self.metrics.stop((OP_TYPE_TOPIC_READ,), ts)
111-
except Exception as e:
112-
self.metrics.stop((OP_TYPE_TOPIC_READ,), ts, error=e)
113-
logger.debug("Read timeout or error: %s", e)
111+
msg = reader.receive_message(timeout=self.args.read_timeout / 1000)
112+
if msg is not None:
113+
reader.commit(msg)
114114

115-
finally:
116-
reader.__exit__(None, None, None)
115+
self.metrics.stop((OP_TYPE_READ,), ts)
116+
except Exception as e:
117+
self.metrics.stop((OP_TYPE_READ,), ts, error=e)
118+
logger.error("Read error: %s", e)
117119

118120
logger.info("Stop topic read workload")

tests/slo/src/options.py

Lines changed: 25 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -61,46 +61,43 @@ def make_table_cleanup_parser(subparsers):
6161
add_common_options(table_cleanup_parser)
6262

6363

64+
def make_topic_create_parser(subparsers):
65+
topic_create_parser = subparsers.add_parser("topic-create", help="Create topic with consumer")
66+
add_common_options(topic_create_parser)
67+
68+
topic_create_parser.add_argument("--path", default="/local/slo_topic", type=str, help="Topic path")
69+
topic_create_parser.add_argument("--consumer", default="slo_consumer", type=str, help="Topic consumer name")
70+
topic_create_parser.add_argument("--partitions-count", default=1, type=int, help="Partition count")
71+
72+
6473
def make_topic_run_parser(subparsers):
65-
"""Создает парсер для команды topic-run - запуск SLO тестов для топиков"""
6674
topic_parser = subparsers.add_parser("topic-run", help="Run topic SLO workload")
6775
add_common_options(topic_parser)
6876

69-
topic_parser.add_argument("--topic-read-rps", default=50, type=int, help="Topic read request rps")
70-
topic_parser.add_argument("--topic-read-timeout", default=5000, type=int, help="Topic read timeout [ms]")
71-
topic_parser.add_argument("--topic-write-rps", default=20, type=int, help="Topic write request rps")
72-
topic_parser.add_argument("--topic-write-timeout", default=10000, type=int, help="Topic write timeout [ms]")
73-
topic_parser.add_argument("--topic-read-threads", default=1, type=int, help="Number of threads for topic reading")
74-
topic_parser.add_argument("--topic-write-threads", default=1, type=int, help="Number of threads for topic writing")
75-
topic_parser.add_argument("--topic-path", default="/local/slo_topic", type=str, help="Topic path")
76-
topic_parser.add_argument("--topic-consumer", default="slo_consumer", type=str, help="Topic consumer name")
77-
topic_parser.add_argument("--topic-message-size", default=100, type=int, help="Topic message size in bytes")
78-
topic_parser.add_argument("--topic-min-partitions", default=1, type=int, help="Minimum active partitions")
79-
topic_parser.add_argument("--topic-max-partitions", default=10, type=int, help="Maximum active partitions")
80-
topic_parser.add_argument("--topic-retention-hours", default=24, type=int, help="Retention period in hours")
81-
82-
topic_parser.add_argument("--time", default=60, type=int, help="Time to run in seconds")
77+
topic_parser.add_argument("--path", default="/local/slo_topic", type=str, help="Topic path")
78+
topic_parser.add_argument("--consumer", default="slo_consumer", type=str, help="Topic consumer name")
79+
topic_parser.add_argument("--partitions-count", default=1, type=int, help="Partition count")
80+
topic_parser.add_argument("--read-rps", default=100, type=int, help="Topic read request rps")
81+
topic_parser.add_argument("--read-timeout", default=5000, type=int, help="Topic read timeout [ms]")
82+
topic_parser.add_argument("--write-rps", default=100, type=int, help="Topic write request rps")
83+
topic_parser.add_argument("--write-timeout", default=5000, type=int, help="Topic write timeout [ms]")
84+
topic_parser.add_argument("--read-threads", default=1, type=int, help="Number of threads for topic reading")
85+
topic_parser.add_argument("--write-threads", default=1, type=int, help="Number of threads for topic writing")
86+
topic_parser.add_argument("--message-size", default=100, type=int, help="Topic message size in bytes")
87+
88+
topic_parser.add_argument("--time", default=10, type=int, help="Time to run in seconds")
8389
topic_parser.add_argument("--shutdown-time", default=10, type=int, help="Graceful shutdown time in seconds")
84-
topic_parser.add_argument("--prom-pgw", default="", type=str, help="Prometheus push gateway (empty to disable)")
90+
topic_parser.add_argument(
91+
"--prom-pgw", default="localhost:9091", type=str, help="Prometheus push gateway (empty to disable)"
92+
)
8593
topic_parser.add_argument("--report-period", default=1000, type=int, help="Prometheus push period in [ms]")
8694

8795

88-
def make_topic_create_parser(subparsers):
89-
topic_create_parser = subparsers.add_parser("topic-create", help="Create topic with consumer")
90-
add_common_options(topic_create_parser)
91-
92-
topic_create_parser.add_argument("--topic-path", default="/local/slo_topic", type=str, help="Topic path")
93-
topic_create_parser.add_argument("--topic-consumer", default="slo_consumer", type=str, help="Topic consumer name")
94-
topic_create_parser.add_argument("--topic-min-partitions", default=1, type=int, help="Minimum active partitions")
95-
topic_create_parser.add_argument("--topic-max-partitions", default=10, type=int, help="Maximum active partitions")
96-
topic_create_parser.add_argument("--topic-retention-hours", default=24, type=int, help="Retention period in hours")
97-
98-
9996
def make_topic_cleanup_parser(subparsers):
10097
topic_cleanup_parser = subparsers.add_parser("topic-cleanup", help="Drop topic")
10198
add_common_options(topic_cleanup_parser)
10299

103-
topic_cleanup_parser.add_argument("--topic-path", default="/local/slo_topic", type=str, help="Topic path")
100+
topic_cleanup_parser.add_argument("--path", default="/local/slo_topic", type=str, help="Topic path")
104101

105102

106103
def get_root_parser():

tests/slo/src/workloads/base.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
from abc import ABC, abstractmethod
22
import logging
3+
import ydb
34

45
logger = logging.getLogger(__name__)
56

67

78
class BaseWorkload(ABC):
89
def __init__(self, driver, args):
9-
self.driver = driver
10+
self.driver: ydb.Driver = driver
1011
self.args = args
1112
self.logger = logger
1213

0 commit comments

Comments
 (0)