Skip to content

Utilities

This section documents the utility modules provided by sts-libs.

Command Line

sts.utils.cmdline

Command execution module.

This module provides functionality for command execution: - Command running - Command argument handling - Command validation

CommandArgs

Bases: TypedDict

Command arguments type.

This type represents command arguments that can be passed to format_args. The values can be strings, numbers, booleans, lists, or None.

Source code in sts_libs/src/sts/utils/cmdline.py
30
31
32
33
34
35
36
37
38
class CommandArgs(TypedDict, total=False):
    """Command arguments type.

    This type represents command arguments that can be passed to format_args.
    The values can be strings, numbers, booleans, lists, or None.
    """

    # Allow any string key with ArgValue type
    __extra__: ArgValue

exists(cmd)

Check if command exists in PATH.

This is a direct passthrough to testinfra's exists().

Parameters:

Name Type Description Default
cmd str

Command to check

required

Returns:

Type Description
bool

True if command exists in PATH

Example
assert exists('ls')
Source code in sts_libs/src/sts/utils/cmdline.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def exists(cmd: str) -> bool:
    """Check if command exists in PATH.

    This is a direct passthrough to testinfra's exists().

    Args:
        cmd: Command to check

    Returns:
        True if command exists in PATH

    Example:
        ```python
        assert exists('ls')
        ```
    """
    return host.exists(cmd)

format_arg(key, value)

Format command argument.

Parameters:

Name Type Description Default
key str

Argument name

required
value ArgValue

Argument value (str, int, float, bool, list, or None)

required

Returns:

Type Description
str

Formatted argument string

Example
format_arg('size', '1G')
'--size=1G'
format_arg('quiet', True)
'--quiet'
format_arg('count', 5)
'--count=5'
Source code in sts_libs/src/sts/utils/cmdline.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def format_arg(key: str, value: ArgValue) -> str:
    """Format command argument.

    Args:
        key: Argument name
        value: Argument value (str, int, float, bool, list, or None)

    Returns:
        Formatted argument string

    Example:
        ```python
        format_arg('size', '1G')
        '--size=1G'
        format_arg('quiet', True)
        '--quiet'
        format_arg('count', 5)
        '--count=5'
        ```
    """
    key = key.replace('_', '-')
    if value is True:
        return f'--{key}'
    if value is False or value is None:
        return ''
    if isinstance(value, list):
        return ' '.join(f"--{key}='{v}'" for v in value)
    return f"--{key}='{value}'"

format_args(**kwargs)

Format command arguments.

Parameters:

Name Type Description Default
**kwargs ArgValue

Command arguments (str, int, float, bool, list, or None)

{}

Returns:

Type Description
str

Formatted arguments string

Example
format_args(size='1G', quiet=True, count=5)
'--size=1G --quiet --count=5'
Source code in sts_libs/src/sts/utils/cmdline.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def format_args(**kwargs: ArgValue) -> str:
    """Format command arguments.

    Args:
        **kwargs: Command arguments (str, int, float, bool, list, or None)

    Returns:
        Formatted arguments string

    Example:
        ```python
        format_args(size='1G', quiet=True, count=5)
        '--size=1G --quiet --count=5'
        ```
    """
    args = []
    for key, value in kwargs.items():
        if arg := format_arg(key, value):
            args.append(arg)
    return ' '.join(args)

run(cmd, msg=None)

Run command and return result.

This is a thin wrapper around testinfra's run() that adds logging.

Parameters:

Name Type Description Default
cmd str

Command to execute

required
msg str | None

Optional message to log before execution

None

Returns:

Type Description
CommandResult

CommandResult from testinfra

Example
result = run('ls -l')
assert result.succeeded
print(result.stdout)
Source code in sts_libs/src/sts/utils/cmdline.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def run(cmd: str, msg: str | None = None) -> CommandResult:
    """Run command and return result.

    This is a thin wrapper around testinfra's run() that adds logging.

    Args:
        cmd: Command to execute
        msg: Optional message to log before execution

    Returns:
        CommandResult from testinfra

    Example:
        ```python
        result = run('ls -l')
        assert result.succeeded
        print(result.stdout)
        ```
    """
    msg = msg or 'Running'
    logging.info(f"{msg}: '{cmd}'")
    return host.run(cmd)

System Management

sts.utils.system

System utilities.

This module provides functionality for system operations: - System information - System logs - System state - Service management

LogFormat

Bases: Enum

Log format options.

Source code in sts_libs/src/sts/utils/system.py
33
34
35
36
37
38
class LogFormat(Enum):
    """Log format options."""

    DEFAULT = auto()
    KERNEL = auto()
    REVERSE = auto()

LogOptions dataclass

Log retrieval options.

This class provides options for log retrieval: - Format options - Filtering options - Time range options

Parameters:

Name Type Description Default
format LogFormat

Log format (optional, defaults to DEFAULT)

DEFAULT
length int | None

Number of lines (optional)

None
since str | None

Since timestamp (optional)

None
grep str | None

Filter pattern (optional)

None
options list[str]

Additional options (optional)

list()
Example
opts = LogOptions()  # Uses defaults
opts = LogOptions(format=LogFormat.KERNEL)  # Custom format
Source code in sts_libs/src/sts/utils/system.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@dataclass
class LogOptions:
    """Log retrieval options.

    This class provides options for log retrieval:
    - Format options
    - Filtering options
    - Time range options

    Args:
        format: Log format (optional, defaults to DEFAULT)
        length: Number of lines (optional)
        since: Since timestamp (optional)
        grep: Filter pattern (optional)
        options: Additional options (optional)

    Example:
        ```python
        opts = LogOptions()  # Uses defaults
        opts = LogOptions(format=LogFormat.KERNEL)  # Custom format
        ```
    """

    # Optional parameters with defaults
    format: LogFormat = LogFormat.DEFAULT

    # Optional parameters without defaults
    length: int | None = None
    since: str | None = None
    grep: str | None = None
    options: list[str] = field(default_factory=list)

SystemInfo dataclass

System information.

This class provides functionality for system information: - Operating system details - Hardware information - System state

Properties

hostname: System hostname kernel: Kernel version arch: System architecture distribution: Distribution name release: Distribution release codename: Distribution codename

Example
info = SystemInfo()  # Discovers values when needed
print(info.hostname)  # Discovers hostname on first access
'localhost'
Source code in sts_libs/src/sts/utils/system.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
@dataclass
class SystemInfo:
    """System information.

    This class provides functionality for system information:
    - Operating system details
    - Hardware information
    - System state

    Properties:
        hostname: System hostname
        kernel: Kernel version
        arch: System architecture
        distribution: Distribution name
        release: Distribution release
        codename: Distribution codename

    Example:
        ```python
        info = SystemInfo()  # Discovers values when needed
        print(info.hostname)  # Discovers hostname on first access
        'localhost'
        ```
    """

    # Optional parameters
    _hostname: str | None = field(default=None, init=False)
    _kernel: str | None = field(default=None, init=False)
    _arch: str | None = field(default=None, init=False)
    _distribution: str | None = field(default=None, init=False)
    _release: str | None = field(default=None, init=False)
    _codename: str | None = field(default=None, init=False)

    @property
    def hostname(self) -> str | None:
        """Get system hostname.

        Returns:
            System hostname or None if not found

        Example:
            ```python
            info.hostname
            'localhost'
            ```
        """
        if self._hostname is None:
            result = run('hostname')
            if result.succeeded:
                self._hostname = result.stdout.strip()
        return self._hostname

    @property
    def kernel(self) -> str | None:
        """Get kernel version.

        Returns:
            Kernel version or None if not found

        Example:
            ```python
            info.kernel
            '5.4.0-1.el8'
            ```
        """
        if self._kernel is None:
            try:
                self._kernel = host.sysctl('kernel.osrelease')  # type: ignore[assignment]
            except ValueError:  # sysctl not available
                self._kernel = run('uname -r').stdout.strip()

        return self._kernel

    @property
    def arch(self) -> str | None:
        """Get system architecture.

        Returns:
            System architecture or None if not found

        Example:
            ```python
            info.arch
            'x86_64'
            ```
        """
        if self._arch is None:
            self._arch = host.system_info.arch  # type: ignore[assignment]
        return self._arch

    @property
    def distribution(self) -> str | None:
        """Get distribution name.

        Returns:
            Distribution name or None if not found

        Example:
            ```python
            info.distribution
            'fedora'
            ```
        """
        if self._distribution is None:
            self._distribution = host.system_info.distribution  # type: ignore[assignment]
        return self._distribution

    @property
    def release(self) -> str | None:
        """Get distribution release.

        Returns:
            Distribution release or None if not found

        Example:
            ```python
            info.release
            '38'
            ```
        """
        if self._release is None:
            self._release = host.system_info.release  # type: ignore[assignment]
        return self._release

    @property
    def codename(self) -> str | None:
        """Get distribution codename.

        Returns:
            Distribution codename or None if not found

        Example:
            ```python
            info.codename
            'thirty eight'
            ```
        """
        if self._codename is None:
            self._codename = host.system_info.codename  # type: ignore[assignment]
        return self._codename

    @classmethod
    def get_current(cls) -> SystemInfo:
        """Get current system information.

        Returns:
            System information

        Example:
            ```python
            info = SystemInfo.get_current()
            info.kernel
            '5.4.0-1.el8'
            ```
        """
        return cls()  # Values will be discovered when needed

    @property
    def is_debug(self) -> bool:
        """Check if running debug kernel.

        Returns:
            True if debug kernel, False otherwise

        Example:
            ```python
            info.is_debug
            False
            ```
        """
        return bool(self.kernel and '+debug' in self.kernel)

    @property
    def in_container(self) -> bool:
        """Check if running in container.

        Returns:
            True if in container, False otherwise

        Example:
            ```python
            info.in_container
            False
            ```
        """
        try:
            proc_current = Path('/proc/1/attr/current').read_text()
            if 'container_t' in proc_current or 'unconfined' in proc_current:
                return True
            if 'docker' in Path('/proc/self/cgroup').read_text():
                return True
        except PermissionError:
            logging.info('Assuming containerized environment')
            return True
        return False

    def log_all(self) -> None:
        """Log all system information.

        Example:
            ```python
            info = SystemInfo()
            info.log_all()
            INFO: Hostname: localhost
            INFO: Kernel: 5.4.0-1.el8
            INFO: Architecture: x86_64
            INFO: Distribution: fedora
            INFO: Release: 38
            INFO: Codename: thirty eight
            ```
        """
        logging.info(f'Hostname: {self.hostname}')
        logging.info(f'Kernel: {self.kernel}')
        logging.info(f'Architecture: {self.arch}')
        logging.info(f'Distribution: {self.distribution}')
        logging.info(f'Release: {self.release}')
        logging.info(f'Codename: {self.codename}')

arch: str | None property

Get system architecture.

Returns:

Type Description
str | None

System architecture or None if not found

Example
info.arch
'x86_64'

codename: str | None property

Get distribution codename.

Returns:

Type Description
str | None

Distribution codename or None if not found

Example
info.codename
'thirty eight'

distribution: str | None property

Get distribution name.

Returns:

Type Description
str | None

Distribution name or None if not found

Example
info.distribution
'fedora'

hostname: str | None property

Get system hostname.

Returns:

Type Description
str | None

System hostname or None if not found

Example
info.hostname
'localhost'

in_container: bool property

Check if running in container.

Returns:

Type Description
bool

True if in container, False otherwise

Example
info.in_container
False

is_debug: bool property

Check if running debug kernel.

Returns:

Type Description
bool

True if debug kernel, False otherwise

Example
info.is_debug
False

kernel: str | None property

Get kernel version.

Returns:

Type Description
str | None

Kernel version or None if not found

Example
info.kernel
'5.4.0-1.el8'

release: str | None property

Get distribution release.

Returns:

Type Description
str | None

Distribution release or None if not found

Example
info.release
'38'

get_current() classmethod

Get current system information.

Returns:

Type Description
SystemInfo

System information

Example
info = SystemInfo.get_current()
info.kernel
'5.4.0-1.el8'
Source code in sts_libs/src/sts/utils/system.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
@classmethod
def get_current(cls) -> SystemInfo:
    """Get current system information.

    Returns:
        System information

    Example:
        ```python
        info = SystemInfo.get_current()
        info.kernel
        '5.4.0-1.el8'
        ```
    """
    return cls()  # Values will be discovered when needed

log_all()

Log all system information.

Example
info = SystemInfo()
info.log_all()
INFO: Hostname: localhost
INFO: Kernel: 5.4.0-1.el8
INFO: Architecture: x86_64
INFO: Distribution: fedora
INFO: Release: 38
INFO: Codename: thirty eight
Source code in sts_libs/src/sts/utils/system.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
def log_all(self) -> None:
    """Log all system information.

    Example:
        ```python
        info = SystemInfo()
        info.log_all()
        INFO: Hostname: localhost
        INFO: Kernel: 5.4.0-1.el8
        INFO: Architecture: x86_64
        INFO: Distribution: fedora
        INFO: Release: 38
        INFO: Codename: thirty eight
        ```
    """
    logging.info(f'Hostname: {self.hostname}')
    logging.info(f'Kernel: {self.kernel}')
    logging.info(f'Architecture: {self.arch}')
    logging.info(f'Distribution: {self.distribution}')
    logging.info(f'Release: {self.release}')
    logging.info(f'Codename: {self.codename}')

SystemManager

System manager functionality.

This class provides functionality for system operations: - System information - System logs - System state - Service management

