Mountain (Flow Kernel Eroder) [Py]¶
This example performs the same simulation than in Mountain [Py], although here the Stream-Power bedrock channel erosion process has been re-implemented using Fastscapelib’s flow kernels.
from random import random
import fastscapelib as fs
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import numba as nb
Re-Implement the Stream-Power Law as a Flow Kernel Eroder¶
The class below re-implements the basic Stream-Power Law (SPL) for simulating bedrock channel erosion, using Fastscapelib’s flow kernels. It inherits from the FlowKernelEroder base class and implements the required methods (parameter and input specs, kernel function and kernel application direction).
This eroder class provides the exact same interface than SPLEroder and it should also provide the same results. It is used below as a drop-in replacement.
class SPLFlowKernelEroder(fs.FlowKernelEroder):
"""Stream-Power Law (SPL) berock channel erosion.
SPL is implemented here as a Fastscapelib flow kernel eroder.
The core erosion logic is implemented in pure-Python and
accelerated with Numba.
"""
def __init__(
self,
flow_graph: fs.FlowGraph,
k_coef: float,
area_exp: float,
slope_exp: float,
tolerance: float = 1e-3,
):
super().__init__(flow_graph)
self.kernel_data.bind(
k_coef=k_coef, area_exp=area_exp, slope_exp=slope_exp, tolerance=tolerance
)
@staticmethod
def param_spec():
"""Returns a dictionary with parameter names
and their (numba) value type.
"""
return {
"k_coef": nb.float64,
"area_exp": nb.float64,
"slope_exp": nb.float64,
"tolerance": nb.float64,
}
@staticmethod
def input_spec():
"""Returns a dictionary with input variable names
and their (numba) value type.
"""
return {
"elevation": nb.float64[::1],
"drainage_area": nb.float64[::1],
"dt": nb.float64,
}
@staticmethod
def kernel_apply_dir() -> fs.FlowGraphTraversalDir:
"""Returns the kernel application direction and order."""
return fs.FlowGraphTraversalDir.BREADTH_UPSTREAM
@staticmethod
def kernel_func(node):
"""The eroder flow kernel function."""
r_count = node.receivers.count
# skip base level node (erosion = 0)
if r_count == 1 and node.receivers.distance[0] == 0.0:
return
# determine whether the current node is in a closed depression
# of the topography (lake) and skip it if that's the case
# (erosion = 0)
elevation_flooded = np.finfo(np.double).max
for r in range(r_count):
irec_elevation_next = (
node.receivers.elevation[r] - node.receivers.erosion[r]
)
if irec_elevation_next < elevation_flooded:
elevation_flooded = irec_elevation_next
if node.elevation <= elevation_flooded:
return
# compute new elevation at the current node by applying SPL
eq_num = node.elevation
eq_den = 1.0
for r in range(r_count):
irec_elevation = node.receivers.elevation[r]
irec_elevation_next = irec_elevation - node.receivers.erosion[r]
if irec_elevation > node.elevation:
continue
irec_weight = node.receivers.weight[r]
irec_distance = node.receivers.distance[r]
factor = (
node.k_coef
* node.dt
* np.power(node.drainage_area * irec_weight, node.area_exp)
)
factor /= irec_distance
eq_num += factor * irec_elevation_next
eq_den += factor
elevation_updated = eq_num / eq_den
# prevent the creation of new closed depressions in the
# topography
if elevation_updated < elevation_flooded:
elevation_updated = elevation_flooded + np.finfo(np.double).tiny
# compute erosion
node.erosion = node.elevation - elevation_updated
def erode(
self, elevation: np.ndarray, drainage_area: np.ndarray, dt: float
) -> np.ndarray:
"""Compute SPL erosion for one time step."""
return super().erode(elevation=elevation, drainage_area=drainage_area, dt=dt)
Setup the Grid, Flow Graph and Eroders¶
Create a RasterGrid of 201x301 nodes with a total length of 50 km in y (rows) and 75 km in x (columns).
Set fixed value boundary conditions at all border nodes.
grid = fs.RasterGrid.from_length([201, 301], [5e4, 7.5e4], fs.NodeStatus.FIXED_VALUE)
Create a FlowGraph object with single direction flow routing and the resolution of closed depressions on the topographic surface. See Flow Routing Strategies for more examples on possible flow routing strategies.
By default, base level nodes are set from fixed value boundary conditions (all border nodes in this example).
flow_graph = fs.FlowGraph(grid, [fs.SingleFlowRouter(), fs.MSTSinkResolver()])
Setup eroder classes (bedrock channel + hillslope) with a given set of parameter values.
spl_eroder = SPLFlowKernelEroder(
flow_graph,
k_coef=2e-4,
area_exp=0.4,
slope_exp=1,
tolerance=1e-5,
)
diffusion_eroder = fs.DiffusionADIEroder(grid, 0.01)
Setup Initial Conditions and External Forcing¶
Create a flat (+ random perturbations) surface topography as initial conditions. Also initialize the array for drainage area.
rng = np.random.Generator(np.random.PCG64(1234))
init_elevation = rng.uniform(0, 1, size=grid.shape)
elevation = init_elevation
drainage_area = np.empty_like(elevation)
Set upflit rate as uniform (fixed value) within the domain and to zero at all grid boundaries.
uplift_rate = np.full_like(elevation, 1e-3)
uplift_rate[[0, -1], :] = 0.
uplift_rate[:, [0, -1]] = 0.
Run the Model¶
Run the model for a few dozens of time steps (total simulation time: 1M years).
dt = 2e4
nsteps = 50
for step in range(nsteps):
# uplift (no uplift at fixed elevation boundaries)
uplifted_elevation = elevation + dt * uplift_rate
# flow routing
filled_elevation = flow_graph.update_routes(uplifted_elevation)
# flow accumulation (drainage area)
flow_graph.accumulate(drainage_area, 1.0)
# apply channel erosion then hillslope diffusion
spl_erosion = spl_eroder.erode(uplifted_elevation, drainage_area, dt)
diff_erosion = diffusion_eroder.erode(uplifted_elevation - spl_erosion, dt)
# update topography
elevation = uplifted_elevation - spl_erosion - diff_erosion
Plot Outputs and Other Diagnostics¶
Topographic elevation
fig, ax = plt.subplots(figsize=(12, 6))
plt.imshow(elevation)
plt.colorbar();
Drainage area (log)
fig, ax = plt.subplots(figsize=(12, 6))
plt.imshow(np.log(drainage_area), cmap=plt.cm.Blues)
plt.colorbar();
Drainage basins
colors = [(1,1,1)] + [(random(),random(),random()) for i in range(255)]
rnd_cm = matplotlib.colors.LinearSegmentedColormap.from_list('new_map', colors, N=256)
fig, ax = plt.subplots(figsize=(12, 6))
plt.imshow(flow_graph.basins(), cmap=rnd_cm);