Skip to content

Stratis

This section documents the Stratis storage management functionality.

Base Functionality

sts.stratis.base

Base Stratis functionality.

This module provides base functionality for Stratis operations: - Command execution - Common utilities

Stratis is a storage management solution that provides: - Advanced storage pools - Thin provisioning - Snapshots - RAID support - Encryption

Key

Bases: StratisBase

Stratis key management.

Manages encryption keys for: - Pool encryption - Data security - Access control

Keys are identified by: - Key description (user-friendly name) - Key file path (contains actual key)

Examples:

Create with default configuration: key = Key()

Create with error propagation: key = Key(StratisConfig(propagate=True))

Source code in sts_libs/src/sts/stratis/base.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
class Key(StratisBase):
    """Stratis key management.

    Manages encryption keys for:
    - Pool encryption
    - Data security
    - Access control

    Keys are identified by:
    - Key description (user-friendly name)
    - Key file path (contains actual key)

    Examples:
        Create with default configuration:
        key = Key()

        Create with error propagation:
        key = Key(StratisConfig(propagate=True))
    """

    def set(self, keydesc: str, keyfile_path: str) -> CommandResult:
        """Set key.

        Associates a key file with a description:
        - Key file must exist
        - Description must be unique
        - Used for pool encryption

        Args:
            keydesc: Key description (identifier)
            keyfile_path: Path to key file

        Returns:
            Command result

        Example:
            ```python
            Register encryption key:
            key.set('mykey', '/path/to/keyfile')
            ```
        """
        return self.run_command(
            'key',
            'set',
            options={'--keyfile-path': keyfile_path},
            positional_args=[keydesc],
        )

    def unset(self, keydesc: str) -> CommandResult:
        """Unset key.

        Removes key association:
        - Key must not be in use
        - Does not delete key file
        - Cannot undo operation

        Args:
            keydesc: Key description

        Returns:
            Command result

        Example:
            ```python
            Remove key registration:
            key.unset('mykey')
            ```
        """
        return self.run_command('key', 'unset', positional_args=[keydesc])

    def list(self) -> CommandResult:
        """List keys.

        Shows all registered keys:
        - Key descriptions only
        - No key file contents
        - No usage information

        Returns:
            Command result

        Example:
            ```python
            Show registered keys:
            key.list()
            ```
        """
        return self.run_command('key', 'list')

    def exists(self, keydesc: str) -> bool:
        """Check if key exists.

        Verifies key registration:
        - Checks key description
        - Does not verify key file
        - Does not check key validity

        Args:
            keydesc: Key description

        Returns:
            True if key exists, False otherwise

        Example:
            ```python
            Check key registration:
            exists = key.exists('mykey')
            ```
        """
        result = self.list()
        return bool(result.succeeded and keydesc in result.stdout)

exists(keydesc)

Check if key exists.

Verifies key registration: - Checks key description - Does not verify key file - Does not check key validity

Parameters:

Name Type Description Default
keydesc str

Key description

required

Returns:

Type Description
bool

True if key exists, False otherwise

Example
Check key registration:
exists = key.exists('mykey')
Source code in sts_libs/src/sts/stratis/base.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
def exists(self, keydesc: str) -> bool:
    """Check if key exists.

    Verifies key registration:
    - Checks key description
    - Does not verify key file
    - Does not check key validity

    Args:
        keydesc: Key description

    Returns:
        True if key exists, False otherwise

    Example:
        ```python
        Check key registration:
        exists = key.exists('mykey')
        ```
    """
    result = self.list()
    return bool(result.succeeded and keydesc in result.stdout)

list()

List keys.

Shows all registered keys: - Key descriptions only - No key file contents - No usage information

Returns:

Type Description
CommandResult

Command result

Example
Show registered keys:
key.list()
Source code in sts_libs/src/sts/stratis/base.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
def list(self) -> CommandResult:
    """List keys.

    Shows all registered keys:
    - Key descriptions only
    - No key file contents
    - No usage information

    Returns:
        Command result

    Example:
        ```python
        Show registered keys:
        key.list()
        ```
    """
    return self.run_command('key', 'list')

set(keydesc, keyfile_path)

Set key.

Associates a key file with a description: - Key file must exist - Description must be unique - Used for pool encryption

Parameters:

Name Type Description Default
keydesc str

Key description (identifier)

required
keyfile_path str

Path to key file

required

Returns:

Type Description
CommandResult

Command result

Example
Register encryption key:
key.set('mykey', '/path/to/keyfile')
Source code in sts_libs/src/sts/stratis/base.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def set(self, keydesc: str, keyfile_path: str) -> CommandResult:
    """Set key.

    Associates a key file with a description:
    - Key file must exist
    - Description must be unique
    - Used for pool encryption

    Args:
        keydesc: Key description (identifier)
        keyfile_path: Path to key file

    Returns:
        Command result

    Example:
        ```python
        Register encryption key:
        key.set('mykey', '/path/to/keyfile')
        ```
    """
    return self.run_command(
        'key',
        'set',
        options={'--keyfile-path': keyfile_path},
        positional_args=[keydesc],
    )

unset(keydesc)

Unset key.

Removes key association: - Key must not be in use - Does not delete key file - Cannot undo operation

Parameters:

Name Type Description Default
keydesc str

Key description

required

Returns:

Type Description
CommandResult

Command result

Example
Remove key registration:
key.unset('mykey')
Source code in sts_libs/src/sts/stratis/base.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def unset(self, keydesc: str) -> CommandResult:
    """Unset key.

    Removes key association:
    - Key must not be in use
    - Does not delete key file
    - Cannot undo operation

    Args:
        keydesc: Key description

    Returns:
        Command result

    Example:
        ```python
        Remove key registration:
        key.unset('mykey')
        ```
    """
    return self.run_command('key', 'unset', positional_args=[keydesc])

StratisBase

Base class for Stratis operations.

Provides common functionality: - Command execution with options - Error handling - Version information - System reporting

Parameters:

Name Type Description Default
config StratisConfig | None

Stratis configuration (optional)

None

Examples:

Create with default configuration: stratis = StratisBase()

Create with custom configuration: stratis = StratisBase(StratisConfig(propagate=True))

Source code in sts_libs/src/sts/stratis/base.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
class StratisBase:
    """Base class for Stratis operations.

    Provides common functionality:
    - Command execution with options
    - Error handling
    - Version information
    - System reporting

    Args:
        config: Stratis configuration (optional)

    Examples:
        Create with default configuration:
        stratis = StratisBase()

        Create with custom configuration:
        stratis = StratisBase(StratisConfig(propagate=True))
    """

    def __init__(self, config: StratisConfig | None = None) -> None:
        """Initialize Stratis base.

        Args:
            config: Stratis configuration
        """
        self.config = config or StratisConfig()

    def run_command(
        self,
        subcommand: str | None = None,
        action: str | None = None,
        options: StratisOptions | None = None,
        positional_args: list[str] | None = None,
    ) -> CommandResult:
        """Run stratis command.

        Command Structure:
        The command starts with 'stratis', followed by any global options from the
        configuration. Next comes the subcommand (like 'pool' or 'filesystem'),
        then the action (like 'create' or 'list'). Finally, any command-specific
        options and positional arguments are added.

        Args:
            subcommand: Command category (pool, filesystem, key)
            action: Operation to perform (list, create, set)
            options: Command-specific options as key-value pairs
            positional_args: Additional arguments

        Returns:
            Command result

        Examples:
            List all pools:
            stratis.run_command('pool', 'list')

            Create a filesystem:
            stratis.run_command('filesystem', 'create',
                              positional_args=['pool1', 'fs1'])

            Set encryption key:
            stratis.run_command('key', 'set',
                              options={'--keyfile-path': '/path/to/key'},
                              positional_args=['mykey'])
        """
        command_list: list[str] = [CLI_NAME]
        command_list.extend(self.config.to_args())

        if subcommand is not None:
            command_list.append(subcommand)
        if action is not None:
            command_list.append(action)
        if options is not None:
            command_list.extend(k if v is None else f'{k} {v}' for k, v in options.items())
        if positional_args:
            command_list.extend(positional_args)

        result = run(' '.join(command_list))
        if result.failed and self.config.propagate:
            raise StratisError(f'Command failed: {result.stderr}')
        return result

    def version(self) -> CommandResult:
        """Get Stratis version.

        Returns version information for:
        - stratisd daemon
        - stratis-cli tool
        - libstratis library

        Returns:
            Version information

        Example:
            ```python
            Get version information:
            version = stratis.version()
            ```
        """
        return self.run_command(action='--version')

    def get_report(self) -> StratisReportData | None:
        """Get Stratis report.

        Retrieves system-wide information about:
        - Storage pools
        - Filesystems
        - Block devices
        - Cache devices

        Returns:
            Report data or None if failed

        Example:
            ```python
            Get system report:
            report = stratis.get_report()
            ```
        """
        result = self.run_command('report')
        if result.failed or not result.stdout:
            return None

        try:
            return json.loads(result.stdout)
        except json.JSONDecodeError:
            return None

__init__(config=None)

Initialize Stratis base.

Parameters:

Name Type Description Default
config StratisConfig | None

Stratis configuration

None
Source code in sts_libs/src/sts/stratis/base.py
103
104
105
106
107
108
109
def __init__(self, config: StratisConfig | None = None) -> None:
    """Initialize Stratis base.

    Args:
        config: Stratis configuration
    """
    self.config = config or StratisConfig()

get_report()

Get Stratis report.

Retrieves system-wide information about: - Storage pools - Filesystems - Block devices - Cache devices

Returns:

Type Description
StratisReportData | None

Report data or None if failed

Example
Get system report:
report = stratis.get_report()
Source code in sts_libs/src/sts/stratis/base.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def get_report(self) -> StratisReportData | None:
    """Get Stratis report.

    Retrieves system-wide information about:
    - Storage pools
    - Filesystems
    - Block devices
    - Cache devices

    Returns:
        Report data or None if failed

    Example:
        ```python
        Get system report:
        report = stratis.get_report()
        ```
    """
    result = self.run_command('report')
    if result.failed or not result.stdout:
        return None

    try:
        return json.loads(result.stdout)
    except json.JSONDecodeError:
        return None

run_command(subcommand=None, action=None, options=None, positional_args=None)

Run stratis command.

Command Structure: The command starts with 'stratis', followed by any global options from the configuration. Next comes the subcommand (like 'pool' or 'filesystem'), then the action (like 'create' or 'list'). Finally, any command-specific options and positional arguments are added.

Parameters:

Name Type Description Default
subcommand str | None

Command category (pool, filesystem, key)

None
action str | None

Operation to perform (list, create, set)

None
options StratisOptions | None

Command-specific options as key-value pairs

None
positional_args list[str] | None

Additional arguments

None

Returns:

Type Description
CommandResult

Command result

Examples:

List all pools: stratis.run_command('pool', 'list')

Create a filesystem: stratis.run_command('filesystem', 'create', positional_args=['pool1', 'fs1'])

Set encryption key: stratis.run_command('key', 'set', options={'--keyfile-path': '/path/to/key'}, positional_args=['mykey'])

Source code in sts_libs/src/sts/stratis/base.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def run_command(
    self,
    subcommand: str | None = None,
    action: str | None = None,
    options: StratisOptions | None = None,
    positional_args: list[str] | None = None,
) -> CommandResult:
    """Run stratis command.

    Command Structure:
    The command starts with 'stratis', followed by any global options from the
    configuration. Next comes the subcommand (like 'pool' or 'filesystem'),
    then the action (like 'create' or 'list'). Finally, any command-specific
    options and positional arguments are added.

    Args:
        subcommand: Command category (pool, filesystem, key)
        action: Operation to perform (list, create, set)
        options: Command-specific options as key-value pairs
        positional_args: Additional arguments

    Returns:
        Command result

    Examples:
        List all pools:
        stratis.run_command('pool', 'list')

        Create a filesystem:
        stratis.run_command('filesystem', 'create',
                          positional_args=['pool1', 'fs1'])

        Set encryption key:
        stratis.run_command('key', 'set',
                          options={'--keyfile-path': '/path/to/key'},
                          positional_args=['mykey'])
    """
    command_list: list[str] = [CLI_NAME]
    command_list.extend(self.config.to_args())

    if subcommand is not None:
        command_list.append(subcommand)
    if action is not None:
        command_list.append(action)
    if options is not None:
        command_list.extend(k if v is None else f'{k} {v}' for k, v in options.items())
    if positional_args:
        command_list.extend(positional_args)

    result = run(' '.join(command_list))
    if result.failed and self.config.propagate:
        raise StratisError(f'Command failed: {result.stderr}')
    return result

