Skip to content

Modules

buildstock_fetch.main.fetch_bldg_ids(product, release_year, weather_file, release_version, state, upgrade_id)

Fetch a list of Building ID's

Provided a state, returns a list of building ID's for that state.

Parameters:

Name Type Description Default
product ResCom

The product type (e.g., 'resstock', 'comstock')

required
release_year ReleaseYear

The release year (e.g., '2021', '2022')

required
weather_file Weather

The weather file type (e.g., 'tmy3')

required
release_version str

The release version number (e.g., '1')

required
state str

The state to fetch building ID's for.

required

Returns:

Type Description
list[BuildingID]

A list of building ID's for the given state.

Source code in buildstock_fetch/main.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def fetch_bldg_ids(
    product: ResCom, release_year: ReleaseYear, weather_file: Weather, release_version: str, state: str, upgrade_id: str
) -> list[BuildingID]:
    """Fetch a list of Building ID's

    Provided a state, returns a list of building ID's for that state.

    Args:
        product: The product type (e.g., 'resstock', 'comstock')
        release_year: The release year (e.g., '2021', '2022')
        weather_file: The weather file type (e.g., 'tmy3')
        release_version: The release version number (e.g., '1')
        state: The state to fetch building ID's for.

    Returns:
        A list of building ID's for the given state.
    """

    if product == "resstock":
        product_str = "res"
    elif product == "comstock":
        product_str = "com"
    else:
        raise InvalidProductError(product)

    release_name = f"{product_str}_{release_year}_{weather_file}_{release_version}"
    if not _validate_release_name(release_name):
        raise InvalidReleaseNameError(release_name)

    # Read the specific partition that matches our criteria
    partition_path = (
        METADATA_DIR
        / f"product={product}"
        / f"release_year={release_year}"
        / f"weather_file={weather_file}"
        / f"release_version={release_version}"
        / f"state={state}"
    )

    # Check if the partition exists
    if not partition_path.exists():
        return []

    # Read the parquet files in the specific partition
    df = pl.read_parquet(str(partition_path))

    # No need to filter since we're already reading the specific partition
    filtered_df = df

    # Convert the filtered data to BuildingID objects
    building_ids = []
    for row in filtered_df.iter_rows(named=True):
        building_id = BuildingID(
            bldg_id=int(row["bldg_id"]),
            release_number=release_version,
            release_year=release_year,
            res_com=product,
            weather=weather_file,
            upgrade_id=upgrade_id,
            state=state,
        )
        building_ids.append(building_id)

    return building_ids

buildstock_fetch.main.fetch_bldg_data(bldg_ids, file_type, output_dir, max_workers=5, weather_states=None)

Download building data for a given list of building ids

Downloads the data for the given building ids and returns list of paths to the downloaded files.

Parameters:

Name Type Description Default
bldg_ids list[BuildingID]

A list of BuildingID objects to download data for.

required

Returns:

Type Description
tuple[list[Path], list[str]]

A list of paths to the downloaded files.

Source code in buildstock_fetch/main.py
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
def fetch_bldg_data(
    bldg_ids: list[BuildingID],
    file_type: tuple[str, ...],
    output_dir: Path,
    max_workers: int = 5,
    weather_states: list[str] | None = None,
) -> tuple[list[Path], list[str]]:
    """Download building data for a given list of building ids

    Downloads the data for the given building ids and returns list of paths to the downloaded files.

    Args:
        bldg_ids: A list of BuildingID objects to download data for.

    Returns:
        A list of paths to the downloaded files.
    """
    file_type_obj = _parse_requested_file_type(file_type)
    console = Console()

    # Initialize weather_states to empty list if None
    if weather_states is None:
        weather_states = []

    downloaded_paths: list[Path] = []
    failed_downloads: list[str] = []

    # Calculate total files to download
    total_files = 0
    if file_type_obj.metadata:
        unique_metadata_urls = _resolve_unique_metadata_urls(bldg_ids)
        total_files += len(unique_metadata_urls)  # Add metadata file
    if file_type_obj.load_curve_15min:
        total_files += len(bldg_ids)  # Add 15-minute load curve files
    if file_type_obj.load_curve_hourly:
        total_files += len(bldg_ids)  # Add hourly load curve files
    if file_type_obj.load_curve_daily:
        total_files += len(bldg_ids)  # Add daily load curve files
    if file_type_obj.load_curve_monthly:
        total_files += len(bldg_ids)  # Add monthly load curve files
    if file_type_obj.load_curve_annual:
        total_files += len(bldg_ids)  # Add annual load curve files
    if file_type_obj.weather:
        available_bldg_ids = [bldg_id for bldg_id in bldg_ids if bldg_id.state in weather_states]
        total_files += len(available_bldg_ids) * len(weather_states)  # Add weather map files

    console.print(f"\n[bold blue]Starting download of {total_files} files...[/bold blue]")
    with Progress(
        SpinnerColumn(),
        TextColumn("[progress.description]{task.description}"),
        BarColumn(),
        TaskProgressColumn(),
        TextColumn("•"),
        DownloadColumn(),
        TextColumn("•"),
        TransferSpeedColumn(),
        TextColumn("•"),
        TimeRemainingColumn(),
        console=console,
        transient=False,
    ) as progress:
        _execute_downloads(
            file_type_obj,
            bldg_ids,
            output_dir,
            max_workers,
            progress,
            downloaded_paths,
            failed_downloads,
            console,
            weather_states,
        )

        # TODO: add EV related files
        # TODO: Write a function for downloading EV related files from SB's s3 bucket.
        # It should dynamically build the download url based on the release_name + state combo.
        # Make sure to follow the directory structure for downloading the files.
        if file_type_obj.trip_schedules:
            _download_trip_schedules_data(bldg_ids, output_dir, downloaded_paths)

    _print_download_summary(downloaded_paths, failed_downloads, console)

    return downloaded_paths, failed_downloads

buildstock_fetch.read.BuildStockRead

Reader class for BuildStock data downloaded with bsf.

This class provides methods to read metadata and load curve data from locally downloaded BuildStock files.

Parameters:

Name Type Description Default
data_path Path | S3Path | str

Path to the data directory (local path or S3 path).

required
release ReleaseKey | BuildstockRelease

A BuildStockRelease enum member specifying the release.

required
states USStateCode | Collection[USStateCode] | None

Optional State or list of States to filter data. If None, auto-detects states present on disk.

None
sample_n int | None

Optional number of buildings to sample.

None
random Random | int | None

Optional random state for reproducible sampling (Random instance or int seed).

None
metadata_variant Literal['standard', 'sb']

Metadata file variant to use. "standard" for metadata.parquet, "sb" for metadata-sb.parquet (Switchbox-specific). Defaults to "standard".

'standard'
Example

from buildstock_fetch.read import BuildStockRead bsr = BuildStockRead( ... data_path="./data", ... release="res_2024_tmy_2", ... states="NY", ... ) metadata = bsr.read_metadata(upgrades=["0", "1"])

