Skip to content

Stratis

This section documents the Stratis storage management functionality.

Base Functionality

sts.stratis.base

Base Stratis functionality.

This module provides base functionality for Stratis operations: - Command execution - Common utilities

Stratis is a storage management solution that provides: - Advanced storage pools - Thin provisioning - Snapshots - RAID support - Encryption

Key

Bases: StratisBase

Stratis key management.

Manages encryption keys for: - Pool encryption - Data security - Access control

Keys are identified by: - Key description (user-friendly name) - Key file path (contains actual key)

Examples:

Create with default configuration: key = Key()

Create with error propagation: key = Key(StratisConfig(propagate=True))

Source code in sts_libs/src/sts/stratis/base.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
class Key(StratisBase):
    """Stratis key management.

    Manages encryption keys for:
    - Pool encryption
    - Data security
    - Access control

    Keys are identified by:
    - Key description (user-friendly name)
    - Key file path (contains actual key)

    Examples:
        Create with default configuration:
        key = Key()

        Create with error propagation:
        key = Key(StratisConfig(propagate=True))
    """

    def set(self, keydesc: str, keyfile_path: str) -> CommandResult:
        """Set key.

        Associates a key file with a description:
        - Key file must exist
        - Description must be unique
        - Used for pool encryption

        Args:
            keydesc: Key description (identifier)
            keyfile_path: Path to key file

        Returns:
            Command result

        Example:
            ```python
            Register encryption key:
            key.set('mykey', '/path/to/keyfile')
            ```
        """
        return self.run_command(
            'key',
            'set',
            options={'--keyfile-path': keyfile_path},
            positional_args=[keydesc],
        )

    def unset(self, keydesc: str) -> CommandResult:
        """Unset key.

        Removes key association:
        - Key must not be in use
        - Does not delete key file
        - Cannot undo operation

        Args:
            keydesc: Key description

        Returns:
            Command result

        Example:
            ```python
            Remove key registration:
            key.unset('mykey')
            ```
        """
        return self.run_command('key', 'unset', positional_args=[keydesc])

    def list(self) -> CommandResult:
        """List keys.

        Shows all registered keys:
        - Key descriptions only
        - No key file contents
        - No usage information

        Returns:
            Command result

        Example:
            ```python
            Show registered keys:
            key.list()
            ```
        """
        return self.run_command('key', 'list')

    def exists(self, keydesc: str) -> bool:
        """Check if key exists.

        Verifies key registration:
        - Checks key description
        - Does not verify key file
        - Does not check key validity

        Args:
            keydesc: Key description

        Returns:
            True if key exists, False otherwise

        Example:
            ```python
            Check key registration:
            exists = key.exists('mykey')
            ```
        """
        result = self.list()
        return bool(result.succeeded and keydesc in result.stdout)

exists(keydesc)

Check if key exists.

Verifies key registration: - Checks key description - Does not verify key file - Does not check key validity

Parameters:

Name Type Description Default
keydesc str

Key description

required

Returns:

Type Description
bool

True if key exists, False otherwise

Example
Check key registration:
exists = key.exists('mykey')
Source code in sts_libs/src/sts/stratis/base.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
def exists(self, keydesc: str) -> bool:
    """Check if key exists.

    Verifies key registration:
    - Checks key description
    - Does not verify key file
    - Does not check key validity

    Args:
        keydesc: Key description

    Returns:
        True if key exists, False otherwise

    Example:
        ```python
        Check key registration:
        exists = key.exists('mykey')
        ```
    """
    result = self.list()
    return bool(result.succeeded and keydesc in result.stdout)

list()

List keys.

Shows all registered keys: - Key descriptions only - No key file contents - No usage information

Returns:

Type Description
CommandResult

Command result

Example
Show registered keys:
key.list()
Source code in sts_libs/src/sts/stratis/base.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
def list(self) -> CommandResult:
    """List keys.

    Shows all registered keys:
    - Key descriptions only
    - No key file contents
    - No usage information

    Returns:
        Command result

    Example:
        ```python
        Show registered keys:
        key.list()
        ```
    """
    return self.run_command('key', 'list')

set(keydesc, keyfile_path)

Set key.

Associates a key file with a description: - Key file must exist - Description must be unique - Used for pool encryption

Parameters:

Name Type Description Default
keydesc str

Key description (identifier)

required
keyfile_path str

Path to key file

required

Returns:

Type Description
CommandResult

Command result

Example
Register encryption key:
key.set('mykey', '/path/to/keyfile')
Source code in sts_libs/src/sts/stratis/base.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def set(self, keydesc: str, keyfile_path: str) -> CommandResult:
    """Set key.

    Associates a key file with a description:
    - Key file must exist
    - Description must be unique
    - Used for pool encryption

    Args:
        keydesc: Key description (identifier)
        keyfile_path: Path to key file

    Returns:
        Command result

    Example:
        ```python
        Register encryption key:
        key.set('mykey', '/path/to/keyfile')
        ```
    """
    return self.run_command(
        'key',
        'set',
        options={'--keyfile-path': keyfile_path},
        positional_args=[keydesc],
    )

unset(keydesc)

Unset key.

Removes key association: - Key must not be in use - Does not delete key file - Cannot undo operation

Parameters:

Name Type Description Default
keydesc str

Key description

required

Returns:

Type Description
CommandResult

Command result

Example
Remove key registration:
key.unset('mykey')
Source code in sts_libs/src/sts/stratis/base.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def unset(self, keydesc: str) -> CommandResult:
    """Unset key.

    Removes key association:
    - Key must not be in use
    - Does not delete key file
    - Cannot undo operation

    Args:
        keydesc: Key description

    Returns:
        Command result

    Example:
        ```python
        Remove key registration:
        key.unset('mykey')
        ```
    """
    return self.run_command('key', 'unset', positional_args=[keydesc])

StratisBase

Base class for Stratis operations.

Provides common functionality: - Command execution with options - Error handling - Version information - System reporting

Parameters:

Name Type Description Default
config StratisConfig | None

Stratis configuration (optional)

None

Examples:

Create with default configuration: stratis = StratisBase()

Create with custom configuration: stratis = StratisBase(StratisConfig(propagate=True))

Source code in sts_libs/src/sts/stratis/base.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
class StratisBase:
    """Base class for Stratis operations.

    Provides common functionality:
    - Command execution with options
    - Error handling
    - Version information
    - System reporting

    Args:
        config: Stratis configuration (optional)

    Examples:
        Create with default configuration:
        stratis = StratisBase()

        Create with custom configuration:
        stratis = StratisBase(StratisConfig(propagate=True))
    """

    def __init__(self, config: StratisConfig | None = None) -> None:
        """Initialize Stratis base.

        Args:
            config: Stratis configuration
        """
        self.config = config or StratisConfig()

    def run_command(
        self,
        subcommand: str | None = None,
        action: str | None = None,
        options: StratisOptions | None = None,
        positional_args: list[str] | None = None,
    ) -> CommandResult:
        """Run stratis command.

        Command Structure:
        The command starts with 'stratis', followed by any global options from the
        configuration. Next comes the subcommand (like 'pool' or 'filesystem'),
        then the action (like 'create' or 'list'). Finally, any command-specific
        options and positional arguments are added.

        Args:
            subcommand: Command category (pool, filesystem, key)
            action: Operation to perform (list, create, set)
            options: Command-specific options as key-value pairs
            positional_args: Additional arguments

        Returns:
            Command result

        Examples:
            List all pools:
            stratis.run_command('pool', 'list')

            Create a filesystem:
            stratis.run_command('filesystem', 'create',
                              positional_args=['pool1', 'fs1'])

            Set encryption key:
            stratis.run_command('key', 'set',
                              options={'--keyfile-path': '/path/to/key'},
                              positional_args=['mykey'])
        """
        command_list: list[str] = [CLI_NAME]
        command_list.extend(self.config.to_args())

        if subcommand is not None:
            command_list.append(subcommand)
        if action is not None:
            command_list.append(action)
        if options is not None:
            command_list.extend(k if v is None else f'{k} {v}' for k, v in options.items())
        if positional_args:
            command_list.extend(positional_args)

        result = run(' '.join(command_list))
        if result.failed and self.config.propagate:
            raise StratisError(f'Command failed: {result.stderr}')
        return result

    def version(self) -> CommandResult:
        """Get Stratis version.

        Returns version information for:
        - stratisd daemon
        - stratis-cli tool
        - libstratis library

        Returns:
            Version information

        Example:
            ```python
            Get version information:
            version = stratis.version()
            ```
        """
        return self.run_command(action='--version')

    def get_report(self) -> StratisReportData | None:
        """Get Stratis report.

        Retrieves system-wide information about:
        - Storage pools
        - Filesystems
        - Block devices
        - Cache devices

        Returns:
            Report data or None if failed

        Example:
            ```python
            Get system report:
            report = stratis.get_report()
            ```
        """
        result = self.run_command('report')
        if result.failed or not result.stdout:
            return None

        try:
            return json.loads(result.stdout)
        except json.JSONDecodeError:
            return None

__init__(config=None)

Initialize Stratis base.

Parameters:

Name Type Description Default
config StratisConfig | None

Stratis configuration

None
Source code in sts_libs/src/sts/stratis/base.py
103
104
105
106
107
108
109
def __init__(self, config: StratisConfig | None = None) -> None:
    """Initialize Stratis base.

    Args:
        config: Stratis configuration
    """
    self.config = config or StratisConfig()

get_report()

Get Stratis report.

Retrieves system-wide information about: - Storage pools - Filesystems - Block devices - Cache devices

Returns:

Type Description
StratisReportData | None

Report data or None if failed

Example
Get system report:
report = stratis.get_report()
Source code in sts_libs/src/sts/stratis/base.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def get_report(self) -> StratisReportData | None:
    """Get Stratis report.

    Retrieves system-wide information about:
    - Storage pools
    - Filesystems
    - Block devices
    - Cache devices

    Returns:
        Report data or None if failed

    Example:
        ```python
        Get system report:
        report = stratis.get_report()
        ```
    """
    result = self.run_command('report')
    if result.failed or not result.stdout:
        return None

    try:
        return json.loads(result.stdout)
    except json.JSONDecodeError:
        return None