version()

Get Stratis version.

Returns version information for: - stratisd daemon - stratis-cli tool - libstratis library

Returns:

Type Description
CommandResult

Version information

Example
Get version information:
version = stratis.version()
Source code in sts_libs/src/sts/stratis/base.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def version(self) -> CommandResult:
    """Get Stratis version.

    Returns version information for:
    - stratisd daemon
    - stratis-cli tool
    - libstratis library

    Returns:
        Version information

    Example:
        ```python
        Get version information:
        version = stratis.version()
        ```
    """
    return self.run_command(action='--version')

StratisConfig dataclass

Stratis configuration.

Controls Stratis behavior: - Error handling: Whether to raise exceptions - UUID format: Hyphenated or not - Command execution: Global options

Parameters:

Name Type Description Default
propagate bool

Whether to propagate errors as exceptions

False
unhyphenated_uuids bool

Whether to use UUIDs without hyphens

False

Examples:

Create with default settings: stratis = StratisConfig()

Create with error propagation: stratis = StratisConfig(propagate=True)

Source code in sts_libs/src/sts/stratis/base.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@dataclass
class StratisConfig:
    """Stratis configuration.

    Controls Stratis behavior:
    - Error handling: Whether to raise exceptions
    - UUID format: Hyphenated or not
    - Command execution: Global options

    Args:
        propagate: Whether to propagate errors as exceptions
        unhyphenated_uuids: Whether to use UUIDs without hyphens

    Examples:
        Create with default settings:
        stratis = StratisConfig()

        Create with error propagation:
        stratis = StratisConfig(propagate=True)
    """

    propagate: bool = False  # Raise exceptions on error
    unhyphenated_uuids: bool = False  # Use UUIDs without hyphens

    def to_args(self) -> list[str]:
        """Convert configuration to command arguments.

        Generates CLI options based on settings:
        - --propagate: Raise exceptions
        - --unhyphenated_uuids: Change UUID format

        Returns:
            List of command arguments

        Example:
            ```python
            Get command line arguments:
            config = StratisConfig(propagate=True)
            args = config.to_args()  # Returns ['--propagate']
            ```
        """
        args = []
        if self.propagate:
            args.append('--propagate')
        if self.unhyphenated_uuids:
            args.append('--unhyphenated_uuids')
        return args

to_args()

Convert configuration to command arguments.

Generates CLI options based on settings: - --propagate: Raise exceptions - --unhyphenated_uuids: Change UUID format

Returns:

Type Description
list[str]

List of command arguments

Example
Get command line arguments:
config = StratisConfig(propagate=True)
args = config.to_args()  # Returns ['--propagate']
Source code in sts_libs/src/sts/stratis/base.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def to_args(self) -> list[str]:
    """Convert configuration to command arguments.

    Generates CLI options based on settings:
    - --propagate: Raise exceptions
    - --unhyphenated_uuids: Change UUID format

    Returns:
        List of command arguments

    Example:
        ```python
        Get command line arguments:
        config = StratisConfig(propagate=True)
        args = config.to_args()  # Returns ['--propagate']
        ```
    """
    args = []
    if self.propagate:
        args.append('--propagate')
    if self.unhyphenated_uuids:
        args.append('--unhyphenated_uuids')
    return args

Pool Management

sts.stratis.pool

Stratis pool management.

This module provides functionality for managing Stratis pools: - Pool creation - Pool operations - Pool encryption

BlockDevInfo dataclass

Block device information from stratis report.

Parameters:

Name Type Description Default
path str | None

Device path

None
size str | None

Device size in sectors

None
uuid str | None

Device UUID

None
in_use bool

Whether device is in use

False
blksizes str | None

Block size information

None
Source code in sts_libs/src/sts/stratis/pool.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
@dataclass
class BlockDevInfo:
    """Block device information from stratis report.

    Args:
        path: Device path
        size: Device size in sectors
        uuid: Device UUID
        in_use: Whether device is in use
        blksizes: Block size information
    """

    path: str | None = None
    size: str | None = None
    uuid: str | None = None
    in_use: bool = False
    blksizes: str | None = None
    clevis_config: dict[str, Any] | None = None
    clevis_pin: str | None = None
    key_description: str | None = None

    @staticmethod
    def parse_bool(value: bool | int | str | None) -> bool:
        """Parse boolean value from stratis output.

        Args:
            value: Value to parse (can be bool, int, str, or None)

        Returns:
            Parsed boolean value
        """
        if isinstance(value, bool):
            return value
        if isinstance(value, int):
            return bool(value)
        if isinstance(value, str):
            return value.lower() in ('true', '1', 'yes', 'on')
        return False

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> BlockDevInfo:
        """Create device info from dictionary.

        Args:
            data: Dictionary data

        Returns:
            BlockDevInfo instance
        """
        return cls(
            path=data.get('path'),
            size=data.get('size'),
            uuid=data.get('uuid'),
            in_use=cls.parse_bool(data.get('in_use')),
            blksizes=data.get('blksizes'),
            key_description=data.get('key_description'),
            clevis_pin=data.get('clevis_pin'),
            clevis_config=data.get('clevis_config'),
        )

from_dict(data) classmethod

Create device info from dictionary.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary data

required

Returns:

Type Description
BlockDevInfo

BlockDevInfo instance

Source code in sts_libs/src/sts/stratis/pool.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
@classmethod
def from_dict(cls, data: dict[str, Any]) -> BlockDevInfo:
    """Create device info from dictionary.

    Args:
        data: Dictionary data

    Returns:
        BlockDevInfo instance
    """
    return cls(
        path=data.get('path'),
        size=data.get('size'),
        uuid=data.get('uuid'),
        in_use=cls.parse_bool(data.get('in_use')),
        blksizes=data.get('blksizes'),
        key_description=data.get('key_description'),
        clevis_pin=data.get('clevis_pin'),
        clevis_config=data.get('clevis_config'),
    )

parse_bool(value) staticmethod

Parse boolean value from stratis output.

Parameters:

Name Type Description Default
value bool | int | str | None

Value to parse (can be bool, int, str, or None)

required

Returns:

Type Description
bool

Parsed boolean value

Source code in sts_libs/src/sts/stratis/pool.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@staticmethod
def parse_bool(value: bool | int | str | None) -> bool:
    """Parse boolean value from stratis output.

    Args:
        value: Value to parse (can be bool, int, str, or None)

    Returns:
        Parsed boolean value
    """
    if isinstance(value, bool):
        return value
    if isinstance(value, int):
        return bool(value)
    if isinstance(value, str):
        return value.lower() in ('true', '1', 'yes', 'on')
    return False

BlockDevs dataclass

Block devices from stratis report.

Parameters:

Name Type Description Default
datadevs list[BlockDevInfo]

List of data devices

list()
cachedevs list[BlockDevInfo]

List of cache devices

list()
Source code in sts_libs/src/sts/stratis/pool.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
@dataclass
class BlockDevs:
    """Block devices from stratis report.

    Args:
        datadevs: List of data devices
        cachedevs: List of cache devices
    """

    datadevs: list[BlockDevInfo] = field(default_factory=list)
    cachedevs: list[BlockDevInfo] = field(default_factory=list)

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> BlockDevs:
        """Create block devices from dictionary.

        Args:
            data: Dictionary data

        Returns:
            BlockDevs instance
        """
        return cls(
            datadevs=[BlockDevInfo.from_dict(dev) for dev in data.get('datadevs', [])],
            cachedevs=[BlockDevInfo.from_dict(dev) for dev in data.get('cachedevs', [])],
        )

from_dict(data) classmethod

Create block devices from dictionary.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary data

required

Returns:

Type Description
BlockDevs

BlockDevs instance

Source code in sts_libs/src/sts/stratis/pool.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
@classmethod
def from_dict(cls, data: dict[str, Any]) -> BlockDevs:
    """Create block devices from dictionary.

    Args:
        data: Dictionary data

    Returns:
        BlockDevs instance
    """
    return cls(
        datadevs=[BlockDevInfo.from_dict(dev) for dev in data.get('datadevs', [])],
        cachedevs=[BlockDevInfo.from_dict(dev) for dev in data.get('cachedevs', [])],
    )

PoolCreateConfig dataclass

Pool creation configuration.

Parameters:

Name Type Description Default
key_desc str | None

Key description for keyring encryption (optional)

None
tang_url str | None

Tang server URL for tang encryption (optional)

None
thumbprint str | None

Tang server thumbprint (optional)

None
clevis str | None

Clevis encryption configuration (optional)

None
trust_url bool

Trust Tang server URL (optional)

False
no_overprovision bool

Disable overprovisioning (optional)

False
Example
config = PoolCreateConfig()  # Uses defaults
config = PoolCreateConfig(key_desc='mykey')  # Custom settings
Source code in sts_libs/src/sts/stratis/pool.py
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
@dataclass
class PoolCreateConfig:
    """Pool creation configuration.

    Args:
        key_desc: Key description for keyring encryption (optional)
        tang_url: Tang server URL for tang encryption (optional)
        thumbprint: Tang server thumbprint (optional)
        clevis: Clevis encryption configuration (optional)
        trust_url: Trust Tang server URL (optional)
        no_overprovision: Disable overprovisioning (optional)

    Example:
        ```python
        config = PoolCreateConfig()  # Uses defaults
        config = PoolCreateConfig(key_desc='mykey')  # Custom settings
        ```
    """

    # Optional parameters
    key_desc: str | None = None
    tang_url: str | None = None
    thumbprint: str | None = None
    clevis: str | None = None
    trust_url: bool = False
    no_overprovision: bool = False

PoolReport dataclass

Bases: StratisBase

Pool report data.

This class provides detailed information about a Stratis pool: - Direct report fetching - Refreshable data - Complete pool information

Parameters:

Name Type Description Default
name str | None

Pool name (optional)

None
uuid str | None

Pool UUID (optional)

None
blockdevs BlockDevs

Block devices (optional)

BlockDevs()
encryption list[str]

Encryption type (optional)

list()
fs_limit int | None

Filesystem limit (optional)

None
available_actions str | None

Available actions (optional)

None
filesystems list[Any]

List of filesystems (optional)

list()
raw_data dict[str, Any]

Raw pool data from report

dict()
total_size int | None

Total pool size in sectors (optional)

None
used_size int | None

Used size of pool in sectors (optional)

None
object_path str | None

Pool Dbus object path

None
prevent_update bool

Flag to prevent updates from report (defaults to False)

