Structural Geology - Automation of Data Processing
Folder Structure
Let’s assume our initial folder structure looks like this:
geology # top-level folder
├── data
│ ├── scan_positions.gpkg
│ ├── scan_1.laz
│ ├── scan_2.laz
│ └── scan_3.laz
└── scripts
The following script allows to automate data preparation and processing for the structural geology analysis workflow:
project_config.py
We use the following configuration to setup our project. Copy, adapt and save the following to a file called project_config.py:
import os, sys
# -----------------------------------------------------------------------------
# LIS Pro 3D installation path
# (make sure to adapt the SAGA_PATH if necessary!)
os.environ["SAGA_PATH"] = r"C:\Program Files\Laserdata\LIS Pro 3D"
sys.path.insert(1, os.environ["SAGA_PATH"])
# -----------------------------------------------------------------------------
# processing directories
# the main folder of the project
BASE_DIR = r"C:\...\geology"
# contains the input data
DATA_DIR = os.path.join(BASE_DIR, "data")
# this folder will hold the results once processing is finished
RESULTS_DIR = os.path.join(BASE_DIR, "results")
# -----------------------------------------------------------------------------
# create (non-existing) dirs
os.makedirs(RESULTS_DIR, exist_ok=True)
# -----------------------------------------------------------------------------
# global settings: files, tool parameters, ...
# data homogenization
PT_SPACING = 0.03
# discontinuity sets
MAX_SETS = 6 # number of sets
MIN_NORMAL_DIFF_POLES = 40.0 # minimum normal difference poles
MAX_NORMAL_DIFF_SET = 30.0 # set labelling distance
# discontinuity spacing
MIN_JOINTSIZE = 250 # minimum point count joint plane
MAX_DISTANCE = 5.0 # maximum persistence distanceFill data_processing.py
Fill data_processing.py, which contains the main processing instructions. Note that it imports the project_config:
import os
import project_config as cfg
from PySAGA import saga_api, lis_cmd
from PySAGA.tools import *
# -----------------------------------------------------------------------------
def Structural_Analysis():
# -----------------------------------------------------------------------------
# read scan positions
scan_positions = saga_api.SG_Get_Data_Manager().Add_Shapes(
os.path.join(cfg.DATA_DIR, "scan_positions.gpkg")
)
if not scan_positions:
return lis_cmd.Error_Return("Loading data failed")
# -----------------------------------------------------------------------------
# create list of point cloud files
input_files = lis_cmd.Create_Files_Input_String(cfg.DATA_DIR, ".laz")
# -----------------------------------------------------------------------------
# import point clouds
scans = []
if not lis_io.Import_LASLAZ_Files(
POINTS=scans,
FILES=input_files,
i=True,
C=True,
CLASSES="",
LAST_RETURNS=False,
RGB_RANGE="16 bit",
AOI=False,
THIN_OUT=False,
):
return lis_cmd.Error_Return("Tool execution failed")
# -----------------------------------------------------------------------------
# merge scans and add identifier
pc_raw = saga_api.SG_Get_Data_Manager().Add_PointCloud()
if not pointcloud_tools.Merge_Point_Clouds(
PC_LAYERS=scans,
PC_OUT=pc_raw,
DEL_LAYERS=True,
ADD_IDENTIFIER=True,
START_VALUE=1,
):
return lis_cmd.Error_Return("Tool execution failed")
# -----------------------------------------------------------------------------
# homogenize data
pc = saga_api.SG_Get_Data_Manager().Add_PointCloud()
if not lis_thinning.Block_Thinning_3D(
PC_IN=pc_raw,
PC_OUT=pc,
FIELD_FILTER="<not set>",
COPY_ATTR=True,
METHOD_THIN="nearest",
SPACE_HORI=cfg.PT_SPACING,
SPACE_VERT=cfg.PT_SPACING,
FILTER_COUNT=False,
METHOD_BASELINE="lowest point",
):
return lis_cmd.Error_Return("Tool execution failed")
saga_api.SG_Get_Data_Manager().Delete(pc_raw)
# -----------------------------------------------------------------------------
# establish point to scanner mapping
if not lis_tools.Map_Point_Cloud_to_Scanner(
PC_IN=pc,
PC_SCANNER=scan_positions,
FIELD_PSID="ID",
COPY_ATTR=True,
FIELD_PSID_SCANNER="Id",
FIELD_Z="Z",
ATTR_SUFFIX="",
METHOD="point source id",
DIR_SCANNER=True,
POS_SCANNER=False,
RANGE=False,
POLAR=False,
AZIMUTHAL=False,
):
return lis_cmd.Error_Return("Tool execution failed")
# -----------------------------------------------------------------------------
# surface normals
if not lis_analysis.Normal_Estimation_Robust_PCA(
PC_IN=pc,
FIELD_DIRSCAN_X="dirscan_x",
FIELD_DIRSCAN_Y="dirscan_y",
FIELD_DIRSCAN_Z="dirscan_z",
COPY_ATTR=True,
ATTR_SUFFIX="",
METHOD_NN="k nearest neighbors",
KNN=12,
MAXDISTTHRES=True,
MAXDIST=2.0,
MIN_K=5,
NOISE=0.01,
CURV_RADIUS=0.3,
):
return lis_cmd.Error_Return("Tool execution failed")
# -----------------------------------------------------------------------------
# derive discontinuity sets
tab_sets = saga_api.SG_Get_Data_Manager().Add_Table()
lut = saga_api.SG_Get_Data_Manager().Add_Table()
plot = saga_api.SG_Get_Data_Manager().Add_Grid()
if not lis_geology.Discontinuity_Sets(
PC_IN=pc,
TAB_SETS=tab_sets,
LUT=lut,
GRID_PLOT=plot,
IMAGE_PLOT=os.path.join(cfg.RESULTS_DIR, "plot.png"),
FIELD_NORM_X="normal_x",
FIELD_NORM_Y="normal_y",
FIELD_NORM_Z="normal_z",
FIELD_FILTER="<not set>",
COPY_ATTR=True,
ATTR_SUFFIX="",
RADIUS_KDE=15.0,
KERNEL_KDE="quartic kernel",
MIN_NORMAL_DIFF_POLES=cfg.MIN_NORMAL_DIFF_POLES,
MAX_SETS=cfg.MAX_SETS,
MAX_NORMAL_DIFF_SET=cfg.MAX_NORMAL_DIFF_SET,
SUPPRESS_POLES="",
LABEL_PC=True,
DIPDIR_DIP_SET=True,
DIPDIR_DIP_POINT=True,
RGB=True,
SEPARATE=True,
SEARCH_RADIUS=0.1,
MIN_POINTS=3,
PLANESIZE=True,
):
return lis_cmd.Error_Return("Tool execution failed")
lut.Save(os.path.join(cfg.RESULTS_DIR, "lut_sets.txt"))
saga_api.SG_Get_Data_Manager().Delete(tab_sets)
saga_api.SG_Get_Data_Manager().Delete(lut)
saga_api.SG_Get_Data_Manager().Delete(plot)
# -----------------------------------------------------------------------------
# calculate spacings
tab_dist = saga_api.SG_Get_Data_Manager().Add_Table()
tab_hist = saga_api.SG_Get_Data_Manager().Add_Table()
tab_orient = saga_api.SG_Get_Data_Manager().Add_Table()
if not lis_geology.Discontinuity_Spacing(
PC_IN=pc,
TAB_DIST=tab_dist,
TAB_HIST=tab_hist,
TAB_ORIENT=tab_orient,
FIELD_SETID="setid",
FIELD_NORM_X="set_normal_x",
FIELD_NORM_Y="set_normal_y",
FIELD_NORM_Z="set_normal_z",
FIELD_JOINTID="jointid",
FIELD_JOINTSIZE="jointsize",
MIN_JOINTSIZE=cfg.MIN_JOINTSIZE,
MAX_DISTANCE=cfg.MAX_DISTANCE,
BIN_SIZE=0.1,
THIN_OUT=True,
SPACING=0.3,
):
return lis_cmd.Error_Return("Tool execution failed")
pc.Save(os.path.join(cfg.RESULTS_DIR, "rock_face.sg-pts-z"))
tab_dist.Save(os.path.join(cfg.RESULTS_DIR, "distances.txt"))
tab_hist.Save(os.path.join(cfg.RESULTS_DIR, "histogram.txt"))
tab_orient.Save(os.path.join(cfg.RESULTS_DIR, "set_statistics.txt"))
saga_api.SG_Get_Data_Manager().Delete()
return True
# -----------------------------------------------------------------------------
def Create_Plots():
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
# -------------------------------------------------------------------
sets = [
"set_1",
"set_2",
"set_3",
"set_4",
"set_5",
"set_6",
"set_7",
"set_8",
"set_9",
"set_10",
"set_11",
"set_12",
"set_13",
"set_14",
"set_15",
"set_16",
"set_17",
"set_18",
"set_19",
"set_20",
]
# -------------------------------------------------------------------
colors = {
"set_1": "#f03c3c",
"set_2": "#84c484",
"set_3": "#ffff10",
"set_4": "#79cce6",
"set_5": "#ee9f0e",
"set_6": "#f580ff",
"set_7": "#8080c0",
"set_8": "#808000",
"set_9": "#4fff4f",
"set_10": "#0080ff",
"set_11": "#ddd359",
"set_12": "#c654cf",
"set_13": "#ab6363",
"set_14": "#625584",
"set_15": "#248243",
"set_16": "#0000ff",
"set_17": "#ff0080",
"set_18": "#00ce00",
"set_19": "#c4ffff",
"set_20": "#c10000",
}
# -----------------------------------------------------------------------------
# cumulative frequency distribution plot
file_path = os.path.join(cfg.RESULTS_DIR, "distances.txt")
df = pd.read_csv(file_path, sep="\t")
df.columns = df.columns.str.replace("_raw", "", regex=False)
# calculate the number of derived sets
num_sets = (int)((len(df.columns) - 1) / 2)
fig, ax = plt.subplots(figsize=(10, 6))
for col in sets[:num_sets]:
x = df[col].to_numpy(dtype=float)
x = x[x > 0]
x_sorted = np.sort(x)
y = np.arange(1, len(x_sorted) + 1) / len(x_sorted) * 100
ax.plot(
x_sorted,
y,
marker="o",
linewidth=2,
markersize=5,
color=colors[col],
label=col,
)
plt.title("Cumulative Frequency Distribution")
plt.xlabel("Discontinuity Spacing")
plt.ylabel("Cumulative Frequency (%)")
ax.set_xscale("log")
ax.set_xticks([0.001, 0.01, 0.1, 1, 10])
ax.set_xticklabels(["0.001", "0.01", "0.1", "1", "10"])
ax.set_xlim(left=0.001)
ax.set_ylim(0, 100)
plt.grid(True, linestyle="--", alpha=0.5)
plt.legend(title="Sets")
plt.tight_layout()
plt.savefig(os.path.join(cfg.RESULTS_DIR, "cumulative_frequency_plot.png"))
# -----------------------------------------------------------------------------
# histogram plot
file_path = os.path.join(cfg.RESULTS_DIR, "histogram.txt")
df = pd.read_csv(file_path, sep="\t")
df.columns = df.columns.str.strip()
df["bin_center"] = (df["min"] + df["max"]) / 2
fig = plt.figure(figsize=(12, 7))
ax = fig.add_subplot(111, projection="3d")
# bar geometry
dx = 0.08 # bin width
dy = 0.5 # spacing between sets
for i, s in enumerate(sets[:num_sets]):
x = df["bin_center"].to_numpy()
y = np.full_like(x, i, dtype=float)
z = np.zeros_like(x)
counts = df[f"count_{s}"].to_numpy()
total = counts.sum()
percent = (counts / total) * 100
ax.bar3d(
x, y, z, dx, dy, percent, color=colors[s], alpha=0.75, label=s
)
plt.title("Discontinuity Spacing")
ax.set_xlabel("Discontinuity Spacing (m)")
ax.set_ylabel("Set")
ax.set_zlabel("Frequency (%)")
ax.set_yticks(range(num_sets))
ax.set_yticklabels([s.replace("set_", "") for s in sets[:num_sets]])
plt.tight_layout()
plt.savefig(os.path.join(cfg.RESULTS_DIR, "histogram_plot.png"))
# -----------------------------------------------------------------------------
if __name__ == "__main__":
starttime = lis_cmd.Start_Logging(False)
if not Structural_Analysis():
lis_cmd.Error_Exit("Processing failed", starttime)
Create_Plots()
lis_cmd.Stop_Logging(starttime)