Example
sm = SystemManager()
sm.get_logs(LogOptions(format=LogFormat.KERNEL))
'Jan 1 00:00:00 kernel: ...'
Source code in sts_libs/src/sts/utils/system.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
class SystemManager:
    """System manager functionality.

    This class provides functionality for system operations:
    - System information
    - System logs
    - System state
    - Service management

    Example:
        ```python
        sm = SystemManager()
        sm.get_logs(LogOptions(format=LogFormat.KERNEL))
        'Jan 1 00:00:00 kernel: ...'
        ```
    """

    def __init__(self) -> None:
        """Initialize system manager."""
        self.info = SystemInfo.get_current()
        self.package_manager = Dnf()

    def get_logs(self, options: LogOptions | None = None) -> str | None:
        """Get system logs.

        Args:
            options: Log options

        Returns:
            Log output or None if error

        Example:
            ```python
            sm = SystemManager()
            sm.get_logs(LogOptions(format=LogFormat.KERNEL))
            'Jan 1 00:00:00 kernel: ...'
            ```
        """
        options = options or LogOptions()

        # Build command
        cmd = ['journalctl']
        if options.format == LogFormat.KERNEL:
            cmd.append('-k')
        if options.length:
            cmd.extend(['-n', str(options.length)])
        if options.format == LogFormat.REVERSE:
            cmd.append('-r')
        if options.since:
            cmd.extend(['-S', options.since])
        if options.options:
            cmd.extend(options.options)
        if options.grep:
            cmd.extend(['|', 'grep', f"'{options.grep}'"])

        result = run(' '.join(cmd))
        if result.failed:
            logging.error('Failed to get system logs')
            return None

        # Format output to match /var/log/messages
        output = []
        for line in result.stdout.splitlines():
            parts = line.split()
            if len(parts) < MIN_LOG_PARTS:
                continue
            parts[3] = parts[3].split('.')[0]
            output.append(' '.join(parts))

        return '\n'.join(output)

    def generate_sosreport(self, skip_plugins: str | None = None, plugin_timeout: int = 300) -> str | None:
        """Generate sosreport.

        Args:
            skip_plugins: Plugins to skip
            plugin_timeout: Plugin timeout in seconds

        Returns:
            Path to sosreport or None if error

        Example:
            ```python
            sm = SystemManager()
            sm.generate_sosreport(skip_plugins='kdump,networking')
            '/tmp/sosreport-localhost-123456-2021-01-01-abcdef.tar.xz'
            ```
        """
        ensure_installed('sos')

        cmd = ['sos', 'report', '--batch', f'--plugin-timeout={plugin_timeout}']
        if skip_plugins:
            cmd.extend(['--skip-plugins', skip_plugins])

        result = run(' '.join(cmd))
        if result.failed:
            logging.error('Failed to generate sosreport')
            return None

        # Find sosreport path in output
        for line in result.stdout.splitlines():
            if '/tmp/sosreport' in line:
                return line.strip()

        return None

    def get_timestamp(self, timezone_: Literal['utc', 'local'] = 'local') -> str:
        """Get current timestamp.

        Args:
            timezone_: Timezone to use

        Returns:
            Timestamp string (YYYYMMDDhhmmss)

        Example:
            ```python
            sm = SystemManager()
            sm.get_timestamp()
            '20210101000000'
            ```
        """
        return datetime.now(tz=timezone.utc if timezone_ == 'utc' else None).strftime('%Y%m%d%H%M%S')

    def clear_logs(self) -> None:
        """Clear system logs.

        Example:
            ```python
            sm = SystemManager()
            sm.clear_logs()
            ```
        """
        run('dmesg -c')

    def is_service_enabled(self, service: str) -> bool:
        """Check if service is enabled.

        Args:
            service: Service name

        Returns:
            True if service is enabled, False otherwise

        Example:
            ```python
            sm = SystemManager()
            sm.is_service_enabled('sshd')
            True
            ```
        """
        result = run(f'systemctl is-enabled {service}')
        return result.succeeded

    def is_service_running(self, service: str) -> bool:
        """Check if service is running.

        Args:
            service: Service name

        Returns:
            True if service is running, False otherwise

        Example:
            ```python
            sm = SystemManager()
            sm.is_service_running('sshd')
            True
            ```
        """
        result = run(f'systemctl is-active {service}')
        return result.succeeded

    def service_enable(self, service: str) -> bool:
        """Enable service.

        Args:
            service: Service name

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            sm = SystemManager()
            sm.service_enable('sshd')
            True
            ```
        """
        result = run(f'systemctl enable {service}')
        return result.succeeded

    def service_disable(self, service: str) -> bool:
        """Disable service.

        Args:
            service: Service name

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            sm = SystemManager()
            sm.service_disable('sshd')
            True
            ```
        """
        result = run(f'systemctl disable {service}')
        return result.succeeded

    def service_start(self, service: str) -> bool:
        """Start service.

        Args:
            service: Service name

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            sm = SystemManager()
            sm.service_start('sshd')
            True
            ```
        """
        result = run(f'systemctl start {service}')
        return result.succeeded

    def service_stop(self, service: str) -> bool:
        """Stop service.

        Args:
            service: Service name

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            sm = SystemManager()
            sm.service_stop('sshd')
            True
            ```
        """
        result = run(f'systemctl stop {service}')
        return result.succeeded

    def service_restart(self, service: str) -> bool:
        """Restart service.

        Args:
            service: Service name

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            sm = SystemManager()
            sm.service_restart('sshd')
            True
            ```
        """
        result = run(f'systemctl restart {service}')
        return result.succeeded

    def _test_service_enable_cycle(self, service: str) -> bool:
        """Test service enable/disable cycle.

        Args:
            service: Service name

        Returns:
            True if successful, False otherwise
        """
        if self.is_service_enabled(service):
            # Test disable -> enable
            if not self.service_disable(service):
                return False
            time.sleep(SERVICE_WAIT_TIME)
            return self.service_enable(service)
        # Test enable -> disable
        if not self.service_enable(service):
            return False
        time.sleep(SERVICE_WAIT_TIME)
        return self.service_disable(service)

    def _test_service_start_cycle(self, service: str) -> bool:
        """Test service start/stop cycle.

        Args:
            service: Service name

        Returns:
            True if successful, False otherwise
        """
        if self.is_service_running(service):
            # Test stop -> start
            if not self.service_stop(service):
                return False
            time.sleep(SERVICE_WAIT_TIME)
            return self.service_start(service)
        # Test start -> stop
        if not self.service_start(service):
            return False
        time.sleep(SERVICE_WAIT_TIME)
        return self.service_stop(service)

    def _test_service_restart(self, service: str) -> bool:
        """Test service restart.

        Args:
            service: Service name

        Returns:
            True if successful, False otherwise
        """
        if not self.service_restart(service):
            return False
        time.sleep(SERVICE_WAIT_TIME)
        return self.is_service_running(service)

    def test_service(self, service: str) -> bool:
        """Test service operations.

        This method tests:
        - Enable/disable cycle
        - Start/stop cycle
        - Restart operation

        Args:
            service: Service name

        Returns:
            True if all tests pass, False otherwise

        Example:
            ```python
            sm = SystemManager()
            sm.test_service('sshd')
            True
            ```
        """
        # Test enable/disable cycle
        if not self._test_service_enable_cycle(service):
            return False

        # Test start/stop cycle
        if not self._test_service_start_cycle(service):
            return False

        # Test restart
        return self._test_service_restart(service)

__init__()

Initialize system manager.

Source code in sts_libs/src/sts/utils/system.py
310
311
312
313
def __init__(self) -> None:
    """Initialize system manager."""
    self.info = SystemInfo.get_current()
    self.package_manager = Dnf()

clear_logs()

Clear system logs.

Example
sm = SystemManager()
sm.clear_logs()
Source code in sts_libs/src/sts/utils/system.py
417
418
419
420
421
422
423
424
425
426
def clear_logs(self) -> None:
    """Clear system logs.

    Example:
        ```python
        sm = SystemManager()
        sm.clear_logs()
        ```
    """
    run('dmesg -c')

generate_sosreport(skip_plugins=None, plugin_timeout=300)

Generate sosreport.

Parameters:

Name Type Description Default
skip_plugins str | None

Plugins to skip

None
plugin_timeout int

Plugin timeout in seconds

300

Returns:

Type Description
str | None

Path to sosreport or None if error

Example
sm = SystemManager()
sm.generate_sosreport(skip_plugins='kdump,networking')
'/tmp/sosreport-localhost-123456-2021-01-01-abcdef.tar.xz'
Source code in sts_libs/src/sts/utils/system.py
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
def generate_sosreport(self, skip_plugins: str | None = None, plugin_timeout: int = 300) -> str | None:
    """Generate sosreport.

    Args:
        skip_plugins: Plugins to skip
        plugin_timeout: Plugin timeout in seconds

    Returns:
        Path to sosreport or None if error

    Example:
        ```python
        sm = SystemManager()
        sm.generate_sosreport(skip_plugins='kdump,networking')
        '/tmp/sosreport-localhost-123456-2021-01-01-abcdef.tar.xz'
        ```
    """
    ensure_installed('sos')

    cmd = ['sos', 'report', '--batch', f'--plugin-timeout={plugin_timeout}']
    if skip_plugins:
        cmd.extend(['--skip-plugins', skip_plugins])

    result = run(' '.join(cmd))
    if result.failed:
        logging.error('Failed to generate sosreport')
        return None

    # Find sosreport path in output
    for line in result.stdout.splitlines():
        if '/tmp/sosreport' in line:
            return line.strip()

    return None

get_logs(options=None)

Get system logs.

Parameters:

Name Type Description Default
options LogOptions | None

Log options

None

Returns:

Type Description
str | None

Log output or None if error

Example
sm = SystemManager()
sm.get_logs(LogOptions(format=LogFormat.KERNEL))
'Jan 1 00:00:00 kernel: ...'
Source code in sts_libs/src/sts/utils/system.py
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
def get_logs(self, options: LogOptions | None = None) -> str | None:
    """Get system logs.

    Args:
        options: Log options

    Returns:
        Log output or None if error

    Example:
        ```python
        sm = SystemManager()
        sm.get_logs(LogOptions(format=LogFormat.KERNEL))
        'Jan 1 00:00:00 kernel: ...'
        ```
    """
    options = options or LogOptions()

    # Build command
    cmd = ['journalctl']
    if options.format == LogFormat.KERNEL:
        cmd.append('-k')
    if options.length:
        cmd.extend(['-n', str(options.length)])
    if options.format == LogFormat.REVERSE:
        cmd.append('-r')
    if options.since:
        cmd.extend(['-S', options.since])
    if options.options:
        cmd.extend(options.options)
    if options.grep:
        cmd.extend(['|', 'grep', f"'{options.grep}'"])

    result = run(' '.join(cmd))
    if result.failed:
        logging.error('Failed to get system logs')
        return None

    # Format output to match /var/log/messages
    output = []
    for line in result.stdout.splitlines():
        parts = line.split()
        if len(parts) < MIN_LOG_PARTS:
            continue
        parts[3] = parts[3].split('.')[0]
        output.append(' '.join(parts))

    return '\n'.join(output)

get_timestamp(timezone_='local')

Get current timestamp.

Parameters:

Name Type Description Default
timezone_ Literal['utc', 'local']

Timezone to use

'local'

Returns:

Type Description
str

Timestamp string (YYYYMMDDhhmmss)

Example
sm = SystemManager()
sm.get_timestamp()
'20210101000000'
Source code in sts_libs/src/sts/utils/system.py
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
def get_timestamp(self, timezone_: Literal['utc', 'local'] = 'local') -> str:
    """Get current timestamp.

    Args:
        timezone_: Timezone to use

    Returns:
        Timestamp string (YYYYMMDDhhmmss)

    Example:
        ```python
        sm = SystemManager()
        sm.get_timestamp()
        '20210101000000'
        ```
    """
    return datetime.now(tz=timezone.utc if timezone_ == 'utc' else None).strftime('%Y%m%d%H%M%S')

is_service_enabled(service)

Check if service is enabled.

Parameters:

Name Type Description Default
service str

Service name

required

Returns:

Type Description
bool

True if service is enabled, False otherwise

Example
sm = SystemManager()
sm.is_service_enabled('sshd')
True
Source code in sts_libs/src/sts/utils/system.py
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
def is_service_enabled(self, service: str) -> bool:
    """Check if service is enabled.

    Args:
        service: Service name

    Returns:
        True if service is enabled, False otherwise

    Example:
        ```python
        sm = SystemManager()
        sm.is_service_enabled('sshd')
        True
        ```
    """
    result = run(f'systemctl is-enabled {service}')
    return result.succeeded

is_service_running(service)

Check if service is running.

Parameters:

Name Type Description Default
service str

Service name

required

Returns:

Type Description
bool

True if service is running, False otherwise

Example
sm = SystemManager()
sm.is_service_running('sshd')
True
Source code in sts_libs/src/sts/utils/system.py
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
def is_service_running(self, service: str) -> bool:
    """Check if service is running.

    Args:
        service: Service name

    Returns:
        True if service is running, False otherwise

    Example:
        ```python
        sm = SystemManager()
        sm.is_service_running('sshd')
        True
        ```
    """
    result = run(f'systemctl is-active {service}')
    return result.succeeded

service_disable(service)

Disable service.

Parameters:

Name Type Description Default
service str

Service name

required

Returns:

Type Description
bool

True if successful, False otherwise

Example
sm = SystemManager()
sm.service_disable('sshd')
True
Source code in sts_libs/src/sts/utils/system.py
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
def service_disable(self, service: str) -> bool:
    """Disable service.

    Args:
        service: Service name

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        sm = SystemManager()
        sm.service_disable('sshd')
        True
        ```
    """
    result = run(f'systemctl disable {service}')
    return result.succeeded

service_enable(service)

Enable service.

Parameters:

Name Type Description Default
service str

Service name

required

Returns:

Type Description
bool

True if successful, False otherwise

Example
sm = SystemManager()
sm.service_enable('sshd')
True
Source code in sts_libs/src/sts/utils/system.py
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
def service_enable(self, service: str) -> bool:
    """Enable service.

    Args:
        service: Service name

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        sm = SystemManager()
        sm.service_enable('sshd')
        True
        ```
    """
    result = run(f'systemctl enable {service}')
    return result.succeeded

service_restart(service)

Restart service.

Parameters:

Name Type Description Default
service str

Service name

required

Returns:

Type Description
bool

True if successful, False otherwise

Example
sm = SystemManager()
sm.service_restart('sshd')
True
Source code in sts_libs/src/sts/utils/system.py
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
def service_restart(self, service: str) -> bool:
    """Restart service.

    Args:
        service: Service name

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        sm = SystemManager()
        sm.service_restart('sshd')
        True
        ```
    """
    result = run(f'systemctl restart {service}')
    return result.succeeded

service_start(service)

Start service.

Parameters:

Name Type Description Default
service str

Service name

required

Returns:

Type Description
bool

True if successful, False otherwise

Example
sm = SystemManager()
sm.service_start('sshd')
True
Source code in sts_libs/src/sts/utils/system.py
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
def service_start(self, service: str) -> bool:
    """Start service.

    Args:
        service: Service name

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        sm = SystemManager()
        sm.service_start('sshd')
        True
        ```
    """
    result = run(f'systemctl start {service}')
    return result.succeeded

service_stop(service)

Stop service.

Parameters:

Name Type Description Default
service str

Service name

required

Returns:

Type Description
bool

True if successful, False otherwise

Example
sm = SystemManager()
sm.service_stop('sshd')
True
Source code in sts_libs/src/sts/utils/system.py
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
def service_stop(self, service: str) -> bool:
    """Stop service.

    Args:
        service: Service name

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        sm = SystemManager()
        sm.service_stop('sshd')
        True
        ```
    """
    result = run(f'systemctl stop {service}')
    return result.succeeded

test_service(service)

Test service operations.

This method tests: - Enable/disable cycle - Start/stop cycle - Restart operation

Parameters:

Name Type Description Default
service str

Service name

required

Returns:

Type Description
bool

True if all tests pass, False otherwise

Example
sm = SystemManager()
sm.test_service('sshd')
True
Source code in sts_libs/src/sts/utils/system.py
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
def test_service(self, service: str) -> bool:
    """Test service operations.

    This method tests:
    - Enable/disable cycle
    - Start/stop cycle
    - Restart operation

    Args:
        service: Service name

    Returns:
        True if all tests pass, False otherwise

    Example:
        ```python
        sm = SystemManager()
        sm.test_service('sshd')
        True
        ```
    """
    # Test enable/disable cycle
    if not self._test_service_enable_cycle(service):
        return False

    # Test start/stop cycle
    if not self._test_service_start_cycle(service):
        return False

    # Test restart
    return self._test_service_restart(service)

Package Management

sts.utils.packages

Package management.

This module provides functionality for managing system packages: - Package installation - Package information - Package version management - Repository management

Dnf

DNF package manager functionality.

This class provides functionality for managing system packages: - Package installation - Package removal - Repository management

Example
pm = Dnf()
pm.install('bash')
True
Source code in sts_libs/src/sts/utils/packages.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
class Dnf:
    """DNF package manager functionality.

    This class provides functionality for managing system packages:
    - Package installation
    - Package removal
    - Repository management

    Example:
        ```python
        pm = Dnf()
        pm.install('bash')
        True
        ```
    """

    def __init__(self) -> None:
        """Initialize package manager."""
        self.repo_path = Path('/etc/yum.repos.d')

    def install(self, package: str) -> bool:
        """Install package.

        Args:
            package: Package name

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pm.install('bash')
            True
            ```
        """
        pkg: RpmPackage = host.package(package)  # type: ignore[no-any-return]
        if pkg.is_installed:
            return True

        result = run(f'dnf install -y {package}')
        if result.failed:
            logging.error(f'Failed to install {package}')
            return False

        logging.info(f'Successfully installed {package}')
        return True

    def remove(self, package: str) -> bool:
        """Remove package.

        Args:
            package: Package name

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pm.remove('bash')
            True
            ```
        """
        pkg: RpmPackage = host.package(package)  # type: ignore[no-any-return]
        if not pkg.is_installed:
            return True

        result = run(f'dnf remove -y {package}')
        if result.failed:
            logging.error(f'Failed to remove {package}')
            return False

        logging.info(f'Successfully removed {package}')
        return True

    def add_repo(self, config: RepoConfig) -> bool:
        """Add repository.

        Args:
            config: Repository configuration

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            config = RepoConfig(
                name='epel',
                baseurl='https://dl.fedoraproject.org/pub/epel/8/Everything/x86_64/',
            )
            pm.add_repo(config)
            True
            ```
        """
        if not config.baseurl and not config.metalink:
            logging.error('Either baseurl or metalink required')
            return False

        repo_file = self.repo_path / f'{config.name.lower()}.repo'
        if repo_file.exists():
            logging.info(f'Repository {config.name} already exists')
            return True

        # Write repo file
        try:
            content = [f'[{config.name}]']
            content.extend(f'{k}={v}' for k, v in config.to_config().items())
            repo_file.write_text('\n'.join(content))
        except OSError:
            logging.exception(f'Failed to write repository file {repo_file}')
            return False

        return True

    def remove_repo(self, name: str) -> bool:
        """Remove repository.

        Args:
            name: Repository name

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pm.remove_repo('epel')
            True
            ```
        """
        repo_file = self.repo_path / f'{name.lower()}.repo'
        try:
            repo_file.unlink(missing_ok=True)
        except OSError:
            logging.exception(f'Failed to remove repository file {repo_file}')
            return False

        return True

    def repo_exists(self, name: str) -> bool:
        """Check if repository exists.

        Args:
            name: Repository name

        Returns:
            True if repository exists

        Example:
            ```python
            pm.repo_exists('epel')
            True
            ```
        """
        result = run(f'dnf repoinfo {name}')
        if result.failed:
            logging.error(f'Repository {name} not found')
            return False
        return True

    def repo_enabled(self, name: str) -> bool:
        """Check if repository is enabled.

        Args:
            name: Repository name

        Returns:
            True if repository exists and is enabled

        Example:
            ```python
            pm.repo_enabled('epel')
            True
            ```
        """
        if not self.repo_exists(name):
            return False

        result = run(f'dnf repoinfo {name}')
        if 'enabled' not in result.stdout:
            logging.error(f'Repository {name} not enabled')
            return False

        return True

    def download_repo(self, url: str, name: str | None = None, *, overwrite: bool = True) -> bool:
        """Download repository file.

        Args:
            url: Repository file URL
            name: Repository name (optional, derived from URL)
            overwrite: Overwrite existing file (optional)

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pm.download_repo('https://dl.fedoraproject.org/pub/epel/epel-release-latest-8.noarch.rpm')
            True
            ```
        """
        if not self.install('curl'):
            return False

        if not name:
            name = url.split('/')[-1]
        if not name.endswith('.repo'):
            name = f'{name}.repo'

        repo_file = self.repo_path / name
        if repo_file.exists() and not overwrite:
            logging.info(f'Repository file {repo_file} already exists')
            return True

        result = run(f'curl {url} --output {repo_file}')
        if result.failed:
            logging.error(f'Failed to download repository file from {url}')
            return False

        return True