False
Source code in sts_libs/src/sts/stratis/pool.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
@dataclass
class PoolReport(StratisBase):
    """Pool report data.

    This class provides detailed information about a Stratis pool:
    - Direct report fetching
    - Refreshable data
    - Complete pool information

    Args:
        name: Pool name (optional)
        uuid: Pool UUID (optional)
        blockdevs: Block devices (optional)
        encryption: Encryption type (optional)
        fs_limit: Filesystem limit (optional)
        available_actions: Available actions (optional)
        filesystems: List of filesystems (optional)
        raw_data: Raw pool data from report
        total_size: Total pool size in sectors (optional)
        used_size: Used size of pool in sectors (optional)
        object_path: Pool Dbus object path
        prevent_update: Flag to prevent updates from report (defaults to False)
    """

    name: str | None = None
    blockdevs: BlockDevs = field(default_factory=BlockDevs)
    uuid: str | None = None
    encryption: list[str] = field(default_factory=list)
    fs_limit: int | None = None
    available_actions: str | None = None
    filesystems: list[Any] = field(default_factory=list)
    raw_data: dict[str, Any] = field(default_factory=dict, repr=False)
    total_size: int | None = field(default=None)
    used_size: int | None = field(default=None)
    object_path: str | None = field(default=None)
    prevent_update: bool = field(default=False)

    def __post_init__(self) -> None:
        """Initialize the report with default config."""
        super().__init__(config=StratisConfig())

        # If name is provided, fetch the report data
        if self.name:
            self.refresh()

    def refresh(self) -> bool:
        """Refresh pool report data from system.

        Updates all fields with the latest information from stratisd.

        Returns:
            bool: True if refresh was successful, False otherwise
        """
        # If prevent_update is True, skip refresh
        if self.prevent_update:
            logging.debug('Refresh skipped due to prevent_update flag')
            return True

        # First get standard report data
        result = self.run_command('report')
        if result.failed or not result.stdout:
            logging.error('Failed to get report data')
            return False

        try:
            report_data = json.loads(result.stdout)
            if not self._update_from_report(report_data):
                return False

            # If we have a name, also fetch size information from managed objects
            if self.name:
                self.update_size_info()

        except json.JSONDecodeError:
            logging.exception('Failed to parse report JSON')
            return False
        else:
            return True

    def update_size_info(self) -> bool:
        """Update size information from managed objects.

        Fetches total and used size from the managed objects interface.

        Returns:
            bool: True if successful, False otherwise
        """
        if self.prevent_update:
            logging.debug('Size info update skipped due to prevent_update flag')
            return True

        if not self.name:
            return False

        # Get object path
        result = self.run_command(
            subcommand='pool', action='debug get-object-path', positional_args=['--name', self.name]
        )

        if result.failed or not result.stdout:
            logging.debug(f'Failed to get object path for pool {self.name}')
            return False

        self.object_path = result.stdout.strip()

        # Get managed objects report
        result = self.run_command(subcommand='report', action='managed_objects_report')

        if result.failed or not result.stdout:
            logging.error('Failed to get managed objects report')
            return False

        try:
            report_data = json.loads(result.stdout)

            # Get pool data (if object_path exists in report)
            if self.object_path not in report_data:
                logging.error(f'Object path {self.object_path} not found in managed objects report')
                return False

            # Get the pool interfaces
            pool_interfaces = report_data[self.object_path]

            # Get the last revision
            last_revision = list(pool_interfaces.keys())[-1]
            pool_interface = pool_interfaces[last_revision]

            # Get size information
            if 'TotalPhysicalSize' in pool_interface:
                self.total_size = int(pool_interface['TotalPhysicalSize'])

            # TotalPhysicalUsed is a tuple with format [1, "value"]
            if 'TotalPhysicalUsed' in pool_interface and (
                isinstance(pool_interface['TotalPhysicalUsed'], list) and len(pool_interface['TotalPhysicalUsed']) > 1
            ):
                self.used_size = int(pool_interface['TotalPhysicalUsed'][1])

        except (json.JSONDecodeError, KeyError, ValueError):
            logging.exception('Error parsing managed objects data')
            return False
        else:
            return True

    def _update_from_report(self, report_data: ReportData) -> bool:
        """Update pool information from report data.

        Args:
            report_data: Complete report data

        Returns:
            bool: True if update was successful, False otherwise
        """
        if self.prevent_update:
            logging.debug('Update from report skipped due to prevent_update flag')
            return True

        if not isinstance(report_data, dict) or 'pools' not in report_data:
            logging.error('Invalid report format')
            return False

        pools = report_data.get('pools', [])
        if not isinstance(pools, list):
            logging.error('Invalid pools format')
            return False

        # Find the pool with matching name
        for pool in pools:
            if not isinstance(pool, dict):
                continue

            if not self.name or self.name == pool.get('name'):
                # Store raw data for access to fields not explicitly mapped
                self.raw_data = pool.copy()

                # Update explicit fields
                self.name = pool.get('name')
                self.uuid = pool.get('uuid')
                self.fs_limit = pool.get('fs_limit')
                self.available_actions = pool.get('available_actions')
                self.filesystems = pool.get('filesystems', [])

                # Update blockdevs if present
                if 'blockdevs' in pool:
                    self.blockdevs = BlockDevs.from_dict(pool.get('blockdevs', {}))
                    self.encryption = []
                    if self.blockdevs.datadevs[0].key_description:
                        self.encryption.append('keyring')
                    if self.blockdevs.datadevs[0].clevis_pin:
                        self.encryption.append(self.blockdevs.datadevs[0].clevis_pin)

                return True

        # If we get here and name was specified, pool wasn't found
        if self.name:
            logging.warning(f"Pool '{self.name}' not found in report")
            return False

        # If no name was specified and no pools exist
        if not pools:
            logging.warning('No pools found in report')
            return False

        return False

    def get_device_paths(self) -> list[str]:
        """Get all device paths from the pool.

        Returns:
            List of device paths for both data and cache devices
        """
        return [dev.path for dev in self.blockdevs.datadevs if dev.path] + [
            dev.path for dev in self.blockdevs.cachedevs if dev.path
        ]

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> PoolReport | None:
        """Create report from dictionary.

        Args:
            data: Dictionary data

        Returns:
            PoolReport instance or None if invalid
        """
        try:
            report = cls()
            report.raw_data = data.copy()

            report.name = data.get('name')
            report.blockdevs = BlockDevs.from_dict(data.get('blockdevs', {}))
            report.uuid = data.get('uuid')
            report.fs_limit = data.get('fs_limit')
            report.available_actions = data.get('available_actions')
            report.filesystems = data.get('filesystems', [])
            if report.blockdevs and len(report.blockdevs.datadevs) >= 1:
                report.encryption = []
                if report.blockdevs.datadevs[0].key_description:
                    report.encryption.append('keyring')
                if report.blockdevs.datadevs[0].clevis_pin:
                    report.encryption.append(report.blockdevs.datadevs[0].clevis_pin)

            # Fetch size info if name is provided
            if report.name:
                report.update_size_info()

        except (KeyError, TypeError) as e:
            logging.warning(f'Invalid pool report data: {e}')
            return None
        else:
            return report

    @classmethod
    def get_all(cls) -> list[PoolReport]:
        """Get reports for all pools.

        Returns:
            List of PoolReport instances
        """
        reports: list[PoolReport] = []

        # Create base instance
        base = cls()

        result = base.run_command('report')
        if result.failed or not result.stdout:
            return reports

        try:
            report_data = json.loads(result.stdout)

            if 'pools' in report_data and isinstance(report_data['pools'], list):
                for pool_data in report_data['pools']:
                    if not isinstance(pool_data, dict):
                        continue

                    if report := cls.from_dict(pool_data):
                        reports.append(report)
        except (json.JSONDecodeError, KeyError, ValueError) as e:
            logging.warning(f'Failed to parse pools: {e}')

        return reports

__post_init__()

Initialize the report with default config.

Source code in sts_libs/src/sts/stratis/pool.py
158
159
160
161
162
163
164
def __post_init__(self) -> None:
    """Initialize the report with default config."""
    super().__init__(config=StratisConfig())

    # If name is provided, fetch the report data
    if self.name:
        self.refresh()

from_dict(data) classmethod

Create report from dictionary.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary data

required

Returns:

Type Description
PoolReport | None

PoolReport instance or None if invalid

Source code in sts_libs/src/sts/stratis/pool.py
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
@classmethod
def from_dict(cls, data: dict[str, Any]) -> PoolReport | None:
    """Create report from dictionary.

    Args:
        data: Dictionary data

    Returns:
        PoolReport instance or None if invalid
    """
    try:
        report = cls()
        report.raw_data = data.copy()

        report.name = data.get('name')
        report.blockdevs = BlockDevs.from_dict(data.get('blockdevs', {}))
        report.uuid = data.get('uuid')
        report.fs_limit = data.get('fs_limit')
        report.available_actions = data.get('available_actions')
        report.filesystems = data.get('filesystems', [])
        if report.blockdevs and len(report.blockdevs.datadevs) >= 1:
            report.encryption = []
            if report.blockdevs.datadevs[0].key_description:
                report.encryption.append('keyring')
            if report.blockdevs.datadevs[0].clevis_pin:
                report.encryption.append(report.blockdevs.datadevs[0].clevis_pin)

        # Fetch size info if name is provided
        if report.name:
            report.update_size_info()

    except (KeyError, TypeError) as e:
        logging.warning(f'Invalid pool report data: {e}')
        return None
    else:
        return report

get_all() classmethod

Get reports for all pools.

Returns:

Type Description
list[PoolReport]

List of PoolReport instances

Source code in sts_libs/src/sts/stratis/pool.py
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
@classmethod
def get_all(cls) -> list[PoolReport]:
    """Get reports for all pools.

    Returns:
        List of PoolReport instances
    """
    reports: list[PoolReport] = []

    # Create base instance
    base = cls()

    result = base.run_command('report')
    if result.failed or not result.stdout:
        return reports

    try:
        report_data = json.loads(result.stdout)

        if 'pools' in report_data and isinstance(report_data['pools'], list):
            for pool_data in report_data['pools']:
                if not isinstance(pool_data, dict):
                    continue

                if report := cls.from_dict(pool_data):
                    reports.append(report)
    except (json.JSONDecodeError, KeyError, ValueError) as e:
        logging.warning(f'Failed to parse pools: {e}')

    return reports

get_device_paths()

Get all device paths from the pool.

Returns:

Type Description
list[str]

List of device paths for both data and cache devices

Source code in sts_libs/src/sts/stratis/pool.py
325
326
327
328
329
330
331
332
333
def get_device_paths(self) -> list[str]:
    """Get all device paths from the pool.

    Returns:
        List of device paths for both data and cache devices
    """
    return [dev.path for dev in self.blockdevs.datadevs if dev.path] + [
        dev.path for dev in self.blockdevs.cachedevs if dev.path
    ]

refresh()

Refresh pool report data from system.

Updates all fields with the latest information from stratisd.

Returns:

Name Type Description
bool bool

True if refresh was successful, False otherwise

Source code in sts_libs/src/sts/stratis/pool.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def refresh(self) -> bool:
    """Refresh pool report data from system.

    Updates all fields with the latest information from stratisd.

    Returns:
        bool: True if refresh was successful, False otherwise
    """
    # If prevent_update is True, skip refresh
    if self.prevent_update:
        logging.debug('Refresh skipped due to prevent_update flag')
        return True

    # First get standard report data
    result = self.run_command('report')
    if result.failed or not result.stdout:
        logging.error('Failed to get report data')
        return False

    try:
        report_data = json.loads(result.stdout)
        if not self._update_from_report(report_data):
            return False

        # If we have a name, also fetch size information from managed objects
        if self.name:
            self.update_size_info()

    except json.JSONDecodeError:
        logging.exception('Failed to parse report JSON')
        return False
    else:
        return True

update_size_info()

Update size information from managed objects.

Fetches total and used size from the managed objects interface.

Returns:

Name Type Description
bool bool

True if successful, False otherwise

