Skip to content

DataCube Class#

Datacube diagram

pyramids.datacube.Datacube #

DataCube.

Source code in pyramids/datacube.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
class Datacube:
    """DataCube."""

    files: List[str]
    data: np.ndarray

    """
    files:
        list of geotiff files' names
    """

    def __init__(
        self,
        src: Dataset,
        time_length: int,
        files: List[str] = None,
    ):
        """Construct Datacube object."""
        self._base = src
        self._files = files
        self._time_length = time_length

        pass

    def __str__(self):
        """__str__."""
        message = f"""
            Files: {len(self.files)}
            Cell size: {self._base.cell_size}
            EPSG: {self._base.epsg}
            Dimension: {self.rows} * {self.columns}
            Mask: {self._base.no_data_value[0]}
        """
        return message

    def __repr__(self):
        """__repr__."""
        message = f"""
            Files: {len(self.files)}
            Cell size: {self._base.cell_size}
            EPSG: {self._base.epsg}
            Dimension: {self.rows} * {self.columns}
            Mask: {self._base.no_data_value[0]}
        """
        return message

    @property
    def base(self) -> Dataset:
        """base.

        Base Dataset
        """
        return self._base

    @property
    def files(self):
        """Files."""
        return self._files

    @property
    def time_length(self) -> int:
        """Length of the dataset."""
        return self._time_length

    @property
    def rows(self):
        """Number of rows."""
        return self._base.rows

    @property
    def shape(self):
        """Number of rows."""
        return self.time_length, self.rows, self.columns

    @property
    def columns(self):
        """Number of columns."""
        return self._base.columns

    @classmethod
    def create_cube(cls, src: Dataset, dataset_length: int) -> "Datacube":
        """Create Datacube.

            - Create a Datacube from a sample raster.

        Args:
            src (Dataset):
                Raster object.
            dataset_length (int):
                Length of the dataset.

        Returns:
            Datacube: Datacube object.
        """
        return cls(src, dataset_length)

    @classmethod
    def read_multiple_files(
        cls,
        path: Union[str, List[str]],
        with_order: bool = False,
        regex_string: str = r"\d{4}.\d{2}.\d{2}",
        date: bool = True,
        file_name_data_fmt: str = None,
        start: str = None,
        end: str = None,
        fmt: str = "%Y-%m-%d",
        extension: str = ".tif",
    ) -> "Datacube":
        r"""read_multiple_files.

            - Read rasters from a folder (or list of files) and create a 3D array with the same 2D dimensions as the
              first raster and length equal to the number of files.

            - All rasters should have the same dimensions.
            - If you want to read the rasters with a certain order, the raster file names should contain a date
              that follows a consistent format (YYYY.MM.DD / YYYY-MM-DD or YYYY_MM_DD), e.g. "MSWEP_1979.01.01.tif".

        Args:
            path (str | List[str]):
                Path of the folder that contains all the rasters, or a list containing the paths of the rasters to read.
            with_order (bool):
                True if the raster names follow a certain order. Then the raster names should have a date that follows
                the same format (YYYY.MM.DD / YYYY-MM-DD or YYYY_MM_DD). For example:

                ```python
                >>> "MSWEP_1979.01.01.tif"
                >>> "MSWEP_1979.01.02.tif"
                >>> ...
                >>> "MSWEP_1979.01.20.tif"

                ```

            regex_string (str):
                A regex string used to locate the date in the file names. Default is r"\d{4}.\d{2}.\d{2}". For example:

                ```python
                >>> fname = "MSWEP_YYYY.MM.DD.tif"
                >>> regex_string = r"\d{4}.\d{2}.\d{2}"
                ```

                - Or:

                ```python
                >>> fname = "MSWEP_YYYY_M_D.tif"
                >>> regex_string = r"\d{4}_\d{1}_\d{1}"
                ```

                - If there is a number at the beginning of the name:

                ```python
                >>> fname = "1_MSWEP_YYYY_M_D.tif"
                >>> regex_string = r"\d+"
                ```

            date (bool):
                True if the number in the file name is a date. Default is True.
            file_name_data_fmt (str):
                If the file names contain a date and you want to read them ordered. Default is None. For example:

                ```python
                >>> "MSWEP_YYYY.MM.DD.tif"
                >>> file_name_data_fmt = "%Y.%m.%d"
                ```

            start (str):
                Start date if you want to read the input raster for a specific period only and not all rasters. If not
                given, all rasters in the given path will be read.
            end (str):
                End date if you want to read the input rasters for a specific period only. If not given, all rasters in
                the given path will be read.
            fmt (str):
                Format of the given date in the start/end parameter.
            extension (str):
                The extension of the files you want to read from the given path. Default is ".tif".

        Returns:
            Datacube:
                Instance of the Datacube class.

        Examples:
            - Read all rasters in a folder:

              ```python
              >>> from pyramids.datacube import Datacube
              >>> raster_folder = "examples/GIS/data/raster-folder"
              >>> prec = Datacube.read_multiple_files(raster_folder)

              ```

            - Read from a pre-collected list without ordering:

              ```python
              >>> import glob
              >>> search_criteria = "*.tif"
              >>> file_list = glob.glob(os.path.join(raster_folder, search_criteria))
              >>> prec = Datacube.read_multiple_files(file_list, with_order=False)

              ```
        """
        if not isinstance(path, str) and not isinstance(path, list):
            raise TypeError(f"path input should be string/list type, given{type(path)}")

        if isinstance(path, str):
            # check whither the path exists or not
            if not os.path.exists(path):
                raise FileNotFoundError("The path you have provided does not exist")
            # get a list of all files
            files = os.listdir(path)
            files = [i for i in files if i.endswith(extension)]
            # files = glob.glob(os.path.join(path, "*.tif"))
            # check whether there are files or not inside the folder
            if len(files) < 1:
                raise FileNotFoundError("The path you have provided is empty")
        else:
            files = path[:]

        # to sort the files in the same order as the first number in the name
        if with_order:
            match_str_fn = lambda x: re.search(regex_string, x)
            list_dates = list(map(match_str_fn, files))

            if None in list_dates:
                raise ValueError(
                    "The date format/separator given does not match the file names"
                )
            if date:
                if file_name_data_fmt is None:
                    raise ValueError(
                        f"To read the raster with a certain order (with_order = {with_order}, then you have to enter "
                        f"the value of the parameter file_name_data_fmt(given: {file_name_data_fmt})"
                    )
                fn = lambda x: dt.datetime.strptime(x.group(), file_name_data_fmt)
            else:
                fn = lambda x: int(x.group())
            list_dates = list(map(fn, list_dates))

            df = pd.DataFrame()
            df["files"] = files
            df["date"] = list_dates
            df.sort_values("date", inplace=True, ignore_index=True)
            files = df.loc[:, "files"].values

        if start is not None or end is not None:
            if date:
                start = dt.datetime.strptime(start, fmt)
                end = dt.datetime.strptime(end, fmt)

                files = (
                    df.loc[start <= df["date"], :]
                    .loc[df["date"] <= end, "files"]
                    .values
                )
            else:
                files = (
                    df.loc[start <= df["date"], :]
                    .loc[df["date"] <= end, "files"]
                    .values
                )

        if not isinstance(path, list):
            # add the path to all the files
            files = [f"{path}/{i}" for i in files]
        # create a 3d array with the 2d dimension of the first raster and the len
        # of the number of rasters in the folder
        sample = Dataset.read_file(files[0])

        return cls(sample, len(files), files)

    def open_datacube(self, band: int = 0) -> None:
        """Open the datacube.

        Read values from the given band as arrays for all files.

        Args:
            band (int): Index of the band you want to read. Default is 0.

        Returns:
            None: Loads values into the internal 3D array [time, rows, cols] in-place.
        """
        # check the given band number
        if not hasattr(self, "base"):
            raise ValueError(
                "please use the read_multiple_files method to get the files (tiff/ascii) in the"
                "dataset directory"
            )
        if band > self.base.band_count - 1:
            raise ValueError(
                f"the raster has only {self.base.band_count} check the given band number"
            )
        # fill the array with no_data_value data
        self._values = np.ones(
            (
                self.time_length,
                self.base.rows,
                self.base.columns,
            )
        )
        self._values[:, :, :] = np.nan

        for i, file_i in enumerate(self.files):
            # read the tif file
            raster_i = gdal.Open(f"{file_i}")
            self._values[i, :, :] = raster_i.GetRasterBand(band + 1).ReadAsArray()

    @property
    def values(self) -> np.ndarray:
        """Values.

        - The attribute where the dataset array is stored.
        - the 3D numpy array, [dataset length, rows, cols], [dataset length, lons, lats]
        """
        return self._values

    @values.setter
    def values(self, val):
        """Values.

        - setting the data (array) does not allow different dimension from the dimension that has been
        defined in creating the dataset.
        """
        # if the attribute is defined before check the dimension
        if hasattr(self, "values"):
            if self._values.shape != val.shape:
                raise ValueError(
                    f"The dimension of the new data: {val.shape}, differs from the dimension of the "
                    f"original dataset: {self._values.shape}, please redefine the base Dataset and "
                    f"dataset_length first"
                )

        self._values = val

    @values.deleter
    def values(self):
        self._values = None

    def __getitem__(self, key):
        """Getitem."""
        if not hasattr(self, "values"):
            raise AttributeError("Please use the read_dataset method to read the data")
        return self._values[key, :, :]

    def __setitem__(self, key, value: np.ndarray):
        """Setitem."""
        if not hasattr(self, "values"):
            raise AttributeError("Please use the read_dataset method to read the data")
        self._values[key, :, :] = value

    def __len__(self):
        """Length of the Datacube."""
        return self._values.shape[0]

    def __iter__(self):
        """Iterate over the Datacube."""
        return iter(self._values[:])

    def head(self, n: int = 5):
        """First 5 Datasets."""
        return self._values[:n, :, :]

    def tail(self, n: int = -5):
        """Last 5 Datasets."""
        return self._values[n:, :, :]

    def first(self):
        """First Dataset."""
        return self._values[0, :, :]

    def last(self):
        """Last Dataset."""
        return self._values[-1, :, :]

    def iloc(self, i) -> Dataset:
        """iloc.

            - Access dataset array using index.

        Args:
            i (int):
                Index of the dataset to access.

        Returns:
            Dataset: Dataset object.
        """
        if not hasattr(self, "values"):
            raise DatasetNoFoundError("please read the dataset first")
        arr = self._values[i, :, :]
        dst = gdal.GetDriverByName("MEM").CreateCopy("", self.base.raster, 0)
        dst.GetRasterBand(1).WriteArray(arr)
        return Dataset(dst)

    def plot(self, band: int = 0, exclude_value: Any = None, **kwargs: Any) -> "ArrayGlyph":
        r"""Read Array.

            - read the values stored in a given band.

        Args:
            band (int):
                The band you want to get its data. Default is 0.
            exclude_value (Any):
                Value to exclude from the plot. Default is None.
            **kwargs:
                | Parameter                  | Type                  | Description |
                |----------------------------|-----------------------|-------------|
                | points                     | array                 | 3-column array: col 1 = value to display, col 2 = row index, col 3 = column index. Columns 2 and 3 indicate the location of the point. |
                | point_color                | str                   | Color of the points. |
                | point_size                 | Any                   | Size of the points. |
                | pid_color                  | str                   | Color of the annotation of the point. Default is blue. |
                | pid_size                   | Any                   | Size of the point annotation. |
                | figsize                    | tuple, optional       | Figure size. Default is `(8, 8)`. |
                | title                      | str, optional         | Title of the plot. Default is `'Total Discharge'`. |
                | title_size                 | int, optional         | Title size. Default is `15`. |
                | orientation                | str, optional         | Orientation of the color bar (`horizontal` or `vertical`). Default is `'vertical'`. |
                | rotation                   | number, optional      | Rotation of the color bar label. Default is `-90`. |
                | cbar_length                | float, optional       | Ratio to control the height of the color bar. Default is `0.75`. |
                | ticks_spacing              | int, optional         | Spacing in the color bar ticks. Default is `2`. |
                | cbar_label_size            | int, optional         | Size of the color bar label. Default is `12`. |
                | cbar_label                 | str, optional         | Label of the color bar. Default is `'Discharge m³/s'`. |
                | color_scale                | int, optional         | Color scaling mode (default = `1`): 1 = normal scale, 2 = power scale, 3 = SymLogNorm scale, 4 = PowerNorm scale, 5 = BoundaryNorm scale. |
                | gamma                      | float, optional       | Value needed for `color_scale=2`. Default is `1/2`. |
                | line_threshold             | float, optional       | Value needed for `color_scale=3`. Default is `0.0001`. |
                | line_scale                 | float, optional       | Value needed for `color_scale=3`. Default is `0.001`. |
                | bounds                     | list                  | Discrete bounds for `color_scale=4`. Default is `None`. |
                | midpoint                   | float, optional       | Value needed for `color_scale=5`. Default is `0`. |
                | cmap                       | str, optional         | Color map style. Default is `'coolwarm_r'`. |
                | display_cell_value         | bool                  | Whether to display the values of the cells as text. |
                | num_size                   | int, optional         | Size of the numbers plotted on top of each cell. Default is `8`. |
                | background_color_threshold | float \| int, optional| Threshold for deciding number color: if value > threshold → black; else white. If `None`, uses `max_value/2`. Default is `None`. |


        Returns:
            ArrayGlyph: A plotting/animation handle (from cleopatra.ArrayGlyph).
        """
        import_cleopatra(
            "The current funcrion uses cleopatra package to for plotting, please install it manually, for more info "
            "check https://github.com/Serapieum-of-alex/cleopatra"
        )
        from cleopatra.array_glyph import ArrayGlyph

        data = self.values

        exclude_value = (
            [self.base.no_data_value[band], exclude_value]
            if exclude_value is not None
            else [self.base.no_data_value[band]]
        )

        cleo = ArrayGlyph(data, exclude_value=exclude_value)
        time = list(range(self.time_length))
        cleo.animate(time, **kwargs)
        return cleo

    def to_file(
        self, path: Union[str, List[str]], driver: str = "geotiff", band: int = 0
    ):
        """Save to geotiff format.

            saveRaster saves a raster to a path

        Args:
            path (Union[str, List[str]]):
                a path includng the name of the raster and extention.
            driver (str):
                driver = "geotiff".
            band (int):
                band index, needed only in case of ascii drivers. Default is 1.

        Examples:
            - Save to a file:

              ```python
              >>> raster_obj = Dataset.read_file("path/to/file/***.tif")
              >>> output_path = "examples/GIS/data/save_raster_test.tif"
              >>> raster_obj.to_file(output_path)

              ```
        """
        ext = CATALOG.get_extension(driver)

        if isinstance(path, str):
            if not Path(path).exists():
                Path(path).mkdir(parents=True, exist_ok=True)
            path = [f"{path}/{i}.{ext}" for i in range(self.time_length)]
        else:
            if not len(path) == self.time_length:
                raise ValueError(
                    f"Length of the given paths: {len(path)} does not equal number of rasters in the data cube: {self.time_length}"
                )
            if not Path(path[0]).parent.exists():
                Path(path[0]).parent.mkdir(parents=True, exist_ok=True)

        for i in range(self.time_length):
            src = self.iloc(i)
            src.to_file(path[i], band=band)

    def to_crs(
        self,
        to_epsg: int = 3857,
        method: str = "nearest neighbor",
        maintain_alignment: int = False,
    ) -> None:
        """to_epsg.

            - to_epsg reprojects a raster to any projection (default the WGS84 web mercator projection,
            without resampling) The function returns a GDAL in-memory file object, where you can ReadAsArray etc.

        Args:
            to_epsg (int):
                Reference number to the new projection (https://epsg.io/)
                (default 3857 the reference no of WGS84 web mercator).
            method (str):
                Resampling technique. Default is "Nearest". See https://gisgeography.com/raster-resampling/.
                "Nearest" for nearest neighbor, "cubic" for cubic convolution, "bilinear" for bilinear.
            maintain_alignment (bool):
                True to maintain the number of rows and columns of the raster the same after reprojection.
                Default is False.

        Returns:
            None: Updates the datacube values and base in place after reprojection.

        Examples:
            - Reproject dataset to EPSG:3857:

              ```python
              >>> from pyramids.dataset import Dataset
              >>> src = Dataset.read_file("path/raster_name.tif")
              >>> projected_raster = src.to_crs(to_epsg=3857)

              ```
        """
        for i in range(self.time_length):
            src = self.iloc(i)
            dst = src.to_crs(
                to_epsg, method=method, maintain_alignment=maintain_alignment
            )
            arr = dst.read_array()
            if i == 0:
                # create the array
                array = (
                    np.ones(
                        (
                            self.time_length,
                            arr.shape[0],
                            arr.shape[1],
                        )
                    )
                    * np.nan
                )
            array[i, :, :] = arr

        self._values = array
        # use the last src as
        self._base = dst

    def crop(
        self, mask: Union[Dataset, str], inplace: bool = False, touch: bool = True
    ) -> Union[None, Dataset]:
        """crop.

            crop matches the location of nodata value from src raster to dst raster. Mask is where the NoDatavalue will
            be taken and the location of this value. src_dir is path to the folder where rasters exist where we need to
            put the NoDataValue of the mask in RasterB at the same locations.

        Args:
            mask (Dataset):
                Dataset object of the mask raster to crop the rasters (to get the NoData value and its location in the
                array). Mask should include the name of the raster and the extension like "data/dem.tif", or you can
                read the mask raster using gdal and use it as the first parameter to the function.
            inplace (bool):
                True to make the changes in place.
            touch (bool):
                Include the cells that touch the polygon, not only those that lie entirely inside the polygon mask.
                Default is True.

        Returns:
            Union[None, "Datacube"]: New rasters have the values from rasters in B_input_path with the NoDataValue in
            the same locations as raster A.

        Examples:
            - Crop aligned rasters using a DEM mask:

              ```python
              >>> dem_path = "examples/GIS/data/acc4000.tif"
              >>> src_path = "examples/GIS/data/aligned_rasters/"
              >>> out_path = "examples/GIS/data/crop_aligned_folder/"
              >>> Datacube.crop(dem_path, src_path, out_path)

              ```
        """
        for i in range(self.time_length):
            src = self.iloc(i)
            dst = src.crop(mask, touch=touch)
            arr = dst.read_array()
            if i == 0:
                # create the array
                array = (
                    np.ones(
                        (self.time_length, arr.shape[0], arr.shape[1]),
                    )
                    * np.nan
                )

            array[i, :, :] = arr

        if inplace:
            self._values = array
            # use the last src as
            self._base = dst
        else:
            dataset = Datacube(dst, time_length=self.time_length)
            dataset._values = array
            return dataset

    # # TODO: merge ReprojectDataset and ProjectRaster they are almost the same
    # # TODO: still needs to be tested
    # @staticmethod
    # def to_epsg(
    #         src: gdal.Datacube,
    #         to_epsg: int = 3857,
    #         cell_size: int = [],
    #         method: str = "Nearest",
    #
    # ) -> gdal.Datacube:
    #     """to_epsg.
    #
    #         - to_epsg reprojects and resamples a folder of rasters to any projection
    #         (default the WGS84 web mercator projection, without resampling)
    #
    #     Parameters
    #     ----------
    #     src: [gdal dataset]
    #         gdal dataset object (src=gdal.Open("dem.tif"))
    #     to_epsg: [integer]
    #          reference number to the new projection (https://epsg.io/)
    #         (default 3857 the reference no of WGS84 web mercator )
    #     cell_size: [integer]
    #          number to resample the raster cell size to a new cell size
    #         (default empty so raster will not be resampled)
    #     method: [String]
    #         resampling technique default is "Nearest"
    #         https://gisgeography.com/raster-resampling/
    #         "Nearest" for nearest neighbor,"cubic" for cubic convolution,
    #         "bilinear" for bilinear
    #
    #     Returns
    #     -------
    #     raster: [gdal Datacube]
    #          a GDAL in-memory file object, where you can ReadAsArray etc.
    #     """
    #     if not isinstance(src, gdal.Datacube):
    #         raise TypeError(
    #             "src should be read using gdal (gdal dataset please read it using gdal"
    #             f" library) given {type(src)}"
    #         )
    #     if not isinstance(to_epsg, int):
    #         raise TypeError(
    #             "please enter correct integer number for to_epsg more information "
    #             f"https://epsg.io/, given {type(to_epsg)}"
    #         )
    #     if not isinstance(method, str):
    #         raise TypeError(
    #             "please enter correct method more information see " "docmentation "
    #         )
    #
    #     if cell_size:
    #         assert isinstance(cell_size, int) or isinstance(
    #             cell_size, float
    #         ), "please enter an integer or float cell size"
    #
    #     if method == "Nearest":
    #         method = gdal.GRA_NearestNeighbour
    #     elif method == "cubic":
    #         method = gdal.GRA_Cubic
    #     elif method == "bilinear":
    #         method = gdal.GRA_Bilinear
    #
    #     src_proj = src.GetProjection()
    #     src_gt = src.GetGeoTransform()
    #     src_x = src.RasterXSize
    #     src_y = src.RasterYSize
    #     dtype = src.GetRasterBand(1).DataType
    #     # spatial ref
    #     src_sr = osr.SpatialReference(wkt=src_proj)
    #     src_epsg = src_sr.GetAttrValue("AUTHORITY", 1)
    #
    #     # distination
    #     # spatial ref
    #     dst_epsg = osr.SpatialReference()
    #     dst_epsg.ImportFromEPSG(to_epsg)
    #     # transformation factors
    #     tx = osr.CoordinateTransformation(src_sr, dst_epsg)
    #
    #     # incase the source crs is GCS and longitude is in the west hemisphere gdal
    #     # reads longitude fron 0 to 360 and transformation factor wont work with valeus
    #     # greater than 180
    #     if src_epsg == "4326" and src_gt[0] > 180:
    #         lng_new = src_gt[0] - 360
    #         # transform the right upper corner point
    #         (ulx, uly, ulz) = tx.TransformPoint(lng_new, src_gt[3])
    #         # transform the right lower corner point
    #         (lrx, lry, lrz) = tx.TransformPoint(
    #             lng_new + src_gt[1] * src_x, src_gt[3] + src_gt[5] * src_y
    #         )
    #     else:
    #         # transform the right upper corner point
    #         (ulx, uly, ulz) = tx.TransformPoint(src_gt[0], src_gt[3])
    #         # transform the right lower corner point
    #         (lrx, lry, lrz) = tx.TransformPoint(
    #             src_gt[0] + src_gt[1] * src_x, src_gt[3] + src_gt[5] * src_y
    #         )
    #
    #     if not cell_size:
    #         # the result raster has the same pixcel size as the source
    #         # check if the coordinate system is GCS convert the distance from angular to metric
    #         if src_epsg == "4326":
    #             coords_1 = (src_gt[3], src_gt[0])
    #             coords_2 = (src_gt[3], src_gt[0] + src_gt[1])
    #             #            pixel_spacing=geopy.distance.vincenty(coords_1, coords_2).m
    #             pixel_spacing = FeatureCollection.GCSDistance(coords_1, coords_2)
    #         else:
    #             pixel_spacing = src_gt[1]
    #     else:
    #         # if src_epsg.GetAttrValue('AUTHORITY', 1) != "4326":
    #         #     assert (cell_size > 1), "please enter cell size greater than 1"
    #         # if the user input a cell size resample the raster
    #         pixel_spacing = cell_size
    #
    #     # create a new raster
    #     cols = int(np.round(abs(lrx - ulx) / pixel_spacing))
    #     rows = int(np.round(abs(uly - lry) / pixel_spacing))
    #     dst = Dataset._create_dataset(cols, rows, 1, dtype, driver="MEM")
    #
    #     # new geotransform
    #     new_geo = (ulx, pixel_spacing, src_gt[2], uly, src_gt[4], -pixel_spacing)
    #     # set the geotransform
    #     dst.SetGeoTransform(new_geo)
    #     # set the projection
    #     dst.SetProjection(dst_epsg.ExportToWkt())
    #     # set the no data value
    #     no_data_value = src.GetRasterBand(1).GetNoDataValue()
    #     dst = Dataset._set_no_data_value(dst, no_data_value)
    #     # perform the projection & resampling
    #     gdal.ReprojectImage(
    #         src, dst, src_sr.ExportToWkt(), dst_epsg.ExportToWkt(), method
    #     )
    #
    #     return dst

    def align(self, alignment_src: Dataset) -> None:
        """matchDataAlignment.

        This function matches the coordinate system and the number of rows and columns between two rasters. Raster A
        is the source of the coordinate system, number of rows, number of columns, and cell size. The result will be
        a raster with the same structure as Raster A but with values from Raster B using nearest neighbor interpolation.

        Args:
            alignment_src (Dataset):
                Dataset to use as the spatial template (CRS, rows, columns).

        Returns:
            None:
                Updates the datacube values in place to match the alignment of alignment_src.

        Examples:
            - Align all rasters in the datacube to a DEM raster:

              ```python
              >>> dem_path = "01GIS/inputs/4000/acc4000.tif"
              >>> prec_in_path = "02Precipitation/CHIRPS/Daily/"
              >>> prec_out_path = "02Precipitation/4km/"
              >>> Dataset.align(dem_path, prec_in_path, prec_out_path)

              ```
        """
        if not isinstance(alignment_src, Dataset):
            raise TypeError("alignment_src input should be a Dataset object")

        for i in range(self.time_length):
            src = self.iloc(i)
            dst = src.align(alignment_src)
            arr = dst.read_array()
            if i == 0:
                # create the array
                array = (
                    np.ones(
                        (self.time_length, arr.shape[0], arr.shape[1]),
                    )
                    * np.nan
                )

            array[i, :, :] = arr

        self._values = array
        # use the last src as
        self._base = dst

    @staticmethod
    def merge(
        src: List[str],
        dst: str,
        no_data_value: Union[float, int, str] = "0",
        init: Union[float, int, str] = "nan",
        n: Union[float, int, str] = "nan",
    ) -> None:
        """merge.

            Merges a group of rasters into one raster.

        Args:
            src (List[str]):
                List of paths to all input rasters.
            dst (str):
                Path to the output raster.
            no_data_value (float | int | str):
                Assign a specified nodata value to output bands.
            init (float | int | str):
                Pre-initialize the output image bands with these values. However, it is not
                marked as the nodata value in the output file. If only one value is given, the same value is used
                in all the bands.
            n (float | int | str):
                Ignore pixels from files being merged in with this pixel value.

        Returns:
            None
        """
        # run the command
        # cmd = "gdal_merge.py -o merged_image_1.tif"
        # subprocess.call(cmd.split() + file_list)
        # vrt = gdal.BuildVRT("merged.vrt", file_list)
        # src = gdal.Translate("merged_image.tif", vrt)

        parameters = (
            ["", "-o", dst]
            + src
            + [
                "-co",
                "COMPRESS=LZW",
                "-init",
                str(init),
                "-a_nodata",
                str(no_data_value),
                "-n",
                str(n),
            ]
        )  # '-separate'
        gdal_merge.main(parameters)

    def apply(self, ufunc: Callable) -> None:
        """apply.

        Apply a function on each raster in the datacube.

        Args:
            ufunc (Callable):
                Callable universal function (builtin or user defined). See
                https://numpy.org/doc/stable/reference/ufuncs.html
                To create a ufunc from a normal function: https://numpy.org/doc/stable/reference/generated/numpy.frompyfunc.html

        Returns:
            None

        Examples:
            - Apply a simple modulo operation to each value:

              ```python
              >>> def func(val):
              >>>    return val%2
              >>> ufunc = np.frompyfunc(func, 1, 1)
              >>> dataset.apply(ufunc)

              ```
        """
        if not callable(ufunc):
            raise TypeError("The Second argument should be a function")
        arr = self.values
        no_data_value = self.base.no_data_value[0]
        # execute the function on each raster
        arr[~np.isclose(arr, no_data_value, rtol=0.001)] = ufunc(
            arr[~np.isclose(arr, no_data_value, rtol=0.001)]
        )

    def overlay(
        self,
        classes_map,
        exclude_value: Union[float, int] = None,
    ) -> Dict[List[float], List[float]]:
        """Overlay.

        Args:
            classes_map (Dataset):
                Dataset object for the raster that has classes to overlay with.
            exclude_value (float | int, optional):
                Values to exclude from extracted values. Defaults to None.

        Returns:
            Dict[List[float], List[float]]:
                Dictionary with a list of values in the basemap as keys and for each key a list of all the
                intersected values in the maps from the path.
        """
        values = {}
        for i in range(self.time_length):
            src = self.iloc(i)
            dict_i = src.overlay(classes_map, exclude_value)

            # these are the distinct values from the BaseMap which are keys in the
            # values dict with each one having a list of values
            classes = list(dict_i.keys())

            for class_i in classes:
                if class_i not in values.keys():
                    values[class_i] = list()

                values[class_i] = values[class_i] + dict_i[class_i]

        return values