__init__()

Initialize package manager.

Source code in sts_libs/src/sts/utils/packages.py
103
104
105
def __init__(self) -> None:
    """Initialize package manager."""
    self.repo_path = Path('/etc/yum.repos.d')

add_repo(config)

Add repository.

Parameters:

Name Type Description Default
config RepoConfig

Repository configuration

required

Returns:

Type Description
bool

True if successful, False otherwise

Example
config = RepoConfig(
    name='epel',
    baseurl='https://dl.fedoraproject.org/pub/epel/8/Everything/x86_64/',
)
pm.add_repo(config)
True
Source code in sts_libs/src/sts/utils/packages.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def add_repo(self, config: RepoConfig) -> bool:
    """Add repository.

    Args:
        config: Repository configuration

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        config = RepoConfig(
            name='epel',
            baseurl='https://dl.fedoraproject.org/pub/epel/8/Everything/x86_64/',
        )
        pm.add_repo(config)
        True
        ```
    """
    if not config.baseurl and not config.metalink:
        logging.error('Either baseurl or metalink required')
        return False

    repo_file = self.repo_path / f'{config.name.lower()}.repo'
    if repo_file.exists():
        logging.info(f'Repository {config.name} already exists')
        return True

    # Write repo file
    try:
        content = [f'[{config.name}]']
        content.extend(f'{k}={v}' for k, v in config.to_config().items())
        repo_file.write_text('\n'.join(content))
    except OSError:
        logging.exception(f'Failed to write repository file {repo_file}')
        return False

    return True

download_repo(url, name=None, *, overwrite=True)

Download repository file.

Parameters:

Name Type Description Default
url str

Repository file URL

required
name str | None

Repository name (optional, derived from URL)

None
overwrite bool

Overwrite existing file (optional)

True

Returns:

Type Description
bool

True if successful, False otherwise

Example
pm.download_repo('https://dl.fedoraproject.org/pub/epel/epel-release-latest-8.noarch.rpm')
True
Source code in sts_libs/src/sts/utils/packages.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
def download_repo(self, url: str, name: str | None = None, *, overwrite: bool = True) -> bool:
    """Download repository file.

    Args:
        url: Repository file URL
        name: Repository name (optional, derived from URL)
        overwrite: Overwrite existing file (optional)

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pm.download_repo('https://dl.fedoraproject.org/pub/epel/epel-release-latest-8.noarch.rpm')
        True
        ```
    """
    if not self.install('curl'):
        return False

    if not name:
        name = url.split('/')[-1]
    if not name.endswith('.repo'):
        name = f'{name}.repo'

    repo_file = self.repo_path / name
    if repo_file.exists() and not overwrite:
        logging.info(f'Repository file {repo_file} already exists')
        return True

    result = run(f'curl {url} --output {repo_file}')
    if result.failed:
        logging.error(f'Failed to download repository file from {url}')
        return False

    return True

install(package)

Install package.

Parameters:

Name Type Description Default
package str

Package name

required

Returns:

Type Description
bool

True if successful, False otherwise

Example
pm.install('bash')
True
Source code in sts_libs/src/sts/utils/packages.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def install(self, package: str) -> bool:
    """Install package.

    Args:
        package: Package name

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pm.install('bash')
        True
        ```
    """
    pkg: RpmPackage = host.package(package)  # type: ignore[no-any-return]
    if pkg.is_installed:
        return True

    result = run(f'dnf install -y {package}')
    if result.failed:
        logging.error(f'Failed to install {package}')
        return False

    logging.info(f'Successfully installed {package}')
    return True

remove(package)

Remove package.

Parameters:

Name Type Description Default
package str

Package name

required

Returns:

Type Description
bool

True if successful, False otherwise

Example
pm.remove('bash')
True
Source code in sts_libs/src/sts/utils/packages.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def remove(self, package: str) -> bool:
    """Remove package.

    Args:
        package: Package name

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pm.remove('bash')
        True
        ```
    """
    pkg: RpmPackage = host.package(package)  # type: ignore[no-any-return]
    if not pkg.is_installed:
        return True

    result = run(f'dnf remove -y {package}')
    if result.failed:
        logging.error(f'Failed to remove {package}')
        return False

    logging.info(f'Successfully removed {package}')
    return True

remove_repo(name)

Remove repository.

Parameters:

Name Type Description Default
name str

Repository name

required

Returns:

Type Description
bool

True if successful, False otherwise

Example
pm.remove_repo('epel')
True
Source code in sts_libs/src/sts/utils/packages.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def remove_repo(self, name: str) -> bool:
    """Remove repository.

    Args:
        name: Repository name

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pm.remove_repo('epel')
        True
        ```
    """
    repo_file = self.repo_path / f'{name.lower()}.repo'
    try:
        repo_file.unlink(missing_ok=True)
    except OSError:
        logging.exception(f'Failed to remove repository file {repo_file}')
        return False

    return True

repo_enabled(name)

Check if repository is enabled.

Parameters:

Name Type Description Default
name str

Repository name

required

Returns:

Type Description
bool

True if repository exists and is enabled

Example
pm.repo_enabled('epel')
True
Source code in sts_libs/src/sts/utils/packages.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
def repo_enabled(self, name: str) -> bool:
    """Check if repository is enabled.

    Args:
        name: Repository name

    Returns:
        True if repository exists and is enabled

    Example:
        ```python
        pm.repo_enabled('epel')
        True
        ```
    """
    if not self.repo_exists(name):
        return False

    result = run(f'dnf repoinfo {name}')
    if 'enabled' not in result.stdout:
        logging.error(f'Repository {name} not enabled')
        return False

    return True

repo_exists(name)

Check if repository exists.

Parameters:

Name Type Description Default
name str

Repository name

required

Returns:

Type Description
bool

True if repository exists

Example
pm.repo_exists('epel')
True
Source code in sts_libs/src/sts/utils/packages.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def repo_exists(self, name: str) -> bool:
    """Check if repository exists.

    Args:
        name: Repository name

    Returns:
        True if repository exists

    Example:
        ```python
        pm.repo_exists('epel')
        True
        ```
    """
    result = run(f'dnf repoinfo {name}')
    if result.failed:
        logging.error(f'Repository {name} not found')
        return False
    return True

RepoConfig dataclass

Repository configuration.

This class provides configuration for repository management: - Repository metadata - Repository options - Repository URLs

Parameters:

Name Type Description Default
name str

Repository name

required
baseurl str | None

Repository URL (optional)

None
metalink str | None

Repository metalink (optional)

None
enabled bool

Enable repository

True
gpgcheck bool

Enable GPG check

False
skip_if_unavailable bool

Skip if unavailable

True
Example
config = RepoConfig(
    name='epel',
    baseurl='https://dl.fedoraproject.org/pub/epel/8/Everything/x86_64/',
)
Source code in sts_libs/src/sts/utils/packages.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@dataclass
class RepoConfig:
    """Repository configuration.

    This class provides configuration for repository management:
    - Repository metadata
    - Repository options
    - Repository URLs

    Args:
        name: Repository name
        baseurl: Repository URL (optional)
        metalink: Repository metalink (optional)
        enabled: Enable repository
        gpgcheck: Enable GPG check
        skip_if_unavailable: Skip if unavailable

    Example:
        ```python
        config = RepoConfig(
            name='epel',
            baseurl='https://dl.fedoraproject.org/pub/epel/8/Everything/x86_64/',
        )
        ```
    """

    name: str
    baseurl: str | None = None
    metalink: str | None = None
    enabled: bool = True
    gpgcheck: bool = False
    skip_if_unavailable: bool = True

    def to_config(self) -> dict[str, str]:
        """Convert to repository configuration.

        Returns:
            Repository configuration dictionary

        Example:
            ```python
            config = RepoConfig(name='epel', baseurl='https://example.com')
            config.to_config()
            {'name': 'epel', 'baseurl': 'https://example.com', 'enabled': '1', ...}
            ```
        """
        config = {
            'name': self.name,
            'enabled': '1' if self.enabled else '0',
            'gpgcheck': '1' if self.gpgcheck else '0',
            'skip_if_unavailable': '1' if self.skip_if_unavailable else '0',
        }
        if self.baseurl:
            config['baseurl'] = self.baseurl
        if self.metalink:
            config['metalink'] = self.metalink
        return config

to_config()

Convert to repository configuration.

Returns:

Type Description
dict[str, str]

Repository configuration dictionary

Example
config = RepoConfig(name='epel', baseurl='https://example.com')
config.to_config()
{'name': 'epel', 'baseurl': 'https://example.com', 'enabled': '1', ...}
Source code in sts_libs/src/sts/utils/packages.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def to_config(self) -> dict[str, str]:
    """Convert to repository configuration.

    Returns:
        Repository configuration dictionary

    Example:
        ```python
        config = RepoConfig(name='epel', baseurl='https://example.com')
        config.to_config()
        {'name': 'epel', 'baseurl': 'https://example.com', 'enabled': '1', ...}
        ```
    """
    config = {
        'name': self.name,
        'enabled': '1' if self.enabled else '0',
        'gpgcheck': '1' if self.gpgcheck else '0',
        'skip_if_unavailable': '1' if self.skip_if_unavailable else '0',
    }
    if self.baseurl:
        config['baseurl'] = self.baseurl
    if self.metalink:
        config['metalink'] = self.metalink
    return config

ensure_installed(*packages)

Ensure packages are installed.

Parameters:

Name Type Description Default
*packages str

Package names to install

()

Returns:

Type Description
bool

True if all packages are installed, False otherwise

Example
ensure_installed('lsscsi', 'curl')
True
ensure_installed('nonexistent')
False
Source code in sts_libs/src/sts/utils/packages.py
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def ensure_installed(*packages: str) -> bool:
    """Ensure packages are installed.

    Args:
        *packages: Package names to install

    Returns:
        True if all packages are installed, False otherwise

    Example:
        ```python
        ensure_installed('lsscsi', 'curl')
        True
        ensure_installed('nonexistent')
        False
        ```
    """
    if not packages:
        return True

    pm = Dnf()
    for package in packages:
        if not host.package(package).is_installed:  # type: ignore[no-any-return]
            return all(pm.install(package) for package in packages)
    return True

Module Management

sts.utils.modules

Kernel module management.

This module provides functionality for managing kernel modules: - Module loading/unloading - Module information - Module dependencies

ModuleInfo dataclass

Kernel module information.

This class provides functionality for managing module information: - Module metadata - Module parameters - Module dependencies

Parameters:

Name Type Description Default
name str | None

Module name (optional, discovers first loaded module)

None
size int | None

Module size (optional, discovered from module)

None
used_by list[str]

List of modules using this module (optional, discovered from module)

list()
state str | None

Module state (optional, discovered from module)

None
parameters dict[str, Any]

Module parameters (optional, discovered from module)

dict()
Example
info = ModuleInfo()  # Discovers first loaded module
info = ModuleInfo(name='dm_mod')  # Discovers other values
Source code in sts_libs/src/sts/utils/modules.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
@dataclass
class ModuleInfo:
    """Kernel module information.

    This class provides functionality for managing module information:
    - Module metadata
    - Module parameters
    - Module dependencies

    Args:
        name: Module name (optional, discovers first loaded module)
        size: Module size (optional, discovered from module)
        used_by: List of modules using this module (optional, discovered from module)
        state: Module state (optional, discovered from module)
        parameters: Module parameters (optional, discovered from module)

    Example:
        ```python
        info = ModuleInfo()  # Discovers first loaded module
        info = ModuleInfo(name='dm_mod')  # Discovers other values
        ```
    """

    # Optional parameters
    name: str | None = None
    size: int | None = None
    used_by: list[str] = field(default_factory=list)
    state: str | None = None
    parameters: dict[str, Any] = field(default_factory=dict)

    def __post_init__(self) -> None:
        """Initialize module information.

        Discovers module information based on provided parameters.
        """
        # If no name provided, get first loaded module
        if not self.name:
            try:
                line = Path('/proc/modules').read_text().splitlines()[0]
                self.name = line.split()[0]
            except (OSError, IndexError):
                return

        # Get module information if name is available
        if self.name:
            try:
                for line in Path('/proc/modules').read_text().splitlines():
                    parts = line.split(maxsplit=4)
                    if parts[0] == self.name:
                        self.size = int(parts[1])
                        self.state = parts[2]
                        if parts[3] != '-':
                            self.used_by = parts[3].rstrip(',').split(',')
                        break
            except (OSError, IndexError, ValueError):
                logging.warning(f'Failed to get module info for {self.name}')

            # Get module parameters
            param_path = Path('/sys/module') / self.name / 'parameters'
            if param_path.is_dir():
                try:
                    for param in param_path.iterdir():
                        if param.is_file():
                            self.parameters[param.name] = param.read_text().strip()
                except OSError:
                    logging.warning(f'Failed to get parameters for {self.name}')

    @property
    def exists(self) -> bool:
        """Check if module exists.

        Returns:
            True if exists, False otherwise

        Example:
            ```python
            info = ModuleInfo(name='dm_mod')
            info.exists
            True
            ```
        """
        return bool(self.name and Path('/sys/module', self.name).exists())

    @property
    def loaded(self) -> bool:
        """Check if module is loaded.

        Returns:
            True if loaded, False otherwise

        Example:
            ```python
            info = ModuleInfo(name='dm_mod')
            info.loaded
            True
            ```
        """
        return bool(self.state)

    def load(self, parameters: str | None = None) -> bool:
        """Load module.

        Args:
            parameters: Module parameters

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            info = ModuleInfo(name='dm_mod')
            info.load()
            True
            ```
        """
        if not self.name:
            logging.error('Module name required')
            return False

        cmd = ['modprobe', self.name]
        if parameters:
            cmd.append(parameters)

        result = run(' '.join(cmd))
        if result.failed:
            logging.error(f'Failed to load module: {result.stderr}')
            return False

        # Update module information
        self.__post_init__()
        return True

    def unload(self) -> bool:
        """Unload module.

        Returns:
            True if successful, False otherwise

        Raises:
            ModuleInUseError: If module is in use
            RuntimeError: If module cannot be unloaded

        Example:
            ```python
            info = ModuleInfo(name='dm_mod')
            info.unload()
            True
            ```
        """
        if not self.name:
            logging.error('Module name required')
            return False

        result = run(f'modprobe -r {self.name}')
        if result.failed:
            if f'modprobe: FATAL: Module {self.name} is in use.' in result.stderr:
                raise ModuleInUseError(self.name)
            raise RuntimeError(result.stderr)

        # Update module information
        self.__post_init__()
        return True

    def unload_with_dependencies(self) -> bool:
        """Unload module and its dependencies.

        Returns:
            True if successful, False otherwise

        Raises:
            ModuleInUseError: If module is in use
            RuntimeError: If module cannot be unloaded

        Example:
            ```python
            info = ModuleInfo(name='dm_mod')
            info.unload_with_dependencies()
            True
            ```
        """
        if not self.name:
            logging.error('Module name required')
            return False

        if self.used_by:
            logging.info(f'Removing modules dependent on {self.name}')
            for module in self.used_by:
                if (info := ModuleInfo(name=module)) and not info.unload_with_dependencies():
                    logging.error('Failed to unload dependent modules')
                    return False

        return self.unload()

    @classmethod
    def from_name(cls, name: str) -> ModuleInfo | None:
        """Get module information by name.

        Args:
            name: Module name

        Returns:
            Module information or None if not found

        Example:
            ```python
            info = ModuleInfo.from_name('dm_mod')
            info.used_by
            ['dm_mirror', 'dm_log']
            ```
        """
        info = cls(name=name)
        return info if info.exists else None

exists: bool property

Check if module exists.

Returns:

Type Description
bool

True if exists, False otherwise

Example
info = ModuleInfo(name='dm_mod')
info.exists
True

loaded: bool property

Check if module is loaded.

Returns:

Type Description
bool

True if loaded, False otherwise

Example
info = ModuleInfo(name='dm_mod')
info.loaded
True