Source code in sts_libs/src/sts/stratis/pool.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def update_size_info(self) -> bool:
    """Update size information from managed objects.

    Fetches total and used size from the managed objects interface.

    Returns:
        bool: True if successful, False otherwise
    """
    if self.prevent_update:
        logging.debug('Size info update skipped due to prevent_update flag')
        return True

    if not self.name:
        return False

    # Get object path
    result = self.run_command(
        subcommand='pool', action='debug get-object-path', positional_args=['--name', self.name]
    )

    if result.failed or not result.stdout:
        logging.debug(f'Failed to get object path for pool {self.name}')
        return False

    self.object_path = result.stdout.strip()

    # Get managed objects report
    result = self.run_command(subcommand='report', action='managed_objects_report')

    if result.failed or not result.stdout:
        logging.error('Failed to get managed objects report')
        return False

    try:
        report_data = json.loads(result.stdout)

        # Get pool data (if object_path exists in report)
        if self.object_path not in report_data:
            logging.error(f'Object path {self.object_path} not found in managed objects report')
            return False

        # Get the pool interfaces
        pool_interfaces = report_data[self.object_path]

        # Get the last revision
        last_revision = list(pool_interfaces.keys())[-1]
        pool_interface = pool_interfaces[last_revision]

        # Get size information
        if 'TotalPhysicalSize' in pool_interface:
            self.total_size = int(pool_interface['TotalPhysicalSize'])

        # TotalPhysicalUsed is a tuple with format [1, "value"]
        if 'TotalPhysicalUsed' in pool_interface and (
            isinstance(pool_interface['TotalPhysicalUsed'], list) and len(pool_interface['TotalPhysicalUsed']) > 1
        ):
            self.used_size = int(pool_interface['TotalPhysicalUsed'][1])

    except (json.JSONDecodeError, KeyError, ValueError):
        logging.exception('Error parsing managed objects data')
        return False
    else:
        return True

StratisPool dataclass

Bases: StratisBase

Stratis pool representation.

This class provides functionality for managing Stratis pools: - Pool creation - Pool operations - Pool encryption

Parameters:

Name Type Description Default
name str | None

Pool name (optional, discovered from system)

None
uuid str | None

Pool UUID (optional, discovered from system)

None
encryption list[str]

Encryption type (optional, discovered from system)

list()
blockdevs list[str]

List of block devices (optional, discovered from system)

list()
report PoolReport | None

Pool report (optional, discovered from system)

None
Example
pool = StratisPool()  # Discovers first available pool
pool = StratisPool(name='pool1')  # Discovers other values
Source code in sts_libs/src/sts/stratis/pool.py
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
@dataclass
class StratisPool(StratisBase):
    """Stratis pool representation.

    This class provides functionality for managing Stratis pools:
    - Pool creation
    - Pool operations
    - Pool encryption

    Args:
        name: Pool name (optional, discovered from system)
        uuid: Pool UUID (optional, discovered from system)
        encryption: Encryption type (optional, discovered from system)
        blockdevs: List of block devices (optional, discovered from system)
        report: Pool report (optional, discovered from system)

    Example:
        ```python
        pool = StratisPool()  # Discovers first available pool
        pool = StratisPool(name='pool1')  # Discovers other values
        ```
    """

    name: str | None = None
    uuid: str | None = None
    encryption: list[str] = field(default_factory=list)
    blockdevs: list[str] = field(default_factory=list)
    report: PoolReport | None = field(default=None, repr=False)
    prevent_report_updates: bool = False

    # Class-level paths
    POOL_PATH: ClassVar[str] = '/stratis/pool'

    def __post_init__(self) -> None:
        """Initialize pool."""
        # Initialize base class with default config
        super().__init__(config=StratisConfig())

        if self.name and not self.report:
            self.report = PoolReport(name=self.name, prevent_update=self.prevent_report_updates)
            if not self.prevent_report_updates:
                self._update_from_report()

    def _update_from_report(self) -> None:
        """Update pool attributes from report data.

        This centralizes all attribute updates from the report to avoid inconsistencies.
        """
        if self.prevent_report_updates:
            logging.debug('Update from report skipped due to prevent_report_updates flag')
            return

        if not self.report:
            return

        if not self.name and self.report.name:
            self.name = self.report.name

        if not self.uuid and self.report.uuid:
            self.uuid = self.report.uuid

        if self.report.encryption:
            self.encryption = self.report.encryption

        if not self.blockdevs:
            self.blockdevs = self.report.get_device_paths()

    def refresh_report(self) -> bool:
        """Refresh pool report data.

        Creates or updates the pool report with the latest information.

        Returns:
            bool: True if refresh was successful
        """
        # Create new report if needed
        if not self.report:
            # Set name after init because we are explicitly calling refresh below
            # Setting name at init would result in calling .refresh() twice
            self.report = PoolReport(prevent_update=self.prevent_report_updates)
            self.report.name = self.name

        # Refresh the report data
        success = self.report.refresh()

        # Update pool fields from report if successful
        if success and not self.prevent_report_updates:
            self._update_from_report()

        return success

    def create(self, config: PoolCreateConfig | None = None) -> bool:
        """Create pool.

        Args:
            config: Pool creation configuration

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pool.create(PoolCreateConfig(key_desc='mykey'))
            True
            ```
        """
        if not self.name:
            logging.error('Pool name required')
            return False

        if not self.blockdevs:
            raise StratisPoolError('No block devices specified')

        options: StratisOptions = {}
        if config:
            if config.key_desc:
                options['--key-desc'] = config.key_desc
            if config.clevis:
                options['--clevis'] = config.clevis
            if config.tang_url:
                options['--tang-url'] = config.tang_url
            if config.thumbprint:
                options['--thumbprint'] = config.thumbprint
            if config.trust_url:
                options['--trust-url'] = None
            if config.no_overprovision:
                options['--no-overprovision'] = None

        result = self.run_command(
            subcommand='pool',
            action='create',
            options=options,
            positional_args=[self.name, ' '.join(self.blockdevs)],
        )

        if not result.failed:
            return self.refresh_report()

        return False

    def destroy(self) -> bool:
        """Destroy pool.

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pool.destroy()
            True
            ```
        """
        if not self.name:
            logging.error('Pool name required')
            return False

        result = self.run_command(
            subcommand='pool',
            action='destroy',
            positional_args=[self.name],
        )
        return not result.failed

    def list(self, *, stopped: bool = False, current_pool: bool = True) -> str:
        """List information about pools.

        Args:
            stopped: Display information about stopped pools only
            current_pool: Display information about all pools

        Returns:
            stdout if successful, None otherwise

        Example:
            ```python
            pool.list(stopped=True)
            Name              Total / Used / Free    Properties                                   UUID   Alerts
            p1     7.28 TiB / 4.21 GiB / 7.27 TiB   ~Ca,~Cr, Op   398925ee-6efa-4fe1-bc0f-9cd13e3da8d7
            ```
        """
        options: StratisOptions = {}
        if current_pool:
            if self.uuid:
                options['--uuid'] = self.uuid
            else:
                options['--name'] = self.name
        if stopped:
            options['--stopped'] = ''

        result = self.run_command('pool', action='list', options=options)
        return result.stdout

    def start(self, unlock_method: UnlockMethod | None = None) -> bool:
        """Start pool.

        Args:
            unlock_method: Encryption unlock method

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pool.start(unlock_method='keyring')
            True
            ```
        """
        if not self.name and not self.uuid:
            logging.error('Pool name or UUID required')
            return False

        options: StratisOptions = {}
        if unlock_method:
            options['--unlock-method'] = unlock_method
        if self.uuid:
            options['--uuid'] = self.uuid
        else:
            options['--name'] = self.name

        result = self.run_command('pool', action='start', options=options)

        if not result.failed:
            self.refresh_report()

        return not result.failed

    def stop(self) -> bool:
        """Stop pool.

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pool.stop()
            True
            ```
        """
        if not self.name and not self.uuid:
            logging.error('Pool name or UUID required')
            return False

        options: StratisOptions = {}
        if self.uuid:
            options['--uuid'] = self.uuid
        else:
            options['--name'] = self.name

        result = self.run_command('pool', action='stop', options=options)
        return not result.failed

    def add_data(self, blockdevs: list[str]) -> bool:
        """Add data devices to pool.

        Args:
            blockdevs: List of block devices

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pool.add_data(['/dev/sdd', '/dev/sde'])
            True
            ```
        """
        if not self.name:
            logging.error('Pool name required')
            return False

        result = self.run_command(
            subcommand='pool',
            action='add-data',
            positional_args=[self.name, ' '.join(blockdevs)],
        )

        logging.info(result)

        if not result.failed:
            self.refresh_report()

        return not result.failed

    def extend_data(self) -> bool:
        """Extend data devices in the pool to use the full device size.

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pool.extend_data()
            True
            ```
        """
        if not self.name:
            logging.error('Pool name required')
            return False

        result = self.run_command(
            subcommand='pool',
            action='extend-data',
            positional_args=[self.name],
        )

        if not result.failed:
            self.refresh_report()

        return not result.failed

    def init_cache(self, blockdevs: list[str]) -> bool:
        """Initialize cache devices.

        Args:
            blockdevs: List of block devices

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pool.init_cache(['/dev/nvme0n1'])
            True
            ```
        """
        if not self.name:
            logging.error('Pool name required')
            return False

        # Pass each device as a separate argument
        result = self.run_command(
            subcommand='pool',
            action='init-cache',
            positional_args=[self.name, *blockdevs],
        )

        if not result.failed:
            self.refresh_report()
        else:
            logging.error(f'Failed to initialize cache: {result.stderr}')

        return not result.failed

    def add_cache(self, blockdevs: list[str]) -> bool:
        """Add cache devices to pool.

        Args:
            blockdevs: List of block devices

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pool.add_cache(['/dev/nvme0n2'])
            True
            ```
        """
        if not self.name:
            logging.error('Pool name required')
            return False

        result = self.run_command(
            subcommand='pool',
            action='add-cache',
            positional_args=[self.name, ' '.join(blockdevs)],
        )
        if not result.failed:
            self.refresh_report()

        return not result.failed

    def bind_keyring(self, key_desc: str) -> bool:
        """Bind pool to keyring.

        Args:
            key_desc: Key description

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pool.bind_keyring('mykey')
            True
            ```
        """
        if not self.name:
            logging.error('Pool name required')
            return False

        result = self.run_command(
            subcommand='pool',
            action='bind keyring',
            positional_args=[self.name, key_desc],
        )
        if not result.failed:
            self.refresh_report()

        return not result.failed

    def bind_tang(self, config: TangConfig) -> bool:
        """Bind pool to Tang server.

        Args:
            config: Tang server configuration

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pool.bind_tang(TangConfig('http://tang.example.com'))
            True
            ```
        """
        if not self.name:
            logging.error('Pool name required')
            return False

        if not config.url:
            logging.error('Tang server URL required')
            return False

        options: StratisOptions = {}
        if config.trust_url:
            options['--trust-url'] = None
        if config.thumbprint:
            options['--thumbprint'] = config.thumbprint

        result = self.run_command(
            subcommand='pool',
            action='bind tang',
            options=options,
            positional_args=[self.name, config.url],
        )
        if not result.failed:
            self.refresh_report()

        return not result.failed

    def bind_tpm2(self) -> bool:
        """Bind pool to TPM2.

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pool.bind_tpm2()
            True
            ```
        """
        if not self.name:
            logging.error('Pool name required')
            return False

        result = self.run_command(
            subcommand='pool',
            action='bind tpm2',
            positional_args=[self.name],
        )
        if not result.failed:
            self.refresh_report()

        return not result.failed

    def rebind_keyring(self, key_desc: str) -> bool:
        """Rebind pool to keyring.

        Args:
            key_desc: Key description

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pool.rebind_keyring('mykey')
            True
            ```
        """
        if not self.name:
            logging.error('Pool name required')
            return False

        result = self.run_command(
            subcommand='pool',
            action='rebind keyring',
            positional_args=[self.name, key_desc],
        )

        if not result.failed:
            self.refresh_report()

        return not result.failed

    def rebind_clevis(self) -> bool:
        """Rebind pool to Clevis.

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pool.rebind_clevis()
            True
            ```
        """
        if not self.name:
            logging.error('Pool name required')
            return False

        result = self.run_command(
            subcommand='pool',
            action='rebind clevis',
            positional_args=[self.name],
        )

        if not result.failed:
            self.refresh_report()

        return not result.failed

    def unbind_keyring(self) -> bool:
        """Unbind pool from keyring.

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pool.unbind_keyring()
            True
            ```
        """
        if not self.name:
            logging.error('Pool name required')
            return False

        result = self.run_command(
            subcommand='pool',
            action='unbind keyring',
            positional_args=[self.name],
        )
        if not result.failed:
            self.refresh_report()

        return not result.failed

    def unbind_clevis(self) -> bool:
        """Unbind pool from Clevis.

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pool.unbind_clevis()
            True
            ```
        """
        if not self.name:
            logging.error('Pool name required')
            return False

        result = self.run_command(
            subcommand='pool',
            action='unbind clevis',
            positional_args=[self.name],
        )
        if not result.failed:
            self.refresh_report()

        return not result.failed

    @classmethod
    def from_report(cls, report: PoolReport) -> StratisPool | None:
        """Create pool from report.

        Args:
            report: Pool report data

        Returns:
            StratisPool instance or None if invalid

        Example:
            ```python
            pool = StratisPool.from_report(report)
            ```
        """
        if not report.name:
            return None

        # Get paths from report
        paths = report.get_device_paths()

        # Create pool with report already attached
        return cls(
            name=report.name,
            uuid=report.uuid,
            encryption=report.encryption,
            blockdevs=paths,
            report=report,  # Attach the report directly
        )

    @classmethod
    def get_all(cls) -> list[StratisPool]:
        """Get all Stratis pools.

        Returns:
            List of StratisPool instances

        Example:
            ```python
            StratisPool.get_all()
            [StratisPool(name='pool1', ...), StratisPool(name='pool2', ...)]
            ```
        """
        pools: list[StratisPool] = []

        # Get all reports
        reports = PoolReport.get_all()

        # Create pools from reports
        pools.extend(pool for report in reports if (pool := cls.from_report(report)))

        return pools

    @classmethod
    def setup_blockdevices(cls) -> list[str]:
        """Set up block devices for testing.

        Returns:
            List of device paths

        Example:
            ```python
            StratisPool.setup_blockdevices()
            ['/dev/sda', '/dev/sdb']
            ```
        """
        # Get free disks
        from sts.blockdevice import get_free_disks

        blockdevices = get_free_disks()
        if not blockdevices:
            pytest.skip('No free disks found')

        # Group disks by block sizes
        filtered_disks_by_block_sizes: dict[tuple[int, int], list[str]] = {}
        for disk in blockdevices:
            block_sizes = (disk.sector_size, disk.block_size)
            if block_sizes in filtered_disks_by_block_sizes:
                filtered_disks_by_block_sizes[block_sizes].append(str(disk.path))
            else:
                filtered_disks_by_block_sizes[block_sizes] = [str(disk.path)]

        # Find devices with the most common block sizes
        most_common_block_sizes: list[str] = []
        for disks in filtered_disks_by_block_sizes.values():
            if len(disks) > len(most_common_block_sizes):
                most_common_block_sizes = disks

        # Clear start of devices
        for disk in most_common_block_sizes:
            run(f'dd if=/dev/zero of={disk} bs=1M count=10')

        return most_common_block_sizes

