fractopo package

Subpackages

Submodules

fractopo.branches_and_nodes module

Functions for extracting branches and nodes from trace maps.

branches_and_nodes is the main entrypoint.

fractopo.branches_and_nodes.additional_snapping_func(trace: LineString, idx: int, additional_snapping: list[tuple[int, Point]]) LineString

Insert points into LineStrings to make sure trace abutting trace.

E.g.

>>> trace = LineString([(0, 0), (1, 0), (2, 0), (3, 0)])
>>> idx = 0
>>> point = Point(2.25, 0.1)
>>> additional_snapping = [
...     (0, point),
... ]
>>> additional_snapping_func(trace, idx, additional_snapping).wkt
'LINESTRING (0 0, 1 0, 2 0, 2.25 0.1, 3 0)'

When idx doesn’t match -> no additional snapping

>>> trace = LineString([(0, 0), (1, 0), (2, 0), (3, 0)])
>>> idx = 1
>>> point = Point(2.25, 0.1)
>>> additional_snapping = [
...     (0, point),
... ]
>>> additional_snapping_func(trace, idx, additional_snapping).wkt
'LINESTRING (0 0, 1 0, 2 0, 3 0)'
fractopo.branches_and_nodes.angle_to_point(point: Point, nearest_point: Point, comparison_point: Point) float

Calculate the angle between two vectors.

Vectors are made from the given points: Both vectors have the same first point, nearest_point, and second point is either point or comparison_point.

Returns angle in degrees.

E.g.

>>> point = Point(1, 1)
>>> nearest_point = Point(0, 0)
>>> comparison_point = Point(-1, 1)
>>> angle_to_point(point, nearest_point, comparison_point)
90.0
>>> point = Point(1, 1)
>>> nearest_point = Point(0, 0)
>>> comparison_point = Point(-1, 2)
>>> angle_to_point(point, nearest_point, comparison_point)
71.56505117707799
fractopo.branches_and_nodes.determine_branch_identity(number_of_i_nodes: int, number_of_xy_nodes: int, number_of_e_nodes: int) str

Determine the identity of a branch.

Is based on the amount of I-, XY- and E-nodes and returns it as a string.

E.g.

>>> determine_branch_identity(2, 0, 0)
'I - I'
>>> determine_branch_identity(1, 1, 0)
'C - I'
>>> determine_branch_identity(1, 0, 1)
'I - E'
fractopo.branches_and_nodes.determine_insert_approach(nearest_point_idx: int, trace_point_dists: list[tuple[int, Point, float]], snap_threshold: float, point: Point, nearest_point: Point) tuple[int, bool]

Determine if to insert or replace point.

fractopo.branches_and_nodes.filter_non_unique_traces(traces: GeoSeries, snap_threshold: float) GeoSeries

Filter out traces that are not unique.

fractopo.branches_and_nodes.get_branch_identities(branches: GeoSeries, nodes: GeoSeries, node_identities: list, snap_threshold: float) list[str]

Determine the types of branches for a GeoSeries of branches.

i.e. C-C, C-I or I-I, + (C-E, E-E, I-E)

>>> branches = gpd.GeoSeries(
...     [
...         LineString([(1, 1), (2, 2)]),
...         LineString([(2, 2), (3, 3)]),
...         LineString([(3, 0), (2, 2)]),
...         LineString([(2, 2), (-2, 5)]),
...     ]
... )
>>> nodes = gpd.GeoSeries(
...     [
...         Point(2, 2),
...         Point(1, 1),
...         Point(3, 3),
...         Point(3, 0),
...         Point(-2, 5),
...     ]
... )
>>> node_identities = ["X", "I", "I", "I", "E"]
>>> snap_threshold = 0.001
>>> get_branch_identities(branches, nodes, node_identities, snap_threshold)
['C - I', 'C - I', 'C - I', 'C - E']
fractopo.branches_and_nodes.insert_point_to_linestring(trace: LineString, point: Point, snap_threshold: float) LineString

Insert/modify point to trace LineString.

The point location is determined to fit into the LineString without changing the geometrical order of LineString vertices (which only makes sense if LineString is sublinear.)

TODO: Does not work for 2.5D geometries (Z-coordinates). Z-coordinates will be lost.

E.g.

>>> trace = LineString([(0, 0), (1, 0), (2, 0), (3, 0)])
>>> point = Point(1.25, 0.1)
>>> insert_point_to_linestring(trace, point, 0.01).wkt
'LINESTRING (0 0, 1 0, 1.25 0.1, 2 0, 3 0)'
>>> trace = LineString([(0, 0), (1, 0), (2, 0), (3, 0)])
>>> point = Point(2.25, 0.1)
>>> insert_point_to_linestring(trace, point, 0.01).wkt
'LINESTRING (0 0, 1 0, 2 0, 2.25 0.1, 3 0)'
fractopo.branches_and_nodes.is_endpoint_close_to_boundary(endpoint: Point, areas: list[Polygon | MultiPolygon], snap_threshold: float) bool

Check if endpoint is within snap_threshold of areas boundaries.

fractopo.branches_and_nodes.node_identities_from_branches(branches: GeoSeries, areas: GeoSeries | GeoDataFrame, snap_threshold: float) tuple[list[Point], list[str]]