data instance-attribute #

files

list of geotiff files' names

base property #

base.

Base Dataset

time_length property #

Length of the dataset.

rows property #

Number of rows.

shape property #

Number of rows.

columns property #

Number of columns.

values deletable property writable #

Values.

  • The attribute where the dataset array is stored.
  • the 3D numpy array, [dataset length, rows, cols], [dataset length, lons, lats]

__init__(src, time_length, files=None) #

Construct Datacube object.

Source code in pyramids/datacube.py
def __init__(
    self,
    src: Dataset,
    time_length: int,
    files: List[str] = None,
):
    """Construct Datacube object."""
    self._base = src
    self._files = files
    self._time_length = time_length

    pass

__str__() #

str.

Source code in pyramids/datacube.py
def __str__(self):
    """__str__."""
    message = f"""
        Files: {len(self.files)}
        Cell size: {self._base.cell_size}
        EPSG: {self._base.epsg}
        Dimension: {self.rows} * {self.columns}
        Mask: {self._base.no_data_value[0]}
    """
    return message

__repr__() #

repr.

Source code in pyramids/datacube.py
def __repr__(self):
    """__repr__."""
    message = f"""
        Files: {len(self.files)}
        Cell size: {self._base.cell_size}
        EPSG: {self._base.epsg}
        Dimension: {self.rows} * {self.columns}
        Mask: {self._base.no_data_value[0]}
    """
    return message