__post_init__()

Initialize module information.

Discovers module information based on provided parameters.

Source code in sts_libs/src/sts/utils/modules.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def __post_init__(self) -> None:
    """Initialize module information.

    Discovers module information based on provided parameters.
    """
    # If no name provided, get first loaded module
    if not self.name:
        try:
            line = Path('/proc/modules').read_text().splitlines()[0]
            self.name = line.split()[0]
        except (OSError, IndexError):
            return

    # Get module information if name is available
    if self.name:
        try:
            for line in Path('/proc/modules').read_text().splitlines():
                parts = line.split(maxsplit=4)
                if parts[0] == self.name:
                    self.size = int(parts[1])
                    self.state = parts[2]
                    if parts[3] != '-':
                        self.used_by = parts[3].rstrip(',').split(',')
                    break
        except (OSError, IndexError, ValueError):
            logging.warning(f'Failed to get module info for {self.name}')

        # Get module parameters
        param_path = Path('/sys/module') / self.name / 'parameters'
        if param_path.is_dir():
            try:
                for param in param_path.iterdir():
                    if param.is_file():
                        self.parameters[param.name] = param.read_text().strip()
            except OSError:
                logging.warning(f'Failed to get parameters for {self.name}')

from_name(name) classmethod

Get module information by name.

Parameters:

Name Type Description Default
name str

Module name

required

Returns:

Type Description
ModuleInfo | None

Module information or None if not found

Example
info = ModuleInfo.from_name('dm_mod')
info.used_by
['dm_mirror', 'dm_log']
Source code in sts_libs/src/sts/utils/modules.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
@classmethod
def from_name(cls, name: str) -> ModuleInfo | None:
    """Get module information by name.

    Args:
        name: Module name

    Returns:
        Module information or None if not found

    Example:
        ```python
        info = ModuleInfo.from_name('dm_mod')
        info.used_by
        ['dm_mirror', 'dm_log']
        ```
    """
    info = cls(name=name)
    return info if info.exists else None

load(parameters=None)

Load module.

Parameters:

Name Type Description Default
parameters str | None

Module parameters

None

Returns:

Type Description
bool

True if successful, False otherwise

Example
info = ModuleInfo(name='dm_mod')
info.load()
True
Source code in sts_libs/src/sts/utils/modules.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def load(self, parameters: str | None = None) -> bool:
    """Load module.

    Args:
        parameters: Module parameters

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        info = ModuleInfo(name='dm_mod')
        info.load()
        True
        ```
    """
    if not self.name:
        logging.error('Module name required')
        return False

    cmd = ['modprobe', self.name]
    if parameters:
        cmd.append(parameters)

    result = run(' '.join(cmd))
    if result.failed:
        logging.error(f'Failed to load module: {result.stderr}')
        return False

    # Update module information
    self.__post_init__()
    return True

unload()

Unload module.

Returns:

Type Description
bool

True if successful, False otherwise

Raises:

Type Description
ModuleInUseError

If module is in use

RuntimeError

If module cannot be unloaded

Example
info = ModuleInfo(name='dm_mod')
info.unload()
True
Source code in sts_libs/src/sts/utils/modules.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def unload(self) -> bool:
    """Unload module.

    Returns:
        True if successful, False otherwise

    Raises:
        ModuleInUseError: If module is in use
        RuntimeError: If module cannot be unloaded

    Example:
        ```python
        info = ModuleInfo(name='dm_mod')
        info.unload()
        True
        ```
    """
    if not self.name:
        logging.error('Module name required')
        return False

    result = run(f'modprobe -r {self.name}')
    if result.failed:
        if f'modprobe: FATAL: Module {self.name} is in use.' in result.stderr:
            raise ModuleInUseError(self.name)
        raise RuntimeError(result.stderr)

    # Update module information
    self.__post_init__()
    return True

unload_with_dependencies()

Unload module and its dependencies.

Returns:

Type Description
bool

True if successful, False otherwise

Raises:

Type Description
ModuleInUseError

If module is in use

RuntimeError

If module cannot be unloaded

Example
info = ModuleInfo(name='dm_mod')
info.unload_with_dependencies()
True
Source code in sts_libs/src/sts/utils/modules.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
def unload_with_dependencies(self) -> bool:
    """Unload module and its dependencies.

    Returns:
        True if successful, False otherwise

    Raises:
        ModuleInUseError: If module is in use
        RuntimeError: If module cannot be unloaded

    Example:
        ```python
        info = ModuleInfo(name='dm_mod')
        info.unload_with_dependencies()
        True
        ```
    """
    if not self.name:
        logging.error('Module name required')
        return False

    if self.used_by:
        logging.info(f'Removing modules dependent on {self.name}')
        for module in self.used_by:
            if (info := ModuleInfo(name=module)) and not info.unload_with_dependencies():
                logging.error('Failed to unload dependent modules')
                return False

    return self.unload()

ModuleManager

Module manager functionality.

This class provides functionality for managing kernel modules: - Module loading/unloading - Module information - Module dependencies

Example
mm = ModuleManager()
mm.load('dm_mod')
True
Source code in sts_libs/src/sts/utils/modules.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
class ModuleManager:
    """Module manager functionality.

    This class provides functionality for managing kernel modules:
    - Module loading/unloading
    - Module information
    - Module dependencies

    Example:
        ```python
        mm = ModuleManager()
        mm.load('dm_mod')
        True
        ```
    """

    def __init__(self) -> None:
        """Initialize module manager."""
        self.modules_path = Path('/proc/modules')
        self.parameters_path = Path('/sys/module')

    def get_all(self) -> list[ModuleInfo]:
        """Get list of all loaded modules.

        Returns:
            List of module information

        Example:
            ```python
            mm = ModuleManager()
            mm.get_all()
            [ModuleInfo(name='dm_mod', ...), ModuleInfo(name='ext4', ...)]
            ```
        """
        modules = []
        try:
            for line in self.modules_path.read_text().splitlines():
                parts = line.split(maxsplit=4)
                info = ModuleInfo(name=parts[0])
                if info.exists:
                    modules.append(info)
        except (OSError, IndexError):
            logging.exception('Failed to get module list')
            return []

        return modules

    def get_parameters(self, name: str) -> dict[str, str]:
        """Get module parameters.

        Args:
            name: Module name

        Returns:
            Dictionary of parameter names and values

        Example:
            ```python
            mm = ModuleManager()
            mm.get_parameters('dm_mod')
            {'major': '253'}
            ```
        """
        if info := ModuleInfo(name=name):
            return info.parameters
        return {}

    def load(self, name: str, parameters: str | None = None) -> bool:
        """Load module.

        Args:
            name: Module name
            parameters: Module parameters

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            mm = ModuleManager()
            mm.load('dm_mod')
            True
            ```
        """
        info = ModuleInfo(name=name)
        if info.loaded:
            return True
        return info.load(parameters)

    def unload(self, name: str) -> bool:
        """Unload module.

        Args:
            name: Module name

        Returns:
            True if successful, False otherwise

        Raises:
            ModuleInUseError: If module is in use
            RuntimeError: If module cannot be unloaded

        Example:
            ```python
            mm = ModuleManager()
            mm.unload('dm_mod')
            True
            ```
        """
        info = ModuleInfo(name=name)
        if info.loaded:
            return info.unload()
        return True

    def unload_with_dependencies(self, name: str) -> bool:
        """Unload module and its dependencies.

        Args:
            name: Module name

        Returns:
            True if successful, False otherwise

        Raises:
            ModuleInUseError: If module is in use
            RuntimeError: If module cannot be unloaded

        Example:
            ```python
            mm = ModuleManager()
            mm.unload_with_dependencies('dm_mod')
            True
            ```
        """
        info = ModuleInfo(name=name)
        if info.loaded:
            return info.unload_with_dependencies()
        return True

__init__()

Initialize module manager.

Source code in sts_libs/src/sts/utils/modules.py
252
253
254
255
def __init__(self) -> None:
    """Initialize module manager."""
    self.modules_path = Path('/proc/modules')
    self.parameters_path = Path('/sys/module')

get_all()

Get list of all loaded modules.

Returns:

Type Description
list[ModuleInfo]

List of module information

Example
mm = ModuleManager()
mm.get_all()
[ModuleInfo(name='dm_mod', ...), ModuleInfo(name='ext4', ...)]
Source code in sts_libs/src/sts/utils/modules.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def get_all(self) -> list[ModuleInfo]:
    """Get list of all loaded modules.

    Returns:
        List of module information

    Example:
        ```python
        mm = ModuleManager()
        mm.get_all()
        [ModuleInfo(name='dm_mod', ...), ModuleInfo(name='ext4', ...)]
        ```
    """
    modules = []
    try:
        for line in self.modules_path.read_text().splitlines():
            parts = line.split(maxsplit=4)
            info = ModuleInfo(name=parts[0])
            if info.exists:
                modules.append(info)
    except (OSError, IndexError):
        logging.exception('Failed to get module list')
        return []

    return modules

get_parameters(name)

Get module parameters.

Parameters:

Name Type Description Default
name str

Module name

required

Returns:

Type Description
dict[str, str]

Dictionary of parameter names and values

Example
mm = ModuleManager()
mm.get_parameters('dm_mod')
{'major': '253'}
Source code in sts_libs/src/sts/utils/modules.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def get_parameters(self, name: str) -> dict[str, str]:
    """Get module parameters.

    Args:
        name: Module name

    Returns:
        Dictionary of parameter names and values

    Example:
        ```python
        mm = ModuleManager()
        mm.get_parameters('dm_mod')
        {'major': '253'}
        ```
    """
    if info := ModuleInfo(name=name):
        return info.parameters
    return {}

load(name, parameters=None)

Load module.

Parameters:

Name Type Description Default
name str

Module name

required
parameters str | None

Module parameters

None

Returns:

Type Description
bool

True if successful, False otherwise

Example
mm = ModuleManager()
mm.load('dm_mod')
True
Source code in sts_libs/src/sts/utils/modules.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
def load(self, name: str, parameters: str | None = None) -> bool:
    """Load module.

    Args:
        name: Module name
        parameters: Module parameters

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        mm = ModuleManager()
        mm.load('dm_mod')
        True
        ```
    """
    info = ModuleInfo(name=name)
    if info.loaded:
        return True
    return info.load(parameters)

unload(name)

Unload module.

Parameters:

Name Type Description Default
name str

Module name

required

Returns:

Type Description
bool

True if successful, False otherwise

Raises:

Type Description
ModuleInUseError

If module is in use

RuntimeError

If module cannot be unloaded

Example
mm = ModuleManager()
mm.unload('dm_mod')
True
Source code in sts_libs/src/sts/utils/modules.py
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
def unload(self, name: str) -> bool:
    """Unload module.

    Args:
        name: Module name

    Returns:
        True if successful, False otherwise

    Raises:
        ModuleInUseError: If module is in use
        RuntimeError: If module cannot be unloaded

    Example:
        ```python
        mm = ModuleManager()
        mm.unload('dm_mod')
        True
        ```
    """
    info = ModuleInfo(name=name)
    if info.loaded:
        return info.unload()
    return True

unload_with_dependencies(name)

Unload module and its dependencies.

Parameters:

Name Type Description Default
name str

Module name

required

Returns:

Type Description
bool

True if successful, False otherwise

Raises:

Type Description
ModuleInUseError

If module is in use

RuntimeError

If module cannot be unloaded

Example
mm = ModuleManager()
mm.unload_with_dependencies('dm_mod')
True
Source code in sts_libs/src/sts/utils/modules.py
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
def unload_with_dependencies(self, name: str) -> bool:
    """Unload module and its dependencies.

    Args:
        name: Module name

    Returns:
        True if successful, False otherwise

    Raises:
        ModuleInUseError: If module is in use
        RuntimeError: If module cannot be unloaded

    Example:
        ```python
        mm = ModuleManager()
        mm.unload_with_dependencies('dm_mod')
        True
        ```
    """
    info = ModuleInfo(name=name)
    if info.loaded:
        return info.unload_with_dependencies()
    return True

Size Handling

sts.utils.size

Size conversion utilities.

This module provides functionality for converting between human-readable sizes and bytes: - Human to bytes conversion (e.g., '1KiB' -> 1024) - Bytes to human conversion (e.g., 1024 -> '1KiB') - Size validation

Size dataclass

Size representation.

This class provides functionality for size operations: - Size parsing - Unit conversion - String representation

Parameters:

Name Type Description Default
value float

Size value (optional, defaults to 0)

0.0
unit Unit

Size unit (optional, defaults to bytes)

B
Example
size = Size()  # Zero bytes
size = Size(1024)  # 1024 bytes
size = Size(1.0, Unit.KIB)  # Custom value and unit
size = Size.from_string('1KiB')  # From string
Source code in sts_libs/src/sts/utils/size.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
@dataclass
class Size:
    """Size representation.

    This class provides functionality for size operations:
    - Size parsing
    - Unit conversion
    - String representation

    Args:
        value: Size value (optional, defaults to 0)
        unit: Size unit (optional, defaults to bytes)

    Example:
        ```python
        size = Size()  # Zero bytes
        size = Size(1024)  # 1024 bytes
        size = Size(1.0, Unit.KIB)  # Custom value and unit
        size = Size.from_string('1KiB')  # From string
        ```
    """

    # Optional parameters with defaults
    value: float = 0.0
    unit: Unit = Unit.B

    # Regular expression for parsing human-readable sizes
    SIZE_PATTERN: ClassVar[re.Pattern[str]] = re.compile(
        r'^([\-0-9\.]+)(Ki|Mi|Gi|Ti|Pi|Ei|Zi|Yi)?B$',
    )

    # Unit multipliers (powers of 1024)
    MULTIPLIERS: ClassVar[dict[Unit, int]] = {
        Unit.B: BYTE,
        Unit.KIB: KIB,
        Unit.MIB: MIB,
        Unit.GIB: GIB,
        Unit.TIB: TIB,
        Unit.PIB: PIB,
        Unit.EIB: EIB,
        Unit.ZIB: ZIB,
        Unit.YIB: YIB,
    }

    def __post_init__(self) -> None:
        """Initialize size.

        Converts value to appropriate unit if needed.
        """
        # Convert to appropriate unit if value is large
        if self.unit == Unit.B and self.value >= KIB:
            bytes_ = int(self.value)
            value = float(bytes_)
            unit = Unit.B
            for next_unit in list(Unit)[1:]:  # Skip B
                if value < KIB:
                    break
                value /= 1024
                unit = next_unit
            self.value = value
            self.unit = unit

    @classmethod
    def from_string(cls, size: str) -> Size | None:
        """Parse size from string.

        Args:
            size: Size string (e.g., '1KiB')

        Returns:
            Size instance or None if invalid

        Example:
            ```python
            Size.from_string('1KiB')
            Size(value=1.0, unit=Unit.KIB)
            ```
        """
        if not size:
            return None

        # Handle pure numbers as bytes
        if size.isdigit():
            return cls(float(size), Unit.B)

        # Parse size with unit
        match = cls.SIZE_PATTERN.match(size)
        if not match:
            logging.error(f'Invalid size format: {size}')
            return None

        try:
            value = float(match.group(1))
            unit_str = match.group(2)
            unit = Unit.B if not unit_str else Unit(f'{unit_str}B')
            return cls(value, unit)
        except (ValueError, KeyError):
            logging.exception('Failed to parse size')
            return None

    def to_bytes(self) -> int:
        """Convert to bytes.

        Returns:
            Size in bytes

        Example:
            ```python
            Size(1.0, Unit.KIB).to_bytes()
            1024
            ```
        """
        return int(self.value * self.MULTIPLIERS[self.unit])

    @classmethod
    def from_bytes(cls, bytes_: int) -> Size:
        """Convert bytes to human-readable size.

        Args:
            bytes_: Size in bytes

        Returns:
            Size instance

        Example:
            ```python
            Size.from_bytes(1024)
            Size(value=1.0, unit=Unit.KIB)
            ```
        """
        if bytes_ < KIB:
            return cls(float(bytes_), Unit.B)

        value = float(bytes_)
        unit = Unit.B  # Default unit
        for next_unit in list(Unit)[1:]:  # Skip B
            if value < KIB:
                break
            value /= 1024
            unit = next_unit

        return cls(value, unit)

    def __str__(self) -> str:
        """Return human-readable string.

        Returns:
            Size string (e.g., '1KiB')

        Example:
            ```python
            str(Size(1.0, Unit.KIB))
            '1KiB'
            ```
        """
        # Remove decimal part if whole number
        if self.value.is_integer():
            return f'{int(self.value)}{self.unit}'
        return f'{self.value:.1f}{self.unit}'

__post_init__()

Initialize size.

Converts value to appropriate unit if needed.