Resolve node identities from branch data.

>>> branches_list = [
...     LineString([(0, 0), (1, 1)]),
...     LineString([(2, 2), (1, 1)]),
...     LineString([(2, 0), (1, 1)]),
... ]
>>> area_polygon = Polygon([(-5, -5), (-5, 5), (5, 5), (5, -5)])
>>> branches = gpd.GeoSeries(branches_list)
>>> areas = gpd.GeoSeries([area_polygon])
>>> snap_threshold = 0.001
>>> nodes, identities = node_identities_from_branches(
...     branches, areas, snap_threshold
... )
>>> [node.wkt for node in nodes]
['POINT (0 0)', 'POINT (1 1)', 'POINT (2 2)', 'POINT (2 0)']
>>> identities
['I', 'Y', 'I', 'I']
fractopo.branches_and_nodes.node_identity(endpoint: Point, idx: int, areas: GeoSeries | GeoDataFrame, endpoints_geoseries: GeoSeries, endpoints_spatial_index: Any, snap_threshold: float) str

Determine node identity of endpoint.

fractopo.branches_and_nodes.report_snapping_loop(loops: int, allowed_loops: int)

Report snapping looping.

fractopo.branches_and_nodes.resolve_trace_candidates(trace: LineString, idx: int, traces_spatial_index, traces: list[LineString], snap_threshold: float) list[LineString]

Resolve spatial index intersection to actual intersection candidates.

fractopo.branches_and_nodes.simple_snap(trace: LineString, trace_candidates: list[LineString], snap_threshold: float) tuple[LineString, bool]

Modify conditionally trace to snap to any of trace_candidates.

E.g.

>>> trace = LineString([(0, 0), (1, 0), (2, 0), (3, 0)])
>>> trace_candidates = (
...     [LineString([(3.0001, -3), (3.0001, 0), (3, 3)])]
... )
>>> snap_threshold = 0.001
>>> snapped = simple_snap(trace, trace_candidates, snap_threshold)
>>> snapped[0].wkt, snapped[1]
('LINESTRING (0 0, 1 0, 2 0, 3.0001 0)', True)

Do not snap overlapping.

>>> trace = LineString([(0, 0), (1, 0), (2, 0), (3.0002, 0)])
>>> trace_candidates = (
...     [LineString([(3.0001, -3), (3.0001, 0), (3, 3)])]
... )
>>> snap_threshold = 0.001
>>> snapped = simple_snap(trace, trace_candidates, snap_threshold)
>>> snapped[0].wkt, snapped[1]
('LINESTRING (0 0, 1 0, 2 0, 3.0002 0)', False)
fractopo.branches_and_nodes.snap_others_to_trace(idx: int, trace: LineString, snap_threshold: float, traces: list[LineString], traces_spatial_index: Any, areas: list[Polygon | MultiPolygon] | None, final_allowed_loop: bool = False) tuple[LineString, bool]

Determine whether and how to snap trace to traces.

E.g.

Trace gets new coordinates to snap other traces to it:

>>> idx = 0
>>> trace = LineString([(0, 0), (1, 0), (2, 0), (3, 0)])
>>> snap_threshold = 0.001
>>> traces = [trace, LineString([(1.5, 3), (1.5, 0.00001)])]
>>> traces_spatial_index = gpd.GeoSeries(traces).sindex
>>> areas = None
>>> snapped = snap_others_to_trace(
...     idx, trace, snap_threshold, traces, traces_spatial_index, areas
... )
>>> snapped[0].wkt, snapped[1]
('LINESTRING (0 0, 1 0, 1.5 1e-05, 2 0, 3 0)', True)

Trace itself is not snapped by snap_others_to_trace:

>>> idx = 0
>>> trace = LineString([(0, 0), (1, 0), (2, 0), (3, 0)])
>>> snap_threshold = 0.001
>>> traces = [trace, LineString([(3.0001, -3), (3.0001, 0), (3, 3)])]
>>> traces_spatial_index = gpd.GeoSeries(traces).sindex
>>> areas = None
>>> snapped = snap_others_to_trace(
...     idx, trace, snap_threshold, traces, traces_spatial_index, areas
... )
>>> snapped[0].wkt, snapped[1]
('LINESTRING (0 0, 1 0, 2 0, 3 0)', False)
fractopo.branches_and_nodes.snap_trace_simple(idx: int, trace: LineString, snap_threshold: float, traces: list[LineString], traces_spatial_index: Any, final_allowed_loop: bool = False) tuple[LineString, bool]

Determine whether and how to perform simple snap.

fractopo.branches_and_nodes.snap_trace_to_another(trace_endpoints: list[Point], another: LineString, snap_threshold: float) tuple[LineString, bool]

Add point to another trace to snap trace to end at another trace.

I.e. modifies and returns another

fractopo.branches_and_nodes.snap_traces(traces: list[LineString], snap_threshold: float, areas: list[Polygon | MultiPolygon] | None = None, final_allowed_loop=False) tuple[list[LineString], bool]

Snap traces to end exactly at other traces.

fractopo.cli module

Command-line integration of fractopo with click.

class fractopo.cli.LogLevel(*values)

Bases: Enum