create_cube(src, dataset_length) classmethod #

Create Datacube.

- Create a Datacube from a sample raster.

Parameters:

Name Type Description Default
src Dataset

Raster object.

required
dataset_length int

Length of the dataset.

required

Returns:

Name Type Description
Datacube Datacube

Datacube object.

Source code in pyramids/datacube.py
@classmethod
def create_cube(cls, src: Dataset, dataset_length: int) -> "Datacube":
    """Create Datacube.

        - Create a Datacube from a sample raster.

    Args:
        src (Dataset):
            Raster object.
        dataset_length (int):
            Length of the dataset.

    Returns:
        Datacube: Datacube object.
    """
    return cls(src, dataset_length)

read_multiple_files(path, with_order=False, regex_string='\\d{4}.\\d{2}.\\d{2}', date=True, file_name_data_fmt=None, start=None, end=None, fmt='%Y-%m-%d', extension='.tif') classmethod #

read_multiple_files.

- Read rasters from a folder (or list of files) and create a 3D array with the same 2D dimensions as the
  first raster and length equal to the number of files.

- All rasters should have the same dimensions.
- If you want to read the rasters with a certain order, the raster file names should contain a date
  that follows a consistent format (YYYY.MM.DD / YYYY-MM-DD or YYYY_MM_DD), e.g. "MSWEP_1979.01.01.tif".

