Mountain (Flow Kernel Eroder) [Py]

This example performs the same simulation than in Mountain [Py], although here the Stream-Power bedrock channel erosion process has been re-implemented using Fastscapelib’s flow kernels.

from random import random

import fastscapelib as fs
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import numba as nb

Re-Implement the Stream-Power Law as a Flow Kernel Eroder

The class below re-implements the basic Stream-Power Law (SPL) for simulating bedrock channel erosion, using Fastscapelib’s flow kernels. It inherits from the FlowKernelEroder base class and implements the required methods (parameter and input specs, kernel function and kernel application direction).

This eroder class provides the exact same interface than SPLEroder and it should also provide the same results. It is used below as a drop-in replacement.

class SPLFlowKernelEroder(fs.FlowKernelEroder):
    """Stream-Power Law (SPL) berock channel erosion.
    
    SPL is implemented here as a Fastscapelib flow kernel eroder.
    The core erosion logic is implemented in pure-Python and
    accelerated with Numba.
    
    """
    def __init__(
        self,
        flow_graph: fs.FlowGraph,
        k_coef: float,
        area_exp: float,
        slope_exp: float,
        tolerance: float = 1e-3,
    ):
        super().__init__(flow_graph)

        self.kernel_data.bind(
            k_coef=k_coef, area_exp=area_exp, slope_exp=slope_exp, tolerance=tolerance
        )

    @staticmethod
    def param_spec():
        """Returns a dictionary with parameter names
        and their (numba) value type.
        """
        return {
            "k_coef": nb.float64,
            "area_exp": nb.float64,
            "slope_exp": nb.float64,
            "tolerance": nb.float64,
        }

    @staticmethod
    def input_spec():
        """Returns a dictionary with input variable names
        and their (numba) value type.
        """
        return {
            "elevation": nb.float64[::1],
            "drainage_area": nb.float64[::1],
            "dt": nb.float64,
        }

    @staticmethod
    def kernel_apply_dir() -> fs.FlowGraphTraversalDir:
        """Returns the kernel application direction and order."""
        return fs.FlowGraphTraversalDir.BREADTH_UPSTREAM

    @staticmethod
    def kernel_func(node):
        """The eroder flow kernel function."""
        r_count = node.receivers.count

        # skip base level node (erosion = 0)
        if r_count == 1 and node.receivers.distance[0] == 0.0:
            return

        # determine whether the current node is in a closed depression
        # of the topography (lake) and skip it if that's the case
        # (erosion = 0)
        elevation_flooded = np.finfo(np.double).max

        for r in range(r_count):
            irec_elevation_next = (
                node.receivers.elevation[r] - node.receivers.erosion[r]
            )

            if irec_elevation_next < elevation_flooded:
                elevation_flooded = irec_elevation_next

        if node.elevation <= elevation_flooded:
            return

        # compute new elevation at the current node by applying SPL
        eq_num = node.elevation
        eq_den = 1.0

        for r in range(r_count):
            irec_elevation = node.receivers.elevation[r]
            irec_elevation_next = irec_elevation - node.receivers.erosion[r]

            if irec_elevation > node.elevation:
                continue

            irec_weight = node.receivers.weight[r]
            irec_distance = node.receivers.distance[r]

            factor = (
                node.k_coef
                * node.dt
                * np.power(node.drainage_area * irec_weight, node.area_exp)
            )
            factor /= irec_distance
            eq_num += factor * irec_elevation_next
            eq_den += factor

        elevation_updated = eq_num / eq_den

        # prevent the creation of new closed depressions in the
        # topography
        if elevation_updated < elevation_flooded:
            elevation_updated = elevation_flooded + np.finfo(np.double).tiny

        # compute erosion
        node.erosion = node.elevation - elevation_updated

    def erode(
        self, elevation: np.ndarray, drainage_area: np.ndarray, dt: float
    ) -> np.ndarray:
        """Compute SPL erosion for one time step."""
        return super().erode(elevation=elevation, drainage_area=drainage_area, dt=dt)

Setup the Grid, Flow Graph and Eroders

Create a RasterGrid of 201x301 nodes with a total length of 50 km in y (rows) and 75 km in x (columns).

Set fixed value boundary conditions at all border nodes.

grid = fs.RasterGrid.from_length([201, 301], [5e4, 7.5e4], fs.NodeStatus.FIXED_VALUE)

Create a FlowGraph object with single direction flow routing and the resolution of closed depressions on the topographic surface. See Flow Routing Strategies for more examples on possible flow routing strategies.

By default, base level nodes are set from fixed value boundary conditions (all border nodes in this example).

flow_graph = fs.FlowGraph(grid, [fs.SingleFlowRouter(), fs.MSTSinkResolver()])

Setup eroder classes (bedrock channel + hillslope) with a given set of parameter values.

spl_eroder = SPLFlowKernelEroder(
    flow_graph,
    k_coef=2e-4,
    area_exp=0.4,
    slope_exp=1,
    tolerance=1e-5,
)

diffusion_eroder = fs.DiffusionADIEroder(grid, 0.01)

Setup Initial Conditions and External Forcing

Create a flat (+ random perturbations) surface topography as initial conditions. Also initialize the array for drainage area.

rng = np.random.Generator(np.random.PCG64(1234))

init_elevation = rng.uniform(0, 1, size=grid.shape)

elevation = init_elevation
drainage_area = np.empty_like(elevation)

Set upflit rate as uniform (fixed value) within the domain and to zero at all grid boundaries.

uplift_rate = np.full_like(elevation, 1e-3)
uplift_rate[[0, -1], :] = 0.
uplift_rate[:, [0, -1]] = 0.

Run the Model

Run the model for a few dozens of time steps (total simulation time: 1M years).

dt = 2e4
nsteps = 50

for step in range(nsteps):
    # uplift (no uplift at fixed elevation boundaries)
    uplifted_elevation = elevation + dt * uplift_rate
    
    # flow routing
    filled_elevation = flow_graph.update_routes(uplifted_elevation)
    
    # flow accumulation (drainage area)
    flow_graph.accumulate(drainage_area, 1.0)
    
    # apply channel erosion then hillslope diffusion
    spl_erosion = spl_eroder.erode(uplifted_elevation, drainage_area, dt)
    diff_erosion = diffusion_eroder.erode(uplifted_elevation - spl_erosion, dt)
    
    # update topography
    elevation = uplifted_elevation - spl_erosion - diff_erosion

Plot Outputs and Other Diagnostics

  • Topographic elevation

fig, ax = plt.subplots(figsize=(12, 6))
plt.imshow(elevation)
plt.colorbar();
../_images/89c062a6e8b81080c1588896d6fcefbd29e02c3824488a6cb0539b2713b443f9.png
  • Drainage area (log)

fig, ax = plt.subplots(figsize=(12, 6))
plt.imshow(np.log(drainage_area), cmap=plt.cm.Blues)
plt.colorbar();
../_images/d507cd8decbb5c8e167dd3d377c275a3aef33f7b11e4698acc8606ff013f616d.png
  • Drainage basins

colors = [(1,1,1)] + [(random(),random(),random()) for i in range(255)]
rnd_cm = matplotlib.colors.LinearSegmentedColormap.from_list('new_map', colors, N=256)
fig, ax = plt.subplots(figsize=(12, 6))
plt.imshow(flow_graph.basins(), cmap=rnd_cm);
../_images/11a7d7a73ba4cc0f3a173355a17c0725bac9120cbf34f35d40883054b0ae7979.png