__post_init__()

Initialize pool.

Source code in sts_libs/src/sts/stratis/pool.py
487
488
489
490
491
492
493
494
495
def __post_init__(self) -> None:
    """Initialize pool."""
    # Initialize base class with default config
    super().__init__(config=StratisConfig())

    if self.name and not self.report:
        self.report = PoolReport(name=self.name, prevent_update=self.prevent_report_updates)
        if not self.prevent_report_updates:
            self._update_from_report()

add_cache(blockdevs)

Add cache devices to pool.

Parameters:

Name Type Description Default
blockdevs list[str]

List of block devices

required

Returns:

Type Description
bool

True if successful, False otherwise

Example
pool.add_cache(['/dev/nvme0n2'])
True
Source code in sts_libs/src/sts/stratis/pool.py
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
def add_cache(self, blockdevs: list[str]) -> bool:
    """Add cache devices to pool.

    Args:
        blockdevs: List of block devices

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pool.add_cache(['/dev/nvme0n2'])
        True
        ```
    """
    if not self.name:
        logging.error('Pool name required')
        return False

    result = self.run_command(
        subcommand='pool',
        action='add-cache',
        positional_args=[self.name, ' '.join(blockdevs)],
    )
    if not result.failed:
        self.refresh_report()

    return not result.failed

add_data(blockdevs)

Add data devices to pool.

Parameters:

Name Type Description Default
blockdevs list[str]

List of block devices

required

Returns:

Type Description
bool

True if successful, False otherwise

Example
pool.add_data(['/dev/sdd', '/dev/sde'])
True
Source code in sts_libs/src/sts/stratis/pool.py
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
def add_data(self, blockdevs: list[str]) -> bool:
    """Add data devices to pool.

    Args:
        blockdevs: List of block devices

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pool.add_data(['/dev/sdd', '/dev/sde'])
        True
        ```
    """
    if not self.name:
        logging.error('Pool name required')
        return False

    result = self.run_command(
        subcommand='pool',
        action='add-data',
        positional_args=[self.name, ' '.join(blockdevs)],
    )

    logging.info(result)

    if not result.failed:
        self.refresh_report()

    return not result.failed

bind_keyring(key_desc)

Bind pool to keyring.

Parameters:

Name Type Description Default
key_desc str

Key description

required

Returns:

Type Description
bool

True if successful, False otherwise

Example
pool.bind_keyring('mykey')
True
Source code in sts_libs/src/sts/stratis/pool.py
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
def bind_keyring(self, key_desc: str) -> bool:
    """Bind pool to keyring.

    Args:
        key_desc: Key description

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pool.bind_keyring('mykey')
        True
        ```
    """
    if not self.name:
        logging.error('Pool name required')
        return False

    result = self.run_command(
        subcommand='pool',
        action='bind keyring',
        positional_args=[self.name, key_desc],
    )
    if not result.failed:
        self.refresh_report()

    return not result.failed

bind_tang(config)

Bind pool to Tang server.

Parameters:

Name Type Description Default
config TangConfig

Tang server configuration

required

Returns:

Type Description
bool

True if successful, False otherwise

Example
pool.bind_tang(TangConfig('http://tang.example.com'))
True
Source code in sts_libs/src/sts/stratis/pool.py
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
def bind_tang(self, config: TangConfig) -> bool:
    """Bind pool to Tang server.

    Args:
        config: Tang server configuration

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pool.bind_tang(TangConfig('http://tang.example.com'))
        True
        ```
    """
    if not self.name:
        logging.error('Pool name required')
        return False

    if not config.url:
        logging.error('Tang server URL required')
        return False

    options: StratisOptions = {}
    if config.trust_url:
        options['--trust-url'] = None
    if config.thumbprint:
        options['--thumbprint'] = config.thumbprint

    result = self.run_command(
        subcommand='pool',
        action='bind tang',
        options=options,
        positional_args=[self.name, config.url],
    )
    if not result.failed:
        self.refresh_report()

    return not result.failed

bind_tpm2()

Bind pool to TPM2.

Returns:

Type Description
bool

True if successful, False otherwise

Example
pool.bind_tpm2()
True
Source code in sts_libs/src/sts/stratis/pool.py
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
def bind_tpm2(self) -> bool:
    """Bind pool to TPM2.

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pool.bind_tpm2()
        True
        ```
    """
    if not self.name:
        logging.error('Pool name required')
        return False

    result = self.run_command(
        subcommand='pool',
        action='bind tpm2',
        positional_args=[self.name],
    )
    if not result.failed:
        self.refresh_report()

    return not result.failed

create(config=None)

Create pool.

Parameters:

Name Type Description Default
config PoolCreateConfig | None

Pool creation configuration

None

Returns:

Type Description
bool

True if successful, False otherwise

Example
pool.create(PoolCreateConfig(key_desc='mykey'))
True
Source code in sts_libs/src/sts/stratis/pool.py
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
def create(self, config: PoolCreateConfig | None = None) -> bool:
    """Create pool.

    Args:
        config: Pool creation configuration

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pool.create(PoolCreateConfig(key_desc='mykey'))
        True
        ```
    """
    if not self.name:
        logging.error('Pool name required')
        return False

    if not self.blockdevs:
        raise StratisPoolError('No block devices specified')

    options: StratisOptions = {}
    if config:
        if config.key_desc:
            options['--key-desc'] = config.key_desc
        if config.clevis:
            options['--clevis'] = config.clevis
        if config.tang_url:
            options['--tang-url'] = config.tang_url
        if config.thumbprint:
            options['--thumbprint'] = config.thumbprint
        if config.trust_url:
            options['--trust-url'] = None
        if config.no_overprovision:
            options['--no-overprovision'] = None

    result = self.run_command(
        subcommand='pool',
        action='create',
        options=options,
        positional_args=[self.name, ' '.join(self.blockdevs)],
    )

    if not result.failed:
        return self.refresh_report()

    return False

destroy()

Destroy pool.

Returns:

Type Description
bool

True if successful, False otherwise

Example
pool.destroy()
True
Source code in sts_libs/src/sts/stratis/pool.py
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
def destroy(self) -> bool:
    """Destroy pool.

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pool.destroy()
        True
        ```
    """
    if not self.name:
        logging.error('Pool name required')
        return False

    result = self.run_command(
        subcommand='pool',
        action='destroy',
        positional_args=[self.name],
    )
    return not result.failed

extend_data()

Extend data devices in the pool to use the full device size.

Returns:

Type Description
bool

True if successful, False otherwise

Example
pool.extend_data()
True
Source code in sts_libs/src/sts/stratis/pool.py
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
def extend_data(self) -> bool:
    """Extend data devices in the pool to use the full device size.

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pool.extend_data()
        True
        ```
    """
    if not self.name:
        logging.error('Pool name required')
        return False

    result = self.run_command(
        subcommand='pool',
        action='extend-data',
        positional_args=[self.name],
    )

    if not result.failed:
        self.refresh_report()

    return not result.failed

from_report(report) classmethod

Create pool from report.

Parameters:

Name Type Description Default
report PoolReport

Pool report data

required

Returns:

Type Description
StratisPool | None

StratisPool instance or None if invalid

Example
pool = StratisPool.from_report(report)
Source code in sts_libs/src/sts/stratis/pool.py
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
@classmethod
def from_report(cls, report: PoolReport) -> StratisPool | None:
    """Create pool from report.

    Args:
        report: Pool report data

    Returns:
        StratisPool instance or None if invalid

    Example:
        ```python
        pool = StratisPool.from_report(report)
        ```
    """
    if not report.name:
        return None

    # Get paths from report
    paths = report.get_device_paths()

    # Create pool with report already attached
    return cls(
        name=report.name,
        uuid=report.uuid,
        encryption=report.encryption,
        blockdevs=paths,
        report=report,  # Attach the report directly
    )

get_all() classmethod

Get all Stratis pools.

Returns:

Type Description
list[StratisPool]

List of StratisPool instances

Example
StratisPool.get_all()
[StratisPool(name='pool1', ...), StratisPool(name='pool2', ...)]
Source code in sts_libs/src/sts/stratis/pool.py
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
@classmethod
def get_all(cls) -> list[StratisPool]:
    """Get all Stratis pools.

    Returns:
        List of StratisPool instances

    Example:
        ```python
        StratisPool.get_all()
        [StratisPool(name='pool1', ...), StratisPool(name='pool2', ...)]
        ```
    """
    pools: list[StratisPool] = []

    # Get all reports
    reports = PoolReport.get_all()

    # Create pools from reports
    pools.extend(pool for report in reports if (pool := cls.from_report(report)))

    return pools

init_cache(blockdevs)

Initialize cache devices.

Parameters:

Name Type Description Default
blockdevs list[str]

List of block devices

required

Returns:

Type Description
bool

True if successful, False otherwise