Source code in sts_libs/src/sts/utils/size.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def __post_init__(self) -> None:
    """Initialize size.

    Converts value to appropriate unit if needed.
    """
    # Convert to appropriate unit if value is large
    if self.unit == Unit.B and self.value >= KIB:
        bytes_ = int(self.value)
        value = float(bytes_)
        unit = Unit.B
        for next_unit in list(Unit)[1:]:  # Skip B
            if value < KIB:
                break
            value /= 1024
            unit = next_unit
        self.value = value
        self.unit = unit

__str__()

Return human-readable string.

Returns:

Type Description
str

Size string (e.g., '1KiB')

Example
str(Size(1.0, Unit.KIB))
'1KiB'
Source code in sts_libs/src/sts/utils/size.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def __str__(self) -> str:
    """Return human-readable string.

    Returns:
        Size string (e.g., '1KiB')

    Example:
        ```python
        str(Size(1.0, Unit.KIB))
        '1KiB'
        ```
    """
    # Remove decimal part if whole number
    if self.value.is_integer():
        return f'{int(self.value)}{self.unit}'
    return f'{self.value:.1f}{self.unit}'

from_bytes(bytes_) classmethod

Convert bytes to human-readable size.

Parameters:

Name Type Description Default
bytes_ int

Size in bytes

required

Returns:

Type Description
Size

Size instance

Example
Size.from_bytes(1024)
Size(value=1.0, unit=Unit.KIB)
Source code in sts_libs/src/sts/utils/size.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
@classmethod
def from_bytes(cls, bytes_: int) -> Size:
    """Convert bytes to human-readable size.

    Args:
        bytes_: Size in bytes

    Returns:
        Size instance

    Example:
        ```python
        Size.from_bytes(1024)
        Size(value=1.0, unit=Unit.KIB)
        ```
    """
    if bytes_ < KIB:
        return cls(float(bytes_), Unit.B)

    value = float(bytes_)
    unit = Unit.B  # Default unit
    for next_unit in list(Unit)[1:]:  # Skip B
        if value < KIB:
            break
        value /= 1024
        unit = next_unit

    return cls(value, unit)

from_string(size) classmethod

Parse size from string.

Parameters:

Name Type Description Default
size str

Size string (e.g., '1KiB')

required

Returns:

Type Description
Size | None

Size instance or None if invalid

Example
Size.from_string('1KiB')
Size(value=1.0, unit=Unit.KIB)
Source code in sts_libs/src/sts/utils/size.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
@classmethod
def from_string(cls, size: str) -> Size | None:
    """Parse size from string.

    Args:
        size: Size string (e.g., '1KiB')

    Returns:
        Size instance or None if invalid

    Example:
        ```python
        Size.from_string('1KiB')
        Size(value=1.0, unit=Unit.KIB)
        ```
    """
    if not size:
        return None

    # Handle pure numbers as bytes
    if size.isdigit():
        return cls(float(size), Unit.B)

    # Parse size with unit
    match = cls.SIZE_PATTERN.match(size)
    if not match:
        logging.error(f'Invalid size format: {size}')
        return None

    try:
        value = float(match.group(1))
        unit_str = match.group(2)
        unit = Unit.B if not unit_str else Unit(f'{unit_str}B')
        return cls(value, unit)
    except (ValueError, KeyError):
        logging.exception('Failed to parse size')
        return None

to_bytes()

Convert to bytes.

Returns:

Type Description
int

Size in bytes

Example
Size(1.0, Unit.KIB).to_bytes()
1024
Source code in sts_libs/src/sts/utils/size.py
145
146
147
148
149
150
151
152
153
154
155
156
157
def to_bytes(self) -> int:
    """Convert to bytes.

    Returns:
        Size in bytes

    Example:
        ```python
        Size(1.0, Unit.KIB).to_bytes()
        1024
        ```
    """
    return int(self.value * self.MULTIPLIERS[self.unit])

Unit

Bases: str, Enum

Size units.

Source code in sts_libs/src/sts/utils/size.py
31
32
33
34
35
36
37
38
39
40
41
42
class Unit(str, Enum):
    """Size units."""

    B = 'B'
    KIB = 'KiB'
    MIB = 'MiB'
    GIB = 'GiB'
    TIB = 'TiB'
    PIB = 'PiB'
    EIB = 'EiB'
    ZIB = 'ZiB'
    YIB = 'YiB'

size_bytes_2_size_human(bytes_)

Convert bytes to human-readable size.

Parameters:

Name Type Description Default
bytes_ int | str | None

Size in bytes

required

Returns:

Type Description
str | None

Human-readable size or None if invalid

Example
size_bytes_2_size_human(1024)
'1KiB'
Source code in sts_libs/src/sts/utils/size.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def size_bytes_2_size_human(bytes_: int | str | None) -> str | None:
    """Convert bytes to human-readable size.

    Args:
        bytes_: Size in bytes

    Returns:
        Human-readable size or None if invalid

    Example:
        ```python
        size_bytes_2_size_human(1024)
        '1KiB'
        ```
    """
    if not bytes_:
        return None

    try:
        size = Size.from_bytes(int(bytes_))
        return str(size)
    except (ValueError, TypeError):
        logging.exception('Invalid bytes value')
        return None

size_human_2_size_bytes(size)

Convert human-readable size to bytes.

Parameters:

Name Type Description Default
size str

Size string (e.g., '1KiB')

required

Returns:

Type Description
int | None

Size in bytes or None if invalid

Example
size_human_2_size_bytes('1KiB')
1024
Source code in sts_libs/src/sts/utils/size.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def size_human_2_size_bytes(size: str) -> int | None:
    """Convert human-readable size to bytes.

    Args:
        size: Size string (e.g., '1KiB')

    Returns:
        Size in bytes or None if invalid

    Example:
        ```python
        size_human_2_size_bytes('1KiB')
        1024
        ```
    """
    if size_obj := Size.from_string(size):
        return size_obj.to_bytes()
    return None

size_human_check(size)

Check if size string is valid.

Parameters:

Name Type Description Default
size str

Size string to check

required

Returns:

Type Description
bool

True if valid, False otherwise

Example
size_human_check('1KiB')
True
Source code in sts_libs/src/sts/utils/size.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def size_human_check(size: str) -> bool:
    """Check if size string is valid.

    Args:
        size: Size string to check

    Returns:
        True if valid, False otherwise

    Example:
        ```python
        size_human_check('1KiB')
        True
        ```
    """
    return Size.from_string(size) is not None

String Utilities

sts.utils.string_extras

String manipulation utilities.

This module provides utilities for string manipulation: - String conversion - String formatting - String validation

none_to_empty(value)

Convert None to empty string.

Parameters:

Name Type Description Default
value str | None

Value to convert

required

Returns:

Type Description
str

Empty string if value is None, otherwise value

Example
none_to_empty(None)
''
none_to_empty('test')
'test'
Source code in sts_libs/src/sts/utils/string_extras.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def none_to_empty(value: str | None) -> str:
    """Convert None to empty string.

    Args:
        value: Value to convert

    Returns:
        Empty string if value is None, otherwise value

    Example:
        ```python
        none_to_empty(None)
        ''
        none_to_empty('test')
        'test'
        ```
    """
    return '' if value is None else value

rand_string(length=8, chars=None)

Generate random string.

Parameters:

Name Type Description Default
length int

Length of string to generate

8
chars str | None

Characters to use for generation (default: ascii_lowercase + digits)

None

Returns:

Type Description
str

Random string of specified length

Example
rand_string()
'a1b2c3d4'
rand_string(4)
'w9x8'
rand_string(4, 'ABC123')
'B1CA'
Source code in sts_libs/src/sts/utils/string_extras.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def rand_string(length: int = 8, chars: str | None = None) -> str:
    """Generate random string.

    Args:
        length: Length of string to generate
        chars: Characters to use for generation (default: ascii_lowercase + digits)

    Returns:
        Random string of specified length

    Example:
        ```python
        rand_string()
        'a1b2c3d4'
        rand_string(4)
        'w9x8'
        rand_string(4, 'ABC123')
        'B1CA'
        ```
    """
    chars = chars or string.ascii_lowercase + string.digits
    return ''.join(random.choices(chars, k=length))

File Operations

sts.utils.files

File operations module.

This module provides functionality for file system operations: - Directory validation - File counting - Path operations - Mount management - Filesystem operations

DirAccessError

Bases: DirectoryError

Directory cannot be accessed.

Source code in sts_libs/src/sts/utils/files.py
47
48
class DirAccessError(DirectoryError):
    """Directory cannot be accessed."""

DirNotFoundError

Bases: DirectoryError

Directory does not exist.

Source code in sts_libs/src/sts/utils/files.py
39
40
class DirNotFoundError(DirectoryError):
    """Directory does not exist."""

DirTypeError

Bases: DirectoryError

Path exists but is not a directory.

Source code in sts_libs/src/sts/utils/files.py
43
44
class DirTypeError(DirectoryError):
    """Path exists but is not a directory."""

Directory dataclass

Directory representation.

Provides functionality for directory operations including: - Existence checking - File counting - Path resolution

Parameters:

Name Type Description Default
path Path

Directory path (optional, defaults to current directory)

cwd()
create bool

Create directory if it doesn't exist (optional)

False
mode int

Directory creation mode (optional)

493
Example
dir = Directory()  # Uses current directory
dir = Directory('/tmp/test')  # Uses specific path
dir = Directory('/tmp/test', create=True)  # Creates if needed
Source code in sts_libs/src/sts/utils/files.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
@dataclass
class Directory:
    """Directory representation.

    Provides functionality for directory operations including:
    - Existence checking
    - File counting
    - Path resolution

    Args:
        path: Directory path (optional, defaults to current directory)
        create: Create directory if it doesn't exist (optional)
        mode: Directory creation mode (optional)

    Example:
        ```python
        dir = Directory()  # Uses current directory
        dir = Directory('/tmp/test')  # Uses specific path
        dir = Directory('/tmp/test', create=True)  # Creates if needed
        ```
    """

    # Required parameters
    path: Path = field(default_factory=Path.cwd)

    # Optional parameters
    create: bool = False
    mode: int = 0o755

    def __post_init__(self) -> None:
        """Initialize directory.

        Creates directory if needed.
        """
        # Create directory if needed
        if self.create and not self.exists:
            try:
                self.path.mkdir(mode=self.mode, parents=True, exist_ok=True)
            except OSError:
                logging.exception('Failed to create directory')

    @property
    def exists(self) -> bool:
        """Check if directory exists and is a directory."""
        return self.path.is_dir()

    def validate(self) -> None:
        """Validate directory exists and is accessible.

        Raises:
            DirNotFoundError: If directory does not exist
            DirTypeError: If path exists but is not a directory
        """
        if not self.path.exists():
            raise DirNotFoundError(f'Directory not found: {self.path}')
        if not self.exists:
            raise DirTypeError(f'Not a directory: {self.path}')

    def iter_files(self) -> Iterator[Path]:
        """Iterate over files in directory.

        Yields:
            Path objects for each file

        Raises:
            DirAccessError: If directory cannot be accessed
        """
        try:
            for item in self.path.iterdir():
                if item.is_file():
                    yield item
        except PermissionError as e:
            logging.exception(f'Permission denied accessing {self.path}')
            raise DirAccessError(f'Permission denied: {self.path}') from e
        except OSError as e:
            logging.exception(f'Error accessing {self.path}')
            raise DirAccessError(f'Error accessing directory: {e}') from e

    @staticmethod
    def should_remove_file_with_pattern(file: Path, pattern: str) -> bool:
        """Check if file should be removed because it contains pattern.

        Args:
            file: File to check
            pattern: Pattern to match in file contents

        Returns:
            True if file contains pattern and should be removed
        """
        try:
            content = file.read_text()
        except (OSError, UnicodeDecodeError):
            logging.exception(f'Error reading {file}')
            return False
        return pattern in content

    @staticmethod
    def should_remove_file_without_pattern(file: Path, pattern: str) -> bool:
        """Check if file should be removed because it does not contain pattern.

        Args:
            file: File to check
            pattern: Pattern to match in file contents

        Returns:
            True if file does not contain pattern and should be removed
        """
        try:
            content = file.read_text()
        except (OSError, UnicodeDecodeError):
            logging.exception(f'Error reading {file}')
            return False
        return pattern not in content

    @staticmethod
    def remove_file(file: Path) -> None:
        """Remove file safely.

        Args:
            file: File to remove
        """
        try:
            file.unlink()
        except OSError:
            logging.exception(f'Error removing {file}')

    def count_files(self) -> int:
        """Count number of files in directory.

        Returns:
            Number of files in directory (excluding directories)

        Raises:
            DirNotFoundError: If directory does not exist
            DirTypeError: If path exists but is not a directory
            DirAccessError: If directory cannot be accessed

        Example:
            ```python
            Directory('/etc').count_files()
            42
            ```
        """
        self.validate()
        return sum(1 for _ in self.iter_files())

    def rm_files_containing(self, pattern: str, *, invert: bool = False) -> None:
        """Delete files containing (or not containing) specific pattern.

        Args:
            pattern: Pattern to match in file contents
            invert: Delete files NOT containing pattern

        Raises:
            DirNotFoundError: If directory does not exist
            DirTypeError: If path exists but is not a directory
            DirAccessError: If directory cannot be accessed

        Example:
            ```python
            Directory('/tmp').rm_files_containing('error')  # Remove files containing 'error'
            Directory('/tmp').rm_files_containing('error', invert=True)  # Remove files NOT containing 'error'
            ```
        """
        self.validate()
        check_func = self.should_remove_file_without_pattern if invert else self.should_remove_file_with_pattern
        for file in self.iter_files():
            if check_func(file, pattern):
                self.remove_file(file)

    def remove_dir(self) -> None:
        """Remove directory and all its contents using shutil.rmtree.

        Raises:
            DirNotFoundError: If directory does not exist
            DirTypeError: If path exists but is not a directory
            DirAccessError: If directory cannot be accessed

        Example:
            ```python
            Directory(Path('/tmp/test')).remove_dir()
            ```
        """
        self.validate()
        try:
            rmtree(self.path)
        except (OSError, PermissionError):
            logging.exception(f'Error removing {self.path}')

exists: bool property

Check if directory exists and is a directory.

__post_init__()

Initialize directory.

Creates directory if needed.

Source code in sts_libs/src/sts/utils/files.py
80
81
82
83
84
85
86
87
88
89
90
def __post_init__(self) -> None:
    """Initialize directory.

    Creates directory if needed.
    """
    # Create directory if needed
    if self.create and not self.exists:
        try:
            self.path.mkdir(mode=self.mode, parents=True, exist_ok=True)
        except OSError:
            logging.exception('Failed to create directory')

count_files()

Count number of files in directory.

Returns:

Type Description
int

Number of files in directory (excluding directories)

Raises:

Type Description
DirNotFoundError

If directory does not exist

DirTypeError

If path exists but is not a directory

DirAccessError

If directory cannot be accessed

Example
Directory('/etc').count_files()
42
Source code in sts_libs/src/sts/utils/files.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def count_files(self) -> int:
    """Count number of files in directory.

    Returns:
        Number of files in directory (excluding directories)

    Raises:
        DirNotFoundError: If directory does not exist
        DirTypeError: If path exists but is not a directory
        DirAccessError: If directory cannot be accessed

    Example:
        ```python
        Directory('/etc').count_files()
        42
        ```
    """
    self.validate()
    return sum(1 for _ in self.iter_files())

iter_files()

Iterate over files in directory.

Yields:

Type Description
Path

Path objects for each file

Raises:

Type Description
DirAccessError

If directory cannot be accessed

Source code in sts_libs/src/sts/utils/files.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def iter_files(self) -> Iterator[Path]:
    """Iterate over files in directory.

    Yields:
        Path objects for each file

    Raises:
        DirAccessError: If directory cannot be accessed
    """
    try:
        for item in self.path.iterdir():
            if item.is_file():
                yield item
    except PermissionError as e:
        logging.exception(f'Permission denied accessing {self.path}')
        raise DirAccessError(f'Permission denied: {self.path}') from e
    except OSError as e:
        logging.exception(f'Error accessing {self.path}')
        raise DirAccessError(f'Error accessing directory: {e}') from e

remove_dir()

Remove directory and all its contents using shutil.rmtree.

Raises:

Type Description
DirNotFoundError

If directory does not exist

DirTypeError

If path exists but is not a directory

DirAccessError

If directory cannot be accessed

Example
Directory(Path('/tmp/test')).remove_dir()
Source code in sts_libs/src/sts/utils/files.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
def remove_dir(self) -> None:
    """Remove directory and all its contents using shutil.rmtree.

    Raises:
        DirNotFoundError: If directory does not exist
        DirTypeError: If path exists but is not a directory
        DirAccessError: If directory cannot be accessed

    Example:
        ```python
        Directory(Path('/tmp/test')).remove_dir()
        ```
    """
    self.validate()
    try:
        rmtree(self.path)
    except (OSError, PermissionError):
        logging.exception(f'Error removing {self.path}')