Source code in buildstock_fetch/read.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
@final
class BuildStockRead:
    """Reader class for BuildStock data downloaded with bsf.

    This class provides methods to read metadata and load curve data
    from locally downloaded BuildStock files.

    Args:
        data_path: Path to the data directory (local path or S3 path).
        release: A BuildStockRelease enum member specifying the release.
        states: Optional State or list of States to filter data.
            If None, auto-detects states present on disk.
        sample_n: Optional number of buildings to sample.
        random: Optional random state for reproducible sampling (Random instance or int seed).
        metadata_variant: Metadata file variant to use. "standard" for metadata.parquet,
            "sb" for metadata-sb.parquet (Switchbox-specific). Defaults to "standard".

    Example:
        >>> from buildstock_fetch.read import BuildStockRead
        >>> bsr = BuildStockRead(
        ...     data_path="./data",
        ...     release="res_2024_tmy_2",
        ...     states="NY",
        ... )
        >>> metadata = bsr.read_metadata(upgrades=["0", "1"])

    """

    def __init__(
        self,
        data_path: Path | S3Path | str,
        release: ReleaseKey | BuildstockRelease,
        states: USStateCode | Collection[USStateCode] | None = None,
        sample_n: int | None = None,
        random: Random | int | None = None,
        metadata_variant: Literal["standard", "sb"] = "standard",
    ) -> None:
        self.data_path = S3Path(cast(str, data_path)) if is_s3_path(data_path) else Path(cast(str, data_path))
        self.release = release if isinstance(release, BuildstockRelease) else BuildstockReleases.load()[release]

        self.states: list[USStateCode] | None
        if states is None:
            self.states = None
        elif is_valid_state_code(states):
            self.states = [states]
        else:
            self.states = list(states)

        if random is None:
            self.random = Random()
        elif isinstance(random, Random):
            self.random = random
        else:
            self.random = Random(random)

        self.sample_n = sample_n
        self.metadata_variant = metadata_variant

    @cached_property
    def downloaded_metadata(self) -> DownloadedData:
        return DownloadedData(
            filter_downloads(
                self.data_path,
                release_key=(self.release.key,),
                state=self.states,
                file_type="metadata",
            )
        )

    @cached_property
    def sampled_buildings(self) -> frozenset[int] | None:
        if self.sample_n is None:
            return None
        metadata_files = self.downloaded_metadata.filter(file_type="metadata", suffix=".parquet")
        if not metadata_files:
            raise MetadataNotFoundError(self.release, self.states)

        # Select only the chosen metadata variant
        metadata_filename = "metadata-sb.parquet" if self.metadata_variant == "sb" else "metadata.parquet"
        metadata_files = DownloadedData(f for f in metadata_files if f.filename == metadata_filename)
        if not metadata_files:
            raise MetadataNotFoundError(self.release, self.states)

        # Get unique states and find minimum upgrade per state
        # This ensures sampling from all states when multiple states are requested
        states = {f.state for f in metadata_files}
        files_to_read = []
        for state in states:
            state_files = [f for f in metadata_files if f.state == state]
            min_upgrade = min(state_files, key=lambda f: int(f.upgrade)).upgrade
            files_to_read.extend([f for f in state_files if f.upgrade == min_upgrade])

        df = pl.scan_parquet([str(f.file_path) for f in files_to_read]).select("bldg_id").collect()
        all_building_ids = cast(list[int], df["bldg_id"].unique().to_list())

        if self.sample_n > len(all_building_ids):
            logging.getLogger(__name__).info(
                f"sample_n ({self.sample_n}) exceeds available buildings ({len(all_building_ids)}). "
                + "Returning all buildings without sampling."
            )
            return frozenset(all_building_ids)

        return frozenset(self.random.sample(all_building_ids, self.sample_n))

    def read_metadata(
        self, upgrades: str | Collection[str] | None = None, building_ids: Collection[int] | None = None
    ) -> pl.LazyFrame:
        return self.read_parquets("metadata", upgrades, building_ids)

    def read_load_curve_15min(
        self, upgrades: str | Collection[str] | None = None, building_ids: Collection[int] | None = None
    ) -> pl.LazyFrame:
        return self.read_parquets("load_curve_15min", upgrades, building_ids)

    def read_load_curve_hourly(
        self, upgrades: str | Collection[str] | None = None, building_ids: Collection[int] | None = None
    ) -> pl.LazyFrame:
        return self.read_parquets("load_curve_hourly", upgrades, building_ids)

    def read_load_curve_daily(
        self, upgrades: str | Collection[str] | None = None, building_ids: Collection[int] | None = None
    ) -> pl.LazyFrame:
        return self.read_parquets("load_curve_daily", upgrades, building_ids)

    def read_load_curve_monthly(
        self, upgrades: str | Collection[str] | None = None, building_ids: Collection[int] | None = None
    ) -> pl.LazyFrame:
        return self.read_parquets("load_curve_monthly", upgrades, building_ids)

    def read_load_curve_annual(
        self, upgrades: str | Collection[str] | None = None, building_ids: Collection[int] | None = None
    ) -> pl.LazyFrame:
        return self.read_parquets("load_curve_annual", upgrades, building_ids)

    def read_parquets(
        self,
        file_type: FileType,
        upgrades: str | Collection[str] | None = None,
        building_ids: Collection[int] | None = None,
    ) -> pl.LazyFrame:
        if "metadata" not in self.release.file_types:
            raise FileTypeNotAvailableError(self.release, file_type)

        upgrades = self._validate_upgrades(file_type, upgrades)

        # We use different reading strategies based on file type:
        # - Metadata files: Must use diagonal concat because upgrade 0 metadata files have missing
        #   upgrade columns compared to other upgrades, creating schema mismatches which would cause
        #   scan_parquet with globbed files paths to fail. Diagonal concat handles this by filling
        #   missing columns with nulls.
        # - Load curve files: Can use a single glob pattern with hive partitioning since schemas
        #   are consistent across upgrades.

        if file_type == "metadata":
            lf = self._read_metadata_with_diagonal_concat(file_type)
        else:
            lf = pl.scan_parquet(str(self.data_path / self.release.key / file_type) + "/")

        # Apply all filters using Polars (states, upgrades, building_ids, sampled_buildings)
        # Polars automatically infers state/upgrade columns from hive partitioning
        lf = self._apply_filters(lf, upgrades, building_ids)

        return lf

    def _read_metadata_with_diagonal_concat(self, file_type: FileType) -> pl.LazyFrame:
        """Read all metadata files and concat diagonally, then filters will be applied."""
        # Get all metadata files (not filtered by state/upgrade - we'll filter with Polars)
        files = self.downloaded_metadata.filter(file_type=file_type, suffix=".parquet")

        # For metadata files, filter to chosen variant
        if file_type == "metadata":
            metadata_filename = "metadata-sb.parquet" if self.metadata_variant == "sb" else "metadata.parquet"
            files = DownloadedData(f for f in files if f.filename == metadata_filename)

        file_paths = [str(file.file_path) for file in files]

        if not file_paths:
            return pl.LazyFrame()

        # Scan each file with hive partitioning and concat diagonally
        # This handles schema mismatches where different upgrades have different columns
        # (e.g., upgrade 0 metadata files have missing upgrade columns)
        lazy_frames = [pl.scan_parquet(file_path, hive_partitioning=True) for file_path in file_paths]
        # Type checker doesn't know concat returns LazyFrame when given LazyFrames
        result = pl.concat(lazy_frames, how="diagonal")
        return cast(pl.LazyFrame, result)

    def _apply_filters(
        self,
        lf: pl.LazyFrame,
        upgrades: frozenset[UpgradeID] | None = None,
        building_ids: Collection[int] | None = None,
    ) -> pl.LazyFrame:
        """Apply all filters (states, upgrades, building_ids, sampled_buildings) using Polars.

        Filter order: Partition filters (state/upgrade) are applied first as they can prune
        entire files/partitions via hive partitioning. Row-level filters (bldg_id) are applied
        after and use predicate pushdown with row group statistics. Polars query optimizer
        collects all predicates automatically, but ordering partition filters first helps
        ensure they're recognized for file pruning.
        """

        # Apply state and upgradepartition filters first - these can prune entire files/partitions
        # See: https://pola.rs/posts/predicate-pushdown-query-optimizer/

        # Apply state filter if states are specified
        if self.states is not None:
            lf = lf.filter(pl.col("state").is_in(self.states))

        # Apply upgrade filter if upgrades are specified
        # Hive partitions treats upgrades as int64
        if upgrades:
            upgrade_values = [int(upgrade) for upgrade in upgrades]
            lf = lf.filter(pl.col("upgrade").is_in(upgrade_values))

        # Apply row-level filters - combine multiple bldg_id filters for efficiency
        building_id_sets = []
        if building_ids is not None:
            building_id_sets.append(set(building_ids))
        if self.sampled_buildings is not None:
            building_id_sets.append(set(self.sampled_buildings))

        if building_id_sets:
            # Intersect all building ID sets and apply single filter
            combined_ids = set.intersection(*building_id_sets) if len(building_id_sets) > 1 else building_id_sets[0]
            if combined_ids:
                lf = lf.filter(pl.col("bldg_id").is_in(list(combined_ids)))

        return lf

    def _available_upgrades(self, file_type: FileType) -> frozenset[UpgradeID]:
        if file_type == "metadata":
            return self.downloaded_metadata.filter(state=self.states, file_type=file_type).upgrades()

        state_upgrade_ids = []
        target_path = self.data_path / self.release.key / file_type
        if not target_path.exists():
            raise NoUpgradesFoundError(self.release)
        for state_path in target_path.iterdir():
            # Note: This is a little weird if multiple states. Currently returns intersection,
            # so only upgrades available in all states are returned.
            if self.states is None or state_path.name.removeprefix("state=") in self.states:
                state_upgrade_ids.append({u.name.removeprefix("upgrade=") for u in state_path.iterdir()})
        return frozenset(normalize_upgrade_id(_) for _ in set.intersection(*state_upgrade_ids))

    def _validate_upgrades(
        self, file_type: FileType, upgrades: str | Collection[str] | None = None
    ) -> frozenset[UpgradeID]:
        if upgrades is None:
            upgrades = None
        elif isinstance(upgrades, str):
            upgrades = [normalize_upgrade_id(upgrades)]
        else:
            upgrades = [normalize_upgrade_id(_) for _ in upgrades]

        # We shouldn't raise an error here - an empty list may be passed intentionally
        if upgrades is not None and not upgrades:
            logging.getLogger(__name__).info("Empty upgrades list got passed into validate_upgrades")
            return frozenset()

        if upgrades and (invalid_upgrades := [_ for _ in upgrades if _ not in self.release.upgrades]):
            raise InvalidUpgradeForRelease(self.release, *cast(tuple[UpgradeID, ...], tuple(invalid_upgrades)))

        available_upgrades = self._available_upgrades(file_type)
        if not available_upgrades:
            raise NoUpgradesFoundError(self.release)

        if upgrades and (missing_upgrades := [_ for _ in upgrades if _ not in available_upgrades]):
            raise UpgradeNotFoundError(
                self.release, available_upgrades, *cast(tuple[UpgradeID, ...], tuple(missing_upgrades))
            )

        if upgrades:
            return frozenset(cast(Collection[UpgradeID], upgrades))
        return frozenset(available_upgrades)