Parameters:

Name Type Description Default
path str | List[str]

Path of the folder that contains all the rasters, or a list containing the paths of the rasters to read.

required
with_order bool

True if the raster names follow a certain order. Then the raster names should have a date that follows the same format (YYYY.MM.DD / YYYY-MM-DD or YYYY_MM_DD). For example:

>>> "MSWEP_1979.01.01.tif"
>>> "MSWEP_1979.01.02.tif"
>>> ...
>>> "MSWEP_1979.01.20.tif"
False
regex_string str

A regex string used to locate the date in the file names. Default is r"\d{4}.\d{2}.\d{2}". For example:

>>> fname = "MSWEP_YYYY.MM.DD.tif"
>>> regex_string = r"\d{4}.\d{2}.\d{2}"
  • Or:
>>> fname = "MSWEP_YYYY_M_D.tif"
>>> regex_string = r"\d{4}_\d{1}_\d{1}"
  • If there is a number at the beginning of the name:
>>> fname = "1_MSWEP_YYYY_M_D.tif"
>>> regex_string = r"\d+"
'\\d{4}.\\d{2}.\\d{2}'
date bool

True if the number in the file name is a date. Default is True.

True
file_name_data_fmt str

If the file names contain a date and you want to read them ordered. Default is None. For example:

>>> "MSWEP_YYYY.MM.DD.tif"
>>> file_name_data_fmt = "%Y.%m.%d"
None
start str

Start date if you want to read the input raster for a specific period only and not all rasters. If not given, all rasters in the given path will be read.

None
end str

End date if you want to read the input rasters for a specific period only. If not given, all rasters in the given path will be read.

None
fmt str

Format of the given date in the start/end parameter.

'%Y-%m-%d'
extension str

The extension of the files you want to read from the given path. Default is ".tif".

'.tif'

Returns:

Name Type Description
Datacube Datacube

Instance of the Datacube class.

Examples:

  • Read all rasters in a folder:
>>> from pyramids.datacube import Datacube
>>> raster_folder = "examples/GIS/data/raster-folder"
>>> prec = Datacube.read_multiple_files(raster_folder)
  • Read from a pre-collected list without ordering:
>>> import glob
>>> search_criteria = "*.tif"
>>> file_list = glob.glob(os.path.join(raster_folder, search_criteria))
>>> prec = Datacube.read_multiple_files(file_list, with_order=False)
Source code in pyramids/datacube.py
@classmethod
def read_multiple_files(
    cls,
    path: Union[str, List[str]],
    with_order: bool = False,
    regex_string: str = r"\d{4}.\d{2}.\d{2}",
    date: bool = True,
    file_name_data_fmt: str = None,
    start: str = None,
    end: str = None,
    fmt: str = "%Y-%m-%d",
    extension: str = ".tif",
) -> "Datacube":
    r"""read_multiple_files.

        - Read rasters from a folder (or list of files) and create a 3D array with the same 2D dimensions as the
          first raster and length equal to the number of files.

        - All rasters should have the same dimensions.
        - If you want to read the rasters with a certain order, the raster file names should contain a date
          that follows a consistent format (YYYY.MM.DD / YYYY-MM-DD or YYYY_MM_DD), e.g. "MSWEP_1979.01.01.tif".

    Args:
        path (str | List[str]):
            Path of the folder that contains all the rasters, or a list containing the paths of the rasters to read.
        with_order (bool):
            True if the raster names follow a certain order. Then the raster names should have a date that follows
            the same format (YYYY.MM.DD / YYYY-MM-DD or YYYY_MM_DD). For example:

            ```python
            >>> "MSWEP_1979.01.01.tif"
            >>> "MSWEP_1979.01.02.tif"
            >>> ...
            >>> "MSWEP_1979.01.20.tif"

            ```

        regex_string (str):
            A regex string used to locate the date in the file names. Default is r"\d{4}.\d{2}.\d{2}". For example:

            ```python
            >>> fname = "MSWEP_YYYY.MM.DD.tif"
            >>> regex_string = r"\d{4}.\d{2}.\d{2}"
            ```

            - Or:

            ```python
            >>> fname = "MSWEP_YYYY_M_D.tif"
            >>> regex_string = r"\d{4}_\d{1}_\d{1}"
            ```

            - If there is a number at the beginning of the name:

            ```python
            >>> fname = "1_MSWEP_YYYY_M_D.tif"
            >>> regex_string = r"\d+"
            ```

        date (bool):
            True if the number in the file name is a date. Default is True.
        file_name_data_fmt (str):
            If the file names contain a date and you want to read them ordered. Default is None. For example:

            ```python
            >>> "MSWEP_YYYY.MM.DD.tif"
            >>> file_name_data_fmt = "%Y.%m.%d"
            ```

        start (str):
            Start date if you want to read the input raster for a specific period only and not all rasters. If not
            given, all rasters in the given path will be read.
        end (str):
            End date if you want to read the input rasters for a specific period only. If not given, all rasters in
            the given path will be read.
        fmt (str):
            Format of the given date in the start/end parameter.
        extension (str):
            The extension of the files you want to read from the given path. Default is ".tif".

    Returns:
        Datacube:
            Instance of the Datacube class.

    Examples:
        - Read all rasters in a folder:

          ```python
          >>> from pyramids.datacube import Datacube
          >>> raster_folder = "examples/GIS/data/raster-folder"
          >>> prec = Datacube.read_multiple_files(raster_folder)

          ```

        - Read from a pre-collected list without ordering:

          ```python
          >>> import glob
          >>> search_criteria = "*.tif"
          >>> file_list = glob.glob(os.path.join(raster_folder, search_criteria))
          >>> prec = Datacube.read_multiple_files(file_list, with_order=False)

          ```
    """
    if not isinstance(path, str) and not isinstance(path, list):
        raise TypeError(f"path input should be string/list type, given{type(path)}")

    if isinstance(path, str):
        # check whither the path exists or not
        if not os.path.exists(path):
            raise FileNotFoundError("The path you have provided does not exist")
        # get a list of all files
        files = os.listdir(path)
        files = [i for i in files if i.endswith(extension)]
        # files = glob.glob(os.path.join(path, "*.tif"))
        # check whether there are files or not inside the folder
        if len(files) < 1:
            raise FileNotFoundError("The path you have provided is empty")
    else:
        files = path[:]

    # to sort the files in the same order as the first number in the name
    if with_order:
        match_str_fn = lambda x: re.search(regex_string, x)
        list_dates = list(map(match_str_fn, files))

        if None in list_dates:
            raise ValueError(
                "The date format/separator given does not match the file names"
            )
        if date:
            if file_name_data_fmt is None:
                raise ValueError(
                    f"To read the raster with a certain order (with_order = {with_order}, then you have to enter "
                    f"the value of the parameter file_name_data_fmt(given: {file_name_data_fmt})"
                )
            fn = lambda x: dt.datetime.strptime(x.group(), file_name_data_fmt)
        else:
            fn = lambda x: int(x.group())
        list_dates = list(map(fn, list_dates))

        df = pd.DataFrame()
        df["files"] = files
        df["date"] = list_dates
        df.sort_values("date", inplace=True, ignore_index=True)
        files = df.loc[:, "files"].values

    if start is not None or end is not None:
        if date:
            start = dt.datetime.strptime(start, fmt)
            end = dt.datetime.strptime(end, fmt)

            files = (
                df.loc[start <= df["date"], :]
                .loc[df["date"] <= end, "files"]
                .values
            )
        else:
            files = (
                df.loc[start <= df["date"], :]
                .loc[df["date"] <= end, "files"]
                .values
            )

    if not isinstance(path, list):
        # add the path to all the files
        files = [f"{path}/{i}" for i in files]
    # create a 3d array with the 2d dimension of the first raster and the len
    # of the number of rasters in the folder
    sample = Dataset.read_file(files[0])

    return cls(sample, len(files), files)

open_datacube(band=0) #

Open the datacube.

Read values from the given band as arrays for all files.

Parameters:

Name Type Description Default
band int

Index of the band you want to read. Default is 0.

0

Returns:

Name Type Description
None None

