LIS Pro 3D Tutorials

Structural Geology - Automation of Data Processing

< Previous section

Folder Structure

Let’s assume our initial folder structure looks like this:

geology # top-level folder
├── data
│   ├── scan_positions.gpkg
│   ├── scan_1.laz
│   ├── scan_2.laz
│   └── scan_3.laz
└── scripts

The following script allows to automate data preparation and processing for the structural geology analysis workflow:

project_config.py

We use the following configuration to setup our project. Copy, adapt and save the following to a file called project_config.py:

import os, sys

# -----------------------------------------------------------------------------
# LIS Pro 3D installation path
# (make sure to adapt the SAGA_PATH if necessary!)
os.environ["SAGA_PATH"] = r"C:\Program Files\Laserdata\LIS Pro 3D"
sys.path.insert(1, os.environ["SAGA_PATH"])

# -----------------------------------------------------------------------------
# processing directories

# the main folder of the project
BASE_DIR = r"C:\...\geology"

# contains the input data
DATA_DIR = os.path.join(BASE_DIR, "data")

# this folder will hold the results once processing is finished
RESULTS_DIR = os.path.join(BASE_DIR, "results")

# -----------------------------------------------------------------------------
# create (non-existing) dirs
os.makedirs(RESULTS_DIR, exist_ok=True)


# -----------------------------------------------------------------------------
# global settings: files, tool parameters, ...

# data homogenization
PT_SPACING = 0.03

# discontinuity sets
MAX_SETS = 6  # number of sets
MIN_NORMAL_DIFF_POLES = 40.0  # minimum normal difference poles
MAX_NORMAL_DIFF_SET = 30.0  # set labelling distance

# discontinuity spacing
MIN_JOINTSIZE = 250  # minimum point count joint plane
MAX_DISTANCE = 5.0  # maximum persistence distance

Fill data_processing.py

Fill data_processing.py, which contains the main processing instructions. Note that it imports the project_config:

import os

import project_config as cfg
from PySAGA import saga_api, lis_cmd
from PySAGA.tools import *