run_command(subcommand=None, action=None, options=None, positional_args=None)

Run stratis command.

Command Structure: The command starts with 'stratis', followed by any global options from the configuration. Next comes the subcommand (like 'pool' or 'filesystem'), then the action (like 'create' or 'list'). Finally, any command-specific options and positional arguments are added.

Parameters:

Name Type Description Default
subcommand str | None

Command category (pool, filesystem, key)

None
action str | None

Operation to perform (list, create, set)

None
options StratisOptions | None

Command-specific options as key-value pairs

None
positional_args list[str] | None

Additional arguments

None

Returns:

Type Description
CommandResult

Command result

Examples:

List all pools: stratis.run_command('pool', 'list')

Create a filesystem: stratis.run_command('filesystem', 'create', positional_args=['pool1', 'fs1'])

Set encryption key: stratis.run_command('key', 'set', options={'--keyfile-path': '/path/to/key'}, positional_args=['mykey'])

Source code in sts_libs/src/sts/stratis/base.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def run_command(
    self,
    subcommand: str | None = None,
    action: str | None = None,
    options: StratisOptions | None = None,
    positional_args: list[str] | None = None,
) -> CommandResult:
    """Run stratis command.

    Command Structure:
    The command starts with 'stratis', followed by any global options from the
    configuration. Next comes the subcommand (like 'pool' or 'filesystem'),
    then the action (like 'create' or 'list'). Finally, any command-specific
    options and positional arguments are added.

    Args:
        subcommand: Command category (pool, filesystem, key)
        action: Operation to perform (list, create, set)
        options: Command-specific options as key-value pairs
        positional_args: Additional arguments

    Returns:
        Command result

    Examples:
        List all pools:
        stratis.run_command('pool', 'list')

        Create a filesystem:
        stratis.run_command('filesystem', 'create',
                          positional_args=['pool1', 'fs1'])

        Set encryption key:
        stratis.run_command('key', 'set',
                          options={'--keyfile-path': '/path/to/key'},
                          positional_args=['mykey'])
    """
    command_list: list[str] = [CLI_NAME]
    command_list.extend(self.config.to_args())

    if subcommand is not None:
        command_list.append(subcommand)
    if action is not None:
        command_list.append(action)
    if options is not None:
        command_list.extend(k if v is None else f'{k} {v}' for k, v in options.items())
    if positional_args:
        command_list.extend(positional_args)

    result = run(' '.join(command_list))
    if result.failed and self.config.propagate:
        raise StratisError(f'Command failed: {result.stderr}')
    return result

version()

Get Stratis version.

Returns version information for: - stratisd daemon - stratis-cli tool - libstratis library

Returns:

Type Description
CommandResult

Version information

Example
Get version information:
version = stratis.version()
Source code in sts_libs/src/sts/stratis/base.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def version(self) -> CommandResult:
    """Get Stratis version.

    Returns version information for:
    - stratisd daemon
    - stratis-cli tool
    - libstratis library

    Returns:
        Version information

    Example:
        ```python
        Get version information:
        version = stratis.version()
        ```
    """
    return self.run_command(action='--version')

StratisConfig dataclass

Stratis configuration.

Controls Stratis behavior: - Error handling: Whether to raise exceptions - UUID format: Hyphenated or not - Command execution: Global options

Parameters:

Name Type Description Default
propagate bool

Whether to propagate errors as exceptions

False
unhyphenated_uuids bool

Whether to use UUIDs without hyphens

False

Examples:

Create with default settings: stratis = StratisConfig()

Create with error propagation: stratis = StratisConfig(propagate=True)

Source code in sts_libs/src/sts/stratis/base.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@dataclass
class StratisConfig:
    """Stratis configuration.

    Controls Stratis behavior:
    - Error handling: Whether to raise exceptions
    - UUID format: Hyphenated or not
    - Command execution: Global options

    Args:
        propagate: Whether to propagate errors as exceptions
        unhyphenated_uuids: Whether to use UUIDs without hyphens

    Examples:
        Create with default settings:
        stratis = StratisConfig()

        Create with error propagation:
        stratis = StratisConfig(propagate=True)
    """

    propagate: bool = False  # Raise exceptions on error
    unhyphenated_uuids: bool = False  # Use UUIDs without hyphens

    def to_args(self) -> list[str]:
        """Convert configuration to command arguments.

        Generates CLI options based on settings:
        - --propagate: Raise exceptions
        - --unhyphenated_uuids: Change UUID format

        Returns:
            List of command arguments

        Example:
            ```python
            Get command line arguments:
            config = StratisConfig(propagate=True)
            args = config.to_args()  # Returns ['--propagate']
            ```
        """
        args = []
        if self.propagate:
            args.append('--propagate')
        if self.unhyphenated_uuids:
            args.append('--unhyphenated_uuids')
        return args

to_args()

Convert configuration to command arguments.

Generates CLI options based on settings: - --propagate: Raise exceptions - --unhyphenated_uuids: Change UUID format

Returns:

Type Description
list[str]

List of command arguments

Example
Get command line arguments:
config = StratisConfig(propagate=True)
args = config.to_args()  # Returns ['--propagate']
Source code in sts_libs/src/sts/stratis/base.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def to_args(self) -> list[str]:
    """Convert configuration to command arguments.

    Generates CLI options based on settings:
    - --propagate: Raise exceptions
    - --unhyphenated_uuids: Change UUID format

    Returns:
        List of command arguments

    Example:
        ```python
        Get command line arguments:
        config = StratisConfig(propagate=True)
        args = config.to_args()  # Returns ['--propagate']
        ```
    """
    args = []
    if self.propagate:
        args.append('--propagate')
    if self.unhyphenated_uuids:
        args.append('--unhyphenated_uuids')
    return args

Pool Management

sts.stratis.pool

Stratis pool management.

This module provides functionality for managing Stratis pools: - Pool creation - Pool operations - Pool encryption

BlockDevInfo dataclass

Block device information from stratis report.

Parameters:

Name Type Description Default
path str | None

Device path

None
size str | None

Device size in sectors

None
uuid str | None

Device UUID

None
in_use bool

Whether device is in use

False
blksizes str | None

Block size information

None
Source code in sts_libs/src/sts/stratis/pool.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@dataclass
class BlockDevInfo:
    """Block device information from stratis report.

    Args:
        path: Device path
        size: Device size in sectors
        uuid: Device UUID
        in_use: Whether device is in use
        blksizes: Block size information
    """

    path: str | None = None
    size: str | None = None
    uuid: str | None = None
    in_use: bool = False
    blksizes: str | None = None

    @staticmethod
    def parse_bool(value: bool | int | str | None) -> bool:
        """Parse boolean value from stratis output.

        Args:
            value: Value to parse (can be bool, int, str, or None)

        Returns:
            Parsed boolean value
        """
        if isinstance(value, bool):
            return value
        if isinstance(value, int):
            return bool(value)
        if isinstance(value, str):
            return value.lower() in ('true', '1', 'yes', 'on')
        return False

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> BlockDevInfo:
        """Create device info from dictionary.

        Args:
            data: Dictionary data

        Returns:
            BlockDevInfo instance
        """
        return cls(
            path=data.get('path'),
            size=data.get('size'),
            uuid=data.get('uuid'),
            in_use=cls.parse_bool(data.get('in_use')),
            blksizes=data.get('blksizes'),
        )

from_dict(data) classmethod

Create device info from dictionary.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary data

required

Returns:

Type Description
BlockDevInfo

BlockDevInfo instance

Source code in sts_libs/src/sts/stratis/pool.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@classmethod
def from_dict(cls, data: dict[str, Any]) -> BlockDevInfo:
    """Create device info from dictionary.

    Args:
        data: Dictionary data

    Returns:
        BlockDevInfo instance
    """
    return cls(
        path=data.get('path'),
        size=data.get('size'),
        uuid=data.get('uuid'),
        in_use=cls.parse_bool(data.get('in_use')),
        blksizes=data.get('blksizes'),
    )

parse_bool(value) staticmethod

Parse boolean value from stratis output.

Parameters:

Name Type Description Default
value bool | int | str | None

Value to parse (can be bool, int, str, or None)

required

Returns:

Type Description
bool

Parsed boolean value

Source code in sts_libs/src/sts/stratis/pool.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@staticmethod
def parse_bool(value: bool | int | str | None) -> bool:
    """Parse boolean value from stratis output.

    Args:
        value: Value to parse (can be bool, int, str, or None)

    Returns:
        Parsed boolean value
    """
    if isinstance(value, bool):
        return value
    if isinstance(value, int):
        return bool(value)
    if isinstance(value, str):
        return value.lower() in ('true', '1', 'yes', 'on')
    return False

BlockDevs dataclass

Block devices from stratis report.

Parameters:

Name Type Description Default
datadevs list[BlockDevInfo]

List of data devices

list()
cachedevs list[BlockDevInfo]

List of cache devices

list()
Source code in sts_libs/src/sts/stratis/pool.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
@dataclass
class BlockDevs:
    """Block devices from stratis report.

    Args:
        datadevs: List of data devices
        cachedevs: List of cache devices
    """

    datadevs: list[BlockDevInfo] = field(default_factory=list)
    cachedevs: list[BlockDevInfo] = field(default_factory=list)

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> BlockDevs:
        """Create block devices from dictionary.

        Args:
            data: Dictionary data

        Returns:
            BlockDevs instance
        """
        return cls(
            datadevs=[BlockDevInfo.from_dict(dev) for dev in data.get('datadevs', [])],
            cachedevs=[BlockDevInfo.from_dict(dev) for dev in data.get('cachedevs', [])],
        )

from_dict(data) classmethod

Create block devices from dictionary.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary data

required

Returns:

Type Description
BlockDevs

BlockDevs instance

Source code in sts_libs/src/sts/stratis/pool.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
@classmethod
def from_dict(cls, data: dict[str, Any]) -> BlockDevs:
    """Create block devices from dictionary.

    Args:
        data: Dictionary data

    Returns:
        BlockDevs instance
    """
    return cls(
        datadevs=[BlockDevInfo.from_dict(dev) for dev in data.get('datadevs', [])],
        cachedevs=[BlockDevInfo.from_dict(dev) for dev in data.get('cachedevs', [])],
    )