buildstock_fetch.mixed_upgrade.MixedUpgradeScenario

Class for orchestrating multi-year adoption trajectories across multiple upgrade scenarios.

This class enables defining and reading heterogeneous upgrade mixes where buildings progressively adopt different upgrades over time. Buildings are sampled once from a baseline upgrade, then allocated to different upgrades according to adoption fractions per year. Monotonic adoption is enforced: buildings can only move from baseline to an upgrade, never backwards.

Parameters:

Name Type Description Default
data_path Path | S3Path | str

Path to the data directory (local path or S3 path).

required
scenario_name str

Name of the pathway scenario. will be used to create subdirectories when writing out the metadata and load curves.

required
release ReleaseKey | BuildstockRelease

A BuildstockRelease or release key string specifying the release.

required
states USStateCode | Collection[USStateCode] | None

Optional state code or list of state codes to filter data. If None, auto-detects states present on disk.

None
sample_n int | None

Optional number of buildings to sample from baseline.

None
random Random | int | None

Optional Random instance or seed for reproducible sampling and allocation.

None
scenario dict[int, list[float]] | None

Dict mapping upgrade IDs to adoption fractions per year. Example: {4: [0.06, 0.18, 0.30], 8: [0.04, 0.12, 0.20]} represents 3 years where upgrade 4 grows from 6% to 30% adoption and upgrade 8 grows from 4% to 20% adoption.

None
Example

from buildstock_fetch.mixed_upgrade import MixedUpgradeScenario from buildstock_fetch.scenarios import uniform_adoption scenario = uniform_adoption( ... upgrade_ids=[4, 8], ... weights={4: 0.6, 8: 0.4}, ... adoption_trajectory=[0.1, 0.3, 0.5], ... ) mus = MixedUpgradeScenario( ... data_path="./data", ... scenario_name="rapid_adoption", ... release="res_2024_tmy3_2", ... states="NY", ... sample_n=1000, ... random=42, ... scenario=scenario, ... ) metadata = mus.read_metadata().collect() mus.export_scenario_to_cairo("./scenario.csv") mus.save_metadata_parquet() # writes to disk or S3 mus.save_hourly_load_parquet() # writes to disk or S3