remove_file(file) staticmethod

Remove file safely.

Parameters:

Name Type Description Default
file Path

File to remove

required
Source code in sts_libs/src/sts/utils/files.py
165
166
167
168
169
170
171
172
173
174
175
@staticmethod
def remove_file(file: Path) -> None:
    """Remove file safely.

    Args:
        file: File to remove
    """
    try:
        file.unlink()
    except OSError:
        logging.exception(f'Error removing {file}')

rm_files_containing(pattern, *, invert=False)

Delete files containing (or not containing) specific pattern.

Parameters:

Name Type Description Default
pattern str

Pattern to match in file contents

required
invert bool

Delete files NOT containing pattern

False

Raises:

Type Description
DirNotFoundError

If directory does not exist

DirTypeError

If path exists but is not a directory

DirAccessError

If directory cannot be accessed

Example
Directory('/tmp').rm_files_containing('error')  # Remove files containing 'error'
Directory('/tmp').rm_files_containing('error', invert=True)  # Remove files NOT containing 'error'
Source code in sts_libs/src/sts/utils/files.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def rm_files_containing(self, pattern: str, *, invert: bool = False) -> None:
    """Delete files containing (or not containing) specific pattern.

    Args:
        pattern: Pattern to match in file contents
        invert: Delete files NOT containing pattern

    Raises:
        DirNotFoundError: If directory does not exist
        DirTypeError: If path exists but is not a directory
        DirAccessError: If directory cannot be accessed

    Example:
        ```python
        Directory('/tmp').rm_files_containing('error')  # Remove files containing 'error'
        Directory('/tmp').rm_files_containing('error', invert=True)  # Remove files NOT containing 'error'
        ```
    """
    self.validate()
    check_func = self.should_remove_file_without_pattern if invert else self.should_remove_file_with_pattern
    for file in self.iter_files():
        if check_func(file, pattern):
            self.remove_file(file)

should_remove_file_with_pattern(file, pattern) staticmethod

Check if file should be removed because it contains pattern.

Parameters:

Name Type Description Default
file Path

File to check

required
pattern str

Pattern to match in file contents

required

Returns:

Type Description
bool

True if file contains pattern and should be removed

Source code in sts_libs/src/sts/utils/files.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
@staticmethod
def should_remove_file_with_pattern(file: Path, pattern: str) -> bool:
    """Check if file should be removed because it contains pattern.

    Args:
        file: File to check
        pattern: Pattern to match in file contents

    Returns:
        True if file contains pattern and should be removed
    """
    try:
        content = file.read_text()
    except (OSError, UnicodeDecodeError):
        logging.exception(f'Error reading {file}')
        return False
    return pattern in content

should_remove_file_without_pattern(file, pattern) staticmethod

Check if file should be removed because it does not contain pattern.

Parameters:

Name Type Description Default
file Path

File to check

required
pattern str

Pattern to match in file contents

required

Returns:

Type Description
bool

True if file does not contain pattern and should be removed

Source code in sts_libs/src/sts/utils/files.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
@staticmethod
def should_remove_file_without_pattern(file: Path, pattern: str) -> bool:
    """Check if file should be removed because it does not contain pattern.

    Args:
        file: File to check
        pattern: Pattern to match in file contents

    Returns:
        True if file does not contain pattern and should be removed
    """
    try:
        content = file.read_text()
    except (OSError, UnicodeDecodeError):
        logging.exception(f'Error reading {file}')
        return False
    return pattern not in content

validate()

Validate directory exists and is accessible.

Raises:

Type Description
DirNotFoundError

If directory does not exist

DirTypeError

If path exists but is not a directory

Source code in sts_libs/src/sts/utils/files.py
 97
 98
 99
100
101
102
103
104
105
106
107
def validate(self) -> None:
    """Validate directory exists and is accessible.

    Raises:
        DirNotFoundError: If directory does not exist
        DirTypeError: If path exists but is not a directory
    """
    if not self.path.exists():
        raise DirNotFoundError(f'Directory not found: {self.path}')
    if not self.exists:
        raise DirTypeError(f'Not a directory: {self.path}')

DirectoryError

Bases: STSError

Base class for directory-related errors.

Source code in sts_libs/src/sts/utils/files.py
35
36
class DirectoryError(STSError):
    """Base class for directory-related errors."""

count_files(directory=None)

Count number of files in directory.

Parameters:

Name Type Description Default
directory str | Path | None

Path to directory to count files in (optional)

None

Returns:

Type Description
int

Number of files in directory (excluding directories)

Raises:

Type Description
DirNotFoundError

If directory does not exist

DirTypeError

If path exists but is not a directory

DirAccessError

If directory cannot be accessed

Example
count_files()  # Count files in current directory
count_files('/etc')  # Count files in specific directory
Source code in sts_libs/src/sts/utils/files.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def count_files(directory: str | Path | None = None) -> int:
    """Count number of files in directory.

    Args:
        directory: Path to directory to count files in (optional)

    Returns:
        Number of files in directory (excluding directories)

    Raises:
        DirNotFoundError: If directory does not exist
        DirTypeError: If path exists but is not a directory
        DirAccessError: If directory cannot be accessed

    Example:
        ```python
        count_files()  # Count files in current directory
        count_files('/etc')  # Count files in specific directory
        ```
    """
    path = Path(directory) if directory else Path.cwd()
    return Directory(path).count_files()

get_free_space(path=None)

Get free space in bytes.

Parameters:

Name Type Description Default
path str | Path | None

Path to check free space for (optional)

None

Returns:

Type Description
int | None

Free space in bytes or None if error

Example
get_free_space()  # Check current directory
get_free_space('/mnt')  # Check specific path
Source code in sts_libs/src/sts/utils/files.py
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
def get_free_space(path: str | Path | None = None) -> int | None:
    """Get free space in bytes.

    Args:
        path: Path to check free space for (optional)

    Returns:
        Free space in bytes or None if error

    Example:
        ```python
        get_free_space()  # Check current directory
        get_free_space('/mnt')  # Check specific path
        ```
    """
    path_str = str(path) if path else '.'
    result = run(f'df -B 1 {path_str}')
    if result.failed:
        logging.error('Failed to get free space')
        return None

    # Parse output like:
    # Filesystem     1B-blocks       Used   Available Use% Mounted on
    # /dev/sda1    1073741824   10485760  1063256064   1% /mnt
    if match := re.search(r'\S+\s+\d+\s+\d+\s+(\d+)', result.stdout):
        return int(match.group(1))

    return None

is_mounted(device=None, mountpoint=None)

Check if device or mountpoint is mounted.

Parameters:

Name Type Description Default
device str | None

Device to check (optional)

None
mountpoint str | None

Mountpoint to check (optional)

None

Returns:

Type Description
bool

True if mounted, False otherwise

Example
is_mounted(device='/dev/sda1')
True
is_mounted(mountpoint='/mnt')
False
Source code in sts_libs/src/sts/utils/files.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
def is_mounted(device: str | None = None, mountpoint: str | None = None) -> bool:
    """Check if device or mountpoint is mounted.

    Args:
        device: Device to check (optional)
        mountpoint: Mountpoint to check (optional)

    Returns:
        True if mounted, False otherwise

    Example:
        ```python
        is_mounted(device='/dev/sda1')
        True
        is_mounted(mountpoint='/mnt')
        False
        ```
    """
    if device:
        return run(f'mount | grep {device}').succeeded
    if mountpoint:
        return run(f'mount | grep {mountpoint}').succeeded
    return False

mkfs(device=None, fs_type=None, *, force=False)

Create filesystem on device.

Parameters:

Name Type Description Default
device str | None

Device to create filesystem on (optional)

None
fs_type str | None

Filesystem type (optional)

None
force bool

Force creation even if filesystem exists (optional)

False

Returns:

Type Description
bool

True if successful, False otherwise

Example
mkfs('/dev/sda1', 'ext4')  # Create ext4 filesystem
mkfs('/dev/sda1', 'ext4', force=True)  # Force creation
Source code in sts_libs/src/sts/utils/files.py
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
def mkfs(device: str | None = None, fs_type: str | None = None, *, force: bool = False) -> bool:
    """Create filesystem on device.

    Args:
        device: Device to create filesystem on (optional)
        fs_type: Filesystem type (optional)
        force: Force creation even if filesystem exists (optional)

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        mkfs('/dev/sda1', 'ext4')  # Create ext4 filesystem
        mkfs('/dev/sda1', 'ext4', force=True)  # Force creation
        ```
    """
    if not device or not fs_type:
        logging.error('Device and filesystem type required')
        return False

    force_option = '-F' if fs_type != 'xfs' else '-f'
    cmd = [f'mkfs.{fs_type}']
    if force:
        cmd.append(force_option)
    cmd.append(device)

    result = run(' '.join(cmd))
    if result.failed:
        logging.error(f'Failed to create {fs_type} filesystem on {device}: {result.stderr}')
        return False
    return True

mount(device=None, mountpoint=None, fs_type=None, options=None)

Mount device at mountpoint.

Parameters:

Name Type Description Default
device str | None

Device to mount (optional)

None
mountpoint str | None

Mountpoint to mount at (optional)

None
fs_type str | None

Filesystem type (optional)

None
options str | None

Mount options (optional)

None

Returns:

Type Description
bool

True if successful, False otherwise

Example
mount('/dev/sda1', '/mnt')  # Basic mount
mount('/dev/sda1', '/mnt', 'ext4', 'ro')  # Mount with options
Source code in sts_libs/src/sts/utils/files.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
def mount(
    device: str | None = None,
    mountpoint: str | None = None,
    fs_type: str | None = None,
    options: str | None = None,
) -> bool:
    """Mount device at mountpoint.

    Args:
        device: Device to mount (optional)
        mountpoint: Mountpoint to mount at (optional)
        fs_type: Filesystem type (optional)
        options: Mount options (optional)

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        mount('/dev/sda1', '/mnt')  # Basic mount
        mount('/dev/sda1', '/mnt', 'ext4', 'ro')  # Mount with options
        ```
    """
    cmd = ['mount']
    if fs_type:
        cmd.extend(['-t', fs_type])
    if options:
        cmd.extend(['-o', options])
    if device:
        cmd.append(device)
    if mountpoint:
        Directory(Path(mountpoint), create=True)
        cmd.append(mountpoint)

    result = run(' '.join(cmd))
    if result.failed:
        logging.error(f'Failed to mount device: {result.stderr}')
        return False
    return True

rm_files_containing(directory=None, pattern='', *, invert=False)

Delete files containing (or not containing) specific pattern.

Parameters:

Name Type Description Default
directory str | Path | None

Directory to search in (optional)

None
pattern str

Pattern to match in file contents (optional)

''
invert bool

Delete files NOT containing pattern (optional)

False

Raises:

Type Description
DirNotFoundError

If directory does not exist

DirTypeError

If path exists but is not a directory

DirAccessError

If directory cannot be accessed

Example
rm_files_containing()  # Remove all files in current directory
rm_files_containing('/tmp', 'error')  # Remove files containing 'error'
rm_files_containing('/tmp', 'error', invert=True)  # Remove files NOT containing 'error'
Source code in sts_libs/src/sts/utils/files.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def rm_files_containing(directory: str | Path | None = None, pattern: str = '', *, invert: bool = False) -> None:
    """Delete files containing (or not containing) specific pattern.

    Args:
        directory: Directory to search in (optional)
        pattern: Pattern to match in file contents (optional)
        invert: Delete files NOT containing pattern (optional)

    Raises:
        DirNotFoundError: If directory does not exist
        DirTypeError: If path exists but is not a directory
        DirAccessError: If directory cannot be accessed

    Example:
        ```python
        rm_files_containing()  # Remove all files in current directory
        rm_files_containing('/tmp', 'error')  # Remove files containing 'error'
        rm_files_containing('/tmp', 'error', invert=True)  # Remove files NOT containing 'error'
        ```
    """
    path = Path(directory) if directory else Path.cwd()
    Directory(path).rm_files_containing(pattern, invert=invert)

umount(device=None, mountpoint=None)

Unmount device or mountpoint.

Parameters:

Name Type Description Default
device str | None

Device to unmount (optional)

None
mountpoint str | None

Mountpoint to unmount (optional)

None

Returns:

Type Description
bool

True if successful, False otherwise

Example
umount('/dev/sda1')  # Unmount device
umount(mountpoint='/mnt')  # Unmount mountpoint
Source code in sts_libs/src/sts/utils/files.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
def umount(device: str | None = None, mountpoint: str | None = None) -> bool:
    """Unmount device or mountpoint.

    Args:
        device: Device to unmount (optional)
        mountpoint: Mountpoint to unmount (optional)

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        umount('/dev/sda1')  # Unmount device
        umount(mountpoint='/mnt')  # Unmount mountpoint
        ```
    """
    if device and not is_mounted(device=device):
        return True
    if mountpoint and not is_mounted(mountpoint=mountpoint):
        return True

    cmd = ['umount']
    if device:
        cmd.append(device)
    if mountpoint:
        cmd.append(mountpoint)

    result = run(' '.join(cmd))
    if result.failed:
        logging.error(f'Failed to unmount device: {result.stderr}')
        return False
    return True

Process Management

sts.utils.processes

Process management.

This module provides functionality for managing system processes: - Process information - Process control - Process monitoring

ProcessInfo dataclass

Process information.

This class provides functionality for managing process information: - Process metadata - Process status - Process control

Parameters:

Name Type Description Default
pid int | None

Process ID (optional, discovered from name)

None
name str | None

Process name (optional, discovered from pid)

None
status str | None

Process status (optional, discovered from process)

None
cmdline str | None

Process command line (optional, discovered from process)

None
Example
info = ProcessInfo()  # Discovers first available process
info = ProcessInfo(pid=1234)  # Discovers other values
info = ProcessInfo(name='sleep')  # Discovers matching process
Source code in sts_libs/src/sts/utils/processes.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
@dataclass
class ProcessInfo:
    """Process information.

    This class provides functionality for managing process information:
    - Process metadata
    - Process status
    - Process control

    Args:
        pid: Process ID (optional, discovered from name)
        name: Process name (optional, discovered from pid)
        status: Process status (optional, discovered from process)
        cmdline: Process command line (optional, discovered from process)

    Example:
        ```python
        info = ProcessInfo()  # Discovers first available process
        info = ProcessInfo(pid=1234)  # Discovers other values
        info = ProcessInfo(name='sleep')  # Discovers matching process
        ```
    """

    # Optional parameters
    pid: int | None = None
    name: str | None = None
    status: str | None = None
    cmdline: str | None = None

    def __post_init__(self) -> None:
        """Initialize process information.

        Discovers process information based on provided parameters.
        """
        # If no parameters provided, get first available process
        if not any([self.pid, self.name]):
            # Get list of process directories
            try:
                proc_entries = [entry for entry in Path('/proc').iterdir() if entry.is_dir()]
            except OSError:
                return

            # Find first numeric directory
            for entry in proc_entries:
                if entry.name.isdigit():
                    self.pid = int(entry.name)
                    break

        # If name provided but no pid, find matching process
        elif self.name and not self.pid:
            # Get list of process directories
            try:
                proc_entries = [entry for entry in Path('/proc').iterdir() if entry.is_dir()]
            except OSError:
                return

            # Find process with matching name
            for entry in proc_entries:
                if not entry.name.isdigit():
                    continue
                comm_path = entry / 'comm'
                try:
                    if comm_path.exists() and comm_path.read_text().strip() == self.name:
                        self.pid = int(entry.name)
                        break
                except (OSError, ValueError):
                    continue

        # If pid found or provided, get other information
        if self.pid:
            proc_path = Path('/proc') / str(self.pid)

            try:
                # Get process name if not provided
                if not self.name:
                    comm_path = proc_path / 'comm'
                    if comm_path.exists():
                        self.name = comm_path.read_text().strip()

                # Get process status
                status_path = proc_path / 'status'
                if status_path.exists():
                    status_content = status_path.read_text().splitlines()
                    for line in status_content:
                        if line.startswith('State:'):
                            self.status = line.split(':')[1].strip()
                            break

                # Get process command line
                cmdline_path = proc_path / 'cmdline'
                if cmdline_path.exists():
                    self.cmdline = cmdline_path.read_text().strip('\x00').replace('\x00', ' ')

            except OSError:
                logging.exception(f'Failed to get process info for PID {self.pid}')

    @property
    def exists(self) -> bool:
        """Check if process exists.

        Returns:
            True if exists, False otherwise

        Example:
            ```python
            info = ProcessInfo(pid=1234)
            info.exists
            True
            ```
        """
        return bool(self.pid and Path('/proc', str(self.pid)).exists())

    @property
    def running(self) -> bool:
        """Check if process is running.

        Returns:
            True if running, False otherwise

        Example:
            ```python
            info = ProcessInfo(pid=1234)
            info.running
            True
            ```
        """
        if not self.pid:
            return False

        try:
            os.kill(self.pid, 0)
        except ProcessLookupError:
            return False
        except PermissionError:
            # Process exists but we don't have permission to send signals
            return True
        return True

    def kill(self, timeout: float = 1.0) -> bool:
        """Kill process.

        Args:
            timeout: Timeout in seconds between SIGTERM and SIGKILL

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            info = ProcessInfo(pid=1234)
            info.kill()
            True
            ```
        """
        if not self.pid:
            return False

        if not self.running:
            return True

        try:
            # Try SIGTERM first
            os.kill(self.pid, signal.SIGTERM)
            time.sleep(timeout)

            # If still running, use SIGKILL
            if self.running:
                os.kill(self.pid, signal.SIGKILL)
                time.sleep(timeout)

                # Check if process is still running
                if self.running:
                    logging.error(f'Failed to kill process {self.pid}')
                    return False

        except ProcessLookupError:
            # Process already terminated
            pass
        except PermissionError:
            logging.exception(f'Permission denied killing process {self.pid}')
            return False

        return True

    @classmethod
    def from_pid(cls, pid: int) -> ProcessInfo | None:
        """Get process information by PID.

        Args:
            pid: Process ID

        Returns:
            Process information or None if not found

        Example:
            ```python
            info = ProcessInfo.from_pid(1234)
            info.status
            'running'
            ```
        """
        info = cls(pid=pid)
        return info if info.exists else None

    @classmethod
    def from_name(cls, name: str) -> ProcessInfo | None:
        """Get process information by name.

        Args:
            name: Process name

        Returns:
            Process information or None if not found

        Example:
            ```python
            info = ProcessInfo.from_name('sleep')
            info.pid
            1234
            ```
        """
        info = cls(name=name)
        return info if info.exists else None