PoolCreateConfig dataclass

Pool creation configuration.

Parameters:

Name Type Description Default
key_desc str | None

Key description for keyring encryption (optional)

None
tang_url str | None

Tang server URL for tang encryption (optional)

None
thumbprint str | None

Tang server thumbprint (optional)

None
clevis str | None

Clevis encryption configuration (optional)

None
trust_url bool

Trust Tang server URL (optional)

False
no_overprovision bool

Disable overprovisioning (optional)

False
Example
config = PoolCreateConfig()  # Uses defaults
config = PoolCreateConfig(key_desc='mykey')  # Custom settings
Source code in sts_libs/src/sts/stratis/pool.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
@dataclass
class PoolCreateConfig:
    """Pool creation configuration.

    Args:
        key_desc: Key description for keyring encryption (optional)
        tang_url: Tang server URL for tang encryption (optional)
        thumbprint: Tang server thumbprint (optional)
        clevis: Clevis encryption configuration (optional)
        trust_url: Trust Tang server URL (optional)
        no_overprovision: Disable overprovisioning (optional)

    Example:
        ```python
        config = PoolCreateConfig()  # Uses defaults
        config = PoolCreateConfig(key_desc='mykey')  # Custom settings
        ```
    """

    # Optional parameters
    key_desc: str | None = None
    tang_url: str | None = None
    thumbprint: str | None = None
    clevis: str | None = None
    trust_url: bool = False
    no_overprovision: bool = False

PoolReport dataclass

Pool report data.

Parameters:

Name Type Description Default
name str | None

Pool name (optional, discovered from system)

None
blockdevs BlockDevs

Block devices (optional, discovered from system)

BlockDevs()
uuid str | None

Pool UUID (optional, discovered from system)

None
encryption EncryptionType | None

Encryption type (optional, discovered from system)

None
fs_limit int | None

Filesystem limit (optional)

None
available_actions str | None

Available actions (optional)

None
filesystems list[Any]

List of filesystems (optional)

list()
Example
report = PoolReport()  # Discovers first available pool
report = PoolReport(name='pool1')  # Discovers other values
Source code in sts_libs/src/sts/stratis/pool.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
@dataclass
class PoolReport:
    """Pool report data.

    Args:
        name: Pool name (optional, discovered from system)
        blockdevs: Block devices (optional, discovered from system)
        uuid: Pool UUID (optional, discovered from system)
        encryption: Encryption type (optional, discovered from system)
        fs_limit: Filesystem limit (optional)
        available_actions: Available actions (optional)
        filesystems: List of filesystems (optional)

    Example:
        ```python
        report = PoolReport()  # Discovers first available pool
        report = PoolReport(name='pool1')  # Discovers other values
        ```
    """

    name: str | None = None
    blockdevs: BlockDevs = field(default_factory=BlockDevs)
    uuid: str | None = None
    encryption: EncryptionType | None = None
    fs_limit: int | None = None
    available_actions: str | None = None
    filesystems: list[Any] = field(default_factory=list)

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> PoolReport | None:
        """Create report from dictionary.

        Args:
            data: Dictionary data

        Returns:
            PoolReport instance or None if invalid
        """
        try:
            return cls(
                name=data.get('name'),
                blockdevs=BlockDevs.from_dict(data.get('blockdevs', {})),
                uuid=data.get('uuid'),
                encryption=data.get('encryption'),
                fs_limit=data.get('fs_limit'),
                available_actions=data.get('available_actions'),
                filesystems=data.get('filesystems', []),
            )
        except (KeyError, TypeError) as e:
            logging.warning(f'Invalid pool report data: {e}')
            return None

from_dict(data) classmethod

Create report from dictionary.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary data

required

Returns:

Type Description
PoolReport | None

PoolReport instance or None if invalid

Source code in sts_libs/src/sts/stratis/pool.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
@classmethod
def from_dict(cls, data: dict[str, Any]) -> PoolReport | None:
    """Create report from dictionary.

    Args:
        data: Dictionary data

    Returns:
        PoolReport instance or None if invalid
    """
    try:
        return cls(
            name=data.get('name'),
            blockdevs=BlockDevs.from_dict(data.get('blockdevs', {})),
            uuid=data.get('uuid'),
            encryption=data.get('encryption'),
            fs_limit=data.get('fs_limit'),
            available_actions=data.get('available_actions'),
            filesystems=data.get('filesystems', []),
        )
    except (KeyError, TypeError) as e:
        logging.warning(f'Invalid pool report data: {e}')
        return None

StratisPool dataclass

Bases: StratisBase

Stratis pool representation.

This class provides functionality for managing Stratis pools: - Pool creation - Pool operations - Pool encryption

Parameters:

Name Type Description Default
name str | None

Pool name (optional, discovered from system)

None
uuid str | None

Pool UUID (optional, discovered from system)

None
encryption EncryptionType | None

Encryption type (optional, discovered from system)

None
blockdevs list[str]

List of block devices (optional, discovered from system)