Source code in buildstock_fetch/mixed_upgrade.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
@final
class MixedUpgradeScenario:
    """Class for orchestrating multi-year adoption trajectories across multiple upgrade scenarios.

    This class enables defining and reading heterogeneous upgrade mixes where buildings
    progressively adopt different upgrades over time. Buildings are sampled once from a
    baseline upgrade, then allocated to different upgrades according to adoption fractions
    per year. Monotonic adoption is enforced: buildings can only move from baseline to
    an upgrade, never backwards.

    Args:
        data_path: Path to the data directory (local path or S3 path).
        scenario_name: Name of the pathway scenario. will be used to create subdirectories when writing out the metadata and load curves.
        release: A BuildstockRelease or release key string specifying the release.
        states: Optional state code or list of state codes to filter data.
            If None, auto-detects states present on disk.
        sample_n: Optional number of buildings to sample from baseline.
        random: Optional Random instance or seed for reproducible sampling and allocation.
        scenario: Dict mapping upgrade IDs to adoption fractions per year.
            Example: {4: [0.06, 0.18, 0.30], 8: [0.04, 0.12, 0.20]}
            represents 3 years where upgrade 4 grows from 6% to 30% adoption
            and upgrade 8 grows from 4% to 20% adoption.

    Example:
        >>> from buildstock_fetch.mixed_upgrade import MixedUpgradeScenario
        >>> from buildstock_fetch.scenarios import uniform_adoption
        >>> scenario = uniform_adoption(
        ...     upgrade_ids=[4, 8],
        ...     weights={4: 0.6, 8: 0.4},
        ...     adoption_trajectory=[0.1, 0.3, 0.5],
        ... )
        >>> mus = MixedUpgradeScenario(
        ...     data_path="./data",
        ...     scenario_name="rapid_adoption",
        ...     release="res_2024_tmy3_2",
        ...     states="NY",
        ...     sample_n=1000,
        ...     random=42,
        ...     scenario=scenario,
        ... )
        >>> metadata = mus.read_metadata().collect()
        >>> mus.export_scenario_to_cairo("./scenario.csv")
        >>> mus.save_metadata_parquet() # writes to disk or S3
        >>> mus.save_hourly_load_parquet() # writes to disk or S3
    """

    def __init__(
        self,
        data_path: Path | S3Path | str,
        scenario_name: str,
        release: ReleaseKey | BuildstockRelease,
        states: USStateCode | Collection[USStateCode] | None = None,
        sample_n: int | None = None,
        random: Random | int | None = None,
        scenario: dict[int, list[float]] | None = None,
    ) -> None:
        if scenario is None:
            raise ScenarioParameterRequiredError()

        validate_scenario(scenario)
        self.scenario = scenario
        self.scenario_name = scenario_name
        self._upgrade_ids = tuple(scenario)
        self.num_years = len(next(iter(scenario.values())))

        self.release = release if isinstance(release, BuildstockRelease) else BuildstockReleases.load()[release]

        if states is None:
            self.states: list[USStateCode] | None = None
        elif is_valid_state_code(states):
            self.states = [states]
        else:
            self.states = list(states)

        self.random = random if isinstance(random, Random) else Random(random)

        self.sample_n = sample_n
        self._available_bldg_ids_cache: dict[str, dict[int, frozenset[int]]] = {}

        # Normalize the path first so BuildStockRead recognizes S3 URLs in Path objects
        normalized_data_path = _resolve_path(data_path)

        # Baseline reader handles state detection and optional sampling.
        self.baseline_reader = BuildStockRead(
            data_path=normalized_data_path,
            release=self.release,
            states=self.states,
            sample_n=sample_n,
            random=self.random,
            metadata_variant="standard",  # Always use standard variant for mixed upgrades
        )
        # Reuse the normalized path from BuildStockRead (supports both local and S3)
        self.data_path = self.baseline_reader.data_path

        sampled = self.baseline_reader.sampled_buildings
        if sampled is not None:
            self.sampled_bldgs: frozenset[int] = sampled
        else:
            # No sampling: fall back to full baseline bldg_id list.
            metadata_files = self.downloaded_metadata.filter(suffix=".parquet", upgrade=normalize_upgrade_id("0"))
            if not metadata_files:
                raise BaselineMetadataNotFoundError()
            first_file = min(metadata_files, key=lambda x: x.file_path)
            df = pl.scan_parquet(str(first_file.file_path)).select("bldg_id").collect()
            all_bldg_ids = df["bldg_id"].unique().to_list()
            self.sampled_bldgs = frozenset(all_bldg_ids)

        if not self.sampled_bldgs:
            raise NoBuildingsAvailableError()

        num_upgrades = len(scenario)
        print(f"Sampled {len(self.sampled_bldgs)} buildings from baseline (upgrade 0)")
        print(f"Materialized {self.num_years} years of adoption across {num_upgrades} upgrades")

    @cached_property
    def downloaded_metadata(self) -> DownloadedData:
        """Cached property for metadata file discovery only."""
        return DownloadedData(
            filter_downloads(
                self.data_path,
                release_key=(self.release.key,),
                state=self.states,
                file_type="metadata",
            )
        )

    @cached_property
    def _allocation_plan(self) -> tuple[list[int], dict[int, list[int]]]:
        bldgs = list(self.sampled_bldgs)
        self.random.shuffle(bldgs)
        allocations: dict[int, list[int]] = {uid: [] for uid in self._upgrade_ids}
        next_idx = 0
        total = len(bldgs)

        for year_idx in range(self.num_years):
            for uid in self._upgrade_ids:
                target = int(self.scenario[uid][year_idx] * total)
                new = target - len(allocations[uid])
                if new > 0:
                    # Assign contiguous slices once; yearly targets take prefixes.
                    end = next_idx + new
                    allocations[uid].extend(bldgs[next_idx:end])
                    next_idx = end

        return bldgs, allocations

    @cached_property
    def materialized_scenario(self) -> dict[int, dict[int, int]]:
        """Materialize building allocations for all years in the scenario."""
        bldgs, allocations = self._allocation_plan
        total = len(bldgs)
        materialized: dict[int, dict[int, int]] = {}

        for year_idx in range(self.num_years):
            # Start from baseline, then overlay upgrades for this year.
            year_map = dict.fromkeys(bldgs, 0)
            for uid in self._upgrade_ids:
                target = int(self.scenario[uid][year_idx] * total)
                if target:
                    for bldg_id in allocations[uid][:target]:
                        year_map[bldg_id] = uid
            materialized[year_idx] = year_map

        return materialized

    def _validate_years(self, years: list[int] | None) -> list[int]:
        """Validate and resolve years parameter.

        Args:
            years: List of year indices, or None for all years.

        Returns:
            List of valid year indices.

        Raises:
            ValueError: If any year index is out of range.
        """
        if years is None:
            return list(range(self.num_years))

        for year_idx in years:
            if not 0 <= year_idx < self.num_years:
                raise YearOutOfRangeError(year_idx, self.num_years - 1)

        return years

    def _scan_available_upgrades(self, file_type: FileType) -> frozenset[UpgradeID]:
        """Scan parquet directories to discover available upgrades for a file type.

        Uses directory structure scanning rather than file system traversal for performance.

        Args:
            file_type: Type of data to check (e.g., 'load_curve_hourly').

        Returns:
            Set of available upgrade IDs found in the directory structure.
        """
        base_path = self.data_path / self.release.key / file_type

        available_upgrades = set()
        for state_dir in base_path.iterdir():
            if self.states is None or state_dir.name.removeprefix("state=") in self.states:
                for upgrade_dir in state_dir.iterdir():
                    upgrade_id = upgrade_dir.name.removeprefix("upgrade=")
                    available_upgrades.add(normalize_upgrade_id(upgrade_id))

        return frozenset(available_upgrades)

    def _validate_data_availability(self, file_type: FileType) -> None:
        """Validate that all required upgrade data exists on disk.

        Args:
            file_type: Type of data to check (e.g., 'metadata', 'load_curve_15min').

        Raises:
            ScenarioDataNotFoundError: If data for any scenario upgrade is missing.
        """
        required_upgrades = {normalize_upgrade_id(str(uid)) for uid in self._upgrade_ids} | {normalize_upgrade_id("0")}

        if file_type == "metadata":
            # Use file system discovery for metadata
            missing_upgrades = required_upgrades - self.downloaded_metadata.upgrades()
        else:
            # Use parquet directory scanning for load curves
            available_upgrades = self._scan_available_upgrades(file_type)
            missing_upgrades = required_upgrades - available_upgrades

        if missing_upgrades:
            raise ScenarioDataNotFoundError(file_type, missing_upgrades)

    def _available_bldg_ids_by_upgrade(self, file_type: FileType) -> dict[int, frozenset[int]]:
        per_building_types = {
            "load_curve_15min",
            "load_curve_hourly",
            "load_curve_daily",
            "load_curve_monthly",
            "load_curve_annual",
        }
        if file_type not in per_building_types:
            return {}

        cached = self._available_bldg_ids_cache.get(file_type)
        if cached is not None:
            return cached

        needed_upgrades = {0, *self._upgrade_ids}
        available: dict[int, set[int]] = {uid: set() for uid in needed_upgrades}

        # Scan parquet directory structure directly
        base_path = self.data_path / self.release.key / file_type
        for state_dir in base_path.iterdir():
            if self.states is None or state_dir.name.removeprefix("state=") in self.states:
                for upgrade_dir in state_dir.iterdir():
                    try:
                        upgrade_id = int(upgrade_dir.name.removeprefix("upgrade="))
                        if upgrade_id not in available:
                            continue

                        # List files and extract building IDs from filenames
                        for file_path in upgrade_dir.glob("*.parquet"):
                            match = re.match(r"^(\d+)-", file_path.name)
                            if match:
                                bldg_id = int(match.group(1))
                                available[upgrade_id].add(bldg_id)
                    except (ValueError, AttributeError):
                        continue

        cached = {uid: frozenset(ids) for uid, ids in available.items()}
        self._available_bldg_ids_cache[file_type] = cached
        return cached

    def _warn_on_missing_ids(
        self,
        file_type: FileType,
        upgrade_id: int,
        year_idx: int,
        requested_ids: list[int],
        available_ids: frozenset[int],
        sample_size: int = 10,
    ) -> None:
        if not requested_ids or not available_ids:
            return
        missing = [bldg_id for bldg_id in requested_ids if bldg_id not in available_ids]
        if not missing:
            return
        logger = logging.getLogger(__name__)
        logger.warning(
            "Missing %s files for upgrade %s in year_idx %s: %s of %s requested IDs not found%s",
            file_type,
            upgrade_id,
            year_idx,
            len(missing),
            len(requested_ids),
            f" (example IDs: {missing[:sample_size]})" if sample_size else "",
        )

    def _read_for_upgrade(
        self,
        file_type: FileType,
        upgrade_id: int,
        bldg_ids: Iterable[int],
        year_idx: int,
        available_ids_by_upgrade: dict[int, frozenset[int]],
    ) -> pl.LazyFrame | None:
        """Read data for a specific upgrade and set of building IDs."""
        bldg_ids_list = list(bldg_ids)
        if not bldg_ids_list:
            return None
        if available_ids_by_upgrade:
            available_ids = available_ids_by_upgrade.get(upgrade_id, frozenset())
            self._warn_on_missing_ids(file_type, upgrade_id, year_idx, bldg_ids_list, available_ids)
        lf = self.baseline_reader.read_parquets(file_type, upgrades=str(upgrade_id), building_ids=bldg_ids_list)
        if file_type == "metadata":
            lf = lf.rename({"upgrade": "upgrade_id"})
        else:
            lf = lf.with_columns(pl.lit(upgrade_id).alias("upgrade_id"))
        return lf.with_columns(pl.lit(year_idx).alias("year"))

    def _process_year(
        self,
        file_type: FileType,
        year_idx: int,
        bldgs: list[int],
        allocations: dict[int, list[int]],
        total: int,
        available_ids_by_upgrade: dict[int, frozenset[int]],
    ) -> pl.LazyFrame | None:
        """Process data for a single year, reading all upgrades and baseline."""
        year_dfs: list[pl.LazyFrame] = []
        total_adopted = 0

        for uid in self._upgrade_ids:
            target = int(self.scenario[uid][year_idx] * total)
            total_adopted += target
            lf = self._read_for_upgrade(file_type, uid, allocations[uid][:target], year_idx, available_ids_by_upgrade)
            if lf is not None:
                year_dfs.append(lf)

        # Baseline = remaining buildings after all upgrade allocations.
        lf = self._read_for_upgrade(file_type, 0, bldgs[total_adopted:], year_idx, available_ids_by_upgrade)
        if lf is not None:
            year_dfs.append(lf)

        if year_dfs:
            return pl.concat(year_dfs, how="vertical_relaxed")
        return None

    def _read_data_for_scenario(self, file_type: FileType, years: list[int] | None = None) -> pl.LazyFrame:
        """Read data across upgrades and years without full materialization."""
        years = sorted(self._validate_years(years))
        self._validate_data_availability(file_type)
        available_ids_by_upgrade = self._available_bldg_ids_by_upgrade(file_type)
        # Build order + per-upgrade adopter lists once; slice per year.
        bldgs, allocations = self._allocation_plan
        total = len(bldgs)

        all_year_dfs: list[pl.LazyFrame] = []
        for year_idx in years:
            year_df = self._process_year(file_type, year_idx, bldgs, allocations, total, available_ids_by_upgrade)
            if year_df is not None:
                all_year_dfs.append(year_df)

        if not all_year_dfs:
            return pl.LazyFrame({"bldg_id": [], "upgrade_id": [], "year": []})

        return pl.concat(all_year_dfs, how="vertical_relaxed")

    def read_metadata(self, years: list[int] | None = None) -> pl.LazyFrame:
        """Read metadata for specified years in the scenario.

        Returns a LazyFrame containing metadata for all buildings and years in the
        scenario. Each row represents one building in one year.

        Args:
            years: List of year indices to include (0-indexed), or None for all years.
                Example: [0, 1, 2] or None

        Returns:
            A Polars LazyFrame with columns:
                - bldg_id: Building ID (from sampled baseline)
                - upgrade_id: Upgrade ID for this building in this year (0 or scenario upgrade)
                - year: Year index (0-indexed)
                - ...: Original metadata columns (e.g., in.state, in.vintage, etc.)

        Raises:
            ValueError: If any year index is out of range.
            ScenarioDataNotFoundError: If metadata for scenario upgrades is not on disk.

        Example:
            >>> # Read metadata for all years
            >>> metadata = mus.read_metadata()
            >>> df = metadata.collect()
            >>>
            >>> # Read metadata for specific years
            >>> metadata_early = mus.read_metadata(years=[0, 1])
        """
        return self._read_data_for_scenario("metadata", years)

    def read_load_curve_15min(self, years: list[int] | None = None) -> pl.LazyFrame:
        """Read 15-minute load curve data for specified years in the scenario.

        Args:
            years: List of year indices to include, or None for all years.

        Returns:
            A Polars LazyFrame with columns:
                - bldg_id: Building ID
                - upgrade_id: Upgrade ID
                - year: Year index
                - timestamp: Timestamp of the load data
                - ...: Energy columns (e.g., out.electricity.total.energy_consumption)

        Raises:
            ValueError: If any year index is out of range.
            ScenarioDataNotFoundError: If load curve data for scenario upgrades is not on disk.
        """
        return self._read_data_for_scenario("load_curve_15min", years)

    def read_load_curve_hourly(self, years: list[int] | None = None) -> pl.LazyFrame:
        """Read hourly load curve data for specified years in the scenario.

        Args:
            years: List of year indices to include, or None for all years.

        Returns:
            A Polars LazyFrame with columns:
                - bldg_id: Building ID
                - upgrade_id: Upgrade ID
                - year: Year index
                - timestamp: Timestamp of the load data
                - ...: Energy columns

        Raises:
            ValueError: If any year index is out of range.
            ScenarioDataNotFoundError: If load curve data for scenario upgrades is not on disk.
        """
        return self._read_data_for_scenario("load_curve_hourly", years)

    def read_load_curve_daily(self, years: list[int] | None = None) -> pl.LazyFrame:
        """Read daily load curve data for specified years in the scenario.

        Args:
            years: List of year indices to include, or None for all years.

        Returns:
            A Polars LazyFrame with columns:
                - bldg_id: Building ID
                - upgrade_id: Upgrade ID
                - year: Year index
                - timestamp: Timestamp of the load data
                - ...: Energy columns

        Raises:
            ValueError: If any year index is out of range.
            ScenarioDataNotFoundError: If load curve data for scenario upgrades is not on disk.
        """
        return self._read_data_for_scenario("load_curve_daily", years)

    def read_load_curve_annual(self, years: list[int] | None = None) -> pl.LazyFrame:
        """Read annual load curve data for specified years in the scenario.

        Args:
            years: List of year indices to include, or None for all years.

        Returns:
            A Polars LazyFrame with columns:
                - bldg_id: Building ID
                - upgrade_id: Upgrade ID
                - year: Year index
                - ...: Annual energy totals

        Raises:
            ValueError: If any year index is out of range.
            ScenarioDataNotFoundError: If load curve data for scenario upgrades is not on disk.
        """
        return self._read_data_for_scenario("load_curve_annual", years)

    def export_scenario_to_cairo(self, output_path: str | Path | S3Path) -> None:
        """Export scenario to CAIRO-compatible CSV format.

        Creates a CSV file with one row per building and one column per year.
        Each cell contains the upgrade ID for that building in that year.

        Args:
            output_path: Path where the CSV file should be written (local or S3).

        Output format:
            bldg_id,year_0,year_1,year_2
            405821,0,0,0
            612547,0,4,4
            789234,0,0,8

        Raises:
            ScenarioDataNotFoundError: If data for scenario upgrades is not on disk.

        Example:
            >>> mus.export_scenario_to_cairo("./scenario.csv")
            Exported scenario for 1000 buildings across 3 years to ./scenario.csv
        """
        self._validate_data_availability("metadata")
        bldgs, allocations = self._allocation_plan
        sorted_bldg_ids = sorted(self.sampled_bldgs)
        # Index once for stable, low-overhead column fills.
        idx = {bid: i for i, bid in enumerate(sorted_bldg_ids)}
        total = len(bldgs)

        year_cols = {f"year_{year_idx}": [0] * len(sorted_bldg_ids) for year_idx in range(self.num_years)}
        for year_idx in range(self.num_years):
            col = year_cols[f"year_{year_idx}"]
            # Fill each year column from the precomputed adopter slices.
            for uid in self._upgrade_ids:
                target = int(self.scenario[uid][year_idx] * total)
                if target:
                    for bldg_id in allocations[uid][:target]:
                        col[idx[bldg_id]] = uid

        df = pl.DataFrame({"bldg_id": sorted_bldg_ids, **year_cols})
        resolved = _resolve_path(output_path)
        df.write_csv(str(resolved))

        print(f"Exported scenario for {len(self.sampled_bldgs)} buildings across {self.num_years} years to {resolved}")

    def _resolve_scenario_root(self, path: Path | S3Path | str | None) -> Path | S3Path:
        """Resolve scenario root directory based on optional base path."""
        base_path = self.data_path / self.release.key if path is None else _resolve_path(path)
        return base_path / "mixed_upgrade" / self.scenario_name

    def save_metadata_parquet(self, path: Path | S3Path | str | None = None) -> None:
        """Save mixed upgrade metadata to a partitioned parquet dataset.

        Output structure:
            <path>/<scenario_name>/year=<year_int>/metadata.parquet

        Args:
            path: Optional base path to write to. Defaults to data_path/release/mixed_upgrade.
        """
        scenario_root = self._resolve_scenario_root(path)
        for year_idx in range(self.num_years):
            year_dir = scenario_root / f"year={year_idx}"
            if isinstance(year_dir, Path):
                year_dir.mkdir(parents=True, exist_ok=True)
            output_file = year_dir / "metadata.parquet"
            self.read_metadata(years=[year_idx]).sink_parquet(str(output_file))

    def save_hourly_load_parquet(self, path: Path | S3Path | str | None = None) -> None:
        """Save mixed upgrade hourly load curves to partitioned parquet datasets.

        Output structure:
            <path>/mixed_upgrade/<scenario_name>/year=<year_int>/<bldg_id>-<upgrade_id>.parquet

        Args:
            path: Optional base path to write to. Defaults to the release path.
        """
        scenario_root = self._resolve_scenario_root(path)
        for year_idx in range(self.num_years):
            year_dir = scenario_root / f"year={year_idx}"
            if isinstance(year_dir, Path):
                year_dir.mkdir(parents=True, exist_ok=True)
            df = self.read_load_curve_hourly(years=[year_idx]).collect()
            if df.is_empty():
                continue
            for key, group in df.partition_by(["bldg_id", "upgrade_id"], as_dict=True).items():
                bldg_id, upgrade_id = key
                output_file = year_dir / f"{int(bldg_id)}-{int(upgrade_id)}.parquet"
                group.write_parquet(str(output_file))