Enums for log levels.

CRITICAL = 'CRITICAL'
DEBUG = 'DEBUG'
ERROR = 'ERROR'
INFO = 'INFO'
WARNING = 'WARNING'
fractopo.cli.default_network_output_paths(network_name: str, general_output: Path | None, branches_output: Path | None, nodes_output: Path | None, parameters_output: Path | None)

Determine default network output paths.

fractopo.cli.describe_results(validated: ~geopandas.geodataframe.GeoDataFrame, error_column: str, console: ~rich.console.Console = <console width=80 ColorSystem.EIGHT_BIT>)

Describe validation results to stdout.

fractopo.cli.fractopo_callback(log_level: ~fractopo.cli.LogLevel = <typer.models.OptionInfo object>, version: bool = <typer.models.OptionInfo object>)

Use fractopo command-line utilities.

fractopo.cli.get_click_path_args(exists=True, **kwargs)

Get basic click path args.

fractopo.cli.info()

Print out information about fractopo installation and python environment.

fractopo.cli.make_output_dir(base_path: Path) Path

Make timestamped output dir.

fractopo.cli.network(trace_file: ~pathlib.Path = <typer.models.ArgumentInfo object>, area_file: ~pathlib.Path = <typer.models.ArgumentInfo object>, snap_threshold: float = <typer.models.OptionInfo object>, determine_branches_nodes: bool = <typer.models.OptionInfo object>, name: str | None = <typer.models.OptionInfo object>, circular_target_area: bool = <typer.models.OptionInfo object>, truncate_traces: bool = <typer.models.OptionInfo object>, censoring_area: ~pathlib.Path | None = <typer.models.OptionInfo object>, output_path: ~pathlib.Path | None = <typer.models.OptionInfo object>)

Analyze the geometry and topology of trace network.

fractopo.cli.rich_table_from_parameters(parameters: dict[str, float]) Table

Generate rich Table from network parameters.

fractopo.cli.tracevalidate(trace_file: ~pathlib.Path = <typer.models.ArgumentInfo object>, area_file: ~pathlib.Path = <typer.models.ArgumentInfo object>, allow_fix: bool = <typer.models.OptionInfo object>, summary: bool = <typer.models.OptionInfo object>, snap_threshold: float = <typer.models.OptionInfo object>, output: ~pathlib.Path | None = <typer.models.OptionInfo object>, only_area_validation: bool = <typer.models.OptionInfo object>, allow_empty_area: bool = <typer.models.OptionInfo object>)

Validate trace data delineated by target area data.

If allow_fix is True, some automatic fixing will be done to e.g. convert MultiLineStrings to LineStrings.

fractopo.fractopo_utils module

Miscellaneous utilities and scripts of fractopo.

class fractopo.fractopo_utils.LineMerge

Bases: object

Merge lines conditionally.

static conditional_linemerge(first: LineString, second: LineString, tolerance: float | int | Real, buffer_value: float | int | Real) None | LineString

Conditionally merge two LineStrings (first and second).

Merge occurs if 1) their endpoints are within buffer_value of each other and 2) their total orientations are within tolerance (degrees) of each other.

Merges by joining their coordinates. The endpoint (that is within buffer_value of endpoint of first) of the second LineString is trimmed from the resulting coordinates.

E.g. with merging:

>>> first = LineString([(0, 0), (0, 2)])
>>> second = LineString([(0, 2.001), (0, 4)])
>>> tolerance = 5
>>> buffer_value = 0.01
>>> LineMerge.conditional_linemerge(first, second, tolerance, buffer_value).wkt
'LINESTRING (0 0, 0 2, 0 4)'

Without merging:

>>> first = LineString([(0, 0), (0, 2)])
>>> second = LineString([(0, 2.1), (0, 4)])
>>> tolerance = 5
>>> buffer_value = 0.01
>>> LineMerge.conditional_linemerge(
...     first, second, tolerance, buffer_value
... ) is None
True
static conditional_linemerge_collection(traces: GeoDataFrame | GeoSeries, tolerance: float | int | Real, buffer_value: float | int | Real) tuple[list[LineString], list[int]]

Conditionally linemerge within a collection of LineStrings.

Returns the linemerged traces and the idxs of traces that were linemerged.

E.g.

>>> first = LineString([(0, 0), (0, 2)])
>>> second = LineString([(0, 2.001), (0, 4)])
>>> traces = gpd.GeoSeries([first, second])
>>> tolerance = 5
>>> buffer_value = 0.01
>>> new_traces, idx = LineMerge.conditional_linemerge_collection(
...     traces, tolerance, buffer_value
... )
>>> [trace.wkt for trace in new_traces], idx
(['LINESTRING (0 0, 0 2, 0 4)'], [0, 1])
static integrate_replacements(traces: GeoDataFrame, new_traces: list[LineString], modified_idx: list[int]) GeoDataFrame

Add linemerged and remove the parts that were linemerged.

E.g.

>>> first = LineString([(0, 0), (0, 2)])
>>> second = LineString([(0, 2.001), (0, 4)])
>>> traces = gpd.GeoDataFrame(geometry=[first, second])
>>> new_traces = [LineString([(0, 0), (0, 2), (0, 4)])]
>>> modified_idx = [0, 1]
>>> LineMerge.integrate_replacements(traces, new_traces, modified_idx)
                     geometry