list()
Example
pool = StratisPool()  # Discovers first available pool
pool = StratisPool(name='pool1')  # Discovers other values
Source code in sts_libs/src/sts/stratis/pool.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
@dataclass
class StratisPool(StratisBase):
    """Stratis pool representation.

    This class provides functionality for managing Stratis pools:
    - Pool creation
    - Pool operations
    - Pool encryption

    Args:
        name: Pool name (optional, discovered from system)
        uuid: Pool UUID (optional, discovered from system)
        encryption: Encryption type (optional, discovered from system)
        blockdevs: List of block devices (optional, discovered from system)

    Example:
        ```python
        pool = StratisPool()  # Discovers first available pool
        pool = StratisPool(name='pool1')  # Discovers other values
        ```
    """

    name: str | None = None
    uuid: str | None = None
    encryption: EncryptionType | None = None
    blockdevs: list[str] = field(default_factory=list)

    # Class-level paths
    POOL_PATH: ClassVar[str] = '/stratis/pool'

    def __post_init__(self) -> None:
        """Initialize pool."""
        # Initialize base class with default config
        super().__init__(config=StratisConfig())

        # Discover pool info if needed
        if not self.uuid or not self.encryption or not self.blockdevs:
            result = self.run_command('report')
            if result.succeeded and result.stdout:
                try:
                    report = json.loads(result.stdout)
                    if not isinstance(report, dict):
                        logging.warning('Invalid report format: not a dictionary')
                        return

                    pools = report.get('pools', [])
                    if not isinstance(pools, list):
                        logging.warning('Invalid pools format: not a list')
                        return

                    for pool in pools:
                        if not isinstance(pool, dict):
                            logging.warning('Invalid pool format: not a dictionary')
                            continue

                        if not self.name or self.name == pool.get('name'):
                            if not self.name:
                                self.name = pool.get('name')
                            if not self.uuid:
                                self.uuid = pool.get('uuid')
                            if not self.encryption:
                                self.encryption = pool.get('encryption')
                            if not self.blockdevs:
                                blockdevs = pool.get('blockdevs', {})
                                if not isinstance(blockdevs, dict):
                                    logging.warning('Invalid blockdevs format: not a dictionary')
                                    continue

                                # Get paths from both data and cache devices
                                self.blockdevs = [
                                    path
                                    for dev in blockdevs.get('datadevs', [])
                                    if isinstance(dev, dict) and (path := dev.get('path'))
                                ] + [
                                    path
                                    for dev in blockdevs.get('cachedevs', [])
                                    if isinstance(dev, dict) and (path := dev.get('path'))
                                ]
                            break
                except (json.JSONDecodeError, KeyError, ValueError, TypeError) as e:
                    logging.warning(f'Failed to parse pool info: {e}')

    def get_pool_uuid(self) -> str | None:
        """Get pool UUID.

        Returns:
            Pool UUID or None if not found

        Example:
            ```python
            pool.get_pool_uuid()
            '123e4567-e89b-12d3-a456-426614174000'
            ```
        """
        if not self.name:
            return None

        result = self.run_command('report')
        if result.failed or not result.stdout:
            return None

        try:
            report = json.loads(result.stdout)
            for pool in report['pools']:
                if self.name == pool['name']:
                    return pool['uuid']
        except (KeyError, ValueError) as e:
            logging.warning(f'Failed to get pool UUID: {e}')

        return None

    def create(self, config: PoolCreateConfig | None = None) -> bool:
        """Create pool.

        Args:
            config: Pool creation configuration

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pool.create(PoolCreateConfig(key_desc='mykey'))
            True
            ```
        """
        if not self.name:
            logging.error('Pool name required')
            return False

        if not self.blockdevs:
            raise StratisPoolError('No block devices specified')

        options: StratisOptions = {}
        if config:
            if config.key_desc:
                options['--key-desc'] = config.key_desc
            if config.clevis:
                options['--clevis'] = config.clevis
            if config.tang_url:
                options['--tang-url'] = config.tang_url
            if config.thumbprint:
                options['--thumbprint'] = config.thumbprint
            if config.trust_url:
                options['--trust-url'] = None
            if config.no_overprovision:
                options['--no-overprovision'] = None

        result = self.run_command(
            subcommand='pool',
            action='create',
            options=options,
            positional_args=[self.name, ' '.join(self.blockdevs)],
        )
        return not result.failed

    def destroy(self) -> bool:
        """Destroy pool.

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pool.destroy()
            True
            ```
        """
        if not self.name:
            logging.error('Pool name required')
            return False

        result = self.run_command(
            subcommand='pool',
            action='destroy',
            positional_args=[self.name],
        )
        return not result.failed

    def start(self, unlock_method: EncryptionType | None = None) -> bool:
        """Start pool.

        Args:
            unlock_method: Encryption unlock method

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pool.start(unlock_method='keyring')
            True
            ```
        """
        if not self.name and not self.uuid:
            logging.error('Pool name or UUID required')
            return False

        options: StratisOptions = {}
        if unlock_method:
            options['--unlock-method'] = unlock_method
        if self.uuid:
            options['--uuid'] = self.uuid
        else:
            options['--name'] = self.name

        result = self.run_command('pool', action='start', options=options)
        return not result.failed

    def stop(self) -> bool:
        """Stop pool.

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pool.stop()
            True
            ```
        """
        if not self.name and not self.uuid:
            logging.error('Pool name or UUID required')
            return False

        options: StratisOptions = {}
        if self.uuid:
            options['--uuid'] = self.uuid
        else:
            options['--name'] = self.name

        result = self.run_command('pool', action='stop', options=options)
        return not result.failed

    def add_data(self, blockdevs: list[str]) -> bool:
        """Add data devices to pool.

        Args:
            blockdevs: List of block devices

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pool.add_data(['/dev/sdd', '/dev/sde'])
            True
            ```
        """
        if not self.name:
            logging.error('Pool name required')
            return False

        result = self.run_command(
            subcommand='pool',
            action='add-data',
            positional_args=[self.name, ' '.join(blockdevs)],
        )
        if not result.failed:
            self.blockdevs.extend(blockdevs)
        return not result.failed

    def init_cache(self, blockdevs: list[str]) -> bool:
        """Initialize cache devices.

        Args:
            blockdevs: List of block devices

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pool.init_cache(['/dev/nvme0n1'])
            True
            ```
        """
        if not self.name:
            logging.error('Pool name required')
            return False

        # Pass each device as a separate argument
        result = self.run_command(
            subcommand='pool',
            action='init-cache',
            positional_args=[self.name, *blockdevs],
        )
        if result.failed:
            logging.error(f'Failed to initialize cache: {result.stderr}')
        return not result.failed

    def add_cache(self, blockdevs: list[str]) -> bool:
        """Add cache devices to pool.

        Args:
            blockdevs: List of block devices

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pool.add_cache(['/dev/nvme0n2'])
            True
            ```
        """
        if not self.name:
            logging.error('Pool name required')
            return False

        result = self.run_command(
            subcommand='pool',
            action='add-cache',
            positional_args=[self.name, ' '.join(blockdevs)],
        )
        return not result.failed

    def bind_keyring(self, key_desc: str) -> bool:
        """Bind pool to keyring.

        Args:
            key_desc: Key description

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pool.bind_keyring('mykey')
            True
            ```
        """
        if not self.name:
            logging.error('Pool name required')
            return False

        result = self.run_command(
            subcommand='pool',
            action='bind keyring',
            positional_args=[self.name, key_desc],
        )
        if not result.failed:
            self.encryption = 'keyring'
        return not result.failed

    def bind_tang(self, config: TangConfig) -> bool:
        """Bind pool to Tang server.

        Args:
            config: Tang server configuration

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pool.bind_tang(TangConfig('http://tang.example.com'))
            True
            ```
        """
        if not self.name:
            logging.error('Pool name required')
            return False

        if not config.url:
            logging.error('Tang server URL required')
            return False

        options: StratisOptions = {}
        if config.trust_url:
            options['--trust-url'] = None
        if config.thumbprint:
            options['--thumbprint'] = config.thumbprint

        result = self.run_command(
            subcommand='pool',
            action='bind tang',
            options=options,
            positional_args=[self.name, config.url],
        )
        if not result.failed:
            self.encryption = 'tang'
        return not result.failed

    def bind_tpm2(self) -> bool:
        """Bind pool to TPM2.

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pool.bind_tpm2()
            True
            ```
        """
        if not self.name:
            logging.error('Pool name required')
            return False

        result = self.run_command(
            subcommand='pool',
            action='bind tpm2',
            positional_args=[self.name],
        )
        if not result.failed:
            self.encryption = 'tpm2'
        return not result.failed

    def unbind_keyring(self) -> bool:
        """Unbind pool from keyring.

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pool.unbind_keyring()
            True
            ```
        """
        if not self.name:
            logging.error('Pool name required')
            return False

        result = self.run_command(
            subcommand='pool',
            action='unbind keyring',
            positional_args=[self.name],
        )
        if not result.failed:
            self.encryption = None
        return not result.failed

    def unbind_clevis(self) -> bool:
        """Unbind pool from Clevis.

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            pool.unbind_clevis()
            True
            ```
        """
        if not self.name:
            logging.error('Pool name required')
            return False

        result = self.run_command(
            subcommand='pool',
            action='unbind clevis',
            positional_args=[self.name],
        )
        if not result.failed:
            self.encryption = None
        return not result.failed

    @classmethod
    def from_report(cls, report: PoolReport) -> StratisPool | None:
        """Create pool from report.

        Args:
            report: Pool report data

        Returns:
            StratisPool instance or None if invalid

        Example:
            ```python
            pool = StratisPool.from_report(report)
            ```
        """
        if not report.name:
            return None

        # Get paths from both data and cache devices
        paths = [dev.path for dev in report.blockdevs.datadevs if dev.path] + [
            dev.path for dev in report.blockdevs.cachedevs if dev.path
        ]

        return cls(
            name=report.name,
            uuid=report.uuid,
            encryption=report.encryption,
            blockdevs=paths,
        )

    @classmethod
    def get_all(cls) -> list[StratisPool]:
        """Get all Stratis pools.

        Returns:
            List of StratisPool instances

        Example:
            ```python
            StratisPool.get_all()
            [StratisPool(name='pool1', ...), StratisPool(name='pool2', ...)]
            ```
        """
        pools: list[StratisPool] = []
        # Create base instance without __post_init__
        base = super().__new__(cls)
        super(cls, base).__init__()

        result = base.run_command('report')
        if result.failed or not result.stdout:
            return pools

        try:
            report = json.loads(result.stdout)
            pools.extend(
                pool
                for pool_data in report['pools']
                if (pool := cls.from_report(PoolReport.from_dict(pool_data) or PoolReport())) is not None
            )
        except (KeyError, ValueError) as e:
            logging.warning(f'Failed to parse report: {e}')

        return pools

    @classmethod
    def setup_blockdevices(cls) -> list[str]:
        """Set up block devices for testing.

        Returns:
            List of device paths

        Example:
            ```python
            StratisPool.setup_blockdevices()
            ['/dev/sda', '/dev/sdb']
            ```
        """
        # Get free disks
        from sts.blockdevice import get_free_disks

        blockdevices = get_free_disks()
        if not blockdevices:
            pytest.skip('No free disks found')

        # Group disks by block sizes
        filtered_disks_by_block_sizes: dict[tuple[int, int], list[str]] = {}
        for disk in blockdevices:
            block_sizes = (disk.sector_size, disk.block_size)
            if block_sizes in filtered_disks_by_block_sizes:
                filtered_disks_by_block_sizes[block_sizes].append(str(disk.path))
            else:
                filtered_disks_by_block_sizes[block_sizes] = [str(disk.path)]

        # Find devices with the most common block sizes
        most_common_block_sizes: list[str] = []
        for disks in filtered_disks_by_block_sizes.values():
            if len(disks) > len(most_common_block_sizes):
                most_common_block_sizes = disks

        # Clear start of devices
        for disk in most_common_block_sizes:
            run(f'dd if=/dev/zero of={disk} bs=1M count=10')

        return most_common_block_sizes

__post_init__()

Initialize pool.

Source code in sts_libs/src/sts/stratis/pool.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def __post_init__(self) -> None:
    """Initialize pool."""
    # Initialize base class with default config
    super().__init__(config=StratisConfig())

    # Discover pool info if needed
    if not self.uuid or not self.encryption or not self.blockdevs:
        result = self.run_command('report')
        if result.succeeded and result.stdout:
            try:
                report = json.loads(result.stdout)
                if not isinstance(report, dict):
                    logging.warning('Invalid report format: not a dictionary')
                    return

                pools = report.get('pools', [])
                if not isinstance(pools, list):
                    logging.warning('Invalid pools format: not a list')
                    return

                for pool in pools:
                    if not isinstance(pool, dict):
                        logging.warning('Invalid pool format: not a dictionary')
                        continue

                    if not self.name or self.name == pool.get('name'):
                        if not self.name:
                            self.name = pool.get('name')
                        if not self.uuid:
                            self.uuid = pool.get('uuid')
                        if not self.encryption:
                            self.encryption = pool.get('encryption')
                        if not self.blockdevs:
                            blockdevs = pool.get('blockdevs', {})
                            if not isinstance(blockdevs, dict):
                                logging.warning('Invalid blockdevs format: not a dictionary')
                                continue

                            # Get paths from both data and cache devices
                            self.blockdevs = [
                                path
                                for dev in blockdevs.get('datadevs', [])
                                if isinstance(dev, dict) and (path := dev.get('path'))
                            ] + [
                                path
                                for dev in blockdevs.get('cachedevs', [])
                                if isinstance(dev, dict) and (path := dev.get('path'))
                            ]
                        break
            except (json.JSONDecodeError, KeyError, ValueError, TypeError) as e:
                logging.warning(f'Failed to parse pool info: {e}')

add_cache(blockdevs)

Add cache devices to pool.

Parameters:

Name Type Description Default
blockdevs list[str]

List of block devices

required

Returns:

Type Description
bool

True if successful, False otherwise

Example
pool.add_cache(['/dev/nvme0n2'])
True
Source code in sts_libs/src/sts/stratis/pool.py
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
def add_cache(self, blockdevs: list[str]) -> bool:
    """Add cache devices to pool.

    Args:
        blockdevs: List of block devices

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pool.add_cache(['/dev/nvme0n2'])
        True
        ```
    """
    if not self.name:
        logging.error('Pool name required')
        return False

    result = self.run_command(
        subcommand='pool',
        action='add-cache',
        positional_args=[self.name, ' '.join(blockdevs)],
    )
    return not result.failed

add_data(blockdevs)

Add data devices to pool.

Parameters:

Name Type Description Default
blockdevs list[str]

List of block devices

required

Returns:

Type Description
bool

True if successful, False otherwise