downloaded_metadata cached property

Cached property for metadata file discovery only.

materialized_scenario cached property

Materialize building allocations for all years in the scenario.

export_scenario_to_cairo(output_path)

Export scenario to CAIRO-compatible CSV format.

Creates a CSV file with one row per building and one column per year. Each cell contains the upgrade ID for that building in that year.

Parameters:

Name Type Description Default
output_path str | Path | S3Path

Path where the CSV file should be written (local or S3).

required
Output format

bldg_id,year_0,year_1,year_2 405821,0,0,0 612547,0,4,4 789234,0,0,8

Raises:

Type Description
ScenarioDataNotFoundError

If data for scenario upgrades is not on disk.

Example

mus.export_scenario_to_cairo("./scenario.csv") Exported scenario for 1000 buildings across 3 years to ./scenario.csv

Source code in buildstock_fetch/mixed_upgrade.py
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
def export_scenario_to_cairo(self, output_path: str | Path | S3Path) -> None:
    """Export scenario to CAIRO-compatible CSV format.

    Creates a CSV file with one row per building and one column per year.
    Each cell contains the upgrade ID for that building in that year.

    Args:
        output_path: Path where the CSV file should be written (local or S3).

    Output format:
        bldg_id,year_0,year_1,year_2
        405821,0,0,0
        612547,0,4,4
        789234,0,0,8

    Raises:
        ScenarioDataNotFoundError: If data for scenario upgrades is not on disk.

    Example:
        >>> mus.export_scenario_to_cairo("./scenario.csv")
        Exported scenario for 1000 buildings across 3 years to ./scenario.csv
    """
    self._validate_data_availability("metadata")
    bldgs, allocations = self._allocation_plan
    sorted_bldg_ids = sorted(self.sampled_bldgs)
    # Index once for stable, low-overhead column fills.
    idx = {bid: i for i, bid in enumerate(sorted_bldg_ids)}
    total = len(bldgs)

    year_cols = {f"year_{year_idx}": [0] * len(sorted_bldg_ids) for year_idx in range(self.num_years)}
    for year_idx in range(self.num_years):
        col = year_cols[f"year_{year_idx}"]
        # Fill each year column from the precomputed adopter slices.
        for uid in self._upgrade_ids:
            target = int(self.scenario[uid][year_idx] * total)
            if target:
                for bldg_id in allocations[uid][:target]:
                    col[idx[bldg_id]] = uid

    df = pl.DataFrame({"bldg_id": sorted_bldg_ids, **year_cols})
    resolved = _resolve_path(output_path)
    df.write_csv(str(resolved))

    print(f"Exported scenario for {len(self.sampled_bldgs)} buildings across {self.num_years} years to {resolved}")