Example
pool.init_cache(['/dev/nvme0n1'])
True
Source code in sts_libs/src/sts/stratis/pool.py
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
def init_cache(self, blockdevs: list[str]) -> bool:
    """Initialize cache devices.

    Args:
        blockdevs: List of block devices

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pool.init_cache(['/dev/nvme0n1'])
        True
        ```
    """
    if not self.name:
        logging.error('Pool name required')
        return False

    # Pass each device as a separate argument
    result = self.run_command(
        subcommand='pool',
        action='init-cache',
        positional_args=[self.name, *blockdevs],
    )

    if not result.failed:
        self.refresh_report()
    else:
        logging.error(f'Failed to initialize cache: {result.stderr}')

    return not result.failed

list(*, stopped=False, current_pool=True)

List information about pools.

Parameters:

Name Type Description Default
stopped bool

Display information about stopped pools only

False
current_pool bool

Display information about all pools

True

Returns:

Type Description
str

stdout if successful, None otherwise

Example
pool.list(stopped=True)
Name              Total / Used / Free    Properties                                   UUID   Alerts
p1     7.28 TiB / 4.21 GiB / 7.27 TiB   ~Ca,~Cr, Op   398925ee-6efa-4fe1-bc0f-9cd13e3da8d7
Source code in sts_libs/src/sts/stratis/pool.py
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
def list(self, *, stopped: bool = False, current_pool: bool = True) -> str:
    """List information about pools.

    Args:
        stopped: Display information about stopped pools only
        current_pool: Display information about all pools

    Returns:
        stdout if successful, None otherwise

    Example:
        ```python
        pool.list(stopped=True)
        Name              Total / Used / Free    Properties                                   UUID   Alerts
        p1     7.28 TiB / 4.21 GiB / 7.27 TiB   ~Ca,~Cr, Op   398925ee-6efa-4fe1-bc0f-9cd13e3da8d7
        ```
    """
    options: StratisOptions = {}
    if current_pool:
        if self.uuid:
            options['--uuid'] = self.uuid
        else:
            options['--name'] = self.name
    if stopped:
        options['--stopped'] = ''

    result = self.run_command('pool', action='list', options=options)
    return result.stdout

rebind_clevis()

Rebind pool to Clevis.

Returns:

Type Description
bool

True if successful, False otherwise

Example
pool.rebind_clevis()
True
Source code in sts_libs/src/sts/stratis/pool.py
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
def rebind_clevis(self) -> bool:
    """Rebind pool to Clevis.

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pool.rebind_clevis()
        True
        ```
    """
    if not self.name:
        logging.error('Pool name required')
        return False

    result = self.run_command(
        subcommand='pool',
        action='rebind clevis',
        positional_args=[self.name],
    )

    if not result.failed:
        self.refresh_report()

    return not result.failed

rebind_keyring(key_desc)

Rebind pool to keyring.

Parameters:

Name Type Description Default
key_desc str

Key description

required

Returns:

Type Description
bool

True if successful, False otherwise

Example
pool.rebind_keyring('mykey')
True
Source code in sts_libs/src/sts/stratis/pool.py
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
def rebind_keyring(self, key_desc: str) -> bool:
    """Rebind pool to keyring.

    Args:
        key_desc: Key description

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pool.rebind_keyring('mykey')
        True
        ```
    """
    if not self.name:
        logging.error('Pool name required')
        return False

    result = self.run_command(
        subcommand='pool',
        action='rebind keyring',
        positional_args=[self.name, key_desc],
    )

    if not result.failed:
        self.refresh_report()

    return not result.failed

refresh_report()

Refresh pool report data.

Creates or updates the pool report with the latest information.

Returns:

Name Type Description
bool bool

True if refresh was successful

Source code in sts_libs/src/sts/stratis/pool.py
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
def refresh_report(self) -> bool:
    """Refresh pool report data.

    Creates or updates the pool report with the latest information.

    Returns:
        bool: True if refresh was successful
    """
    # Create new report if needed
    if not self.report:
        # Set name after init because we are explicitly calling refresh below
        # Setting name at init would result in calling .refresh() twice
        self.report = PoolReport(prevent_update=self.prevent_report_updates)
        self.report.name = self.name

    # Refresh the report data
    success = self.report.refresh()

    # Update pool fields from report if successful
    if success and not self.prevent_report_updates:
        self._update_from_report()

    return success

setup_blockdevices() classmethod

Set up block devices for testing.

Returns:

Type Description
list[str]

List of device paths

Example
StratisPool.setup_blockdevices()
['/dev/sda', '/dev/sdb']
Source code in sts_libs/src/sts/stratis/pool.py
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
@classmethod
def setup_blockdevices(cls) -> list[str]:
    """Set up block devices for testing.

    Returns:
        List of device paths

    Example:
        ```python
        StratisPool.setup_blockdevices()
        ['/dev/sda', '/dev/sdb']
        ```
    """
    # Get free disks
    from sts.blockdevice import get_free_disks

    blockdevices = get_free_disks()
    if not blockdevices:
        pytest.skip('No free disks found')

    # Group disks by block sizes
    filtered_disks_by_block_sizes: dict[tuple[int, int], list[str]] = {}
    for disk in blockdevices:
        block_sizes = (disk.sector_size, disk.block_size)
        if block_sizes in filtered_disks_by_block_sizes:
            filtered_disks_by_block_sizes[block_sizes].append(str(disk.path))
        else:
            filtered_disks_by_block_sizes[block_sizes] = [str(disk.path)]

    # Find devices with the most common block sizes
    most_common_block_sizes: list[str] = []
    for disks in filtered_disks_by_block_sizes.values():
        if len(disks) > len(most_common_block_sizes):
            most_common_block_sizes = disks

    # Clear start of devices
    for disk in most_common_block_sizes:
        run(f'dd if=/dev/zero of={disk} bs=1M count=10')

    return most_common_block_sizes

start(unlock_method=None)

Start pool.

Parameters:

Name Type Description Default
unlock_method UnlockMethod | None

Encryption unlock method

None

Returns:

Type Description
bool

True if successful, False otherwise

Example
pool.start(unlock_method='keyring')
True
Source code in sts_libs/src/sts/stratis/pool.py
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
def start(self, unlock_method: UnlockMethod | None = None) -> bool:
    """Start pool.

    Args:
        unlock_method: Encryption unlock method

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pool.start(unlock_method='keyring')
        True
        ```
    """
    if not self.name and not self.uuid:
        logging.error('Pool name or UUID required')
        return False

    options: StratisOptions = {}
    if unlock_method:
        options['--unlock-method'] = unlock_method
    if self.uuid:
        options['--uuid'] = self.uuid
    else:
        options['--name'] = self.name

    result = self.run_command('pool', action='start', options=options)

    if not result.failed:
        self.refresh_report()

    return not result.failed

stop()

Stop pool.

Returns:

Type Description
bool

True if successful, False otherwise

Example
pool.stop()
True
Source code in sts_libs/src/sts/stratis/pool.py
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
def stop(self) -> bool:
    """Stop pool.

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pool.stop()
        True
        ```
    """
    if not self.name and not self.uuid:
        logging.error('Pool name or UUID required')
        return False

    options: StratisOptions = {}
    if self.uuid:
        options['--uuid'] = self.uuid
    else:
        options['--name'] = self.name

    result = self.run_command('pool', action='stop', options=options)
    return not result.failed

unbind_clevis()

Unbind pool from Clevis.

Returns:

Type Description
bool

True if successful, False otherwise

Example
pool.unbind_clevis()
True
Source code in sts_libs/src/sts/stratis/pool.py
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
def unbind_clevis(self) -> bool:
    """Unbind pool from Clevis.

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pool.unbind_clevis()
        True
        ```
    """
    if not self.name:
        logging.error('Pool name required')
        return False

    result = self.run_command(
        subcommand='pool',
        action='unbind clevis',
        positional_args=[self.name],
    )
    if not result.failed:
        self.refresh_report()

    return not result.failed

unbind_keyring()

Unbind pool from keyring.

Returns:

Type Description
bool

True if successful, False otherwise

Example
pool.unbind_keyring()
True
Source code in sts_libs/src/sts/stratis/pool.py
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
def unbind_keyring(self) -> bool:
    """Unbind pool from keyring.

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pool.unbind_keyring()
        True
        ```
    """
    if not self.name:
        logging.error('Pool name required')
        return False

    result = self.run_command(
        subcommand='pool',
        action='unbind keyring',
        positional_args=[self.name],
    )
    if not result.failed:
        self.refresh_report()

    return not result.failed

TangConfig dataclass

Tang server configuration.

Parameters:

Name Type Description Default
url str | None

Tang server URL (optional, discovered from system)

None
trust_url bool

Trust server URL (optional)

False
thumbprint str | None

Server thumbprint (optional)

None
Example
config = TangConfig()  # Uses defaults
config = TangConfig(url='http://tang.example.com')  # Custom settings
Source code in sts_libs/src/sts/stratis/pool.py
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
@dataclass
class TangConfig:
    """Tang server configuration.

    Args:
        url: Tang server URL (optional, discovered from system)
        trust_url: Trust server URL (optional)
        thumbprint: Server thumbprint (optional)

    Example:
        ```python
        config = TangConfig()  # Uses defaults
        config = TangConfig(url='http://tang.example.com')  # Custom settings
        ```
    """

    # Optional parameters
    url: str | None = None
    trust_url: bool = False
    thumbprint: str | None = None

Filesystem Management

sts.stratis.filesystem

Stratis filesystem management.

This module provides functionality for managing Stratis filesystems: - Filesystem creation and management - Thin provisioning and size limits - Snapshot creation and tracking - Space usage monitoring

Key features: - XFS as the default filesystem - Copy-on-write snapshots - Dynamic size management - Origin tracking for snapshots

FilesystemReport dataclass

Filesystem report data.

Contains filesystem metadata: - Basic information (name, UUID) - Size information (total, limit) - Snapshot details (origin) - Usage statistics

Parameters:

Name Type Description Default
name str | None

Filesystem name (optional, discovered from system)

None
uuid str | None

Filesystem UUID (optional, discovered from system)

None
size str | None

Filesystem size (optional, discovered from system)

None
size_limit str | None

Size limit (optional, discovered from system)

None
origin str | None

Origin filesystem for snapshots (optional)

None
used str | None

Used space (optional, discovered from system)

None
Example
report = FilesystemReport()  # Discovers first available filesystem
report = FilesystemReport(name='fs1')  # Discovers other values
Source code in sts_libs/src/sts/stratis/filesystem.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@dataclass
class FilesystemReport:
    """Filesystem report data.

    Contains filesystem metadata:
    - Basic information (name, UUID)
    - Size information (total, limit)
    - Snapshot details (origin)
    - Usage statistics

    Args:
        name: Filesystem name (optional, discovered from system)
        uuid: Filesystem UUID (optional, discovered from system)
        size: Filesystem size (optional, discovered from system)
        size_limit: Size limit (optional, discovered from system)
        origin: Origin filesystem for snapshots (optional)
        used: Used space (optional, discovered from system)

    Example:
        ```python
        report = FilesystemReport()  # Discovers first available filesystem
        report = FilesystemReport(name='fs1')  # Discovers other values
        ```
    """

    name: str | None = None
    uuid: str | None = None
    size: str | None = None  # Total size (e.g. "10 GiB")
    size_limit: str | None = None  # Maximum size allowed
    origin: str | None = None  # Source filesystem for snapshots
    used: str | None = None  # Space currently in use

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> FilesystemReport | None:
        """Create report from dictionary.

        Parses stratis report format:
        - Handles missing fields
        - Validates data types
        - Reports parsing errors

        Args:
            data: Dictionary data from stratis report

        Returns:
            FilesystemReport instance or None if invalid
        """
        try:
            return cls(
                name=data.get('name'),
                uuid=data.get('uuid'),
                size=data.get('size'),
                size_limit=data.get('size_limit'),
                origin=data.get('origin'),
                used=data.get('used'),
            )
        except (KeyError, TypeError) as e:
            logging.warning(f'Invalid filesystem report data: {e}')
            return None

from_dict(data) classmethod

Create report from dictionary.

Parses stratis report format: - Handles missing fields - Validates data types - Reports parsing errors

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary data from stratis report

required

Returns:

Type Description
FilesystemReport | None