exists: bool property

Check if process exists.

Returns:

Type Description
bool

True if exists, False otherwise

Example
info = ProcessInfo(pid=1234)
info.exists
True

running: bool property

Check if process is running.

Returns:

Type Description
bool

True if running, False otherwise

Example
info = ProcessInfo(pid=1234)
info.running
True

__post_init__()

Initialize process information.

Discovers process information based on provided parameters.

Source code in sts_libs/src/sts/utils/processes.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def __post_init__(self) -> None:
    """Initialize process information.

    Discovers process information based on provided parameters.
    """
    # If no parameters provided, get first available process
    if not any([self.pid, self.name]):
        # Get list of process directories
        try:
            proc_entries = [entry for entry in Path('/proc').iterdir() if entry.is_dir()]
        except OSError:
            return

        # Find first numeric directory
        for entry in proc_entries:
            if entry.name.isdigit():
                self.pid = int(entry.name)
                break

    # If name provided but no pid, find matching process
    elif self.name and not self.pid:
        # Get list of process directories
        try:
            proc_entries = [entry for entry in Path('/proc').iterdir() if entry.is_dir()]
        except OSError:
            return

        # Find process with matching name
        for entry in proc_entries:
            if not entry.name.isdigit():
                continue
            comm_path = entry / 'comm'
            try:
                if comm_path.exists() and comm_path.read_text().strip() == self.name:
                    self.pid = int(entry.name)
                    break
            except (OSError, ValueError):
                continue

    # If pid found or provided, get other information
    if self.pid:
        proc_path = Path('/proc') / str(self.pid)

        try:
            # Get process name if not provided
            if not self.name:
                comm_path = proc_path / 'comm'
                if comm_path.exists():
                    self.name = comm_path.read_text().strip()

            # Get process status
            status_path = proc_path / 'status'
            if status_path.exists():
                status_content = status_path.read_text().splitlines()
                for line in status_content:
                    if line.startswith('State:'):
                        self.status = line.split(':')[1].strip()
                        break

            # Get process command line
            cmdline_path = proc_path / 'cmdline'
            if cmdline_path.exists():
                self.cmdline = cmdline_path.read_text().strip('\x00').replace('\x00', ' ')

        except OSError:
            logging.exception(f'Failed to get process info for PID {self.pid}')

from_name(name) classmethod

Get process information by name.

Parameters:

Name Type Description Default
name str

Process name

required

Returns:

Type Description
ProcessInfo | None

Process information or None if not found

Example
info = ProcessInfo.from_name('sleep')
info.pid
1234
Source code in sts_libs/src/sts/utils/processes.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
@classmethod
def from_name(cls, name: str) -> ProcessInfo | None:
    """Get process information by name.

    Args:
        name: Process name

    Returns:
        Process information or None if not found

    Example:
        ```python
        info = ProcessInfo.from_name('sleep')
        info.pid
        1234
        ```
    """
    info = cls(name=name)
    return info if info.exists else None

from_pid(pid) classmethod

Get process information by PID.

Parameters:

Name Type Description Default
pid int

Process ID

required

Returns:

Type Description
ProcessInfo | None

Process information or None if not found

Example
info = ProcessInfo.from_pid(1234)
info.status
'running'
Source code in sts_libs/src/sts/utils/processes.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
@classmethod
def from_pid(cls, pid: int) -> ProcessInfo | None:
    """Get process information by PID.

    Args:
        pid: Process ID

    Returns:
        Process information or None if not found

    Example:
        ```python
        info = ProcessInfo.from_pid(1234)
        info.status
        'running'
        ```
    """
    info = cls(pid=pid)
    return info if info.exists else None

kill(timeout=1.0)

Kill process.

Parameters:

Name Type Description Default
timeout float

Timeout in seconds between SIGTERM and SIGKILL

1.0

Returns:

Type Description
bool

True if successful, False otherwise

Example
info = ProcessInfo(pid=1234)
info.kill()
True
Source code in sts_libs/src/sts/utils/processes.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def kill(self, timeout: float = 1.0) -> bool:
    """Kill process.

    Args:
        timeout: Timeout in seconds between SIGTERM and SIGKILL

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        info = ProcessInfo(pid=1234)
        info.kill()
        True
        ```
    """
    if not self.pid:
        return False

    if not self.running:
        return True

    try:
        # Try SIGTERM first
        os.kill(self.pid, signal.SIGTERM)
        time.sleep(timeout)

        # If still running, use SIGKILL
        if self.running:
            os.kill(self.pid, signal.SIGKILL)
            time.sleep(timeout)

            # Check if process is still running
            if self.running:
                logging.error(f'Failed to kill process {self.pid}')
                return False

    except ProcessLookupError:
        # Process already terminated
        pass
    except PermissionError:
        logging.exception(f'Permission denied killing process {self.pid}')
        return False

    return True

ProcessManager

Process manager functionality.

This class provides functionality for managing system processes: - Process control - Process monitoring - Process information

Example
pm = ProcessManager()
pm.kill_all('sleep')
True
Source code in sts_libs/src/sts/utils/processes.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
class ProcessManager:
    """Process manager functionality.

    This class provides functionality for managing system processes:
    - Process control
    - Process monitoring
    - Process information

    Example:
        ```python
        pm = ProcessManager()
        pm.kill_all('sleep')
        True
        ```
    """

    def __init__(self) -> None:
        """Initialize process manager."""
        self.proc_path = Path('/proc')

    def get_all(self) -> list[ProcessInfo]:
        """Get list of all processes.

        Returns:
            List of process information

        Example:
            ```python
            pm = ProcessManager()
            pm.get_all()
            [ProcessInfo(pid=1, ...), ProcessInfo(pid=2, ...)]
            ```
        """
        processes = []

        try:
            # Get list of process directories
            proc_entries = [entry for entry in self.proc_path.iterdir() if entry.is_dir()]

            # Filter numeric directories and create processes
            for entry in proc_entries:
                if entry.name.isdigit():
                    pid = int(entry.name)
                    if info := ProcessInfo.from_pid(pid):
                        processes.append(info)

        except OSError:
            logging.exception('Failed to get process list')
            return []

        return processes

    def get_by_name(self, name: str) -> list[ProcessInfo]:
        """Get processes by name.

        Args:
            name: Process name

        Returns:
            List of matching processes

        Example:
            ```python
            pm = ProcessManager()
            pm.get_by_name('sleep')
            [ProcessInfo(pid=1234, name='sleep', ...)]
            ```
        """
        return [p for p in self.get_all() if p.name == name]

    def kill_all(self, name: str, timeout: float = 1.0) -> bool:
        """Kill all processes by name.

        Args:
            name: Process name
            timeout: Timeout in seconds between SIGTERM and SIGKILL

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pm = ProcessManager()
            pm.kill_all('sleep')
            True
            ```
        """
        result = run(f'killall {name}')
        if result.failed:
            return False

        # Wait for processes to finish
        time.sleep(timeout)

        # Check if any processes still running
        return not bool(self.get_by_name(name))

__init__()

Initialize process manager.

Source code in sts_libs/src/sts/utils/processes.py
264
265
266
def __init__(self) -> None:
    """Initialize process manager."""
    self.proc_path = Path('/proc')

get_all()

Get list of all processes.

Returns:

Type Description
list[ProcessInfo]

List of process information

Example
pm = ProcessManager()
pm.get_all()
[ProcessInfo(pid=1, ...), ProcessInfo(pid=2, ...)]
Source code in sts_libs/src/sts/utils/processes.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
def get_all(self) -> list[ProcessInfo]:
    """Get list of all processes.

    Returns:
        List of process information

    Example:
        ```python
        pm = ProcessManager()
        pm.get_all()
        [ProcessInfo(pid=1, ...), ProcessInfo(pid=2, ...)]
        ```
    """
    processes = []

    try:
        # Get list of process directories
        proc_entries = [entry for entry in self.proc_path.iterdir() if entry.is_dir()]

        # Filter numeric directories and create processes
        for entry in proc_entries:
            if entry.name.isdigit():
                pid = int(entry.name)
                if info := ProcessInfo.from_pid(pid):
                    processes.append(info)

    except OSError:
        logging.exception('Failed to get process list')
        return []

    return processes

get_by_name(name)

Get processes by name.

Parameters:

Name Type Description Default
name str

Process name

required

Returns:

Type Description
list[ProcessInfo]

List of matching processes

Example
pm = ProcessManager()
pm.get_by_name('sleep')
[ProcessInfo(pid=1234, name='sleep', ...)]
Source code in sts_libs/src/sts/utils/processes.py
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def get_by_name(self, name: str) -> list[ProcessInfo]:
    """Get processes by name.

    Args:
        name: Process name

    Returns:
        List of matching processes

    Example:
        ```python
        pm = ProcessManager()
        pm.get_by_name('sleep')
        [ProcessInfo(pid=1234, name='sleep', ...)]
        ```
    """
    return [p for p in self.get_all() if p.name == name]

kill_all(name, timeout=1.0)

Kill all processes by name.

Parameters:

Name Type Description Default
name str

Process name

required
timeout float

Timeout in seconds between SIGTERM and SIGKILL

1.0

Returns:

Type Description
bool

True if successful, False otherwise

Example
pm = ProcessManager()
pm.kill_all('sleep')
True
Source code in sts_libs/src/sts/utils/processes.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
def kill_all(self, name: str, timeout: float = 1.0) -> bool:
    """Kill all processes by name.

    Args:
        name: Process name
        timeout: Timeout in seconds between SIGTERM and SIGKILL

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pm = ProcessManager()
        pm.kill_all('sleep')
        True
        ```
    """
    result = run(f'killall {name}')
    if result.failed:
        return False

    # Wait for processes to finish
    time.sleep(timeout)

    # Check if any processes still running
    return not bool(self.get_by_name(name))

System Checks

sts.utils.syscheck

System state checker.

This module provides functionality to check system state: - Kernel state (tainted, dumps) - System logs (dmesg, messages) - Crash dumps (kdump, abrt)

abrt_check()

Check if abrtd found any issues.

Returns:

Type Description
bool

True if no errors found, False otherwise

Example
abrt_check()
True
Source code in sts_libs/src/sts/utils/syscheck.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def abrt_check() -> bool:
    """Check if abrtd found any issues.

    Returns:
        True if no errors found, False otherwise

    Example:
        ```python
        abrt_check()
        True
        ```
    """
    logging.info('Checking abrt for errors')

    # Check if abrt is installed
    result = run('rpm -q abrt')
    if result.failed:
        logging.warning('abrt not installed, skipping check')
        return True

    # Check if abrtd is running
    result = run('pidof abrtd')
    if result.failed:
        logging.error('abrtd not running')
        return False

    # Check for issues
    result = run('abrt-cli list')
    if result.failed:
        logging.error('abrt-cli failed')
        return False

    # Parse output for directories
    error = False
    for line in result.stdout.splitlines():
        if match := re.match(r'Directory:\s+(\S+)', line):
            directory = match.group(1)
            filename = f'{directory.replace(":", "-")}.tar.gz'
            logging.info(f'Found abrt issue: {filename}')

            # Archive directory
            run(f'tar cfzP {filename} {directory}')
            target = LOG_PATH / filename
            Path(filename).rename(target)

            # Remove from abrt to avoid affecting next test
            run(f'abrt-cli rm {directory}')
            error = True

    if error:
        logging.error('Found abrt errors')
        return False

    logging.info('No abrt errors found')
    return True

check_all()

Check for errors on the system.

Returns:

Type Description
bool

True if no errors found, False otherwise

Example
check_all()
True
Source code in sts_libs/src/sts/utils/syscheck.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def check_all() -> bool:
    """Check for errors on the system.

    Returns:
        True if no errors found, False otherwise

    Example:
        ```python
        check_all()
        True
        ```
    """
    logging.info('Checking for errors on the system')
    error_count = 0

    # Run all checks
    checks = [
        kernel_check,
        abrt_check,
        messages_dump_check,
        dmesg_check,
        kdump_check,
    ]

    for check in checks:
        if not check():
            error_count += 1

    if error_count:
        # Save system logs
        messages = Path('/var/log/messages')
        if messages.is_file():
            logging.info('Saving /var/log/messages')
            target = LOG_PATH / 'messages.log'
            target.write_bytes(messages.read_bytes())

        # Generate sosreport if available
        # testinfra host.package.is_installed does indeed work, but type checkers doesn't like it too much
        if host.package('sos').is_installed and (sos_file := system.generate_sosreport()):  # pyright: ignore[reportUnknownMemberType, reportCallIssue, reportAttributeAccessIssue]
            logging.info(f'Generated sosreport: {sos_file}')

        return False

    return True

dmesg_check()

Check for errors in dmesg.

Returns:

Type Description
bool

True if no errors found, False otherwise

Example
dmesg_check()
True
Source code in sts_libs/src/sts/utils/syscheck.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def dmesg_check() -> bool:
    """Check for errors in dmesg.

    Returns:
        True if no errors found, False otherwise

    Example:
        ```python
        dmesg_check()
        True
        ```
    """
    logging.info('Checking dmesg for errors')

    for msg in ERROR_MSGS:
        result = run(f"dmesg | grep -i '{msg}'")
        if result.succeeded:
            logging.error(f'Found error in dmesg: {msg}')
            target = LOG_PATH / 'dmesg.log'
            target.write_text(run('dmesg').stdout)
            return False

    run('dmesg -c')  # Clear dmesg
    logging.info('No errors found in dmesg')
    return True

kdump_check()

Check for kdump crash files.

Returns:

Type Description
bool

True if no crashes found, False otherwise

Example
kdump_check()
True
Source code in sts_libs/src/sts/utils/syscheck.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
def kdump_check() -> bool:
    """Check for kdump crash files.

    Returns:
        True if no crashes found, False otherwise

    Example:
        ```python
        kdump_check()
        True
        ```
    """
    logging.info('Checking for kdump crashes')

    # Get hostname
    result = run('hostname')
    if result.failed:
        logging.error('Failed to get hostname')
        return False

    hostname = result.stdout.strip()
    crash_dir = Path('/var/crash') / hostname

    if not crash_dir.exists():
        logging.info('No kdump directory found')
        return True

    # Get crash timestamps
    crashes = []
    for crash in crash_dir.iterdir():
        if match := re.match(r'.*?-(.*)', crash.name):
            date = match.group(1).replace('.', '-')
            index = date.rfind('-')
            date = f'{date[:index]} {date[index + 1:]}'
            crashes.append(date)

    if not crashes:
        logging.info('No kdump crashes found')
        return True

    # Check crash times
    now = datetime.now(timezone.utc).timestamp()
    for crash in crashes:
        result = run(f'date --date="{crash}" +%s')
        if result.failed:
            logging.warning(f'Failed to parse crash date: {crash}')
            continue

        crash_time = int(result.stdout)
        if crash_time > now - 86400:  # Last 24 hours
            logging.error(f'Found recent crash: {crash}')
            return False

    logging.info('No recent kdump crashes found')
    return True

kernel_check()

Check if kernel is tainted.

Returns:

Type Description
bool

True if kernel is not tainted, False otherwise