# -----------------------------------------------------------------------------
def Structural_Analysis():

    # -----------------------------------------------------------------------------
    # read scan positions

    scan_positions = saga_api.SG_Get_Data_Manager().Add_Shapes(
        os.path.join(cfg.DATA_DIR, "scan_positions.gpkg")
    )

    if not scan_positions:
        return lis_cmd.Error_Return("Loading data failed")

    # -----------------------------------------------------------------------------
    # create list of point cloud files

    input_files = lis_cmd.Create_Files_Input_String(cfg.DATA_DIR, ".laz")

    # -----------------------------------------------------------------------------
    # import point clouds

    scans = []

    if not lis_io.Import_LASLAZ_Files(
        POINTS=scans,
        FILES=input_files,
        i=True,
        C=True,
        CLASSES="",
        LAST_RETURNS=False,
        RGB_RANGE="16 bit",
        AOI=False,
        THIN_OUT=False,
    ):
        return lis_cmd.Error_Return("Tool execution failed")

    # -----------------------------------------------------------------------------
    # merge scans and add identifier

    pc_raw = saga_api.SG_Get_Data_Manager().Add_PointCloud()

    if not pointcloud_tools.Merge_Point_Clouds(
        PC_LAYERS=scans,
        PC_OUT=pc_raw,
        DEL_LAYERS=True,
        ADD_IDENTIFIER=True,
        START_VALUE=1,
    ):
        return lis_cmd.Error_Return("Tool execution failed")

    # -----------------------------------------------------------------------------
    # homogenize data

    pc = saga_api.SG_Get_Data_Manager().Add_PointCloud()

    if not lis_thinning.Block_Thinning_3D(
        PC_IN=pc_raw,
        PC_OUT=pc,
        FIELD_FILTER="<not set>",
        COPY_ATTR=True,
        METHOD_THIN="nearest",
        SPACE_HORI=cfg.PT_SPACING,
        SPACE_VERT=cfg.PT_SPACING,
        FILTER_COUNT=False,
        METHOD_BASELINE="lowest point",
    ):
        return lis_cmd.Error_Return("Tool execution failed")

    saga_api.SG_Get_Data_Manager().Delete(pc_raw)

    # -----------------------------------------------------------------------------
    # establish point to scanner mapping

    if not lis_tools.Map_Point_Cloud_to_Scanner(
        PC_IN=pc,
        PC_SCANNER=scan_positions,
        FIELD_PSID="ID",
        COPY_ATTR=True,
        FIELD_PSID_SCANNER="Id",
        FIELD_Z="Z",
        ATTR_SUFFIX="",
        METHOD="point source id",
        DIR_SCANNER=True,
        POS_SCANNER=False,
        RANGE=False,
        POLAR=False,
        AZIMUTHAL=False,
    ):
        return lis_cmd.Error_Return("Tool execution failed")

    # -----------------------------------------------------------------------------
    # surface normals

    if not lis_analysis.Normal_Estimation_Robust_PCA(
        PC_IN=pc,
        FIELD_DIRSCAN_X="dirscan_x",
        FIELD_DIRSCAN_Y="dirscan_y",
        FIELD_DIRSCAN_Z="dirscan_z",
        COPY_ATTR=True,
        ATTR_SUFFIX="",
        METHOD_NN="k nearest neighbors",
        KNN=12,
        MAXDISTTHRES=True,
        MAXDIST=2.0,
        MIN_K=5,
        NOISE=0.01,
        CURV_RADIUS=0.3,
    ):
        return lis_cmd.Error_Return("Tool execution failed")

    # -----------------------------------------------------------------------------
    # derive discontinuity sets

    tab_sets = saga_api.SG_Get_Data_Manager().Add_Table()
    lut = saga_api.SG_Get_Data_Manager().Add_Table()
    plot = saga_api.SG_Get_Data_Manager().Add_Grid()

    if not lis_geology.Discontinuity_Sets(
        PC_IN=pc,
        TAB_SETS=tab_sets,
        LUT=lut,
        GRID_PLOT=plot,
        IMAGE_PLOT=os.path.join(cfg.RESULTS_DIR, "plot.png"),
        FIELD_NORM_X="normal_x",
        FIELD_NORM_Y="normal_y",
        FIELD_NORM_Z="normal_z",
        FIELD_FILTER="<not set>",
        COPY_ATTR=True,
        ATTR_SUFFIX="",
        RADIUS_KDE=15.0,
        KERNEL_KDE="quartic kernel",
        MIN_NORMAL_DIFF_POLES=cfg.MIN_NORMAL_DIFF_POLES,
        MAX_SETS=cfg.MAX_SETS,
        MAX_NORMAL_DIFF_SET=cfg.MAX_NORMAL_DIFF_SET,
        SUPPRESS_POLES="",
        LABEL_PC=True,
        DIPDIR_DIP_SET=True,
        DIPDIR_DIP_POINT=True,
        RGB=True,
        SEPARATE=True,
        SEARCH_RADIUS=0.1,
        MIN_POINTS=3,
        PLANESIZE=True,
    ):
        return lis_cmd.Error_Return("Tool execution failed")

    lut.Save(os.path.join(cfg.RESULTS_DIR, "lut_sets.txt"))

    saga_api.SG_Get_Data_Manager().Delete(tab_sets)
    saga_api.SG_Get_Data_Manager().Delete(lut)
    saga_api.SG_Get_Data_Manager().Delete(plot)

    # -----------------------------------------------------------------------------
    # calculate spacings

    tab_dist = saga_api.SG_Get_Data_Manager().Add_Table()
    tab_hist = saga_api.SG_Get_Data_Manager().Add_Table()
    tab_orient = saga_api.SG_Get_Data_Manager().Add_Table()

    if not lis_geology.Discontinuity_Spacing(
        PC_IN=pc,
        TAB_DIST=tab_dist,
        TAB_HIST=tab_hist,
        TAB_ORIENT=tab_orient,
        FIELD_SETID="setid",
        FIELD_NORM_X="set_normal_x",
        FIELD_NORM_Y="set_normal_y",
        FIELD_NORM_Z="set_normal_z",
        FIELD_JOINTID="jointid",
        FIELD_JOINTSIZE="jointsize",
        MIN_JOINTSIZE=cfg.MIN_JOINTSIZE,
        MAX_DISTANCE=cfg.MAX_DISTANCE,
        BIN_SIZE=0.1,
        THIN_OUT=True,
        SPACING=0.3,
    ):
        return lis_cmd.Error_Return("Tool execution failed")

    pc.Save(os.path.join(cfg.RESULTS_DIR, "rock_face.sg-pts-z"))
    tab_dist.Save(os.path.join(cfg.RESULTS_DIR, "distances.txt"))
    tab_hist.Save(os.path.join(cfg.RESULTS_DIR, "histogram.txt"))
    tab_orient.Save(os.path.join(cfg.RESULTS_DIR, "set_statistics.txt"))

    saga_api.SG_Get_Data_Manager().Delete()

    return True