read_load_curve_15min(years=None)

Read 15-minute load curve data for specified years in the scenario.

Parameters:

Name Type Description Default
years list[int] | None

List of year indices to include, or None for all years.

None

Returns:

Type Description
LazyFrame

A Polars LazyFrame with columns: - bldg_id: Building ID - upgrade_id: Upgrade ID - year: Year index - timestamp: Timestamp of the load data - ...: Energy columns (e.g., out.electricity.total.energy_consumption)

Raises:

Type Description
ValueError

If any year index is out of range.

ScenarioDataNotFoundError

If load curve data for scenario upgrades is not on disk.

Source code in buildstock_fetch/mixed_upgrade.py
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
def read_load_curve_15min(self, years: list[int] | None = None) -> pl.LazyFrame:
    """Read 15-minute load curve data for specified years in the scenario.

    Args:
        years: List of year indices to include, or None for all years.

    Returns:
        A Polars LazyFrame with columns:
            - bldg_id: Building ID
            - upgrade_id: Upgrade ID
            - year: Year index
            - timestamp: Timestamp of the load data
            - ...: Energy columns (e.g., out.electricity.total.energy_consumption)

    Raises:
        ValueError: If any year index is out of range.
        ScenarioDataNotFoundError: If load curve data for scenario upgrades is not on disk.
    """
    return self._read_data_for_scenario("load_curve_15min", years)