Example
kernel_check()
True
Source code in sts_libs/src/sts/utils/syscheck.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def kernel_check() -> bool:
    """Check if kernel is tainted.

    Returns:
        True if kernel is not tainted, False otherwise

    Example:
        ```python
        kernel_check()
        True
        ```
    """
    logging.info('Checking for tainted kernel')

    # Get current tainted value
    result = run('cat /proc/sys/kernel/tainted')
    if result.failed:
        logging.error('Failed to get tainted value')
        return False

    tainted = int(result.stdout)
    if tainted == 0:
        return True

    logging.warning('Kernel is tainted!')

    # Check tainted bits
    bit = 0
    value = tainted
    while value:
        if value & 1:
            logging.info(f'TAINT bit {bit} is set')
        bit += 1
        value >>= 1

    # List tainted module definitions
    result = run('cat /usr/src/kernels/`uname -r`/include/linux/kernel.h | grep TAINT_')
    if result.succeeded:
        logging.info('Tainted bit definitions:')
        logging.info(result.stdout)

    # Check for tainted modules
    result = run("cat /proc/modules | grep -e '(.*)' | cut -d' ' -f1")
    if result.failed:
        return False

    # Skip certain modules
    ignore_modules = {'ocrdma', 'nvme_fc', 'nvmet_fc'}  # Tech Preview modules
    found_issue = False

    for module in result.stdout.splitlines():
        if not module:
            continue

        logging.info(f'Found tainted module: {module}')
        run(f'modinfo {module}')

        if module in ignore_modules:
            logging.info(f'Ignoring known tainted module: {module}')
            continue

        found_issue = True

    return not found_issue

messages_dump_check()

Check for kernel dumps in system messages.

Returns:

Type Description
bool

True if no dumps found, False otherwise

Example
messages_dump_check()
True
Source code in sts_libs/src/sts/utils/syscheck.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def messages_dump_check() -> bool:
    """Check for kernel dumps in system messages.

    Returns:
        True if no dumps found, False otherwise

    Example:
        ```python
        messages_dump_check()
        True
        ```
    """
    logging.info('Checking for kernel dumps')

    messages = Path('/var/log/messages')
    if not messages.is_file():
        logging.warning('No messages file found')
        return True

    # Read messages file
    try:
        content = messages.read_text()
    except UnicodeDecodeError:
        content = messages.read_text(encoding='latin-1')

    # Search for dumps
    begin = r'\[ cut here \]'
    end = r'\[ end trace '
    pattern = f'{begin}(.*?){end}'

    if dumps := re.findall(pattern, content, re.MULTILINE):
        logging.error('Found kernel dumps:')
        for dump in dumps:
            logging.error(dump)
        return False

    logging.info('No kernel dumps found')
    return True

Host Information

sts.utils.host

Host management utilities.

This module provides functionality for managing the test host: - Host initialization - Package installation - Command execution

host_init()

Initialize testinfra host with local backend.

Returns:

Type Description
Host

Host instance configured for local backend

Example
host = host_init()
host.exists('rpm')
True
Source code in sts_libs/src/sts/utils/host.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def host_init() -> Host:
    """Initialize testinfra host with local backend.

    Returns:
        Host instance configured for local backend

    Example:
        ```python
        host = host_init()
        host.exists('rpm')
        True
        ```
    """
    return Host.get_host('local://')

Error Handling

sts.utils.errors

Storage Test Suite error classes.

This module provides a hierarchy of error classes used across the project:

Base Classes: - STSError: Base class for all STS exceptions - DeviceError: Base class for device-related errors - ModuleError: Base class for kernel module errors - PackageError: Base class for package-related errors - SysError: Base class for system-related errors

Device Errors: - DeviceNotFoundError: Device does not exist - DeviceTypeError: Device is not of expected type

Module Errors: - ModuleLoadError: Failed to load kernel module - ModuleUnloadError: Failed to unload kernel module - ModuleInUseError: Module cannot be unloaded because it is in use

Package Errors: - PackageNotFoundError: Package does not exist - PackageInstallError: Failed to install package

System Errors: - SysNotSupportedError: Operation not supported on this system

DeviceError

Bases: STSError

Base class for device-related errors.

Source code in sts_libs/src/sts/utils/errors.py
38
39
class DeviceError(STSError):
    """Base class for device-related errors."""

DeviceNotFoundError

Bases: DeviceError

Device does not exist.

Source code in sts_libs/src/sts/utils/errors.py
42
43
class DeviceNotFoundError(DeviceError):
    """Device does not exist."""

DeviceTypeError

Bases: DeviceError

Device is not of expected type.

Source code in sts_libs/src/sts/utils/errors.py
46
47
class DeviceTypeError(DeviceError):
    """Device is not of expected type."""

ModuleError

Bases: STSError

Base class for kernel module errors.

Source code in sts_libs/src/sts/utils/errors.py
50
51
class ModuleError(STSError):
    """Base class for kernel module errors."""

ModuleInUseError

Bases: ModuleError

Module cannot be unloaded because it is in use.

Source code in sts_libs/src/sts/utils/errors.py
62
63
class ModuleInUseError(ModuleError):
    """Module cannot be unloaded because it is in use."""

ModuleLoadError

Bases: ModuleError

Failed to load kernel module.

Source code in sts_libs/src/sts/utils/errors.py
54
55
class ModuleLoadError(ModuleError):
    """Failed to load kernel module."""

ModuleUnloadError

Bases: ModuleError

Failed to unload kernel module.

Source code in sts_libs/src/sts/utils/errors.py
58
59
class ModuleUnloadError(ModuleError):
    """Failed to unload kernel module."""

PackageError

Bases: STSError

Base class for package-related errors.

Source code in sts_libs/src/sts/utils/errors.py
66
67
class PackageError(STSError):
    """Base class for package-related errors."""

PackageInstallError

Bases: PackageError

Failed to install package.

Source code in sts_libs/src/sts/utils/errors.py
74
75
class PackageInstallError(PackageError):
    """Failed to install package."""

PackageNotFoundError

Bases: PackageError

Package does not exist.

Source code in sts_libs/src/sts/utils/errors.py
70
71
class PackageNotFoundError(PackageError):
    """Package does not exist."""

STSError

Bases: Exception

Base class for all STS exceptions.

Source code in sts_libs/src/sts/utils/errors.py
34
35
class STSError(Exception):
    """Base class for all STS exceptions."""

SysError

Bases: STSError

Base class for system-related errors.

Source code in sts_libs/src/sts/utils/errors.py
78
79
class SysError(STSError):
    """Base class for system-related errors."""

SysNotSupportedError

Bases: SysError

Operation not supported on this system.

Source code in sts_libs/src/sts/utils/errors.py
82
83
class SysNotSupportedError(SysError):
    """Operation not supported on this system."""

TMT Integration

sts.utils.tmt

Test Management Tool utilities.

This module provides functionality for working with tmt: - Test result management - Log gathering - Duration calculation - Custom result submission

For more information about tmt, see: https://tmt.readthedocs.io/en/stable/spec/tests.html#result https://tmt.readthedocs.io/en/stable/spec/plans.html#spec-plans-results

CustomResults

Bases: TypedDict

Custom test results type.

Attributes:

Name Type Description
name str

Result name (e.g. "/step-1" or "/setup/iscsi/target")

result TmtResult

Test result

note str | None

Additional notes

log list[str] | None

Paths to log files

serialnumber int | None

Serial number in test sequence

guest GuestType | None

Guest information

duration str | None

Test duration

ids dict[str, str] | None

Additional identifiers

Source code in sts_libs/src/sts/utils/tmt.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
class CustomResults(TypedDict):
    """Custom test results type.

    Attributes:
        name: Result name (e.g. "/step-1" or "/setup/iscsi/target")
        result: Test result
        note: Additional notes
        log: Paths to log files
        serialnumber: Serial number in test sequence
        guest: Guest information
        duration: Test duration
        ids: Additional identifiers
    """

    name: str
    result: TmtResult
    note: str | None
    log: list[str] | None
    serialnumber: int | None
    guest: GuestType | None
    duration: str | None
    ids: dict[str, str] | None

GuestType

Bases: TypedDict

Guest information type.

Attributes:

Name Type Description
name str | None

Guest name

role str | None

Guest role

Source code in sts_libs/src/sts/utils/tmt.py
105
106
107
108
109
110
111
112
113
114
class GuestType(TypedDict):
    """Guest information type.

    Attributes:
        name: Guest name
        role: Guest role
    """

    name: str | None
    role: str | None

Results

TMT test results management.

This class provides functionality for managing TMT test results: - Adding test results - Managing logs - Calculating durations - Submitting results

Example
results = Results()
results.add(name='setup', result='pass')
results.add(name='test', result='pass', log=['test.log'])
results.submit()
Source code in sts_libs/src/sts/utils/tmt.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
class Results:
    """TMT test results management.

    This class provides functionality for managing TMT test results:
    - Adding test results
    - Managing logs
    - Calculating durations
    - Submitting results

    Example:
        ```python
        results = Results()
        results.add(name='setup', result='pass')
        results.add(name='test', result='pass', log=['test.log'])
        results.submit()
        ```
    """

    def __init__(self) -> None:
        """Initialize results manager."""
        self.results: list[dict[str, Any]] = []
        self.timestamp = timestamp()

    def add(
        self,
        name: str = '/',
        result: TmtResult = 'pass',
        note: str | None = None,
        log: list[str] | None = None,
        errors: list[str] | None = None,
    ) -> None:
        """Add test result.

        When TMT plan is set to 'result: custom', use this followed by submit()
        to create the necessary result.json. Use multiple times when test has
        distinctive steps (parts).

        Args:
            name: Result name (e.g. '/setup/something' or 'setup')
            result: Test result
            note: Additional notes
            log: Paths to log files (relative to TMT_TEST_DATA)
            errors: Error messages (sets result to 'fail' if present)

        Example:
            ```python
            results = Results()
            results.add(
                name='setup',
                result='pass',
                log=['setup.log'],
            )
            ```
        """
        if not name.startswith('/'):
            name = f'/{name}'
        if errors:
            result = 'fail'

        # Calculate duration
        new_timestamp = timestamp()
        duration = calculate_duration(self.timestamp, new_timestamp)
        self.timestamp = new_timestamp

        # Create result
        result_to_add = CustomResults(
            name=name,
            result=result,
            note=note,
            log=log,
            duration=duration,
            ids=None,
            serialnumber=None,
            guest=None,
        )

        self.results.append(remove_nones(result_to_add))

    def submit(self) -> None:
        """Submit test results.

        Creates results.json file in TMT_TEST_DATA directory.

        Example:
            ```python
            results = Results()
            results.add(name='test', result='pass')
            results.submit()  # Creates results.json
            ```
        """
        file = Path(TMT_TEST_DATA / 'results.json')
        with file.open('w') as f:
            json.dump(self.results, f)

__init__()

Initialize results manager.

Source code in sts_libs/src/sts/utils/tmt.py
181
182
183
184
def __init__(self) -> None:
    """Initialize results manager."""
    self.results: list[dict[str, Any]] = []
    self.timestamp = timestamp()

add(name='/', result='pass', note=None, log=None, errors=None)

Add test result.

When TMT plan is set to 'result: custom', use this followed by submit() to create the necessary result.json. Use multiple times when test has distinctive steps (parts).

Parameters:

Name Type Description Default
name str

Result name (e.g. '/setup/something' or 'setup')

'/'
result TmtResult

Test result

'pass'
note str | None

Additional notes

None
log list[str] | None

Paths to log files (relative to TMT_TEST_DATA)

None
errors list[str] | None

Error messages (sets result to 'fail' if present)

None
Example
results = Results()
results.add(
    name='setup',
    result='pass',
    log=['setup.log'],
)
Source code in sts_libs/src/sts/utils/tmt.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def add(
    self,
    name: str = '/',
    result: TmtResult = 'pass',
    note: str | None = None,
    log: list[str] | None = None,
    errors: list[str] | None = None,
) -> None:
    """Add test result.

    When TMT plan is set to 'result: custom', use this followed by submit()
    to create the necessary result.json. Use multiple times when test has
    distinctive steps (parts).

    Args:
        name: Result name (e.g. '/setup/something' or 'setup')
        result: Test result
        note: Additional notes
        log: Paths to log files (relative to TMT_TEST_DATA)
        errors: Error messages (sets result to 'fail' if present)

    Example:
        ```python
        results = Results()
        results.add(
            name='setup',
            result='pass',
            log=['setup.log'],
        )
        ```
    """
    if not name.startswith('/'):
        name = f'/{name}'
    if errors:
        result = 'fail'

    # Calculate duration
    new_timestamp = timestamp()
    duration = calculate_duration(self.timestamp, new_timestamp)
    self.timestamp = new_timestamp

    # Create result
    result_to_add = CustomResults(
        name=name,
        result=result,
        note=note,
        log=log,
        duration=duration,
        ids=None,
        serialnumber=None,
        guest=None,
    )

    self.results.append(remove_nones(result_to_add))

submit()

Submit test results.

Creates results.json file in TMT_TEST_DATA directory.

Example
results = Results()
results.add(name='test', result='pass')
results.submit()  # Creates results.json
Source code in sts_libs/src/sts/utils/tmt.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
def submit(self) -> None:
    """Submit test results.

    Creates results.json file in TMT_TEST_DATA directory.

    Example:
        ```python
        results = Results()
        results.add(name='test', result='pass')
        results.submit()  # Creates results.json
        ```
    """
    file = Path(TMT_TEST_DATA / 'results.json')
    with file.open('w') as f:
        json.dump(self.results, f)

calculate_duration(start, end)

Calculate duration between timestamps.

Parameters:

Name Type Description Default
start float

Start timestamp

required
end float

End timestamp

required

Returns:

Type Description
str

Duration in hh:mm:ss format

Example
calculate_duration(1677721600.0, 1677725200.0)
'01:00:00'
Source code in sts_libs/src/sts/utils/tmt.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def calculate_duration(start: float, end: float) -> str:
    """Calculate duration between timestamps.

    Args:
        start: Start timestamp
        end: End timestamp

    Returns:
        Duration in hh:mm:ss format

    Example:
        ```python
        calculate_duration(1677721600.0, 1677725200.0)
        '01:00:00'
        ```
    """
    secs = int(end - start)
    return f'{secs // 3600:02d}:{secs % 3600 // 60:02d}:{secs % 60:02d}'

gather_logs_from_dir(logs_path, name)

Gather logs from directory into a tarfile.

Parameters:

Name Type Description Default
logs_path str

Path to directory containing logs

required
name str | None

Name for the tarfile (optional)

required

Returns:

Type Description
Path | None

Path to created tarfile or None if directory doesn't exist

Example
gather_logs_from_dir('/var/log/messages', 'system_logs')
Path('/var/tmp/uuid/system_logs.tar')
Source code in sts_libs/src/sts/utils/tmt.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def gather_logs_from_dir(logs_path: str, name: str | None) -> Path | None:
    """Gather logs from directory into a tarfile.

    Args:
        logs_path: Path to directory containing logs
        name: Name for the tarfile (optional)

    Returns:
        Path to created tarfile or None if directory doesn't exist

    Example:
        ```python
        gather_logs_from_dir('/var/log/messages', 'system_logs')
        Path('/var/tmp/uuid/system_logs.tar')
        ```
    """
    path = Path(logs_path)
    if not path.is_dir():
        return None

    # Generate tarfile name
    if not name:
        name = str(path).replace('/', '_')
    if '.tar' not in name:
        name = f'{name}.tar'

    # Create tarfile
    tarfile_path = f'{TMT_TEST_DATA}/{name}'
    with tarfile.open(tarfile_path, 'w') as tar:
        tar.add(path, recursive=True)
    return Path(tarfile_path)

remove_nones(cr)

Remove None values from custom results.

Parameters:

Name Type Description Default
cr CustomResults

Custom results dictionary

required

Returns:

Type Description
dict[str, Any]

Dictionary with None values removed

Example
remove_nones({'name': 'test', 'note': None, 'result': 'pass'})
{'name': 'test', 'result': 'pass'}
Source code in sts_libs/src/sts/utils/tmt.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def remove_nones(cr: CustomResults) -> dict[str, Any]:
    """Remove None values from custom results.

    Args:
        cr: Custom results dictionary

    Returns:
        Dictionary with None values removed

    Example:
        ```python
        remove_nones({'name': 'test', 'note': None, 'result': 'pass'})
        {'name': 'test', 'result': 'pass'}
        ```
    """
    return {k: v for k, v in cr.items() if v is not None}

timestamp()

Get current timestamp.

Returns:

Type Description
float

Current time in seconds since epoch

Example
timestamp()
1677721600.0
Source code in sts_libs/src/sts/utils/tmt.py
70
71
72
73
74
75
76
77
78
79
80
81
82
def timestamp() -> float:
    """Get current timestamp.

    Returns:
        Current time in seconds since epoch

    Example:
        ```python
        timestamp()
        1677721600.0
        ```
    """
    return time.time()