Checkpointing utilities for pipeline state management.

Enables saving and resuming pipeline execution state, which is critical for long-running database builds that may fail partway through.

Checkpoint

Represents a single checkpoint in pipeline execution.

Attributes:
  • stage_name (str) –

    Name of the completed stage

  • timestamp (str) –

    ISO format timestamp of checkpoint creation

  • metadata (dict[str, Any]) –

    Additional metadata about the checkpoint

__init__(stage_name, timestamp=None, metadata=None)

Create a checkpoint.

Parameters:
  • stage_name (str) –

    Name of the pipeline stage that completed

  • timestamp (str | None, default: None ) –

    ISO timestamp. If None, uses current time

  • metadata (dict[str, Any] | None, default: None ) –

    Additional metadata to store with checkpoint

to_dict()

Convert checkpoint to dictionary.

Returns:
  • dict[str, Any]

    Checkpoint as dictionary

from_dict(data) classmethod

Create checkpoint from dictionary.

Parameters:
  • data (dict[str, Any]) –

    Dictionary containing checkpoint data

Returns:

CheckpointManager

Manages pipeline checkpoints for fault tolerance.

Handles saving, loading, and querying pipeline execution state to enable resuming from failures.

Attributes:
  • checkpoint_dir (Path) –

    Directory where checkpoint files are stored

  • checkpoint_file (Path) –

    Path to the main checkpoint state file

  • checkpoints (list[Checkpoint]) –

    List of all checkpoints in order

__init__(checkpoint_dir)

Initialize checkpoint manager.

Parameters:
  • checkpoint_dir (Path) –

    Directory to store checkpoint files

save_checkpoint(stage_name, metadata=None)

Save a checkpoint for a completed stage.

Parameters:
  • stage_name (str) –

    Name of the completed stage

  • metadata (dict[str, Any] | None, default: None ) –

    Additional metadata to store (e.g., statistics, file paths)

get_last_checkpoint()

Get the most recent checkpoint.

Returns:
  • Checkpoint | None

    Last checkpoint, or None if no checkpoints exist

get_completed_stages()

Get list of all completed stage names.

Returns:
  • list[str]

    Names of completed stages in order

is_stage_completed(stage_name)

Check if a stage has been completed.

Parameters:
  • stage_name (str) –

    Name of the stage to check

Returns:
  • bool

    True if stage has a checkpoint, False otherwise

get_checkpoint_for_stage(stage_name)

Get the checkpoint for a specific stage.

Parameters:
  • stage_name (str) –

    Name of the stage

Returns:
  • Checkpoint | None

    Checkpoint for the stage, or None if not found

clear_checkpoints()

Clear all checkpoints and reset state.

clear_from_stage(stage_name)

Clear all checkpoints from a specific stage onwards.

Parameters:
  • stage_name (str) –

    Name of the stage to start clearing from

export_state(output_path)

Export checkpoint state to a file.

Parameters:
  • output_path (Path) –

    Path to export checkpoint state to

import_state(input_path)

Import checkpoint state from a file.

Parameters:
  • input_path (Path) –

    Path to import checkpoint state from

get_summary()

Get a human-readable summary of checkpoint state.

Returns:
  • str

    Summary of all checkpoints

create_stage_checkpoint(checkpoint_dir, stage_name, stats=None)

Convenience function to create a checkpoint for a stage.

Parameters:
  • checkpoint_dir (Path) –

    Directory where checkpoints are stored

  • stage_name (str) –

    Name of the completed stage

  • stats (dict[str, Any] | None, default: None ) –

    Statistics or metadata about the stage

should_skip_stage(checkpoint_dir, stage_name)

Check if a stage can be skipped based on checkpoint.

Parameters:
  • checkpoint_dir (Path) –

    Directory where checkpoints are stored

  • stage_name (str) –

    Name of the stage to check

Returns:
  • bool

    True if stage has been completed and can be skipped