LIS Pro 3D Tutorials

Automate Data Processing

< Previous section

The Workflow

The following script will take care of the following 15 steps:

  1. Select current processing unit
  2. Load and find corresponding buildings for the selected unit
  3. Derive processing extent
  4. Get building subset from point cloud data
  5. Query DTM from raster catalogue
  6. Group building points into clumps
  7. Find planar segments in the building point cloud
  8. Calculate slope of planar patches
  9. Filter out steep and small segments
  10. Assign roof segments to buildings
  11. Generate building models
  12. Compute building statistics
  13. Join attributes from input building footprint layer
  14. Export to CityJSON
  15. Export to CityGML

Final Code

import os
import project_config as cfg
from PySAGA import saga_api, lis_cmd

from PySAGA.tools import *


@lis_cmd.proc_function_wrapper
def process_unit(unit_index):

    processing_units = saga_api.SG_Get_Data_Manager().Add_Shapes(
        cfg.TILESHAPE_250
    )
    if not processing_units.is_Valid():
        return lis_cmd.Error_Return("Loading processing_units failed")

    buildings_used = saga_api.SG_Get_Data_Manager().Add_Shapes(
        cfg.BUILDING_FOOTPRINTS_USED
    )
    if not buildings_used.is_Valid():
        return lis_cmd.Error_Return(
            f"Loading building footprints {cfg.BUILDING_FOOTPRINTS_USED} failed"
        )

    # -------------------------------------------------------------------------
    # get unit to process
    proc_unit = processing_units.Get_Shape(unit_index)

    xmin = proc_unit.Get_Extent().Get_XMin()
    ymin = proc_unit.Get_Extent().Get_YMin()

    # units are referenced by their lower-left corner coordinate
    unit_id = f"{int(xmin)}_{int(ymin)}"
    lis_cmd.Message_Print(f"\nProcessing unit {unit_id} \n")

    # -------------------------------------------------------------------------
    # select current unit
    processing_units.Select(unit_index)

    # -------------------------------------------------------------------------
    # copy current unit to new layer
    current_unit = saga_api.SG_Get_Data_Manager().Add_Shapes()

    if not shapes_tools.Copy_Selection_to_New_Shapes_Layer(
        INPUT=processing_units, OUTPUT=current_unit
    ):
        return lis_cmd.Error_Return("Failed to execute tool")

    # -------------------------------------------------------------------------
    # select buildings with their centroid in current processing unit
    if not shapes_tools.Select_by_Location(
        SHAPES=buildings_used,
        LOCATIONS=current_unit,
        CONDITION="have their centroid in",
        METHOD="new selection",
    ):
        return lis_cmd.Error_Return("Failed to execute tool")

    # -------------------------------------------------------------------------
    # copy selected buildings to new shapes layer
    current_footprints = saga_api.SG_Get_Data_Manager().Add_Shapes()

    if not shapes_tools.Copy_Selection_to_New_Shapes_Layer(
        INPUT=buildings_used, OUTPUT=current_footprints
    ):
        return lis_cmd.Error_Return("Failed to execute tool")

    # -------------------------------------------------------------------------
    # compute extent of current buildings
    current_extent = saga_api.SG_Get_Data_Manager().Add_Shapes()
    if not shapes_tools.Get_Shapes_Extents(
        SHAPES=current_footprints, EXTENTS=current_extent, OUTPUT="all shapes"
    ):
        return lis_cmd.Error_Return("Failed to execute tool")

    # -------------------------------------------------------------------------
    # query building points from point cloud catalogue

    # will be filled with the loaded point cloud subset of type 'saga_api.CSG_PointCloud'
    pc_out_list = []
    if not lis_virtual.Get_Subset_from_Virtual_LASLAZ(
        AOI_SHP=current_extent,
        PC_OUT=pc_out_list,
        FILENAME=cfg.LASVF,
        FILEPATH="",
        COPY_ATTR=True,
        CONSTRAIN_QUERY=True,
        ATTR_FIELD="classification",
        VALUE_RANGE="6;6",
        FILE_COMPRESSION=True,
        ERROR_IF_EMPTY=False,
        THIN_OUT=False,
        FIELD_TILENAME="<not set>",
        AOI_XRANGE="0; 0",
        AOI_YRANGE="0; 0",
        AOI_ADD_OVERLAP=True,
        OVERLAP=cfg.OVERLAP,
        SKIP_EMPTY_AOIS=False,
        FILENAME_TILE_INFO="",
        METHOD_CLIP="lower and left boundary is inside",
        ONE_PC_PER_POLYGON=False,
    ):
        return lis_cmd.Error_Return("Failed to execute tool")

    if len(pc_out_list) < 1:
        return lis_cmd.Error_Return(
            f"Processing unit {unit_id} contains no point cloud data, skipping!"
        )

    # add the point cloud dataset to the data manager for easier memory management
    building_pts = (
        saga_api.SG_Get_Data_Manager().Add(pc_out_list[0]).asPointCloud()
    )

    if building_pts.Get_Count() < 5:
        return lis_cmd.Error_Return(
            f"Processing unit {unit_id} has less than 5 points, skipping!"
        )

    # -------------------------------------------------------------------------
    # query dtm from raster catalogue

    # this list will hold a single DTM grid of type 'saga_api.CSG_Grid'
    grids_list = []
    if not io_gdal.Import_Raster(
        EXTENT_SHAPES=current_extent,
        GRIDS=grids_list,
        FILES=cfg.VRT,
        MULTIPLE="single grids",
        TRANSFORM=True,
        RESAMPLING="Nearest Neighbour",
        EXTENT="shapes extent",
        EXTENT_BUFFER=cfg.OVERLAP,
    ):
        return lis_cmd.Error_Return("Failed to execute tool")

    if len(grids_list) < 1:
        return lis_cmd.Error_Return(
            f"Processing unit {unit_id} contains no grid data, skipping!"
        )

    # add the grid to the data manager for easier memory management
    dtm = saga_api.SG_Get_Data_Manager().Add(grids_list[0]).asGrid()

    # -------------------------------------------------------------------------
    # group building clumps
    if not lis_segmentation.Densitybased_Spatial_Clustering(
        PC_IN=building_pts,
        FIELD_ATTR_1="X",
        FIELD_ATTR_2="Y",
        FIELD_ATTR_3="Z",
        FIELD_CONSTRAINT="<not set>",
        COPY_ATTR=True,
        ATTR_SUFFIX="bld",
        CHOICE_DIM="3-dimensional",
        SEARCH_RADIUS=cfg.RADIUS_CLUSTER,
        MIN_POINTS=cfg.MIN_POINTS_CLUSTER,
        CLUSTERSIZE=False,
        FILTER_BY_SIZE=True,
        MIN_CLUSTERSIZE=cfg.MIN_CLUSTERSIZE,
    ):
        return lis_cmd.Error_Return("Failed to execute tool")

    # -------------------------------------------------------------------------
    # find planar patches
    if not lis_segmentation.Clump_Segmentation(
        PC_IN=building_pts,
        FIELD_CLUMPID="cluster_bld",
        FIELD_SEED="<not set>",
        FIELD_THRES="<not set>",
        FIELD_DIRSCAN_X="<not set>",
        COPY_ATTR=True,
        ATTR_SUFFIX="roof",
        TILESIZE=cfg.TILESIZE_SEGMENTATION,
        OVERLAP=cfg.OVERLAP_SEGMENTATION,
        SEGMENT_MINSIZE=cfg.SEGMENT_MINSIZE,
        ROBUST_DIST_THRES=cfg.ROBUST_DIST_THRES,
        RANSAC_MODEL_POINT_DISTANCE="0.02; 2.00",
        RANSAC_ITER=cfg.RANSAC_ITER,
        RANSAC_SEED=cfg.RANSAC_SEED,
        REGION_RADIUS=cfg.REGION_RADIUS,
        REGION_MINSIZE=cfg.REGION_MINSIZE,
        REGION_MAXANGLE=cfg.REGION_MAXANGLE,
        REGION_MAXOFFSET=cfg.REGION_MAXOFFSET,
        SEGSIZE=True,
        NORMAL=False,
    ):
        return lis_cmd.Error_Return("Failed to execute tool")

    # -------------------------------------------------------------------------
    # compute slope angle per roof segment
    if not lis_analysis.Segment_Features(
        PC_IN=building_pts,
        FIELD_SEGMENTID="segmentid_roof",
        FIELD_DIRSCAN_X="<not set>",
        COPY_ATTR=True,
        ATTR_SUFFIX="roof",
        MAX_EDGELENGTH=1.500000,
        SLOPE=True,
    ):
        return lis_cmd.Error_Return("Failed to execute tool")

    # -------------------------------------------------------------------------
    # filter roof segments by size and slope
    buildings_filtered = saga_api.SG_Get_Data_Manager().Add_PointCloud()

    if not lis_filtering.Attribute_Filter(
        PC_IN=building_pts,
        PC_OUT=buildings_filtered,
        FIELD_ATTR_1="segmentsize_roof",
        FIELD_ATTR_2="slope_roof",
        FIELD_ATTR_3="<not set>",
        COPY_ATTR=True,
        METHOD="drop points",
        TYPE_1="lower limit (min)",
        MIN_1=cfg.FILTER_MIN_SEGSIZE,
        TYPE_2="upper limit (max)",
        MAX_2=cfg.FILTER_MAX_SLOPE,
    ):
        return lis_cmd.Error_Return("Failed to execute tool")

    # -------------------------------------------------------------------------
    # assign roof surfaces to buildings
    if not lis_city_modeller.Assign_Roof_Segments_to_Buildings(
        PC_IN=buildings_filtered,
        SHP_IN=current_footprints,
        FIELD_ROOFID="segmentid_roof",
        FIELD_BLOCKED="<not set>",
        COPY_ATTR=True,
        FIELD_BUILDINGID="proc_id",
        FIELD_LABEL_FLAG="<not set>",
        ATTR_SUFFIX="",
        MIN_PERCENTAGE_INSIDE=cfg.PERCENTAGE_INSIDE,
        COVERAGE_FILTER=False,
        FLAG_SEGMENTS=False,
        REMOVE_UNCONNECTED=False,
        ID_UNDEF=-1,
    ):
        return lis_cmd.Error_Return("Failed to execute tool")

    # -------------------------------------------------------------------------
    # generate building models
    shp_lod2 = saga_api.SG_Get_Data_Manager().Add_Shapes()

    if not lis_city_modeller.LOD2_Building_Modeller(
        PC_IN=buildings_filtered,
        GRID_DTM=dtm,
        SHP_GROUND_PLAN=current_footprints,
        SHP_LOD2=shp_lod2,
        FIELD_BUILDINGID="proc_id",
        FIELD_ROOFID="segmentid_roof_relabel",
        FIELD_BUILDINGID_G="proc_id",
        FIELD_BEDPLATE_G="<not set>",
        MAX_SLOPE=cfg.MODEL_MAX_SLOPE,
        MIN_POINTS_ROOF=cfg.MIN_POINTS_ROOF,
        MAX_EDGELENGTH=cfg.MAX_EDGELENGTH,
        MAX_LINEDIST=cfg.MAX_LINEDIST,
        SNAP_DIST=cfg.SNAP_DIST,
        METHOD_GAP_FILLING="roof plane minimum height",
        ITERATIONS_ROOF_GAPS=cfg.ITERATIONS_ROOF_GAPS,
        MIN_BUILDING_HEIGHT=cfg.MIN_BUILDING_HEIGHT,
        RECTIFY_SMALL_POLYGONS=True,
        MAX_AREA_RECTIFY=cfg.MAX_AREA_RECTIFY,
    ):
        return lis_cmd.Error_Return("Failed to execute tool")

    # -------------------------------------------------------------------------
    # calculate building statistics
    models_out = saga_api.SG_Get_Data_Manager().Add_Shapes()

    if not lis_city_modeller.LOD2_Building_Statistics(
        SHP_BUILDINGS=shp_lod2,
        GRID_DTM=dtm,
        SHP_STATS=models_out,
        FIELD_BUILDINGID="proc_id",
        FIELD_SURFACECLASS="surfaceClass",
    ):
        return lis_cmd.Error_Return("Failed to execute tool")

    # -------------------------------------------------------------------------
    # join attributes of input building footprint layer
    if not table_tools.Join_Attributes_from_a_Table(
        TABLE_A=models_out,
        TABLE_B=current_footprints,
        ID_A="proc_id",
        ID_B="proc_id",
        FIELDS_ALL=True,
        KEEP_ALL=True,
        CMP_CASE=True,
    ):
        return lis_cmd.Error_Return("Failed to execute tool")

    # -------------------------------------------------------------------------
    # save model as shapefile
    models_out.Get_Projection().Create(saga_api.CSG_String(cfg.PROJ))
    models_out_file = os.path.join(cfg.SHAPES_DIR, f"{unit_id}.shp")
    models_out.Save(models_out_file)

    # -------------------------------------------------------------------------
    # export model as cityjson file
    json_file = os.path.join(cfg.JSON_DIR, f"{unit_id}.json")

    if not lis_city_modeller.Export_Shapes_to_CityJSON(
        SHP_IN=models_out,
        FIELD_OBJECTID="proc_id",
        FIELD_SURFACECLASS="surfaceClass",
        FIELD_HEIGHT="measuredHeight",
        FIELDS_CITYOBJECT="roofHgtMax",
        FIELDS_POLYFACE="roofSlope;roofAspect",
        FIELD_EXTERNALID=cfg.EXTERNALID,
        FILE=json_file,
        VERSION="2.0.1",
        CITY_OBJECT="Building",
        LOD=2,
        EPSG=cfg.EPSG,
        DATE="2024-07-10",
        INFO_SYSTEM_EXTERNAL=cfg.EXTERNALID,
        GEOMETRY_TYPE="Solid",
        CLEAN_SOLID=True,
        SNAP_TOL=0.001,
        COORD_PRECISION=3,
    ):
        return lis_cmd.Error_Return("Failed to execute tool")

    # -------------------------------------------------------------------------
    # export model as citygml file
    gml_file = os.path.join(cfg.GML_DIR, f"{unit_id}.gml")

    if not lis_city_modeller.Export_Shapes_to_CityGML(
        SHP_IN=models_out,
        FIELD_OBJECTID="proc_id",
        FIELD_SURFACECLASS="surfaceClass",
        FIELD_HEIGHT="measuredHeight",
        FIELDS_CITYOBJECT="roofHgtMax",
        FIELDS_POLYFACE="roofSlope;roofAspect",
        FIELD_EXTERNALID=cfg.EXTERNALID,
        FILE=gml_file,
        CITY_OBJECT="Building",
        LOD=2,
        DATE="2024-07-10",
        EPSG=str(cfg.EPSG),
        INFO_SYSTEM_EXTERNAL=cfg.EXTERNALID,
        GEOMETRY_TYPE="Solid",
        COORD_PRECISION=3,
    ):
        return lis_cmd.Error_Return("Failed to execute tool")

    # -------------------------------------------------------------------------
    # free memory ressources
    saga_api.SG_Get_Data_Manager().Delete()

    return True