FilesystemReport instance or None if invalid

Source code in sts_libs/src/sts/stratis/filesystem.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@classmethod
def from_dict(cls, data: dict[str, Any]) -> FilesystemReport | None:
    """Create report from dictionary.

    Parses stratis report format:
    - Handles missing fields
    - Validates data types
    - Reports parsing errors

    Args:
        data: Dictionary data from stratis report

    Returns:
        FilesystemReport instance or None if invalid
    """
    try:
        return cls(
            name=data.get('name'),
            uuid=data.get('uuid'),
            size=data.get('size'),
            size_limit=data.get('size_limit'),
            origin=data.get('origin'),
            used=data.get('used'),
        )
    except (KeyError, TypeError) as e:
        logging.warning(f'Invalid filesystem report data: {e}')
        return None

StratisFilesystem dataclass

Bases: StratisBase

Stratis filesystem representation.

Manages filesystems with: - Thin provisioning - Snapshot capabilities - Size management - Usage tracking

Parameters:

Name Type Description Default
name str | None

Filesystem name (optional, discovered from system)

None
pool_name str | None

Pool name (optional, discovered from system)

None
uuid str | None

Filesystem UUID (optional, discovered from system)

None
size int | None

Filesystem size in bytes (optional, discovered from system)

None
size_limit str | None

Size limit (optional, discovered from system)

None
origin str | None

Origin filesystem for snapshots (optional)

None
used str | None

Used space (optional, discovered from system)

None
Example
fs = StratisFilesystem()  # Discovers first available filesystem
fs = StratisFilesystem(name='fs1')  # Discovers other values
Source code in sts_libs/src/sts/stratis/filesystem.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
@dataclass
class StratisFilesystem(StratisBase):
    """Stratis filesystem representation.

    Manages filesystems with:
    - Thin provisioning
    - Snapshot capabilities
    - Size management
    - Usage tracking

    Args:
        name: Filesystem name (optional, discovered from system)
        pool_name: Pool name (optional, discovered from system)
        uuid: Filesystem UUID (optional, discovered from system)
        size: Filesystem size in bytes (optional, discovered from system)
        size_limit: Size limit (optional, discovered from system)
        origin: Origin filesystem for snapshots (optional)
        used: Used space (optional, discovered from system)

    Example:
        ```python
        fs = StratisFilesystem()  # Discovers first available filesystem
        fs = StratisFilesystem(name='fs1')  # Discovers other values
        ```
    """

    name: str | None = None
    pool_name: str | None = None
    uuid: str | None = None
    size: int | None = None  # Size in bytes
    size_limit: str | None = None  # Size limit (human readable)
    origin: str | None = None  # Source filesystem name
    used: str | None = None  # Used space (human readable)

    # Mount point base path
    FS_PATH: ClassVar[str] = '/stratis'

    def __post_init__(self) -> None:
        """Initialize filesystem.

        Discovery process:
        1. Initialize base class
        2. Find pool and filesystem info
        3. Parse size information
        4. Set filesystem attributes
        """
        # Initialize base class with default config
        super().__init__(config=StratisConfig())

        # Discover filesystem info if needed
        if not self.pool_name or not self.name:
            result = self.run_command('report')
            if result.succeeded and result.stdout:
                try:
                    report = json.loads(result.stdout)
                    for pool in report['pools']:
                        for fs in pool['filesystems']:
                            if not self.name or self.name == fs['name']:
                                # Set pool and filesystem names
                                if not self.pool_name:
                                    self.pool_name = pool['name']
                                if not self.name:
                                    self.name = fs['name']

                                # Parse size if available
                                if not self.size and 'size' in fs:
                                    size_bytes = size_human_2_size_bytes(fs['size'])
                                    if size_bytes is not None:
                                        self.size = int(size_bytes)

                                # Set other attributes
                                if not self.uuid:
                                    self.uuid = fs.get('uuid')
                                if not self.size_limit:
                                    self.size_limit = fs.get('size_limit')
                                if not self.origin:
                                    self.origin = fs.get('origin')
                                if not self.used:
                                    self.used = fs.get('used')
                                break
                        if self.name and self.pool_name:
                            break
                except (KeyError, ValueError) as e:
                    logging.warning(f'Failed to parse filesystem info: {e}')

    def get_fs_uuid(self) -> str | None:
        """Get filesystem UUID.

        Retrieves UUID from system:
        - Requires pool and filesystem names
        - UUID is stable across reboots
        - Used for unique identification

        Returns:
            Filesystem UUID or None if not found

        Example:
            ```python
            fs.get_fs_uuid()
            '123e4567-e89b-12d3-a456-426614174000'
            ```
        """
        if not self.pool_name or not self.name:
            return None

        result = self.run_command('report')
        if result.failed or not result.stdout:
            return None

        try:
            report = json.loads(result.stdout)
            for pool in report['pools']:
                if self.pool_name != pool['name']:
                    continue
                for fs in pool['filesystems']:
                    if self.name != fs['name']:
                        continue
                    return fs['uuid']
        except (KeyError, ValueError) as e:
            logging.warning(f'Failed to get filesystem UUID: {e}')

        return None

    def create(self, size: str | None = None, size_limit: str | None = None) -> bool:
        """Create filesystem.

        Creates filesystem with:
        - Optional initial size
        - Optional size limit
        - Thin provisioning enabled
        - Default mount options

        Args:
            size: Initial size (e.g. "10G")
            size_limit: Size limit (e.g. "20G")

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            fs.create(size='10G', size_limit='20G')
            True
            ```
        """
        if not self.pool_name or not self.name:
            logging.error('Pool name and filesystem name required')
            return False

        options: StratisOptions = {}
        if size:
            options['--size'] = size
        if size_limit:
            options['--size-limit'] = size_limit

        result = self.run_command(
            subcommand='filesystem',
            action='create',
            options=options,
            positional_args=[self.pool_name, self.name],
        )
        return not result.failed

    def destroy(self) -> bool:
        """Destroy filesystem.

        Removes filesystem:
        - Unmounts if mounted
        - Removes from pool
        - Deletes all data
        - Cannot be undone

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            fs.destroy()
            True
            ```
        """
        if not self.pool_name or not self.name:
            logging.error('Pool name and filesystem name required')
            return False

        result = self.run_command(
            subcommand='filesystem',
            action='destroy',
            positional_args=[self.pool_name, self.name],
        )
        return not result.failed

    def rename(self, new_name: str) -> bool:
        """Rename filesystem.

        Changes filesystem name:
        - Updates mount points
        - Preserves data and settings
        - Updates snapshot references

        Args:
            new_name: New filesystem name

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            fs.rename('fs2')
            True
            ```
        """
        if not self.pool_name or not self.name:
            logging.error('Pool name and filesystem name required')
            return False

        result = self.run_command(
            subcommand='filesystem',
            action='rename',
            positional_args=[self.pool_name, self.name, new_name],
        )
        if not result.failed:
            self.name = new_name
        return not result.failed

    def snapshot(self, snapshot_name: str) -> StratisFilesystem | None:
        """Create filesystem snapshot.

        Creates copy-on-write snapshot:
        - Instant creation
        - Space-efficient
        - Tracks origin
        - Writable by default

        Args:
            snapshot_name: Snapshot name

        Returns:
            New filesystem instance or None if failed

        Example:
            ```python
            fs.snapshot('snap1')
            StratisFilesystem(name='snap1', ...)
            ```
        """
        if not self.pool_name or not self.name:
            logging.error('Pool name and filesystem name required')
            return None

        result = self.run_command(
            subcommand='filesystem',
            action='snapshot',
            positional_args=[self.pool_name, self.name, snapshot_name],
        )
        if result.failed:
            return None

        return StratisFilesystem(
            name=snapshot_name,
            pool_name=self.pool_name,
            size=self.size,
            origin=self.name,
        )

    def set_size_limit(self, limit: str) -> bool:
        """Set filesystem size limit.

        Limits filesystem growth:
        - Thin provisioning still active
        - Prevents space exhaustion
        - Can be changed later
        - Uses human-readable sizes

        Args:
            limit: Size limit (e.g. "20G")

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            fs.set_size_limit('20G')
            True
            ```
        """
        if not self.pool_name or not self.name:
            logging.error('Pool name and filesystem name required')
            return False

        result = self.run_command(
            subcommand='filesystem',
            action='set-size-limit',
            positional_args=[self.pool_name, self.name, limit],
        )
        if not result.failed:
            self.size_limit = limit
        return not result.failed

    def unset_size_limit(self) -> bool:
        """Unset filesystem size limit.

        Removes growth limit:
        - Allows unlimited growth
        - Limited only by pool size
        - Cannot be undone (must set new limit)

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            fs.unset_size_limit()
            True
            ```
        """
        if not self.pool_name or not self.name:
            logging.error('Pool name and filesystem name required')
            return False

        result = self.run_command(
            subcommand='filesystem',
            action='unset-size-limit',
            positional_args=[self.pool_name, self.name],
        )
        if not result.failed:
            self.size_limit = None
        return not result.failed

    @classmethod
    def from_report(cls, report: FilesystemReport, pool_name: str) -> StratisFilesystem | None:
        """Create filesystem from report.

        Parses report data:
        - Validates required fields
        - Converts size formats
        - Sets relationships

        Args:
            report: Filesystem report data
            pool_name: Pool name

        Returns:
            StratisFilesystem instance or None if invalid

        Example:
            ```python
            fs = StratisFilesystem.from_report(report, 'pool1')
            ```
        """
        if not report.name:
            return None

        size_bytes = None
        if report.size:
            size_bytes = size_human_2_size_bytes(report.size)
            if size_bytes is None:
                logging.warning(f'Invalid size: {report.size}, using None')

        return cls(
            name=report.name,
            pool_name=pool_name,
            size=int(size_bytes) if size_bytes is not None else None,
            uuid=report.uuid,
            size_limit=report.size_limit,
            origin=report.origin,
            used=report.used,
        )

    @classmethod
    def get_all(cls, pool_name: str | None = None) -> list[StratisFilesystem]:
        """Get all Stratis filesystems.

        Lists filesystems:
        - Optionally filtered by pool
        - Includes snapshots
        - Provides full details
        - Sorted by pool

        Args:
            pool_name: Filter by pool name

        Returns:
            List of StratisFilesystem instances

        Example:
            ```python
            StratisFilesystem.get_all('pool1')
            [StratisFilesystem(name='fs1', ...), StratisFilesystem(name='fs2', ...)]
            ```
        """
        filesystems: list[StratisFilesystem] = []
        # Create base instance without __post_init__
        base = super().__new__(cls)
        StratisBase.__init__(base, config=StratisConfig())

        result = base.run_command('report')
        if result.failed or not result.stdout:
            return filesystems

        try:
            report = json.loads(result.stdout)
            for pool_data in report['pools']:
                current_pool = pool_data.get('name')
                if not current_pool:
                    logging.warning('Pool missing name')
                    continue
                if pool_name and pool_name != current_pool:
                    continue
                filesystems.extend(
                    [
                        fs
                        for fs_data in pool_data.get('filesystems', [])
                        if (
                            fs := cls.from_report(
                                FilesystemReport.from_dict(fs_data) or FilesystemReport(), current_pool
                            )
                        )
                    ]
                )
        except (KeyError, ValueError) as e:
            logging.warning(f'Failed to parse report: {e}')

        return filesystems

__post_init__()

Initialize filesystem.

Discovery process: 1. Initialize base class 2. Find pool and filesystem info 3. Parse size information 4. Set filesystem attributes