Example
pool.add_data(['/dev/sdd', '/dev/sde'])
True
Source code in sts_libs/src/sts/stratis/pool.py
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
def add_data(self, blockdevs: list[str]) -> bool:
    """Add data devices to pool.

    Args:
        blockdevs: List of block devices

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pool.add_data(['/dev/sdd', '/dev/sde'])
        True
        ```
    """
    if not self.name:
        logging.error('Pool name required')
        return False

    result = self.run_command(
        subcommand='pool',
        action='add-data',
        positional_args=[self.name, ' '.join(blockdevs)],
    )
    if not result.failed:
        self.blockdevs.extend(blockdevs)
    return not result.failed

bind_keyring(key_desc)

Bind pool to keyring.

Parameters:

Name Type Description Default
key_desc str

Key description

required

Returns:

Type Description
bool

True if successful, False otherwise

Example
pool.bind_keyring('mykey')
True
Source code in sts_libs/src/sts/stratis/pool.py
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
def bind_keyring(self, key_desc: str) -> bool:
    """Bind pool to keyring.

    Args:
        key_desc: Key description

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pool.bind_keyring('mykey')
        True
        ```
    """
    if not self.name:
        logging.error('Pool name required')
        return False

    result = self.run_command(
        subcommand='pool',
        action='bind keyring',
        positional_args=[self.name, key_desc],
    )
    if not result.failed:
        self.encryption = 'keyring'
    return not result.failed

bind_tang(config)

Bind pool to Tang server.

Parameters:

Name Type Description Default
config TangConfig

Tang server configuration

required

Returns:

Type Description
bool

True if successful, False otherwise

Example
pool.bind_tang(TangConfig('http://tang.example.com'))
True
Source code in sts_libs/src/sts/stratis/pool.py
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
def bind_tang(self, config: TangConfig) -> bool:
    """Bind pool to Tang server.

    Args:
        config: Tang server configuration

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pool.bind_tang(TangConfig('http://tang.example.com'))
        True
        ```
    """
    if not self.name:
        logging.error('Pool name required')
        return False

    if not config.url:
        logging.error('Tang server URL required')
        return False

    options: StratisOptions = {}
    if config.trust_url:
        options['--trust-url'] = None
    if config.thumbprint:
        options['--thumbprint'] = config.thumbprint

    result = self.run_command(
        subcommand='pool',
        action='bind tang',
        options=options,
        positional_args=[self.name, config.url],
    )
    if not result.failed:
        self.encryption = 'tang'
    return not result.failed

bind_tpm2()

Bind pool to TPM2.

Returns:

Type Description
bool

True if successful, False otherwise

Example
pool.bind_tpm2()
True
Source code in sts_libs/src/sts/stratis/pool.py
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
def bind_tpm2(self) -> bool:
    """Bind pool to TPM2.

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pool.bind_tpm2()
        True
        ```
    """
    if not self.name:
        logging.error('Pool name required')
        return False

    result = self.run_command(
        subcommand='pool',
        action='bind tpm2',
        positional_args=[self.name],
    )
    if not result.failed:
        self.encryption = 'tpm2'
    return not result.failed

create(config=None)

Create pool.

Parameters:

Name Type Description Default
config PoolCreateConfig | None

Pool creation configuration

None

Returns:

Type Description
bool

True if successful, False otherwise

Example
pool.create(PoolCreateConfig(key_desc='mykey'))
True
Source code in sts_libs/src/sts/stratis/pool.py
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
def create(self, config: PoolCreateConfig | None = None) -> bool:
    """Create pool.

    Args:
        config: Pool creation configuration

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pool.create(PoolCreateConfig(key_desc='mykey'))
        True
        ```
    """
    if not self.name:
        logging.error('Pool name required')
        return False

    if not self.blockdevs:
        raise StratisPoolError('No block devices specified')

    options: StratisOptions = {}
    if config:
        if config.key_desc:
            options['--key-desc'] = config.key_desc
        if config.clevis:
            options['--clevis'] = config.clevis
        if config.tang_url:
            options['--tang-url'] = config.tang_url
        if config.thumbprint:
            options['--thumbprint'] = config.thumbprint
        if config.trust_url:
            options['--trust-url'] = None
        if config.no_overprovision:
            options['--no-overprovision'] = None

    result = self.run_command(
        subcommand='pool',
        action='create',
        options=options,
        positional_args=[self.name, ' '.join(self.blockdevs)],
    )
    return not result.failed

destroy()

Destroy pool.

Returns:

Type Description
bool

True if successful, False otherwise

Example
pool.destroy()
True
Source code in sts_libs/src/sts/stratis/pool.py
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
def destroy(self) -> bool:
    """Destroy pool.

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pool.destroy()
        True
        ```
    """
    if not self.name:
        logging.error('Pool name required')
        return False

    result = self.run_command(
        subcommand='pool',
        action='destroy',
        positional_args=[self.name],
    )
    return not result.failed

from_report(report) classmethod

Create pool from report.

Parameters:

Name Type Description Default
report PoolReport

Pool report data

required

Returns:

Type Description
StratisPool | None

StratisPool instance or None if invalid

Example
pool = StratisPool.from_report(report)
Source code in sts_libs/src/sts/stratis/pool.py
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
@classmethod
def from_report(cls, report: PoolReport) -> StratisPool | None:
    """Create pool from report.

    Args:
        report: Pool report data

    Returns:
        StratisPool instance or None if invalid

    Example:
        ```python
        pool = StratisPool.from_report(report)
        ```
    """
    if not report.name:
        return None

    # Get paths from both data and cache devices
    paths = [dev.path for dev in report.blockdevs.datadevs if dev.path] + [
        dev.path for dev in report.blockdevs.cachedevs if dev.path
    ]

    return cls(
        name=report.name,
        uuid=report.uuid,
        encryption=report.encryption,
        blockdevs=paths,
    )

get_all() classmethod

Get all Stratis pools.

Returns:

Type Description
list[StratisPool]

List of StratisPool instances

Example
StratisPool.get_all()
[StratisPool(name='pool1', ...), StratisPool(name='pool2', ...)]
Source code in sts_libs/src/sts/stratis/pool.py
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
@classmethod
def get_all(cls) -> list[StratisPool]:
    """Get all Stratis pools.

    Returns:
        List of StratisPool instances

    Example:
        ```python
        StratisPool.get_all()
        [StratisPool(name='pool1', ...), StratisPool(name='pool2', ...)]
        ```
    """
    pools: list[StratisPool] = []
    # Create base instance without __post_init__
    base = super().__new__(cls)
    super(cls, base).__init__()

    result = base.run_command('report')
    if result.failed or not result.stdout:
        return pools

    try:
        report = json.loads(result.stdout)
        pools.extend(
            pool
            for pool_data in report['pools']
            if (pool := cls.from_report(PoolReport.from_dict(pool_data) or PoolReport())) is not None
        )
    except (KeyError, ValueError) as e:
        logging.warning(f'Failed to parse report: {e}')

    return pools

get_pool_uuid()

Get pool UUID.

Returns:

Type Description
str | None

Pool UUID or None if not found

Example
pool.get_pool_uuid()
'123e4567-e89b-12d3-a456-426614174000'
Source code in sts_libs/src/sts/stratis/pool.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
def get_pool_uuid(self) -> str | None:
    """Get pool UUID.

    Returns:
        Pool UUID or None if not found

    Example:
        ```python
        pool.get_pool_uuid()
        '123e4567-e89b-12d3-a456-426614174000'
        ```
    """
    if not self.name:
        return None

    result = self.run_command('report')
    if result.failed or not result.stdout:
        return None

    try:
        report = json.loads(result.stdout)
        for pool in report['pools']:
            if self.name == pool['name']:
                return pool['uuid']
    except (KeyError, ValueError) as e:
        logging.warning(f'Failed to get pool UUID: {e}')

    return None

init_cache(blockdevs)

Initialize cache devices.

Parameters:

Name Type Description Default
blockdevs list[str]

List of block devices

required

Returns:

Type Description
bool

True if successful, False otherwise

Example
pool.init_cache(['/dev/nvme0n1'])
True
Source code in sts_libs/src/sts/stratis/pool.py
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
def init_cache(self, blockdevs: list[str]) -> bool:
    """Initialize cache devices.

    Args:
        blockdevs: List of block devices

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pool.init_cache(['/dev/nvme0n1'])
        True
        ```
    """
    if not self.name:
        logging.error('Pool name required')
        return False

    # Pass each device as a separate argument
    result = self.run_command(
        subcommand='pool',
        action='init-cache',
        positional_args=[self.name, *blockdevs],
    )
    if result.failed:
        logging.error(f'Failed to initialize cache: {result.stderr}')
    return not result.failed

setup_blockdevices() classmethod

Set up block devices for testing.

Returns:

Type Description
list[str]

List of device paths

Example
StratisPool.setup_blockdevices()
['/dev/sda', '/dev/sdb']
Source code in sts_libs/src/sts/stratis/pool.py
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
@classmethod
def setup_blockdevices(cls) -> list[str]:
    """Set up block devices for testing.

    Returns:
        List of device paths

    Example:
        ```python
        StratisPool.setup_blockdevices()
        ['/dev/sda', '/dev/sdb']
        ```
    """
    # Get free disks
    from sts.blockdevice import get_free_disks

    blockdevices = get_free_disks()
    if not blockdevices:
        pytest.skip('No free disks found')

    # Group disks by block sizes
    filtered_disks_by_block_sizes: dict[tuple[int, int], list[str]] = {}
    for disk in blockdevices:
        block_sizes = (disk.sector_size, disk.block_size)
        if block_sizes in filtered_disks_by_block_sizes:
            filtered_disks_by_block_sizes[block_sizes].append(str(disk.path))
        else:
            filtered_disks_by_block_sizes[block_sizes] = [str(disk.path)]

    # Find devices with the most common block sizes
    most_common_block_sizes: list[str] = []
    for disks in filtered_disks_by_block_sizes.values():
        if len(disks) > len(most_common_block_sizes):
            most_common_block_sizes = disks

    # Clear start of devices
    for disk in most_common_block_sizes:
        run(f'dd if=/dev/zero of={disk} bs=1M count=10')

    return most_common_block_sizes

start(unlock_method=None)

Start pool.

Parameters:

Name Type Description Default
unlock_method EncryptionType | None

Encryption unlock method

None

Returns:

Type Description
bool