Loads values into the internal 3D array [time, rows, cols] in-place.

Source code in pyramids/datacube.py
def open_datacube(self, band: int = 0) -> None:
    """Open the datacube.

    Read values from the given band as arrays for all files.

    Args:
        band (int): Index of the band you want to read. Default is 0.

    Returns:
        None: Loads values into the internal 3D array [time, rows, cols] in-place.
    """
    # check the given band number
    if not hasattr(self, "base"):
        raise ValueError(
            "please use the read_multiple_files method to get the files (tiff/ascii) in the"
            "dataset directory"
        )
    if band > self.base.band_count - 1:
        raise ValueError(
            f"the raster has only {self.base.band_count} check the given band number"
        )
    # fill the array with no_data_value data
    self._values = np.ones(
        (
            self.time_length,
            self.base.rows,
            self.base.columns,
        )
    )
    self._values[:, :, :] = np.nan

    for i, file_i in enumerate(self.files):
        # read the tif file
        raster_i = gdal.Open(f"{file_i}")
        self._values[i, :, :] = raster_i.GetRasterBand(band + 1).ReadAsArray()

__getitem__(key) #

Getitem.

Source code in pyramids/datacube.py
def __getitem__(self, key):
    """Getitem."""
    if not hasattr(self, "values"):
        raise AttributeError("Please use the read_dataset method to read the data")
    return self._values[key, :, :]

__setitem__(key, value) #

Setitem.

Source code in pyramids/datacube.py
def __setitem__(self, key, value: np.ndarray):
    """Setitem."""
    if not hasattr(self, "values"):
        raise AttributeError("Please use the read_dataset method to read the data")
    self._values[key, :, :] = value

__len__() #

Length of the Datacube.

Source code in pyramids/datacube.py
def __len__(self):
    """Length of the Datacube."""
    return self._values.shape[0]

__iter__() #

Iterate over the Datacube.

Source code in pyramids/datacube.py
def __iter__(self):
    """Iterate over the Datacube."""
    return iter(self._values[:])

head(n=5) #

First 5 Datasets.

Source code in pyramids/datacube.py
def head(self, n: int = 5):
    """First 5 Datasets."""
    return self._values[:n, :, :]

tail(n=-5) #

Last 5 Datasets.

Source code in pyramids/datacube.py
def tail(self, n: int = -5):
    """Last 5 Datasets."""
    return self._values[n:, :, :]

first() #

First Dataset.

Source code in pyramids/datacube.py
def first(self):
    """First Dataset."""
    return self._values[0, :, :]

last() #

Last Dataset.

Source code in pyramids/datacube.py
def last(self):
    """Last Dataset."""
    return self._values[-1, :, :]

iloc(i) #

iloc.

- Access dataset array using index.

Parameters:

Name Type Description Default
i int

Index of the dataset to access.

required

Returns:

Name Type Description
Dataset Dataset

Dataset object.

Source code in pyramids/datacube.py
def iloc(self, i) -> Dataset:
    """iloc.

        - Access dataset array using index.

    Args:
        i (int):
            Index of the dataset to access.

    Returns:
        Dataset: Dataset object.
    """
    if not hasattr(self, "values"):
        raise DatasetNoFoundError("please read the dataset first")
    arr = self._values[i, :, :]
    dst = gdal.GetDriverByName("MEM").CreateCopy("", self.base.raster, 0)
    dst.GetRasterBand(1).WriteArray(arr)
    return Dataset(dst)

plot(band=0, exclude_value=None, **kwargs) #

Read Array.

- read the values stored in a given band.

Parameters:

Name Type Description Default
band int

The band you want to get its data. Default is 0.

0
exclude_value Any

Value to exclude from the plot. Default is None.

None
**kwargs Any
Parameter Type Description
points array 3-column array: col 1 = value to display, col 2 = row index, col 3 = column index. Columns 2 and 3 indicate the location of the point.
point_color str Color of the points.
point_size Any Size of the points.
pid_color str Color of the annotation of the point. Default is blue.
pid_size Any Size of the point annotation.
figsize tuple, optional Figure size. Default is (8, 8).
title str, optional Title of the plot. Default is 'Total Discharge'.
title_size int, optional Title size. Default is 15.
orientation str, optional Orientation of the color bar (horizontal or vertical). Default is 'vertical'.
rotation number, optional Rotation of the color bar label. Default is -90.
cbar_length float, optional Ratio to control the height of the color bar. Default is 0.75.
ticks_spacing int, optional Spacing in the color bar ticks. Default is 2.
cbar_label_size int, optional Size of the color bar label. Default is 12.
cbar_label str, optional Label of the color bar. Default is 'Discharge m³/s'.
color_scale int, optional Color scaling mode (default = 1): 1 = normal scale, 2 = power scale, 3 = SymLogNorm scale, 4 = PowerNorm scale, 5 = BoundaryNorm scale.
gamma float, optional Value needed for color_scale=2. Default is 1/2.
line_threshold float, optional Value needed for color_scale=3. Default is 0.0001.
line_scale float, optional Value needed for color_scale=3. Default is 0.001.
bounds list Discrete bounds for color_scale=4. Default is None.
midpoint float, optional Value needed for color_scale=5. Default is 0.
cmap str, optional Color map style. Default is 'coolwarm_r'.
display_cell_value bool Whether to display the values of the cells as text.
num_size int, optional Size of the numbers plotted on top of each cell. Default is 8.
background_color_threshold float | int, optional Threshold for deciding number color: if value > threshold → black; else white. If None, uses max_value/2. Default is None.
{}

Returns:

Name Type Description
ArrayGlyph ArrayGlyph

A plotting/animation handle (from cleopatra.ArrayGlyph).

Source code in pyramids/datacube.py
def plot(self, band: int = 0, exclude_value: Any = None, **kwargs: Any) -> "ArrayGlyph":
    r"""Read Array.

        - read the values stored in a given band.

    Args:
        band (int):
            The band you want to get its data. Default is 0.
        exclude_value (Any):
            Value to exclude from the plot. Default is None.
        **kwargs:
            | Parameter                  | Type                  | Description |
            |----------------------------|-----------------------|-------------|
            | points                     | array                 | 3-column array: col 1 = value to display, col 2 = row index, col 3 = column index. Columns 2 and 3 indicate the location of the point. |
            | point_color                | str                   | Color of the points. |
            | point_size                 | Any                   | Size of the points. |
            | pid_color                  | str                   | Color of the annotation of the point. Default is blue. |
            | pid_size                   | Any                   | Size of the point annotation. |
            | figsize                    | tuple, optional       | Figure size. Default is `(8, 8)`. |
            | title                      | str, optional         | Title of the plot. Default is `'Total Discharge'`. |
            | title_size                 | int, optional         | Title size. Default is `15`. |
            | orientation                | str, optional         | Orientation of the color bar (`horizontal` or `vertical`). Default is `'vertical'`. |
            | rotation                   | number, optional      | Rotation of the color bar label. Default is `-90`. |
            | cbar_length                | float, optional       | Ratio to control the height of the color bar. Default is `0.75`. |
            | ticks_spacing              | int, optional         | Spacing in the color bar ticks. Default is `2`. |
            | cbar_label_size            | int, optional         | Size of the color bar label. Default is `12`. |
            | cbar_label                 | str, optional         | Label of the color bar. Default is `'Discharge m³/s'`. |
            | color_scale                | int, optional         | Color scaling mode (default = `1`): 1 = normal scale, 2 = power scale, 3 = SymLogNorm scale, 4 = PowerNorm scale, 5 = BoundaryNorm scale. |
            | gamma                      | float, optional       | Value needed for `color_scale=2`. Default is `1/2`. |
            | line_threshold             | float, optional       | Value needed for `color_scale=3`. Default is `0.0001`. |
            | line_scale                 | float, optional       | Value needed for `color_scale=3`. Default is `0.001`. |
            | bounds                     | list                  | Discrete bounds for `color_scale=4`. Default is `None`. |
            | midpoint                   | float, optional       | Value needed for `color_scale=5`. Default is `0`. |
            | cmap                       | str, optional         | Color map style. Default is `'coolwarm_r'`. |
            | display_cell_value         | bool                  | Whether to display the values of the cells as text. |
            | num_size                   | int, optional         | Size of the numbers plotted on top of each cell. Default is `8`. |
            | background_color_threshold | float \| int, optional| Threshold for deciding number color: if value > threshold → black; else white. If `None`, uses `max_value/2`. Default is `None`. |


    Returns:
        ArrayGlyph: A plotting/animation handle (from cleopatra.ArrayGlyph).
    """
    import_cleopatra(
        "The current funcrion uses cleopatra package to for plotting, please install it manually, for more info "
        "check https://github.com/Serapieum-of-alex/cleopatra"
    )
    from cleopatra.array_glyph import ArrayGlyph

    data = self.values

    exclude_value = (
        [self.base.no_data_value[band], exclude_value]
        if exclude_value is not None
        else [self.base.no_data_value[band]]
    )

    cleo = ArrayGlyph(data, exclude_value=exclude_value)
    time = list(range(self.time_length))
    cleo.animate(time, **kwargs)
    return cleo