# -----------------------------------------------------------------------------
def Create_Plots():

    import pandas as pd
    import numpy as np
    import matplotlib.pyplot as plt

    # -------------------------------------------------------------------
    sets = [
        "set_1",
        "set_2",
        "set_3",
        "set_4",
        "set_5",
        "set_6",
        "set_7",
        "set_8",
        "set_9",
        "set_10",
        "set_11",
        "set_12",
        "set_13",
        "set_14",
        "set_15",
        "set_16",
        "set_17",
        "set_18",
        "set_19",
        "set_20",
    ]

    # -------------------------------------------------------------------
    colors = {
        "set_1": "#f03c3c",
        "set_2": "#84c484",
        "set_3": "#ffff10",
        "set_4": "#79cce6",
        "set_5": "#ee9f0e",
        "set_6": "#f580ff",
        "set_7": "#8080c0",
        "set_8": "#808000",
        "set_9": "#4fff4f",
        "set_10": "#0080ff",
        "set_11": "#ddd359",
        "set_12": "#c654cf",
        "set_13": "#ab6363",
        "set_14": "#625584",
        "set_15": "#248243",
        "set_16": "#0000ff",
        "set_17": "#ff0080",
        "set_18": "#00ce00",
        "set_19": "#c4ffff",
        "set_20": "#c10000",
    }

    # -----------------------------------------------------------------------------
    # cumulative frequency distribution plot

    file_path = os.path.join(cfg.RESULTS_DIR, "distances.txt")

    df = pd.read_csv(file_path, sep="\t")
    df.columns = df.columns.str.replace("_raw", "", regex=False)

    # calculate the number of derived sets
    num_sets = (int)((len(df.columns) - 1) / 2)

    fig, ax = plt.subplots(figsize=(10, 6))

    for col in sets[:num_sets]:

        x = df[col].to_numpy(dtype=float)
        x = x[x > 0]
        x_sorted = np.sort(x)

        y = np.arange(1, len(x_sorted) + 1) / len(x_sorted) * 100

        ax.plot(
            x_sorted,
            y,
            marker="o",
            linewidth=2,
            markersize=5,
            color=colors[col],
            label=col,
        )

    plt.title("Cumulative Frequency Distribution")
    plt.xlabel("Discontinuity Spacing")
    plt.ylabel("Cumulative Frequency (%)")

    ax.set_xscale("log")
    ax.set_xticks([0.001, 0.01, 0.1, 1, 10])
    ax.set_xticklabels(["0.001", "0.01", "0.1", "1", "10"])
    ax.set_xlim(left=0.001)
    ax.set_ylim(0, 100)

    plt.grid(True, linestyle="--", alpha=0.5)

    plt.legend(title="Sets")

    plt.tight_layout()

    plt.savefig(os.path.join(cfg.RESULTS_DIR, "cumulative_frequency_plot.png"))

    # -----------------------------------------------------------------------------
    # histogram plot

    file_path = os.path.join(cfg.RESULTS_DIR, "histogram.txt")

    df = pd.read_csv(file_path, sep="\t")
    df.columns = df.columns.str.strip()

    df["bin_center"] = (df["min"] + df["max"]) / 2

    fig = plt.figure(figsize=(12, 7))
    ax = fig.add_subplot(111, projection="3d")

    # bar geometry
    dx = 0.08  # bin width
    dy = 0.5  # spacing between sets

    for i, s in enumerate(sets[:num_sets]):

        x = df["bin_center"].to_numpy()
        y = np.full_like(x, i, dtype=float)
        z = np.zeros_like(x)

        counts = df[f"count_{s}"].to_numpy()
        total = counts.sum()
        percent = (counts / total) * 100

        ax.bar3d(
            x, y, z, dx, dy, percent, color=colors[s], alpha=0.75, label=s
        )

    plt.title("Discontinuity Spacing")
    ax.set_xlabel("Discontinuity Spacing (m)")
    ax.set_ylabel("Set")
    ax.set_zlabel("Frequency (%)")

    ax.set_yticks(range(num_sets))
    ax.set_yticklabels([s.replace("set_", "") for s in sets[:num_sets]])

    plt.tight_layout()

    plt.savefig(os.path.join(cfg.RESULTS_DIR, "histogram_plot.png"))


# -----------------------------------------------------------------------------
if __name__ == "__main__":

    starttime = lis_cmd.Start_Logging(False)

    if not Structural_Analysis():
        lis_cmd.Error_Exit("Processing failed", starttime)

    Create_Plots()

    lis_cmd.Stop_Logging(starttime)

< Previous section