True if successful, False otherwise

Example
pool.start(unlock_method='keyring')
True
Source code in sts_libs/src/sts/stratis/pool.py
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
def start(self, unlock_method: EncryptionType | None = None) -> bool:
    """Start pool.

    Args:
        unlock_method: Encryption unlock method

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pool.start(unlock_method='keyring')
        True
        ```
    """
    if not self.name and not self.uuid:
        logging.error('Pool name or UUID required')
        return False

    options: StratisOptions = {}
    if unlock_method:
        options['--unlock-method'] = unlock_method
    if self.uuid:
        options['--uuid'] = self.uuid
    else:
        options['--name'] = self.name

    result = self.run_command('pool', action='start', options=options)
    return not result.failed

stop()

Stop pool.

Returns:

Type Description
bool

True if successful, False otherwise

Example
pool.stop()
True
Source code in sts_libs/src/sts/stratis/pool.py
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
def stop(self) -> bool:
    """Stop pool.

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pool.stop()
        True
        ```
    """
    if not self.name and not self.uuid:
        logging.error('Pool name or UUID required')
        return False

    options: StratisOptions = {}
    if self.uuid:
        options['--uuid'] = self.uuid
    else:
        options['--name'] = self.name

    result = self.run_command('pool', action='stop', options=options)
    return not result.failed

unbind_clevis()

Unbind pool from Clevis.

Returns:

Type Description
bool

True if successful, False otherwise

Example
pool.unbind_clevis()
True
Source code in sts_libs/src/sts/stratis/pool.py
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
def unbind_clevis(self) -> bool:
    """Unbind pool from Clevis.

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pool.unbind_clevis()
        True
        ```
    """
    if not self.name:
        logging.error('Pool name required')
        return False

    result = self.run_command(
        subcommand='pool',
        action='unbind clevis',
        positional_args=[self.name],
    )
    if not result.failed:
        self.encryption = None
    return not result.failed

unbind_keyring()

Unbind pool from keyring.

Returns:

Type Description
bool

True if successful, False otherwise

Example
pool.unbind_keyring()
True
Source code in sts_libs/src/sts/stratis/pool.py
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
def unbind_keyring(self) -> bool:
    """Unbind pool from keyring.

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        pool.unbind_keyring()
        True
        ```
    """
    if not self.name:
        logging.error('Pool name required')
        return False

    result = self.run_command(
        subcommand='pool',
        action='unbind keyring',
        positional_args=[self.name],
    )
    if not result.failed:
        self.encryption = None
    return not result.failed

TangConfig dataclass

Tang server configuration.

Parameters:

Name Type Description Default
url str | None

Tang server URL (optional, discovered from system)

None
trust_url bool

Trust server URL (optional)

False
thumbprint str | None

Server thumbprint (optional)

None
Example
config = TangConfig()  # Uses defaults
config = TangConfig(url='http://tang.example.com')  # Custom settings
Source code in sts_libs/src/sts/stratis/pool.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
@dataclass
class TangConfig:
    """Tang server configuration.

    Args:
        url: Tang server URL (optional, discovered from system)
        trust_url: Trust server URL (optional)
        thumbprint: Server thumbprint (optional)

    Example:
        ```python
        config = TangConfig()  # Uses defaults
        config = TangConfig(url='http://tang.example.com')  # Custom settings
        ```
    """

    # Optional parameters
    url: str | None = None
    trust_url: bool = False
    thumbprint: str | None = None

Filesystem Management

sts.stratis.filesystem

Stratis filesystem management.

This module provides functionality for managing Stratis filesystems: - Filesystem creation and management - Thin provisioning and size limits - Snapshot creation and tracking - Space usage monitoring

Key features: - XFS as the default filesystem - Copy-on-write snapshots - Dynamic size management - Origin tracking for snapshots

FilesystemReport dataclass

Filesystem report data.

Contains filesystem metadata: - Basic information (name, UUID) - Size information (total, limit) - Snapshot details (origin) - Usage statistics

Parameters:

Name Type Description Default
name str | None

Filesystem name (optional, discovered from system)

None
uuid str | None

Filesystem UUID (optional, discovered from system)

None
size str | None

Filesystem size (optional, discovered from system)

None
size_limit str | None

Size limit (optional, discovered from system)

None
origin str | None

Origin filesystem for snapshots (optional)

None
used str | None

Used space (optional, discovered from system)

None
Example
report = FilesystemReport()  # Discovers first available filesystem
report = FilesystemReport(name='fs1')  # Discovers other values
Source code in sts_libs/src/sts/stratis/filesystem.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@dataclass
class FilesystemReport:
    """Filesystem report data.

    Contains filesystem metadata:
    - Basic information (name, UUID)
    - Size information (total, limit)
    - Snapshot details (origin)
    - Usage statistics

    Args:
        name: Filesystem name (optional, discovered from system)
        uuid: Filesystem UUID (optional, discovered from system)
        size: Filesystem size (optional, discovered from system)
        size_limit: Size limit (optional, discovered from system)
        origin: Origin filesystem for snapshots (optional)
        used: Used space (optional, discovered from system)

    Example:
        ```python
        report = FilesystemReport()  # Discovers first available filesystem
        report = FilesystemReport(name='fs1')  # Discovers other values
        ```
    """

    name: str | None = None
    uuid: str | None = None
    size: str | None = None  # Total size (e.g. "10 GiB")
    size_limit: str | None = None  # Maximum size allowed
    origin: str | None = None  # Source filesystem for snapshots
    used: str | None = None  # Space currently in use

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> FilesystemReport | None:
        """Create report from dictionary.

        Parses stratis report format:
        - Handles missing fields
        - Validates data types
        - Reports parsing errors

        Args:
            data: Dictionary data from stratis report

        Returns:
            FilesystemReport instance or None if invalid
        """
        try:
            return cls(
                name=data.get('name'),
                uuid=data.get('uuid'),
                size=data.get('size'),
                size_limit=data.get('size_limit'),
                origin=data.get('origin'),
                used=data.get('used'),
            )
        except (KeyError, TypeError) as e:
            logging.warning(f'Invalid filesystem report data: {e}')
            return None

from_dict(data) classmethod

Create report from dictionary.

Parses stratis report format: - Handles missing fields - Validates data types - Reports parsing errors

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary data from stratis report

required

Returns:

Type Description
FilesystemReport | None

FilesystemReport instance or None if invalid

Source code in sts_libs/src/sts/stratis/filesystem.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@classmethod
def from_dict(cls, data: dict[str, Any]) -> FilesystemReport | None:
    """Create report from dictionary.

    Parses stratis report format:
    - Handles missing fields
    - Validates data types
    - Reports parsing errors

    Args:
        data: Dictionary data from stratis report

    Returns:
        FilesystemReport instance or None if invalid
    """
    try:
        return cls(
            name=data.get('name'),
            uuid=data.get('uuid'),
            size=data.get('size'),
            size_limit=data.get('size_limit'),
            origin=data.get('origin'),
            used=data.get('used'),
        )
    except (KeyError, TypeError) as e:
        logging.warning(f'Invalid filesystem report data: {e}')
        return None

StratisFilesystem dataclass

Bases: StratisBase

Stratis filesystem representation.

Manages filesystems with: - Thin provisioning - Snapshot capabilities - Size management - Usage tracking

Parameters:

Name Type Description Default
name str | None

Filesystem name (optional, discovered from system)

None
pool_name str | None

Pool name (optional, discovered from system)

None
uuid str | None

Filesystem UUID (optional, discovered from system)

None
size int | None

Filesystem size in bytes (optional, discovered from system)

None
size_limit str | None

Size limit (optional, discovered from system)

None
origin str | None

Origin filesystem for snapshots (optional)

None
used str | None

Used space (optional, discovered from system)