read_load_curve_annual(years=None)

Read annual load curve data for specified years in the scenario.

Parameters:

Name Type Description Default
years list[int] | None

List of year indices to include, or None for all years.

None

Returns:

Type Description
LazyFrame

A Polars LazyFrame with columns: - bldg_id: Building ID - upgrade_id: Upgrade ID - year: Year index - ...: Annual energy totals

Raises:

Type Description
ValueError

If any year index is out of range.

ScenarioDataNotFoundError

If load curve data for scenario upgrades is not on disk.

Source code in buildstock_fetch/mixed_upgrade.py
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
def read_load_curve_annual(self, years: list[int] | None = None) -> pl.LazyFrame:
    """Read annual load curve data for specified years in the scenario.

    Args:
        years: List of year indices to include, or None for all years.

    Returns:
        A Polars LazyFrame with columns:
            - bldg_id: Building ID
            - upgrade_id: Upgrade ID
            - year: Year index
            - ...: Annual energy totals

    Raises:
        ValueError: If any year index is out of range.
        ScenarioDataNotFoundError: If load curve data for scenario upgrades is not on disk.
    """
    return self._read_data_for_scenario("load_curve_annual", years)

read_load_curve_daily(years=None)

Read daily load curve data for specified years in the scenario.

Parameters:

Name Type Description Default
years list[int] | None

List of year indices to include, or None for all years.

None

Returns:

Type Description
LazyFrame

A Polars LazyFrame with columns: - bldg_id: Building ID - upgrade_id: Upgrade ID - year: Year index - timestamp: Timestamp of the load data - ...: Energy columns

Raises:

Type Description
ValueError

If any year index is out of range.

ScenarioDataNotFoundError

If load curve data for scenario upgrades is not on disk.

Source code in buildstock_fetch/mixed_upgrade.py
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
def read_load_curve_daily(self, years: list[int] | None = None) -> pl.LazyFrame:
    """Read daily load curve data for specified years in the scenario.

    Args:
        years: List of year indices to include, or None for all years.

    Returns:
        A Polars LazyFrame with columns:
            - bldg_id: Building ID
            - upgrade_id: Upgrade ID
            - year: Year index
            - timestamp: Timestamp of the load data
            - ...: Energy columns

    Raises:
        ValueError: If any year index is out of range.
        ScenarioDataNotFoundError: If load curve data for scenario upgrades is not on disk.
    """
    return self._read_data_for_scenario("load_curve_daily", years)

read_load_curve_hourly(years=None)

Read hourly load curve data for specified years in the scenario.

Parameters:

Name Type Description Default
years list[int] | None

List of year indices to include, or None for all years.

None

Returns:

Type Description
LazyFrame

A Polars LazyFrame with columns: - bldg_id: Building ID - upgrade_id: Upgrade ID - year: Year index - timestamp: Timestamp of the load data - ...: Energy columns

Raises:

Type Description
ValueError

If any year index is out of range.

ScenarioDataNotFoundError

If load curve data for scenario upgrades is not on disk.

Source code in buildstock_fetch/mixed_upgrade.py
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
def read_load_curve_hourly(self, years: list[int] | None = None) -> pl.LazyFrame:
    """Read hourly load curve data for specified years in the scenario.

    Args:
        years: List of year indices to include, or None for all years.

    Returns:
        A Polars LazyFrame with columns:
            - bldg_id: Building ID
            - upgrade_id: Upgrade ID
            - year: Year index
            - timestamp: Timestamp of the load data
            - ...: Energy columns

    Raises:
        ValueError: If any year index is out of range.
        ScenarioDataNotFoundError: If load curve data for scenario upgrades is not on disk.
    """
    return self._read_data_for_scenario("load_curve_hourly", years)

read_metadata(years=None)

Read metadata for specified years in the scenario.

Returns a LazyFrame containing metadata for all buildings and years in the scenario. Each row represents one building in one year.

Parameters:

Name Type Description Default
years list[int] | None

List of year indices to include (0-indexed), or None for all years. Example: [0, 1, 2] or None

None

Returns:

Type Description
LazyFrame

A Polars LazyFrame with columns: - bldg_id: Building ID (from sampled baseline) - upgrade_id: Upgrade ID for this building in this year (0 or scenario upgrade) - year: Year index (0-indexed) - ...: Original metadata columns (e.g., in.state, in.vintage, etc.)

Raises:

Type Description
ValueError

If any year index is out of range.

ScenarioDataNotFoundError

If metadata for scenario upgrades is not on disk.

Example

Read metadata for all years

metadata = mus.read_metadata() df = metadata.collect()

Read metadata for specific years

metadata_early = mus.read_metadata(years=[0, 1])