0  LINESTRING (0 0, 0 2, 0 4)
static run_loop(traces: GeoDataFrame, tolerance: float | int | Real, buffer_value: float | int | Real) GeoDataFrame

Run multiple conditional linemerge iterations for GeoDataFrame.

This is the main entrypoint.

GeoDataFrame should contain LineStrings.

E.g.

>>> first = LineString([(0, 0), (0, 2)])
>>> second = LineString([(0, 2.001), (0, 4)])
>>> traces = gpd.GeoDataFrame(geometry=[first, second])
>>> tolerance = 5
>>> buffer_value = 0.01
>>> LineMerge.run_loop(traces, tolerance, buffer_value)
                     geometry
0  LINESTRING (0 0, 0 2, 0 4)
fractopo.fractopo_utils.remove_identical_sindex(geosrs: GeoSeries, snap_threshold: float) GeoSeries

Remove stacked nodes by using a search buffer the size of snap_threshold.

fractopo.general module

Contains general calculation and plotting tools.

class fractopo.general.Col(*values)

Bases: Enum

GeoDataFrame column names for attributes.

AZIMUTH = 'azimuth'
AZIMUTH_SET = 'azimuth_set'
LENGTH = 'length'
LENGTH_NON_WEIGHTED = 'length non-weighted'
LENGTH_SET = 'length_set'
LENGTH_WEIGHTS = 'boundary_weight'
class fractopo.general.Param(*values)

Bases: Enum

Column names for geometric and topological parameters.

The ParamInfo instances contain additional metadata.

AREA = ParamInfo(name='Area', plot_as_log=False, unit='$m^2$', needs_topology=False, aggregator=<function sum_aggregation>)
AREAL_FREQUENCY_B20 = ParamInfo(name='Areal Frequency B20', plot_as_log=True, unit='$\\frac{1}{m^2}$', needs_topology=True, aggregator=<function mean_aggregation>)
AREAL_FREQUENCY_P20 = ParamInfo(name='Areal Frequency P20', plot_as_log=True, unit='$\\frac{1}{m^2}$', needs_topology=True, aggregator=<function mean_aggregation>)
BRANCH_MAX_LENGTH = ParamInfo(name='Branch Max Length', plot_as_log=True, unit='$m$', needs_topology=True, aggregator=<function mean_aggregation>)
BRANCH_MEAN_LENGTH = ParamInfo(name='Branch Mean Length', plot_as_log=True, unit='$m$', needs_topology=True, aggregator=<function mean_aggregation>)
BRANCH_MIN_LENGTH = ParamInfo(name='Branch Min Length', plot_as_log=True, unit='$m$', needs_topology=True, aggregator=<function mean_aggregation>)
CIRCLE_COUNT = ParamInfo(name='Circle Count', plot_as_log=False, unit='-', needs_topology=False, aggregator=<function sum_aggregation>)
CONNECTIONS_PER_BRANCH = ParamInfo(name='Connections per Branch', plot_as_log=False, unit='$\\frac{1}{n}$', needs_topology=True, aggregator=<function mean_aggregation>)
CONNECTIONS_PER_TRACE = ParamInfo(name='Connections per Trace', plot_as_log=False, unit='$\\frac{1}{n}$', needs_topology=True, aggregator=<function mean_aggregation>)
CONNECTION_FREQUENCY = ParamInfo(name='Connection Frequency', plot_as_log=False, unit='$\\frac{1}{m^2}$', needs_topology=True, aggregator=<function mean_aggregation>)
DIMENSIONLESS_INTENSITY_B22 = ParamInfo(name='Dimensionless Intensity B22', plot_as_log=False, unit='-', needs_topology=True, aggregator=<function mean_aggregation>)
DIMENSIONLESS_INTENSITY_P22 = ParamInfo(name='Dimensionless Intensity P22', plot_as_log=False, unit='-', needs_topology=False, aggregator=<function mean_aggregation>)
FRACTURE_DENSITY_MAULDON = ParamInfo(name='Fracture Density (Mauldon)', plot_as_log=True, unit='$\\frac{1}{m^2}$', needs_topology=True, aggregator=<function mean_aggregation>)
FRACTURE_INTENSITY_B21 = ParamInfo(name='Fracture Intensity B21', plot_as_log=True, unit='$\\frac{m}{m^2}$', needs_topology=False, aggregator=<function mean_aggregation>)
FRACTURE_INTENSITY_MAULDON = ParamInfo(name='Fracture Intensity (Mauldon)', plot_as_log=True, unit='$\\frac{m}{m^2}$', needs_topology=True, aggregator=<function mean_aggregation>)
FRACTURE_INTENSITY_P21 = ParamInfo(name='Fracture Intensity P21', plot_as_log=True, unit='$\\frac{m}{m^2}$', needs_topology=False, aggregator=<function mean_aggregation>)
NUMBER_OF_BRANCHES = ParamInfo(name='Number of Branches', plot_as_log=False, unit='-', needs_topology=True, aggregator=<function sum_aggregation>)
NUMBER_OF_BRANCHES_TRUE = ParamInfo(name='Number of Branches (Real)', plot_as_log=False, unit='-', needs_topology=True, aggregator=<function sum_aggregation>)
NUMBER_OF_TRACES = ParamInfo(name='Number of Traces', plot_as_log=False, unit='-', needs_topology=True, aggregator=<function sum_aggregation>)
NUMBER_OF_TRACES_TRUE = ParamInfo(name='Number of Traces (Real)', plot_as_log=False, unit='-', needs_topology=True, aggregator=<function sum_aggregation>)
TRACE_MAX_LENGTH = ParamInfo(name='Trace Max Length', plot_as_log=True, unit='$m$', needs_topology=False, aggregator=<function mean_aggregation>)
TRACE_MEAN_LENGTH = ParamInfo(name='Trace Mean Length', plot_as_log=True, unit='$m$', needs_topology=False, aggregator=<function mean_aggregation>)
TRACE_MEAN_LENGTH_MAULDON = ParamInfo(name='Trace Mean Length (Mauldon)', plot_as_log=True, unit='$m$', needs_topology=True, aggregator=<function mean_aggregation>)
TRACE_MIN_LENGTH = ParamInfo(name='Trace Min Length', plot_as_log=True, unit='$m$', needs_topology=False, aggregator=<function mean_aggregation>)
class fractopo.general.ParamInfo(name: str, plot_as_log: bool, unit: str, needs_topology: bool, aggregator: Callable)