to_file(path, driver='geotiff', band=0) #

Save to geotiff format.

saveRaster saves a raster to a path

Parameters:

Name Type Description Default
path Union[str, List[str]]

a path includng the name of the raster and extention.

required
driver str

driver = "geotiff".

'geotiff'
band int

band index, needed only in case of ascii drivers. Default is 1.

0

Examples:

  • Save to a file:
>>> raster_obj = Dataset.read_file("path/to/file/***.tif")
>>> output_path = "examples/GIS/data/save_raster_test.tif"
>>> raster_obj.to_file(output_path)
Source code in pyramids/datacube.py
def to_file(
    self, path: Union[str, List[str]], driver: str = "geotiff", band: int = 0
):
    """Save to geotiff format.

        saveRaster saves a raster to a path

    Args:
        path (Union[str, List[str]]):
            a path includng the name of the raster and extention.
        driver (str):
            driver = "geotiff".
        band (int):
            band index, needed only in case of ascii drivers. Default is 1.

    Examples:
        - Save to a file:

          ```python
          >>> raster_obj = Dataset.read_file("path/to/file/***.tif")
          >>> output_path = "examples/GIS/data/save_raster_test.tif"
          >>> raster_obj.to_file(output_path)

          ```
    """
    ext = CATALOG.get_extension(driver)

    if isinstance(path, str):
        if not Path(path).exists():
            Path(path).mkdir(parents=True, exist_ok=True)
        path = [f"{path}/{i}.{ext}" for i in range(self.time_length)]
    else:
        if not len(path) == self.time_length:
            raise ValueError(
                f"Length of the given paths: {len(path)} does not equal number of rasters in the data cube: {self.time_length}"
            )
        if not Path(path[0]).parent.exists():
            Path(path[0]).parent.mkdir(parents=True, exist_ok=True)

    for i in range(self.time_length):
        src = self.iloc(i)
        src.to_file(path[i], band=band)

to_crs(to_epsg=3857, method='nearest neighbor', maintain_alignment=False) #

to_epsg.

- to_epsg reprojects a raster to any projection (default the WGS84 web mercator projection,
without resampling) The function returns a GDAL in-memory file object, where you can ReadAsArray etc.

Parameters:

Name Type Description Default
to_epsg int