if __name__ == "__main__":
    # -----------------------------------------------------------------------------
    # start the logging
    starttime = lis_cmd.Start_Logging(False)

    # -----------------------------------------------------------------------------
    # load the shapefile with the processing units
    processing_units = saga_api.SG_Get_Data_Manager().Add_Shapes(
        cfg.TILESHAPE_250
    )

    if not processing_units.is_Valid():
        lis_cmd.Error_Exit(
            "Loading processing_units failed",
            starttime,
            True,
        )

    lis_cmd.Message_Print(
        f"Number of units to process: {processing_units.Get_Count()} \n\n"
    )

    # -------------------------------------------------------------------------
    # execute the process_unit function in parallel (each unit on a
    # CPU core) with the maximum number of available CPU cores
    num_threads = saga_api.SG_OMP_Get_Max_Num_Threads()

    num_processing_units = processing_units.Get_Count()

    lis_cmd.Run_Parallel_Tasks(process_unit, num_processing_units, num_threads)

    # -------------------------------------------------------------------------
    # free memory and stop logging
    saga_api.SG_Get_Data_Manager().Delete()

    lis_cmd.Stop_Logging(starttime)

Create a Run Script

In your scripts folder, create a new Python script called “run_script.py” and add the following code to the script!

from subprocess import run
import os
import sys
import argparse

LOG_DIR = "../log"


def parse_args():
    parser = argparse.ArgumentParser(
        description="run a script with automated logging."
    )

    parser.add_argument(
        "scripts",
        type=str,
        nargs="+",
        help="the script(s) to execute, e.g. 'my_script.py' or 'my_script0.py my_script1.py my_script2.py'",
    )

    return parser.parse_args()


if __name__ == "__main__":
    args = parse_args()

    if not os.path.exists(LOG_DIR):
        os.makedirs(LOG_DIR, exist_ok=True)

    for script in args.scripts:
        scriptname, fileext = os.path.splitext(script)
        std_log = os.path.join(LOG_DIR, f"{scriptname}.std.log")
        err_log = os.path.join(LOG_DIR, f"{scriptname}.err.log")

        if fileext == ".py":
            command = f"{sys.executable} -u  {script} > {std_log} 2> {err_log}"
            print("running: " + command)
            run(command, universal_newlines=True, shell=True)
        else:
            print(f"Skipping {script} (unknown file extension)")

Run the Script Using the Runscript

Run the tool in the Miniforge Prompt using the command

python run_script.py 01_data_prepararation 02_data_processing.py

< Previous section