Bases: object

Parameter with name and metadata.

aggregator: Callable
name: str
needs_topology: bool
plot_as_log: bool
unit: str
class fractopo.general.ProcessResult(identifier: str, result: Any, error: bool)

Bases: object

Dataclass for multiprocessing result parsing.

error: bool
identifier: str
result: Any
fractopo.general.assign_branch_and_node_colors(feature_type: str) str

Determine color for each branch and node type.

Defaults to red.

>>> assign_branch_and_node_colors("C-C")
'red'
fractopo.general.avg_calc(data)

Calculate average for radial data.

TODO: Should take length into calculations. Not real average atm

fractopo.general.azimu_half(degrees: float) float

Transform azimuth from 180-360 range to range 0-180.

Parameters:

degrees – Degrees in range 0 - 360

Returns:

Degrees in range 0 - 180

fractopo.general.azimuth_to_unit_vector(azimuth: float) ndarray

Convert azimuth to unit vector.

fractopo.general.bool_arrays_sum(arr_1: ndarray, arr_2: ndarray) ndarray

Calculate integer sum of two arrays.

Resulting array consists only of integers 0, 1 and 2.

>>> arr_1 = np.array([True, False, False])
>>> arr_2 = np.array([True, True, False])
>>> bool_arrays_sum(arr_1, arr_2)
array([2, 1, 0])
>>> arr_1 = np.array([True, True])
>>> arr_2 = np.array([True, True])
>>> bool_arrays_sum(arr_1, arr_2)
array([2, 2])
fractopo.general.bounding_polygon(geoseries: GeoSeries | GeoDataFrame) Polygon

Create bounding polygon around GeoSeries.

The geoseries geometries will always be completely enveloped by the polygon. The geometries will not intersect the polygon boundary.

>>> geom = LineString([(1, 0), (1, 1), (-1, -1)])
>>> geoseries = gpd.GeoSeries([geom])
>>> poly = bounding_polygon(geoseries)
>>> poly.wkt
'POLYGON ((2 -2, 2 2, -2 2, -2 -2, 2 -2))'
>>> geoseries.intersects(poly.boundary)
0    False
dtype: bool
fractopo.general.calc_circle_area(radius: float | int | Real) float

Calculate area of circle.

>>> calc_circle_area(1.78)
9.953822163633902
fractopo.general.calc_circle_radius(area: float | int | Real) float

Calculate radius from area.

>>> calc_circle_radius(10.0)
1.7841241161527712
fractopo.general.calc_strike(dip_direction: float) float

Calculate strike from dip direction. Right-handed rule.

E.g.:

>>> calc_strike(50.0)
320.0
>>> calc_strike(180.0)
90.0
Parameters:

dip_direction – The direction of dip.

Returns:

Converted strike.

fractopo.general.check_for_wrong_geometries(traces: GeoDataFrame, area: GeoDataFrame)

Check that traces are line geometries and area contains area geometries.

fractopo.general.check_for_z_coordinates(geodata: GeoDataFrame | GeoSeries) bool

Check if geopandas data contains Z-coordinates.

fractopo.general.compare_unit_vector_orientation(vec_1: ndarray, vec_2: ndarray, threshold_angle: float | int | Real) bool

If vec_1 and vec_2 are too different in orientation, will return False.

fractopo.general.convert_list_columns(gdf: GeoDataFrame, allow: bool = True) GeoDataFrame

Convert list type columns to string.

fractopo.general.create_unit_vector(start_point: Point, end_point: Point) ndarray

Create numpy unit vector from two shapely Points.

Parameters:
  • start_point – The start point.

  • end_point – The end point.

Returns:

The unit vector that points from start_point to end_point.

fractopo.general.define_length_set(length: float, set_df: DataFrame) str

Define sets based on the length of the traces or branches.