Source code in sts_libs/src/sts/stratis/filesystem.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def __post_init__(self) -> None:
    """Initialize filesystem.

    Discovery process:
    1. Initialize base class
    2. Find pool and filesystem info
    3. Parse size information
    4. Set filesystem attributes
    """
    # Initialize base class with default config
    super().__init__(config=StratisConfig())

    # Discover filesystem info if needed
    if not self.pool_name or not self.name:
        result = self.run_command('report')
        if result.succeeded and result.stdout:
            try:
                report = json.loads(result.stdout)
                for pool in report['pools']:
                    for fs in pool['filesystems']:
                        if not self.name or self.name == fs['name']:
                            # Set pool and filesystem names
                            if not self.pool_name:
                                self.pool_name = pool['name']
                            if not self.name:
                                self.name = fs['name']

                            # Parse size if available
                            if not self.size and 'size' in fs:
                                size_bytes = size_human_2_size_bytes(fs['size'])
                                if size_bytes is not None:
                                    self.size = int(size_bytes)

                            # Set other attributes
                            if not self.uuid:
                                self.uuid = fs.get('uuid')
                            if not self.size_limit:
                                self.size_limit = fs.get('size_limit')
                            if not self.origin:
                                self.origin = fs.get('origin')
                            if not self.used:
                                self.used = fs.get('used')
                            break
                    if self.name and self.pool_name:
                        break
            except (KeyError, ValueError) as e:
                logging.warning(f'Failed to parse filesystem info: {e}')

create(size=None, size_limit=None)

Create filesystem.

Creates filesystem with: - Optional initial size - Optional size limit - Thin provisioning enabled - Default mount options

Parameters:

Name Type Description Default
size str | None

Initial size (e.g. "10G")

None
size_limit str | None

Size limit (e.g. "20G")

None

Returns:

Type Description
bool

True if successful, False otherwise

Example
fs.create(size='10G', size_limit='20G')
True
Source code in sts_libs/src/sts/stratis/filesystem.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def create(self, size: str | None = None, size_limit: str | None = None) -> bool:
    """Create filesystem.

    Creates filesystem with:
    - Optional initial size
    - Optional size limit
    - Thin provisioning enabled
    - Default mount options

    Args:
        size: Initial size (e.g. "10G")
        size_limit: Size limit (e.g. "20G")

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        fs.create(size='10G', size_limit='20G')
        True
        ```
    """
    if not self.pool_name or not self.name:
        logging.error('Pool name and filesystem name required')
        return False

    options: StratisOptions = {}
    if size:
        options['--size'] = size
    if size_limit:
        options['--size-limit'] = size_limit

    result = self.run_command(
        subcommand='filesystem',
        action='create',
        options=options,
        positional_args=[self.pool_name, self.name],
    )
    return not result.failed

destroy()

Destroy filesystem.

Removes filesystem: - Unmounts if mounted - Removes from pool - Deletes all data - Cannot be undone

Returns:

Type Description
bool

True if successful, False otherwise

Example
fs.destroy()
True
Source code in sts_libs/src/sts/stratis/filesystem.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def destroy(self) -> bool:
    """Destroy filesystem.

    Removes filesystem:
    - Unmounts if mounted
    - Removes from pool
    - Deletes all data
    - Cannot be undone

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        fs.destroy()
        True
        ```
    """
    if not self.pool_name or not self.name:
        logging.error('Pool name and filesystem name required')
        return False

    result = self.run_command(
        subcommand='filesystem',
        action='destroy',
        positional_args=[self.pool_name, self.name],
    )
    return not result.failed

from_report(report, pool_name) classmethod

Create filesystem from report.

Parses report data: - Validates required fields - Converts size formats - Sets relationships

Parameters:

Name Type Description Default
report FilesystemReport

Filesystem report data

required
pool_name str

Pool name

required

Returns:

Type Description
StratisFilesystem | None

StratisFilesystem instance or None if invalid

Example
fs = StratisFilesystem.from_report(report, 'pool1')
Source code in sts_libs/src/sts/stratis/filesystem.py
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
@classmethod
def from_report(cls, report: FilesystemReport, pool_name: str) -> StratisFilesystem | None:
    """Create filesystem from report.

    Parses report data:
    - Validates required fields
    - Converts size formats
    - Sets relationships

    Args:
        report: Filesystem report data
        pool_name: Pool name

    Returns:
        StratisFilesystem instance or None if invalid

    Example:
        ```python
        fs = StratisFilesystem.from_report(report, 'pool1')
        ```
    """
    if not report.name:
        return None

    size_bytes = None
    if report.size:
        size_bytes = size_human_2_size_bytes(report.size)
        if size_bytes is None:
            logging.warning(f'Invalid size: {report.size}, using None')

    return cls(
        name=report.name,
        pool_name=pool_name,
        size=int(size_bytes) if size_bytes is not None else None,
        uuid=report.uuid,
        size_limit=report.size_limit,
        origin=report.origin,
        used=report.used,
    )

get_all(pool_name=None) classmethod

Get all Stratis filesystems.

Lists filesystems: - Optionally filtered by pool - Includes snapshots - Provides full details - Sorted by pool

Parameters:

Name Type Description Default
pool_name str | None

Filter by pool name

None

Returns:

Type Description
list[StratisFilesystem]

List of StratisFilesystem instances

Example
StratisFilesystem.get_all('pool1')
[StratisFilesystem(name='fs1', ...), StratisFilesystem(name='fs2', ...)]
Source code in sts_libs/src/sts/stratis/filesystem.py
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
@classmethod
def get_all(cls, pool_name: str | None = None) -> list[StratisFilesystem]:
    """Get all Stratis filesystems.

    Lists filesystems:
    - Optionally filtered by pool
    - Includes snapshots
    - Provides full details
    - Sorted by pool

    Args:
        pool_name: Filter by pool name

    Returns:
        List of StratisFilesystem instances

    Example:
        ```python
        StratisFilesystem.get_all('pool1')
        [StratisFilesystem(name='fs1', ...), StratisFilesystem(name='fs2', ...)]
        ```
    """
    filesystems: list[StratisFilesystem] = []
    # Create base instance without __post_init__
    base = super().__new__(cls)
    StratisBase.__init__(base, config=StratisConfig())

    result = base.run_command('report')
    if result.failed or not result.stdout:
        return filesystems

    try:
        report = json.loads(result.stdout)
        for pool_data in report['pools']:
            current_pool = pool_data.get('name')
            if not current_pool:
                logging.warning('Pool missing name')
                continue
            if pool_name and pool_name != current_pool:
                continue
            filesystems.extend(
                [
                    fs
                    for fs_data in pool_data.get('filesystems', [])
                    if (
                        fs := cls.from_report(
                            FilesystemReport.from_dict(fs_data) or FilesystemReport(), current_pool
                        )
                    )
                ]
            )
    except (KeyError, ValueError) as e:
        logging.warning(f'Failed to parse report: {e}')

    return filesystems

get_fs_uuid()

Get filesystem UUID.

Retrieves UUID from system: - Requires pool and filesystem names - UUID is stable across reboots - Used for unique identification

Returns:

Type Description
str | None

Filesystem UUID or None if not found

Example
fs.get_fs_uuid()
'123e4567-e89b-12d3-a456-426614174000'
Source code in sts_libs/src/sts/stratis/filesystem.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def get_fs_uuid(self) -> str | None:
    """Get filesystem UUID.

    Retrieves UUID from system:
    - Requires pool and filesystem names
    - UUID is stable across reboots
    - Used for unique identification

    Returns:
        Filesystem UUID or None if not found

    Example:
        ```python
        fs.get_fs_uuid()
        '123e4567-e89b-12d3-a456-426614174000'
        ```
    """
    if not self.pool_name or not self.name:
        return None

    result = self.run_command('report')
    if result.failed or not result.stdout:
        return None

    try:
        report = json.loads(result.stdout)
        for pool in report['pools']:
            if self.pool_name != pool['name']:
                continue
            for fs in pool['filesystems']:
                if self.name != fs['name']:
                    continue
                return fs['uuid']
    except (KeyError, ValueError) as e:
        logging.warning(f'Failed to get filesystem UUID: {e}')

    return None

rename(new_name)

Rename filesystem.

Changes filesystem name: - Updates mount points - Preserves data and settings - Updates snapshot references

Parameters:

Name Type Description Default
new_name str

New filesystem name

required

Returns:

Type Description
bool

True if successful, False otherwise

Example
fs.rename('fs2')
True
Source code in sts_libs/src/sts/stratis/filesystem.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
def rename(self, new_name: str) -> bool:
    """Rename filesystem.

    Changes filesystem name:
    - Updates mount points
    - Preserves data and settings
    - Updates snapshot references

    Args:
        new_name: New filesystem name

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        fs.rename('fs2')
        True
        ```
    """
    if not self.pool_name or not self.name:
        logging.error('Pool name and filesystem name required')
        return False

    result = self.run_command(
        subcommand='filesystem',
        action='rename',
        positional_args=[self.pool_name, self.name, new_name],
    )
    if not result.failed:
        self.name = new_name
    return not result.failed

set_size_limit(limit)

Set filesystem size limit.

Limits filesystem growth: - Thin provisioning still active - Prevents space exhaustion - Can be changed later - Uses human-readable sizes

Parameters:

Name Type Description Default
limit str

Size limit (e.g. "20G")

required

Returns:

Type Description
bool

True if successful, False otherwise

Example
fs.set_size_limit('20G')
True
Source code in sts_libs/src/sts/stratis/filesystem.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
def set_size_limit(self, limit: str) -> bool:
    """Set filesystem size limit.

    Limits filesystem growth:
    - Thin provisioning still active
    - Prevents space exhaustion
    - Can be changed later
    - Uses human-readable sizes

    Args:
        limit: Size limit (e.g. "20G")

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        fs.set_size_limit('20G')
        True
        ```
    """
    if not self.pool_name or not self.name:
        logging.error('Pool name and filesystem name required')
        return False

    result = self.run_command(
        subcommand='filesystem',
        action='set-size-limit',
        positional_args=[self.pool_name, self.name, limit],
    )
    if not result.failed:
        self.size_limit = limit
    return not result.failed

snapshot(snapshot_name)

Create filesystem snapshot.

Creates copy-on-write snapshot: - Instant creation - Space-efficient - Tracks origin - Writable by default

Parameters:

Name Type Description Default
snapshot_name str

Snapshot name

required

Returns:

Type Description
StratisFilesystem | None

New filesystem instance or None if failed

Example
fs.snapshot('snap1')
StratisFilesystem(name='snap1', ...)
Source code in sts_libs/src/sts/stratis/filesystem.py
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
def snapshot(self, snapshot_name: str) -> StratisFilesystem | None:
    """Create filesystem snapshot.

    Creates copy-on-write snapshot:
    - Instant creation
    - Space-efficient
    - Tracks origin
    - Writable by default

    Args:
        snapshot_name: Snapshot name

    Returns:
        New filesystem instance or None if failed

    Example:
        ```python
        fs.snapshot('snap1')
        StratisFilesystem(name='snap1', ...)
        ```
    """
    if not self.pool_name or not self.name:
        logging.error('Pool name and filesystem name required')
        return None

    result = self.run_command(
        subcommand='filesystem',
        action='snapshot',
        positional_args=[self.pool_name, self.name, snapshot_name],
    )
    if result.failed:
        return None

    return StratisFilesystem(
        name=snapshot_name,
        pool_name=self.pool_name,
        size=self.size,
        origin=self.name,
    )

unset_size_limit()

Unset filesystem size limit.

Removes growth limit: - Allows unlimited growth - Limited only by pool size - Cannot be undone (must set new limit)

Returns:

Type Description
bool

True if successful, False otherwise

Example
fs.unset_size_limit()
True
Source code in sts_libs/src/sts/stratis/filesystem.py
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
def unset_size_limit(self) -> bool:
    """Unset filesystem size limit.

    Removes growth limit:
    - Allows unlimited growth
    - Limited only by pool size
    - Cannot be undone (must set new limit)

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        fs.unset_size_limit()
        True
        ```
    """
    if not self.pool_name or not self.name:
        logging.error('Pool name and filesystem name required')
        return False

    result = self.run_command(
        subcommand='filesystem',
        action='unset-size-limit',
        positional_args=[self.pool_name, self.name],
    )
    if not result.failed:
        self.size_limit = None
    return not result.failed