Source code in buildstock_fetch/mixed_upgrade.py
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
def read_metadata(self, years: list[int] | None = None) -> pl.LazyFrame:
    """Read metadata for specified years in the scenario.

    Returns a LazyFrame containing metadata for all buildings and years in the
    scenario. Each row represents one building in one year.

    Args:
        years: List of year indices to include (0-indexed), or None for all years.
            Example: [0, 1, 2] or None

    Returns:
        A Polars LazyFrame with columns:
            - bldg_id: Building ID (from sampled baseline)
            - upgrade_id: Upgrade ID for this building in this year (0 or scenario upgrade)
            - year: Year index (0-indexed)
            - ...: Original metadata columns (e.g., in.state, in.vintage, etc.)

    Raises:
        ValueError: If any year index is out of range.
        ScenarioDataNotFoundError: If metadata for scenario upgrades is not on disk.

    Example:
        >>> # Read metadata for all years
        >>> metadata = mus.read_metadata()
        >>> df = metadata.collect()
        >>>
        >>> # Read metadata for specific years
        >>> metadata_early = mus.read_metadata(years=[0, 1])
    """
    return self._read_data_for_scenario("metadata", years)

save_hourly_load_parquet(path=None)

Save mixed upgrade hourly load curves to partitioned parquet datasets.

Output structure

/mixed_upgrade//year=/-.parquet

Parameters:

Name Type Description Default
path Path | S3Path | str | None

Optional base path to write to. Defaults to the release path.

None
Source code in buildstock_fetch/mixed_upgrade.py
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
def save_hourly_load_parquet(self, path: Path | S3Path | str | None = None) -> None:
    """Save mixed upgrade hourly load curves to partitioned parquet datasets.

    Output structure:
        <path>/mixed_upgrade/<scenario_name>/year=<year_int>/<bldg_id>-<upgrade_id>.parquet

    Args:
        path: Optional base path to write to. Defaults to the release path.
    """
    scenario_root = self._resolve_scenario_root(path)
    for year_idx in range(self.num_years):
        year_dir = scenario_root / f"year={year_idx}"
        if isinstance(year_dir, Path):
            year_dir.mkdir(parents=True, exist_ok=True)
        df = self.read_load_curve_hourly(years=[year_idx]).collect()
        if df.is_empty():
            continue
        for key, group in df.partition_by(["bldg_id", "upgrade_id"], as_dict=True).items():
            bldg_id, upgrade_id = key
            output_file = year_dir / f"{int(bldg_id)}-{int(upgrade_id)}.parquet"
            group.write_parquet(str(output_file))

save_metadata_parquet(path=None)

Save mixed upgrade metadata to a partitioned parquet dataset.

Output structure

//year=/metadata.parquet

Parameters:

Name Type Description Default
path Path | S3Path | str | None

Optional base path to write to. Defaults to data_path/release/mixed_upgrade.

None
Source code in buildstock_fetch/mixed_upgrade.py
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
def save_metadata_parquet(self, path: Path | S3Path | str | None = None) -> None:
    """Save mixed upgrade metadata to a partitioned parquet dataset.

    Output structure:
        <path>/<scenario_name>/year=<year_int>/metadata.parquet

    Args:
        path: Optional base path to write to. Defaults to data_path/release/mixed_upgrade.
    """
    scenario_root = self._resolve_scenario_root(path)
    for year_idx in range(self.num_years):
        year_dir = scenario_root / f"year={year_idx}"
        if isinstance(year_dir, Path):
            year_dir.mkdir(parents=True, exist_ok=True)
        output_file = year_dir / "metadata.parquet"
        self.read_metadata(years=[year_idx]).sink_parquet(str(output_file))

buildstock_fetch.scenarios.uniform_adoption(upgrade_ids, weights, adoption_trajectory)

Generate a scenario from total adoption trajectory and fixed upgrade weights.

This helper function distributes a total adoption trajectory across multiple upgrades according to fixed weights. For example, if 30% of buildings adopt in year 1, and upgrade 4 has weight 0.6, then 18% of buildings will adopt upgrade 4 in year 1.

Parameters:

Name Type Description Default
upgrade_ids list[int]

List of upgrade IDs to include in the scenario.

required
weights dict[int, float]

Per-upgrade share of total adopters. Must sum to 1.0 (±1e-6). Example: {4: 0.6, 8: 0.4} means 60% choose upgrade 4, 40% choose upgrade 8.

required
adoption_trajectory list[float]

Total adoption fraction per year. Must be non-decreasing. Example: [0.1, 0.3, 0.5] means 10%, 30%, 50% total adoption over 3 years.

required

Returns:

Type Description
dict[int, list[float]]

Scenario dict mapping upgrade IDs to per-year adoption fractions.

Raises:

Type Description
InvalidScenarioError

If weights don't sum to 1.0, contain invalid values, or if adoption_trajectory is invalid.

Example

scenario = uniform_adoption( ... upgrade_ids=[4, 8], ... weights={4: 0.6, 8: 0.4}, ... adoption_trajectory=[0.1, 0.3, 0.5], ... ) scenario {4: [0.06, 0.18, 0.30], 8: [0.04, 0.12, 0.20]}

Source code in buildstock_fetch/scenarios.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
def uniform_adoption(
    upgrade_ids: list[int],
    weights: dict[int, float],
    adoption_trajectory: list[float],
) -> dict[int, list[float]]:
    """Generate a scenario from total adoption trajectory and fixed upgrade weights.

    This helper function distributes a total adoption trajectory across multiple
    upgrades according to fixed weights. For example, if 30% of buildings adopt
    in year 1, and upgrade 4 has weight 0.6, then 18% of buildings will adopt
    upgrade 4 in year 1.

    Args:
        upgrade_ids: List of upgrade IDs to include in the scenario.
        weights: Per-upgrade share of total adopters. Must sum to 1.0 (±1e-6).
            Example: {4: 0.6, 8: 0.4} means 60% choose upgrade 4, 40% choose upgrade 8.
        adoption_trajectory: Total adoption fraction per year. Must be non-decreasing.
            Example: [0.1, 0.3, 0.5] means 10%, 30%, 50% total adoption over 3 years.

    Returns:
        Scenario dict mapping upgrade IDs to per-year adoption fractions.

    Raises:
        InvalidScenarioError: If weights don't sum to 1.0, contain invalid values,
            or if adoption_trajectory is invalid.

    Example:
        >>> scenario = uniform_adoption(
        ...     upgrade_ids=[4, 8],
        ...     weights={4: 0.6, 8: 0.4},
        ...     adoption_trajectory=[0.1, 0.3, 0.5],
        ... )
        >>> scenario
        {4: [0.06, 0.18, 0.30], 8: [0.04, 0.12, 0.20]}
    """
    # Validate all parameters
    _validate_upgrade_ids_param(upgrade_ids)
    _validate_weights_param(weights, upgrade_ids)
    _validate_adoption_trajectory(adoption_trajectory)

    # Generate scenario by multiplying trajectory by weights
    scenario: dict[int, list[float]] = {}
    for upgrade_id in upgrade_ids:
        weight = weights[upgrade_id]
        scenario[upgrade_id] = [weight * adoption for adoption in adoption_trajectory]

    # Validate the generated scenario
    validate_scenario(scenario)

    return scenario