fractopo.general.determine_azimuth(line: LineString, halved: bool) float

Calculate azimuth of given line.

If halved -> return is in range [0, 180] Else -> [0, 360]

e.g.: Accepts LineString

>>> determine_azimuth(LineString([(0, 0), (1, 1)]), True)
45.0
>>> determine_azimuth(LineString([(0, 0), (0, 1)]), True)
0.0
>>> determine_azimuth(LineString([(0, 0), (-1, -1)]), False)
225.0
>>> determine_azimuth(LineString([(0, 0), (-1, -1)]), True)
45.0
Parameters:
  • line – The line of which azimuth is determined.

  • halved – Whether to return result in range [0, 180] (halved=True) or [0, 360] (halved=False).

Returns:

The determined azimuth.

fractopo.general.determine_boundary_intersecting_lines(line_gdf: GeoDataFrame, area_gdf: GeoDataFrame, snap_threshold: float) tuple[ndarray, ndarray]

Determine lines that intersect any target area boundary.

fractopo.general.determine_regression_azimuth(line: LineString) float

Determine azimuth of line LineString with linear regression.

A scikit-learn LinearRegression is fitted to the x, y coordinates of the given and the azimuth of the fitted linear line is returned.

The azimuth is returned in range [0, 180].

E.g.

>>> line = LineString([(0, 0), (1, 1), (2, 2), (3, 3)])
>>> determine_regression_azimuth(line)
45.0
>>> line = LineString([(-1, -5), (3, 3)])
>>> round(determine_regression_azimuth(line), 3)
26.565
>>> line = LineString([(0, 0), (0, 3)])
>>> determine_regression_azimuth(line)
0.0
Parameters:

line – The line of which azimuth is determined.

Returns:

The determined azimuth in range [0, 180].

Raises:

ValueError – When LinearRegression returns unexpected coefficients.

fractopo.general.determine_set(value: float | int | Real, value_ranges: Sequence[tuple[float | int | Real, float | int | Real]], set_names: Sequence[str], loop_around: bool) str

Determine which named value range, if any, value is within.

loop_around defines behavior expected for radial data i.e. when value range can loop back around e.g. [160, 50]

E.g.

>>> determine_set(10.0, ((0, 20), (30, 160)), ("0-20", "30-160"), False)
'0-20'

Example with

>>> determine_set(50.0, ((0, 20), (160, 60)), ("0-20", "160-60"), True)
'160-60'
Parameters:
  • value – Value to determine set of.

  • value_ranges – Ranges of each set.

  • set_names – Names of each set.

  • loop_around – Whether the sets loop around. This is the case for radial data such as azimuths but not the case for length data.

Returns:

Set string in which value belongs.

Raises:

ValueError – When set value ranges overlap.

fractopo.general.determine_valid_intersection_points(intersection_geoms: GeoSeries) list[Point]

Filter intersection points between trace candidates and geom.

Only allows Point geometries as intersections. LineString intersections would be possible if geometries are stacked.

Parameters:

intersection_geoms – GeoSeries of intersection (Point) geometries.

Returns:

The valid intersections (Points).

fractopo.general.determine_valid_intersection_points_no_vnode(trace_candidates: GeoSeries, geom: LineString) list[Point]

Filter intersection points between trace candidates and geom with no vnodes.

V-node intersections are validated by looking at the endpoints. If V-nodes were kept as intersection points the VNodeValidator could not find V-node errors.

fractopo.general.dissolve_multi_part_traces(traces: GeoDataFrame | GeoSeries) GeoDataFrame | GeoSeries

Dissolve MultiLineStrings in GeoDataFrame or GeoSeries.

Copies all attribute data of rows with MultiLineStrings to new LineStrings.

fractopo.general.extend_bounds(min_x: float | int | Real, min_y: float | int | Real, max_x: float | int | Real, max_y: float | int | Real, extend_amount: float | int | Real) tuple[float | int | Real, float | int | Real, float | int | Real, float | int | Real]

Extend bounds by addition and reduction.

>>> extend_bounds(0, 0, 10, 10, 10)
(-10, -10, 20, 20)
fractopo.general.fallback_aggregation(values) str

Fallback aggregation where values are simply joined into a string.

fractopo.general.flatten_tuples(list_of_tuples: list[tuple[Any, ...]]) tuple[list[int], list[Any]]

Flatten collection of tuples and return index references.

Indexes are from original tuple groupings.

E.g.

>>> tuples = [(1, 1, 1), (2, 2, 2, 2), (3,)]
>>> flatten_tuples(tuples)
([0, 0, 0, 1, 1, 1, 1, 2], [1, 1, 1, 2, 2, 2, 2, 3])
fractopo.general.focus_plot_to_bounds(ax: Axes, total_bounds: ndarray) Axes

Focus plot to given bounds.

fractopo.general.geom_bounds(geom: LineString | Polygon | MultiPolygon) tuple[float | int | Real, float | int | Real, float | int | Real, float | int | Real]

Get LineString or Polygon bounds.

>>> geom_bounds(LineString([(-10, -10), (10, 10)]))
(-10.0, -10.0, 10.0, 10.0)
fractopo.general.get_next_point_in_trace(trace: LineString, point: Point) Point