None
Example
fs = StratisFilesystem()  # Discovers first available filesystem
fs = StratisFilesystem(name='fs1')  # Discovers other values
Source code in sts_libs/src/sts/stratis/filesystem.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
@dataclass
class StratisFilesystem(StratisBase):
    """Stratis filesystem representation.

    Manages filesystems with:
    - Thin provisioning
    - Snapshot capabilities
    - Size management
    - Usage tracking

    Args:
        name: Filesystem name (optional, discovered from system)
        pool_name: Pool name (optional, discovered from system)
        uuid: Filesystem UUID (optional, discovered from system)
        size: Filesystem size in bytes (optional, discovered from system)
        size_limit: Size limit (optional, discovered from system)
        origin: Origin filesystem for snapshots (optional)
        used: Used space (optional, discovered from system)

    Example:
        ```python
        fs = StratisFilesystem()  # Discovers first available filesystem
        fs = StratisFilesystem(name='fs1')  # Discovers other values
        ```
    """

    name: str | None = None
    pool_name: str | None = None
    uuid: str | None = None
    size: int | None = None  # Size in bytes
    size_limit: str | None = None  # Size limit (human readable)
    origin: str | None = None  # Source filesystem name
    used: str | None = None  # Used space (human readable)

    # Mount point base path
    FS_PATH: ClassVar[str] = '/stratis'

    def __post_init__(self) -> None:
        """Initialize filesystem.

        Discovery process:
        1. Initialize base class
        2. Find pool and filesystem info
        3. Parse size information
        4. Set filesystem attributes
        """
        # Initialize base class with default config
        super().__init__(config=StratisConfig())

        # Discover filesystem info if needed
        if not self.pool_name or not self.name:
            result = self.run_command('report')
            if result.succeeded and result.stdout:
                try:
                    report = json.loads(result.stdout)
                    for pool in report['pools']:
                        for fs in pool['filesystems']:
                            if not self.name or self.name == fs['name']:
                                # Set pool and filesystem names
                                if not self.pool_name:
                                    self.pool_name = pool['name']
                                if not self.name:
                                    self.name = fs['name']

                                # Parse size if available
                                if not self.size and 'size' in fs:
                                    size_bytes = size_human_2_size_bytes(fs['size'])
                                    if size_bytes is not None:
                                        self.size = int(size_bytes)

                                # Set other attributes
                                if not self.uuid:
                                    self.uuid = fs.get('uuid')
                                if not self.size_limit:
                                    self.size_limit = fs.get('size_limit')
                                if not self.origin:
                                    self.origin = fs.get('origin')
                                if not self.used:
                                    self.used = fs.get('used')
                                break
                        if self.name and self.pool_name:
                            break
                except (KeyError, ValueError) as e:
                    logging.warning(f'Failed to parse filesystem info: {e}')

    def get_fs_uuid(self) -> str | None:
        """Get filesystem UUID.

        Retrieves UUID from system:
        - Requires pool and filesystem names
        - UUID is stable across reboots
        - Used for unique identification

        Returns:
            Filesystem UUID or None if not found

        Example:
            ```python
            fs.get_fs_uuid()
            '123e4567-e89b-12d3-a456-426614174000'
            ```
        """
        if not self.pool_name or not self.name:
            return None

        result = self.run_command('report')
        if result.failed or not result.stdout:
            return None

        try:
            report = json.loads(result.stdout)
            for pool in report['pools']:
                if self.pool_name != pool['name']:
                    continue
                for fs in pool['filesystems']:
                    if self.name != fs['name']:
                        continue
                    return fs['uuid']
        except (KeyError, ValueError) as e:
            logging.warning(f'Failed to get filesystem UUID: {e}')

        return None

    def create(self, size: str | None = None, size_limit: str | None = None) -> bool:
        """Create filesystem.

        Creates filesystem with:
        - Optional initial size
        - Optional size limit
        - Thin provisioning enabled
        - Default mount options

        Args:
            size: Initial size (e.g. "10G")
            size_limit: Size limit (e.g. "20G")

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            fs.create(size='10G', size_limit='20G')
            True
            ```
        """
        if not self.pool_name or not self.name:
            logging.error('Pool name and filesystem name required')
            return False

        options: StratisOptions = {}
        if size:
            options['--size'] = size
        if size_limit:
            options['--size-limit'] = size_limit

        result = self.run_command(
            subcommand='filesystem',
            action='create',
            options=options,
            positional_args=[self.pool_name, self.name],
        )
        return not result.failed

    def destroy(self) -> bool:
        """Destroy filesystem.

        Removes filesystem:
        - Unmounts if mounted
        - Removes from pool
        - Deletes all data
        - Cannot be undone

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            fs.destroy()
            True
            ```
        """
        if not self.pool_name or not self.name:
            logging.error('Pool name and filesystem name required')
            return False

        result = self.run_command(
            subcommand='filesystem',
            action='destroy',
            positional_args=[self.pool_name, self.name],
        )
        return not result.failed

    def rename(self, new_name: str) -> bool:
        """Rename filesystem.

        Changes filesystem name:
        - Updates mount points
        - Preserves data and settings
        - Updates snapshot references

        Args:
            new_name: New filesystem name

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            fs.rename('fs2')
            True
            ```
        """
        if not self.pool_name or not self.name:
            logging.error('Pool name and filesystem name required')
            return False

        result = self.run_command(
            subcommand='filesystem',
            action='rename',
            positional_args=[self.pool_name, self.name, new_name],
        )
        if not result.failed:
            self.name = new_name
        return not result.failed

    def snapshot(self, snapshot_name: str) -> StratisFilesystem | None:
        """Create filesystem snapshot.

        Creates copy-on-write snapshot:
        - Instant creation
        - Space-efficient
        - Tracks origin
        - Writable by default

        Args:
            snapshot_name: Snapshot name

        Returns:
            New filesystem instance or None if failed

        Example:
            ```python
            fs.snapshot('snap1')
            StratisFilesystem(name='snap1', ...)
            ```
        """
        if not self.pool_name or not self.name:
            logging.error('Pool name and filesystem name required')
            return None

        result = self.run_command(
            subcommand='filesystem',
            action='snapshot',
            positional_args=[self.pool_name, self.name, snapshot_name],
        )
        if result.failed:
            return None

        return StratisFilesystem(
            name=snapshot_name,
            pool_name=self.pool_name,
            size=self.size,
            origin=self.name,
        )

    def set_size_limit(self, limit: str) -> bool:
        """Set filesystem size limit.

        Limits filesystem growth:
        - Thin provisioning still active
        - Prevents space exhaustion
        - Can be changed later
        - Uses human-readable sizes

        Args:
            limit: Size limit (e.g. "20G")

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            fs.set_size_limit('20G')
            True
            ```
        """
        if not self.pool_name or not self.name:
            logging.error('Pool name and filesystem name required')
            return False

        result = self.run_command(
            subcommand='filesystem',
            action='set-size-limit',
            positional_args=[self.pool_name, self.name, limit],
        )
        if not result.failed:
            self.size_limit = limit
        return not result.failed

    def unset_size_limit(self) -> bool:
        """Unset filesystem size limit.

        Removes growth limit:
        - Allows unlimited growth
        - Limited only by pool size
        - Cannot be undone (must set new limit)

        Returns:
            True if successful, False otherwise

        Example:
            ```python
            fs.unset_size_limit()
            True
            ```
        """
        if not self.pool_name or not self.name:
            logging.error('Pool name and filesystem name required')
            return False

        result = self.run_command(
            subcommand='filesystem',
            action='unset-size-limit',
            positional_args=[self.pool_name, self.name],
        )
        if not result.failed:
            self.size_limit = None
        return not result.failed

    @classmethod
    def from_report(cls, report: FilesystemReport, pool_name: str) -> StratisFilesystem | None:
        """Create filesystem from report.

        Parses report data:
        - Validates required fields
        - Converts size formats
        - Sets relationships

        Args:
            report: Filesystem report data
            pool_name: Pool name

        Returns:
            StratisFilesystem instance or None if invalid

        Example:
            ```python
            fs = StratisFilesystem.from_report(report, 'pool1')
            ```
        """
        if not report.name:
            return None

        size_bytes = None
        if report.size:
            size_bytes = size_human_2_size_bytes(report.size)
            if size_bytes is None:
                logging.warning(f'Invalid size: {report.size}, using None')

        return cls(
            name=report.name,
            pool_name=pool_name,
            size=int(size_bytes) if size_bytes is not None else None,
            uuid=report.uuid,
            size_limit=report.size_limit,
            origin=report.origin,
            used=report.used,
        )

    @classmethod
    def get_all(cls, pool_name: str | None = None) -> list[StratisFilesystem]:
        """Get all Stratis filesystems.

        Lists filesystems:
        - Optionally filtered by pool
        - Includes snapshots
        - Provides full details
        - Sorted by pool

        Args:
            pool_name: Filter by pool name

        Returns:
            List of StratisFilesystem instances

        Example:
            ```python
            StratisFilesystem.get_all('pool1')
            [StratisFilesystem(name='fs1', ...), StratisFilesystem(name='fs2', ...)]
            ```
        """
        filesystems: list[StratisFilesystem] = []
        # Create base instance without __post_init__
        base = super().__new__(cls)
        StratisBase.__init__(base, config=StratisConfig())

        result = base.run_command('report')
        if result.failed or not result.stdout:
            return filesystems

        try:
            report = json.loads(result.stdout)
            for pool_data in report['pools']:
                current_pool = pool_data.get('name')
                if not current_pool:
                    logging.warning('Pool missing name')
                    continue
                if pool_name and pool_name != current_pool:
                    continue
                filesystems.extend(
                    [
                        fs
                        for fs_data in pool_data.get('filesystems', [])
                        if (
                            fs := cls.from_report(
                                FilesystemReport.from_dict(fs_data) or FilesystemReport(), current_pool
                            )
                        )
                    ]
                )
        except (KeyError, ValueError) as e:
            logging.warning(f'Failed to parse report: {e}')

        return filesystems

__post_init__()

Initialize filesystem.

Discovery process: 1. Initialize base class 2. Find pool and filesystem info 3. Parse size information 4. Set filesystem attributes

Source code in sts_libs/src/sts/stratis/filesystem.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def __post_init__(self) -> None:
    """Initialize filesystem.

    Discovery process:
    1. Initialize base class
    2. Find pool and filesystem info
    3. Parse size information
    4. Set filesystem attributes
    """
    # Initialize base class with default config
    super().__init__(config=StratisConfig())

    # Discover filesystem info if needed
    if not self.pool_name or not self.name:
        result = self.run_command('report')
        if result.succeeded and result.stdout:
            try:
                report = json.loads(result.stdout)
                for pool in report['pools']:
                    for fs in pool['filesystems']:
                        if not self.name or self.name == fs['name']:
                            # Set pool and filesystem names
                            if not self.pool_name:
                                self.pool_name = pool['name']
                            if not self.name:
                                self.name = fs['name']

                            # Parse size if available
                            if not self.size and 'size' in fs:
                                size_bytes = size_human_2_size_bytes(fs['size'])
                                if size_bytes is not None:
                                    self.size = int(size_bytes)

                            # Set other attributes
                            if not self.uuid:
                                self.uuid = fs.get('uuid')
                            if not self.size_limit:
                                self.size_limit = fs.get('size_limit')
                            if not self.origin:
                                self.origin = fs.get('origin')
                            if not self.used:
                                self.used = fs.get('used')
                            break
                    if self.name and self.pool_name:
                        break
            except (KeyError, ValueError) as e:
                logging.warning(f'Failed to parse filesystem info: {e}')

create(size=None, size_limit=None)

Create filesystem.

Creates filesystem with: - Optional initial size - Optional size limit - Thin provisioning enabled - Default mount options

Parameters:

Name Type Description Default
size str | None

Initial size (e.g. "10G")

None
size_limit str | None

Size limit (e.g. "20G")

None

Returns:

Type Description
bool

True if successful, False otherwise

Example
fs.create(size='10G', size_limit='20G')
True
Source code in sts_libs/src/sts/stratis/filesystem.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def create(self, size: str | None = None, size_limit: str | None = None) -> bool:
    """Create filesystem.

    Creates filesystem with:
    - Optional initial size
    - Optional size limit
    - Thin provisioning enabled
    - Default mount options

    Args:
        size: Initial size (e.g. "10G")
        size_limit: Size limit (e.g. "20G")

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        fs.create(size='10G', size_limit='20G')
        True
        ```
    """
    if not self.pool_name or not self.name:
        logging.error('Pool name and filesystem name required')
        return False

    options: StratisOptions = {}
    if size:
        options['--size'] = size
    if size_limit:
        options['--size-limit'] = size_limit

    result = self.run_command(
        subcommand='filesystem',
        action='create',
        options=options,
        positional_args=[self.pool_name, self.name],
    )
    return not result.failed