Reference number to the new projection (https://epsg.io/) (default 3857 the reference no of WGS84 web mercator).

3857
method str

Resampling technique. Default is "Nearest". See https://gisgeography.com/raster-resampling/. "Nearest" for nearest neighbor, "cubic" for cubic convolution, "bilinear" for bilinear.

'nearest neighbor'
maintain_alignment bool

True to maintain the number of rows and columns of the raster the same after reprojection. Default is False.

False

Returns:

Name Type Description
None None

Updates the datacube values and base in place after reprojection.

Examples:

  • Reproject dataset to EPSG:3857:
>>> from pyramids.dataset import Dataset
>>> src = Dataset.read_file("path/raster_name.tif")
>>> projected_raster = src.to_crs(to_epsg=3857)
Source code in pyramids/datacube.py
def to_crs(
    self,
    to_epsg: int = 3857,
    method: str = "nearest neighbor",
    maintain_alignment: int = False,
) -> None:
    """to_epsg.

        - to_epsg reprojects a raster to any projection (default the WGS84 web mercator projection,
        without resampling) The function returns a GDAL in-memory file object, where you can ReadAsArray etc.

    Args:
        to_epsg (int):
            Reference number to the new projection (https://epsg.io/)
            (default 3857 the reference no of WGS84 web mercator).
        method (str):
            Resampling technique. Default is "Nearest". See https://gisgeography.com/raster-resampling/.
            "Nearest" for nearest neighbor, "cubic" for cubic convolution, "bilinear" for bilinear.
        maintain_alignment (bool):
            True to maintain the number of rows and columns of the raster the same after reprojection.
            Default is False.

    Returns:
        None: Updates the datacube values and base in place after reprojection.

    Examples:
        - Reproject dataset to EPSG:3857:

          ```python
          >>> from pyramids.dataset import Dataset
          >>> src = Dataset.read_file("path/raster_name.tif")
          >>> projected_raster = src.to_crs(to_epsg=3857)

          ```
    """
    for i in range(self.time_length):
        src = self.iloc(i)
        dst = src.to_crs(
            to_epsg, method=method, maintain_alignment=maintain_alignment
        )
        arr = dst.read_array()
        if i == 0:
            # create the array
            array = (
                np.ones(
                    (
                        self.time_length,
                        arr.shape[0],
                        arr.shape[1],
                    )
                )
                * np.nan
            )
        array[i, :, :] = arr

    self._values = array
    # use the last src as
    self._base = dst

crop(mask, inplace=False, touch=True) #

crop.

crop matches the location of nodata value from src raster to dst raster. Mask is where the NoDatavalue will
be taken and the location of this value. src_dir is path to the folder where rasters exist where we need to
put the NoDataValue of the mask in RasterB at the same locations.

Parameters:

Name Type Description Default
mask Dataset

Dataset object of the mask raster to crop the rasters (to get the NoData value and its location in the array). Mask should include the name of the raster and the extension like "data/dem.tif", or you can read the mask raster using gdal and use it as the first parameter to the function.

required
inplace bool

True to make the changes in place.

False
touch bool

Include the cells that touch the polygon, not only those that lie entirely inside the polygon mask. Default is True.

True

Returns:

Type Description
Union[None, Dataset]

Union[None, "Datacube"]: New rasters have the values from rasters in B_input_path with the NoDataValue in

Union[None, Dataset]

the same locations as raster A.

Examples:

  • Crop aligned rasters using a DEM mask:
>>> dem_path = "examples/GIS/data/acc4000.tif"
>>> src_path = "examples/GIS/data/aligned_rasters/"
>>> out_path = "examples/GIS/data/crop_aligned_folder/"
>>> Datacube.crop(dem_path, src_path, out_path)
Source code in pyramids/datacube.py
def crop(
    self, mask: Union[Dataset, str], inplace: bool = False, touch: bool = True
) -> Union[None, Dataset]:
    """crop.

        crop matches the location of nodata value from src raster to dst raster. Mask is where the NoDatavalue will
        be taken and the location of this value. src_dir is path to the folder where rasters exist where we need to
        put the NoDataValue of the mask in RasterB at the same locations.

    Args:
        mask (Dataset):
            Dataset object of the mask raster to crop the rasters (to get the NoData value and its location in the
            array). Mask should include the name of the raster and the extension like "data/dem.tif", or you can
            read the mask raster using gdal and use it as the first parameter to the function.
        inplace (bool):
            True to make the changes in place.
        touch (bool):
            Include the cells that touch the polygon, not only those that lie entirely inside the polygon mask.
            Default is True.

    Returns:
        Union[None, "Datacube"]: New rasters have the values from rasters in B_input_path with the NoDataValue in
        the same locations as raster A.

    Examples:
        - Crop aligned rasters using a DEM mask:

          ```python
          >>> dem_path = "examples/GIS/data/acc4000.tif"
          >>> src_path = "examples/GIS/data/aligned_rasters/"
          >>> out_path = "examples/GIS/data/crop_aligned_folder/"
          >>> Datacube.crop(dem_path, src_path, out_path)

          ```
    """
    for i in range(self.time_length):
        src = self.iloc(i)
        dst = src.crop(mask, touch=touch)
        arr = dst.read_array()
        if i == 0:
            # create the array
            array = (
                np.ones(
                    (self.time_length, arr.shape[0], arr.shape[1]),
                )
                * np.nan
            )

        array[i, :, :] = arr

    if inplace:
        self._values = array
        # use the last src as
        self._base = dst
    else:
        dataset = Datacube(dst, time_length=self.time_length)
        dataset._values = array
        return dataset

align(alignment_src) #

matchDataAlignment.

This function matches the coordinate system and the number of rows and columns between two rasters. Raster A is the source of the coordinate system, number of rows, number of columns, and cell size. The result will be a raster with the same structure as Raster A but with values from Raster B using nearest neighbor interpolation.

Parameters:

Name Type Description Default
alignment_src Dataset

Dataset to use as the spatial template (CRS, rows, columns).

required

Returns:

Name Type Description
None None

Updates the datacube values in place to match the alignment of alignment_src.

Examples:

  • Align all rasters in the datacube to a DEM raster:
>>> dem_path = "01GIS/inputs/4000/acc4000.tif"
>>> prec_in_path = "02Precipitation/CHIRPS/Daily/"
>>> prec_out_path = "02Precipitation/4km/"
>>> Dataset.align(dem_path, prec_in_path, prec_out_path)
Source code in pyramids/datacube.py
def align(self, alignment_src: Dataset) -> None:
    """matchDataAlignment.

    This function matches the coordinate system and the number of rows and columns between two rasters. Raster A
    is the source of the coordinate system, number of rows, number of columns, and cell size. The result will be
    a raster with the same structure as Raster A but with values from Raster B using nearest neighbor interpolation.

    Args:
        alignment_src (Dataset):
            Dataset to use as the spatial template (CRS, rows, columns).

    Returns:
        None:
            Updates the datacube values in place to match the alignment of alignment_src.

    Examples:
        - Align all rasters in the datacube to a DEM raster:

          ```python
          >>> dem_path = "01GIS/inputs/4000/acc4000.tif"
          >>> prec_in_path = "02Precipitation/CHIRPS/Daily/"
          >>> prec_out_path = "02Precipitation/4km/"
          >>> Dataset.align(dem_path, prec_in_path, prec_out_path)

          ```
    """
    if not isinstance(alignment_src, Dataset):
        raise TypeError("alignment_src input should be a Dataset object")

    for i in range(self.time_length):
        src = self.iloc(i)
        dst = src.align(alignment_src)
        arr = dst.read_array()
        if i == 0:
            # create the array
            array = (
                np.ones(
                    (self.time_length, arr.shape[0], arr.shape[1]),
                )
                * np.nan
            )

        array[i, :, :] = arr

    self._values = array
    # use the last src as
    self._base = dst

merge(src, dst, no_data_value='0', init='nan', n='nan') staticmethod #

merge.

Merges a group of rasters into one raster.

Parameters:

Name Type Description Default
src List[str]

List of paths to all input rasters.

required
dst str

Path to the output raster.

required
no_data_value float | int | str

Assign a specified nodata value to output bands.

'0'
init float | int | str

Pre-initialize the output image bands with these values. However, it is not marked as the nodata value in the output file. If only one value is given, the same value is used in all the bands.

'nan'
n float | int | str

Ignore pixels from files being merged in with this pixel value.

'nan'

Returns:

Type Description
None

None

Source code in pyramids/datacube.py
@staticmethod
def merge(
    src: List[str],
    dst: str,
    no_data_value: Union[float, int, str] = "0",
    init: Union[float, int, str] = "nan",
    n: Union[float, int, str] = "nan",
) -> None:
    """merge.

        Merges a group of rasters into one raster.

    Args:
        src (List[str]):
            List of paths to all input rasters.
        dst (str):
            Path to the output raster.
        no_data_value (float | int | str):
            Assign a specified nodata value to output bands.
        init (float | int | str):
            Pre-initialize the output image bands with these values. However, it is not
            marked as the nodata value in the output file. If only one value is given, the same value is used
            in all the bands.
        n (float | int | str):
            Ignore pixels from files being merged in with this pixel value.

    Returns:
        None
    """
    # run the command
    # cmd = "gdal_merge.py -o merged_image_1.tif"
    # subprocess.call(cmd.split() + file_list)
    # vrt = gdal.BuildVRT("merged.vrt", file_list)
    # src = gdal.Translate("merged_image.tif", vrt)

    parameters = (
        ["", "-o", dst]
        + src
        + [
            "-co",
            "COMPRESS=LZW",
            "-init",
            str(init),
            "-a_nodata",
            str(no_data_value),
            "-n",
            str(n),
        ]
    )  # '-separate'
    gdal_merge.main(parameters)

apply(ufunc) #

apply.

Apply a function on each raster in the datacube.

Parameters:

Name Type Description Default
ufunc Callable

Callable universal function (builtin or user defined). See https://numpy.org/doc/stable/reference/ufuncs.html To create a ufunc from a normal function: https://numpy.org/doc/stable/reference/generated/numpy.frompyfunc.html

required

Returns:

Type Description
None

None

Examples:

  • Apply a simple modulo operation to each value:
>>> def func(val):
>>>    return val%2
>>> ufunc = np.frompyfunc(func, 1, 1)
>>> dataset.apply(ufunc)
Source code in pyramids/datacube.py
def apply(self, ufunc: Callable) -> None:
    """apply.

    Apply a function on each raster in the datacube.

    Args:
        ufunc (Callable):
            Callable universal function (builtin or user defined). See
            https://numpy.org/doc/stable/reference/ufuncs.html
            To create a ufunc from a normal function: https://numpy.org/doc/stable/reference/generated/numpy.frompyfunc.html

    Returns:
        None

    Examples:
        - Apply a simple modulo operation to each value:

          ```python
          >>> def func(val):
          >>>    return val%2
          >>> ufunc = np.frompyfunc(func, 1, 1)
          >>> dataset.apply(ufunc)

          ```
    """
    if not callable(ufunc):
        raise TypeError("The Second argument should be a function")
    arr = self.values
    no_data_value = self.base.no_data_value[0]
    # execute the function on each raster
    arr[~np.isclose(arr, no_data_value, rtol=0.001)] = ufunc(
        arr[~np.isclose(arr, no_data_value, rtol=0.001)]
    )

overlay(classes_map, exclude_value=None) #

Overlay.

Parameters:

Name Type Description Default
classes_map Dataset

Dataset object for the raster that has classes to overlay with.

required
exclude_value float | int

Values to exclude from extracted values. Defaults to None.

None

Returns:

Type Description
Dict[List[float], List[float]]

Dict[List[float], List[float]]: Dictionary with a list of values in the basemap as keys and for each key a list of all the intersected values in the maps from the path.

Source code in pyramids/datacube.py
def overlay(
    self,
    classes_map,
    exclude_value: Union[float, int] = None,
) -> Dict[List[float], List[float]]:
    """Overlay.

    Args:
        classes_map (Dataset):
            Dataset object for the raster that has classes to overlay with.
        exclude_value (float | int, optional):
            Values to exclude from extracted values. Defaults to None.

    Returns:
        Dict[List[float], List[float]]:
            Dictionary with a list of values in the basemap as keys and for each key a list of all the
            intersected values in the maps from the path.
    """
    values = {}
    for i in range(self.time_length):
        src = self.iloc(i)
        dict_i = src.overlay(classes_map, exclude_value)

        # these are the distinct values from the BaseMap which are keys in the
        # values dict with each one having a list of values
        classes = list(dict_i.keys())

        for class_i in classes:
            if class_i not in values.keys():
                values[class_i] = list()

            values[class_i] = values[class_i] + dict_i[class_i]

    return values