Determine next coordinate point towards middle of LineString from point.

fractopo.general.get_trace_coord_points(trace: LineString) list[Point]

Get all coordinate Points of a LineString.

>>> trace = LineString([(0, 0), (2, 0), (3, 0)])
>>> coord_points = get_trace_coord_points(trace)
>>> print([p.wkt for p in coord_points])
['POINT (0 0)', 'POINT (2 0)', 'POINT (3 0)']
fractopo.general.get_trace_endpoints(trace: LineString) tuple[Point, Point]

Return endpoints (shapely.geometry.Point) of a given LineString.

fractopo.general.intersection_count_to_boundary_weight(intersection_count: int) int

Get actual weight factor for boundary intersection count.

>>> intersection_count_to_boundary_weight(2)
0
>>> intersection_count_to_boundary_weight(0)
1
>>> intersection_count_to_boundary_weight(1)
2
fractopo.general.is_azimuth_close(first: float | int | Real, second: float | int | Real, tolerance: float | int | Real, halved: bool = True) bool

Determine are azimuths first and second within tolerance.

Takes into account the radial nature of azimuths.

>>> is_azimuth_close(0, 179, 15)
True
>>> is_azimuth_close(166, 179, 15)
True
>>> is_azimuth_close(20, 179, 15)
False
Parameters:
  • first – First azimuth value to compare.

  • second – Second azimuth value to compare.

  • tolerance – Tolerance for closeness.

  • halved – Are the azimuths azial (i.e. halved=True) or vectors.

fractopo.general.is_empty_area(area: GeoDataFrame, traces: GeoDataFrame)

Check if any traces intersect the area(s) in area GeoDataFrame.

fractopo.general.is_set(value: float | int | Real, value_range: tuple[float | int | Real, float | int | Real], loop_around: bool) bool

Determine if value fits within the given value_range.

If the value range has the possibility of looping around loop_around can be set to true.