destroy()

Destroy filesystem.

Removes filesystem: - Unmounts if mounted - Removes from pool - Deletes all data - Cannot be undone

Returns:

Type Description
bool

True if successful, False otherwise

Example
fs.destroy()
True
Source code in sts_libs/src/sts/stratis/filesystem.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def destroy(self) -> bool:
    """Destroy filesystem.

    Removes filesystem:
    - Unmounts if mounted
    - Removes from pool
    - Deletes all data
    - Cannot be undone

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        fs.destroy()
        True
        ```
    """
    if not self.pool_name or not self.name:
        logging.error('Pool name and filesystem name required')
        return False

    result = self.run_command(
        subcommand='filesystem',
        action='destroy',
        positional_args=[self.pool_name, self.name],
    )
    return not result.failed

from_report(report, pool_name) classmethod

Create filesystem from report.

Parses report data: - Validates required fields - Converts size formats - Sets relationships

Parameters:

Name Type Description Default
report FilesystemReport

Filesystem report data

required
pool_name str

Pool name

required

Returns:

Type Description
StratisFilesystem | None

StratisFilesystem instance or None if invalid

Example
fs = StratisFilesystem.from_report(report, 'pool1')
Source code in sts_libs/src/sts/stratis/filesystem.py
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
@classmethod
def from_report(cls, report: FilesystemReport, pool_name: str) -> StratisFilesystem | None:
    """Create filesystem from report.

    Parses report data:
    - Validates required fields
    - Converts size formats
    - Sets relationships

    Args:
        report: Filesystem report data
        pool_name: Pool name

    Returns:
        StratisFilesystem instance or None if invalid

    Example:
        ```python
        fs = StratisFilesystem.from_report(report, 'pool1')
        ```
    """
    if not report.name:
        return None

    size_bytes = None
    if report.size:
        size_bytes = size_human_2_size_bytes(report.size)
        if size_bytes is None:
            logging.warning(f'Invalid size: {report.size}, using None')

    return cls(
        name=report.name,
        pool_name=pool_name,
        size=int(size_bytes) if size_bytes is not None else None,
        uuid=report.uuid,
        size_limit=report.size_limit,
        origin=report.origin,
        used=report.used,
    )

get_all(pool_name=None) classmethod

Get all Stratis filesystems.

Lists filesystems: - Optionally filtered by pool - Includes snapshots - Provides full details - Sorted by pool

Parameters:

Name Type Description Default
pool_name str | None

Filter by pool name

None

Returns:

Type Description
list[StratisFilesystem]

List of StratisFilesystem instances

Example
StratisFilesystem.get_all('pool1')
[StratisFilesystem(name='fs1', ...), StratisFilesystem(name='fs2', ...)]
Source code in sts_libs/src/sts/stratis/filesystem.py
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
@classmethod
def get_all(cls, pool_name: str | None = None) -> list[StratisFilesystem]:
    """Get all Stratis filesystems.

    Lists filesystems:
    - Optionally filtered by pool
    - Includes snapshots
    - Provides full details
    - Sorted by pool

    Args:
        pool_name: Filter by pool name

    Returns:
        List of StratisFilesystem instances

    Example:
        ```python
        StratisFilesystem.get_all('pool1')
        [StratisFilesystem(name='fs1', ...), StratisFilesystem(name='fs2', ...)]
        ```
    """
    filesystems: list[StratisFilesystem] = []
    # Create base instance without __post_init__
    base = super().__new__(cls)
    StratisBase.__init__(base, config=StratisConfig())

    result = base.run_command('report')
    if result.failed or not result.stdout:
        return filesystems

    try:
        report = json.loads(result.stdout)
        for pool_data in report['pools']:
            current_pool = pool_data.get('name')
            if not current_pool:
                logging.warning('Pool missing name')
                continue
            if pool_name and pool_name != current_pool:
                continue
            filesystems.extend(
                [
                    fs
                    for fs_data in pool_data.get('filesystems', [])
                    if (
                        fs := cls.from_report(
                            FilesystemReport.from_dict(fs_data) or FilesystemReport(), current_pool
                        )
                    )
                ]
            )
    except (KeyError, ValueError) as e:
        logging.warning(f'Failed to parse report: {e}')

    return filesystems

get_fs_uuid()

Get filesystem UUID.

Retrieves UUID from system: - Requires pool and filesystem names - UUID is stable across reboots - Used for unique identification

Returns:

Type Description
str | None

Filesystem UUID or None if not found

Example
fs.get_fs_uuid()
'123e4567-e89b-12d3-a456-426614174000'
Source code in sts_libs/src/sts/stratis/filesystem.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def get_fs_uuid(self) -> str | None:
    """Get filesystem UUID.

    Retrieves UUID from system:
    - Requires pool and filesystem names
    - UUID is stable across reboots
    - Used for unique identification

    Returns:
        Filesystem UUID or None if not found

    Example:
        ```python
        fs.get_fs_uuid()
        '123e4567-e89b-12d3-a456-426614174000'
        ```
    """
    if not self.pool_name or not self.name:
        return None

    result = self.run_command('report')
    if result.failed or not result.stdout:
        return None

    try:
        report = json.loads(result.stdout)
        for pool in report['pools']:
            if self.pool_name != pool['name']:
                continue
            for fs in pool['filesystems']:
                if self.name != fs['name']:
                    continue
                return fs['uuid']
    except (KeyError, ValueError) as e:
        logging.warning(f'Failed to get filesystem UUID: {e}')

    return None

rename(new_name)

Rename filesystem.

Changes filesystem name: - Updates mount points - Preserves data and settings - Updates snapshot references

Parameters:

Name Type Description Default
new_name str

New filesystem name

required

Returns:

Type Description
bool

True if successful, False otherwise

Example
fs.rename('fs2')
True
Source code in sts_libs/src/sts/stratis/filesystem.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
def rename(self, new_name: str) -> bool:
    """Rename filesystem.

    Changes filesystem name:
    - Updates mount points
    - Preserves data and settings
    - Updates snapshot references

    Args:
        new_name: New filesystem name

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        fs.rename('fs2')
        True
        ```
    """
    if not self.pool_name or not self.name:
        logging.error('Pool name and filesystem name required')
        return False

    result = self.run_command(
        subcommand='filesystem',
        action='rename',
        positional_args=[self.pool_name, self.name, new_name],
    )
    if not result.failed:
        self.name = new_name
    return not result.failed

set_size_limit(limit)

Set filesystem size limit.

Limits filesystem growth: - Thin provisioning still active - Prevents space exhaustion - Can be changed later - Uses human-readable sizes

Parameters:

Name Type Description Default
limit str

Size limit (e.g. "20G")

required

Returns:

Type Description
bool

True if successful, False otherwise

Example
fs.set_size_limit('20G')
True
Source code in sts_libs/src/sts/stratis/filesystem.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
def set_size_limit(self, limit: str) -> bool:
    """Set filesystem size limit.

    Limits filesystem growth:
    - Thin provisioning still active
    - Prevents space exhaustion
    - Can be changed later
    - Uses human-readable sizes

    Args:
        limit: Size limit (e.g. "20G")

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        fs.set_size_limit('20G')
        True
        ```
    """
    if not self.pool_name or not self.name:
        logging.error('Pool name and filesystem name required')
        return False

    result = self.run_command(
        subcommand='filesystem',
        action='set-size-limit',
        positional_args=[self.pool_name, self.name, limit],
    )
    if not result.failed:
        self.size_limit = limit
    return not result.failed

snapshot(snapshot_name)

Create filesystem snapshot.

Creates copy-on-write snapshot: - Instant creation - Space-efficient - Tracks origin - Writable by default

Parameters:

Name Type Description Default
snapshot_name str

Snapshot name

required

Returns:

Type Description
StratisFilesystem | None

New filesystem instance or None if failed

Example
fs.snapshot('snap1')
StratisFilesystem(name='snap1', ...)
Source code in sts_libs/src/sts/stratis/filesystem.py
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
def snapshot(self, snapshot_name: str) -> StratisFilesystem | None:
    """Create filesystem snapshot.

    Creates copy-on-write snapshot:
    - Instant creation
    - Space-efficient
    - Tracks origin
    - Writable by default

    Args:
        snapshot_name: Snapshot name

    Returns:
        New filesystem instance or None if failed

    Example:
        ```python
        fs.snapshot('snap1')
        StratisFilesystem(name='snap1', ...)
        ```
    """
    if not self.pool_name or not self.name:
        logging.error('Pool name and filesystem name required')
        return None

    result = self.run_command(
        subcommand='filesystem',
        action='snapshot',
        positional_args=[self.pool_name, self.name, snapshot_name],
    )
    if result.failed:
        return None

    return StratisFilesystem(
        name=snapshot_name,
        pool_name=self.pool_name,
        size=self.size,
        origin=self.name,
    )

unset_size_limit()

Unset filesystem size limit.

Removes growth limit: - Allows unlimited growth - Limited only by pool size - Cannot be undone (must set new limit)

Returns:

Type Description
bool

True if successful, False otherwise

Example
fs.unset_size_limit()
True
Source code in sts_libs/src/sts/stratis/filesystem.py
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
def unset_size_limit(self) -> bool:
    """Unset filesystem size limit.

    Removes growth limit:
    - Allows unlimited growth
    - Limited only by pool size
    - Cannot be undone (must set new limit)

    Returns:
        True if successful, False otherwise

    Example:
        ```python
        fs.unset_size_limit()
        True
        ```
    """
    if not self.pool_name or not self.name:
        logging.error('Pool name and filesystem name required')
        return False

    result = self.run_command(
        subcommand='filesystem',
        action='unset-size-limit',
        positional_args=[self.pool_name, self.name],
    )
    if not result.failed:
        self.size_limit = None
    return not result.failed