>>> is_set(5, (0, 10), False)
True
>>> is_set(5, (175, 15), True)
True
Parameters:
  • value – Value to determine.

  • Tuple[float, (value_range) – The range of values.

  • loop_around – Whether the range loops around. This is the case for radial data such as azimuths but not the case for length data.

Returns:

Is it within range.

fractopo.general.line_intersection_to_points(first: LineString, second: LineString) list[Point]

Perform shapely intersection between two LineStrings.

Enforces only Point returns.

fractopo.general.match_crs(first: GeoSeries | GeoDataFrame, second: GeoSeries | GeoDataFrame) tuple[GeoSeries | GeoDataFrame, GeoSeries | GeoDataFrame]

Match crs between two geopandas data structures.

>>> first = gpd.GeoSeries([Point(1, 1)], crs="EPSG:3067")
>>> second = gpd.GeoSeries([Point(1, 1)])
>>> m_first, m_second = match_crs(first, second)
>>> m_first.crs == m_second.crs
True
fractopo.general.mean_aggregation(values, weights) float | int | Real

Aggregate by calculating mean.

fractopo.general.mls_to_ls(multilinestrings: list[MultiLineString]) list[LineString]

Flattens a list of multilinestrings to a list of linestrings.

>>> multilinestrings = [
...     MultiLineString(
...         [
...             LineString([(1, 1), (2, 2), (3, 3)]),
...             LineString([(1.9999, 2), (-2, 5)]),
...         ]
...     ),
...     MultiLineString(
...         [
...             LineString([(1, 1), (2, 2), (3, 3)]),
...             LineString([(1.9999, 2), (-2, 5)]),
...         ]
...     ),
... ]
>>> result_linestrings = mls_to_ls(multilinestrings)
>>> print([ls.wkt for ls in result_linestrings])
['LINESTRING (1 1, 2 2, 3 3)', 'LINESTRING (1.9999 2, -2 5)',
'LINESTRING (1 1, 2 2, 3 3)', 'LINESTRING (1.9999 2, -2 5)']
fractopo.general.multiprocess(function_to_call: ~collections.abc.Callable, keyword_arguments: ~collections.abc.Sequence, arguments_identifier=<function <lambda>>, repeats: int = 0) list[ProcessResult]

Process function calls in parallel.

Returns result as a list where the error is appended when execution fails.

fractopo.general.numpy_to_python_type(value: Any)

Convert numpy dtype variable to Python type, if possible.

fractopo.general.point_to_point_unit_vector(point: Point, other_point: Point) ndarray

Create unit vector from point to other point.

>>> point = Point(0, 0)
>>> other_point = Point(1, 1)
>>> point_to_point_unit_vector(point, other_point)
array([0.70710678, 0.70710678])
fractopo.general.point_to_xy(point: Point) tuple[float, float]

Get x and y coordinates of Point.

fractopo.general.prepare_geometry_traces(trace_series: GeoSeries) PreparedGeometry

Prepare trace_series geometries for a faster spatial analysis.

Assumes geometries are LineStrings which are consequently collected into a single MultiLineString which is prepared with shapely.prepared.prep.

>>> traces = gpd.GeoSeries(
...     [LineString([(0, 0), (1, 1)]), LineString([(0, 1), (0, -1)])]
... )
>>> prepare_geometry_traces(traces).context.wkt
'MULTILINESTRING ((0 0, 1 1), (0 1, 0 -1))'
fractopo.general.r2_scorer(y_true: ndarray, y_predicted: ndarray) float

Score fit with r2 metric.

Changes the scoring to be best at a value of 0.

fractopo.general.raise_determination_error(attribute: str, verb: str = 'determining', determine_target: str = 'topology (=branches and nodes)')

Raise AttributeError if attribute cannot be determined.

>>> try:
...     raise_determination_error("parameters")
...     assert False
... except AttributeError as exc:
...     print(f"{str(exc)[0:20]}...")
...
Cannot determine par...
fractopo.general.random_points_within(poly: Polygon, num_points: int) list[Point]

Get random points within Polygon.

>>> from pprint import pprint
>>> random.seed(10)
>>> poly = box(0, 0, 1, 1)
>>> result = random_points_within(poly, 2)
>>> pprint([point.within(poly) for point in result])
[True, True]
fractopo.general.read_geofile(path: Path) GeoDataFrame

Read a filepath for a GeoDataFrame representable geo-object.

Parameters:

pathpathlib.Path to a GeoDataFrame representable spatial file.

Returns:

geopandas.GeoDataFrame read from the file.

Raises:

TypeError – If the file could not be parsed as a GeoDataFrame by geopandas.

fractopo.general.remove_duplicate_caseinsensitive_columns(columns) set

Remove duplicate columns case-insensitively.

fractopo.general.remove_z_coordinates(geometry: BaseGeometry | BaseMultipartGeometry) Any

Remove z-coordinates from a geometry.

A shapely geometry should be provided. Output will always be the same type of geometry with the z-coordinates removed.

Parameters:

geometry – Shapely geometry

Returns:

Shapely geometry with the same geometry type

fractopo.general.remove_z_coordinates_from_geodata(geodata: GeoDataFrame | GeoSeries) GeoDataFrame | GeoSeries

Remove Z-coordinates from geometries in geopandasgeodata.

fractopo.general.replace_coord_in_trace(trace: LineString, index: int, replacement: Point) LineString

Replace coordinate Point of LineString at index with replacement Point.

fractopo.general.resolve_split_to_ls(geom: LineString, splitter: LineString) list[LineString]

Resolve split between two LineStrings to only LineString results.

fractopo.general.safe_buffer(geom: Point | LineString | Polygon, radius: float, **kwargs) Polygon

Get type checked Polygon buffer.

>>> result = safe_buffer(Point(0, 0), 1)
>>> isinstance(result, Polygon), round(result.area, 3)
(True, 3.137)
fractopo.general.sanitize_name(name: str) str

Return only alphanumeric parts of name string.

fractopo.general.save_fig(fig: Figure, results_dir: Path, name: str) list[Path]

Save figure as svg image to results dir.

fractopo.general.sd_calc(data)

Calculate standard deviation for radial data.

TODO: Wrong results atm. Needs to take into account real length, not just orientation of unit vector. Calculates standard deviation for radial data (degrees)

E.g.

>>> sd_calc(np.array([2, 5, 8]))
(3.0, 5.00)
Parameters:

data (np.ndarray) – Array of degrees

Returns:

Standard deviation

fractopo.general.silent_output(name: str)

General method to silence output from general func.

fractopo.general.spatial_index_intersection(spatial_index: Any, coordinates: tuple[float | int | Real, float | int | Real, float | int | Real, float | int | Real] | tuple[float | int | Real, float | int | Real]) list[int]

Type-checked spatial index intersection.

TODO: Maybe use spatial_index.quary_bulk for faster execution? TODO: Maybe use a predicate (e.g., “contains”) to specify operation?

fractopo.general.sum_aggregation(values, **_) float | int | Real

Aggregate by calculating sum.

fractopo.general.total_bounds(geodata: GeoSeries | GeoDataFrame) tuple[float, float, float, float]

Get total bounds of geodataset.

>>> geodata = gpd.GeoSeries([Point(-10, 10), Point(10, 10)])
>>> total_bounds(geodata)
(-10.0, 10.0, 10.0, 10.0)
fractopo.general.within_bounds(x: float | int | Real, y: float | int | Real, min_x: float | int | Real, min_y: float | int | Real, max_x: float | int | Real, max_y: float | int | Real) bool

Are x and y within the bounds.

>>> within_bounds(1, 1, 0, 0, 2, 2)
True
fractopo.general.wrap_silence(func)

Wrap function to capture and silence its output.

Used primarily to silence output from powerlaw functions and methods.

fractopo.general.write_geodata(gdf: GeoDataFrame, path: Path, driver: str = 'GeoJSON', allow_list_column_transform: bool = False)

Write geodata with driver.

Default is GeoJSON.

fractopo.general.write_geodataframe(geodataframe: GeoDataFrame, name: str, results_dir: Path)

Save geodataframe as GeoPackage, GeoJSON and shapefile.

fractopo.general.write_topodata(gdf: GeoDataFrame, output_path: Path)

Write branch or nodes GeoDataFrame to output_path.

fractopo.general.zip_equal(*iterables)

Zip iterables of only equal lengths.

Module contents

fractopo.

Fracture Network Analysis