424 lines
13 KiB
Elixir
424 lines
13 KiB
Elixir
defmodule MoundHunters.TileProcessor do
|
|
@moduledoc """
|
|
GenServer that processes tile requests: lookup tile IDs and download/convert tiles.
|
|
|
|
Processing pipeline:
|
|
1. :looking_up - Query ArcGIS for tile metadata
|
|
2. :downloading - Download ZIP from OGRIP
|
|
3. :extracting - Unzip the archive
|
|
4. :converting - Run las2mound.py to create .mound file
|
|
5. :done - Files ready in priv/tiles/
|
|
"""
|
|
use GenServer
|
|
require Logger
|
|
|
|
alias MoundHunters.OhioLidar
|
|
|
|
# Processing statuses
|
|
@type lookup_status :: :pending | {:ok, String.t()} | {:error, String.t()}
|
|
@type processing_status ::
|
|
:queued | :downloading | :extracting | :converting | :done | {:error, String.t()}
|
|
|
|
# Client API
|
|
|
|
def start_link(opts) do
|
|
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
|
|
end
|
|
|
|
@doc """
|
|
Request processing for a lat/lng coordinate pair.
|
|
Returns the lookup_id that can be used to poll progress.
|
|
"""
|
|
def request_tile(lat, lng) do
|
|
lookup_id = MoundHunters.Boundary.format_lookup_id(lat, lng)
|
|
GenServer.cast(__MODULE__, {:request_tile, lookup_id, lat, lng})
|
|
{:ok, lookup_id}
|
|
end
|
|
|
|
@doc """
|
|
Get the current lookup status for a coordinate pair.
|
|
"""
|
|
def get_lookup_status(lookup_id) do
|
|
case :ets.lookup(:tile_lookups, lookup_id) do
|
|
[{^lookup_id, status}] -> {:ok, status}
|
|
[] -> {:error, :not_found}
|
|
end
|
|
end
|
|
|
|
@doc """
|
|
Get the current processing status for a tile.
|
|
"""
|
|
def get_processing_status(tile_id) do
|
|
case :ets.lookup(:tile_processing, tile_id) do
|
|
[{^tile_id, status, _metadata}] -> {:ok, status}
|
|
[] -> {:error, :not_found}
|
|
end
|
|
end
|
|
|
|
@doc """
|
|
Get full metadata for a processing tile.
|
|
"""
|
|
def get_processing_metadata(tile_id) do
|
|
case :ets.lookup(:tile_processing, tile_id) do
|
|
[{^tile_id, status, metadata}] -> {:ok, status, metadata}
|
|
[] -> {:error, :not_found}
|
|
end
|
|
end
|
|
|
|
# Server callbacks
|
|
|
|
@impl true
|
|
def init(_opts) do
|
|
# Create ETS tables
|
|
:ets.new(:tile_lookups, [:set, :public, :named_table])
|
|
:ets.new(:tile_processing, [:set, :public, :named_table])
|
|
|
|
# Ensure temp directory exists
|
|
temp_dir = Application.get_env(:mound_hunters, :tile_temp_dir)
|
|
File.mkdir_p!(temp_dir)
|
|
root_output_dir = Application.get_env(:mound_hunters, :tile_output_dir)
|
|
mound_dir = Path.join(root_output_dir, "MOUND")
|
|
|
|
state = %{
|
|
las2mound_script: Application.get_env(:mound_hunters, :las2mound_script_path),
|
|
tile_output_dir: mound_dir,
|
|
tile_temp_dir: temp_dir,
|
|
processing_queue: :queue.new(),
|
|
current_job: nil
|
|
}
|
|
|
|
Logger.info("TileProcessor started")
|
|
{:ok, state}
|
|
end
|
|
|
|
@impl true
|
|
def handle_cast({:request_tile, lookup_id, lat, lng}, state) do
|
|
# Check if we already have this lookup in progress or completed
|
|
case :ets.lookup(:tile_lookups, lookup_id) do
|
|
[] ->
|
|
# New request - insert as pending and start lookup
|
|
:ets.insert(:tile_lookups, {lookup_id, :pending})
|
|
Logger.info("New tile request for #{lookup_id} (#{lat}, #{lng})")
|
|
|
|
# Start lookup asynchronously
|
|
send(self(), {:lookup_tile, lookup_id, lat, lng})
|
|
|
|
[{^lookup_id, _status}] ->
|
|
# Already in progress or completed, ignore duplicate request
|
|
Logger.debug("Duplicate tile request for #{lookup_id}, ignoring")
|
|
end
|
|
|
|
{:noreply, state}
|
|
end
|
|
|
|
@impl true
|
|
def handle_info({:lookup_tile, lookup_id, lat, lng}, state) do
|
|
Logger.info("Looking up tile for coordinates (#{lat}, #{lng})")
|
|
|
|
# First, check Mnesia to see if we already have a tile at these coordinates
|
|
case MoundHunters.Repo.get_tile_at_coords(lat, lng) do
|
|
{:ok, tile} ->
|
|
# Found in Mnesia - use cached tile_id
|
|
tile_name = tile.id
|
|
Logger.info("Found tile #{tile_name} in Mnesia (cached), skipping ArcGIS query")
|
|
|
|
# Update lookup table with success
|
|
:ets.insert(:tile_lookups, {lookup_id, {:ok, tile_name}})
|
|
|
|
# Check if tile file still exists
|
|
output_file = Path.join(state.tile_output_dir, "#{tile_name}.mound")
|
|
|
|
if File.exists?(output_file) do
|
|
Logger.info("Tile #{tile_name} already processed, marking as done")
|
|
:ets.insert(:tile_processing, {tile_name, :done, %{tile_info: tile}})
|
|
{:noreply, state}
|
|
else
|
|
# Tile in DB but file missing - this shouldn't happen, but handle it
|
|
Logger.warning("Tile #{tile_name} in Mnesia but file missing, re-querying ArcGIS")
|
|
query_arcgis_and_process(lookup_id, lat, lng, state)
|
|
end
|
|
|
|
{:error, :not_found} ->
|
|
# Not in Mnesia - query ArcGIS
|
|
Logger.debug("Tile not in Mnesia, querying ArcGIS")
|
|
query_arcgis_and_process(lookup_id, lat, lng, state)
|
|
|
|
{:error, reason} ->
|
|
Logger.error("Failed to query Mnesia: #{inspect(reason)}")
|
|
# Fall back to ArcGIS query
|
|
query_arcgis_and_process(lookup_id, lat, lng, state)
|
|
end
|
|
end
|
|
|
|
@impl true
|
|
def handle_info({:process_tile, tile_name, tile_info}, state) do
|
|
Logger.info("Starting processing for tile #{tile_name}")
|
|
|
|
# Update status to downloading
|
|
:ets.insert(:tile_processing, {tile_name, :downloading, %{tile_info: tile_info}})
|
|
|
|
# Download ZIP
|
|
temp_zip = Path.join(state.tile_temp_dir, "#{tile_name}.zip")
|
|
|
|
case OhioLidar.download_tile(tile_name, tile_info.county, temp_zip) do
|
|
{:ok, _bytes} ->
|
|
send(self(), {:extract_tile, tile_name, tile_info, temp_zip})
|
|
{:noreply, state}
|
|
|
|
{:error, reason} ->
|
|
Logger.error("Failed to download tile #{tile_name}: #{inspect(reason)}")
|
|
|
|
:ets.insert(
|
|
:tile_processing,
|
|
{tile_name, {:error, "Download failed: #{inspect(reason)}"}, %{}}
|
|
)
|
|
|
|
# Clean up
|
|
File.rm(temp_zip)
|
|
|
|
# Process next in queue
|
|
state = process_next_in_queue(state)
|
|
{:noreply, state}
|
|
end
|
|
end
|
|
|
|
@impl true
|
|
def handle_info({:extract_tile, tile_name, tile_info, zip_path}, state) do
|
|
Logger.info("Extracting tile #{tile_name}")
|
|
:ets.insert(:tile_processing, {tile_name, :extracting, %{tile_info: tile_info}})
|
|
|
|
extract_dir = Path.join(state.tile_temp_dir, tile_name)
|
|
File.mkdir_p!(extract_dir)
|
|
|
|
case unzip_file(zip_path, extract_dir) do
|
|
:ok ->
|
|
# Find the .las file
|
|
case find_las_file(extract_dir) do
|
|
{:ok, las_file} ->
|
|
send(self(), {:convert_tile, tile_name, tile_info, las_file, extract_dir})
|
|
{:noreply, state}
|
|
|
|
{:error, reason} ->
|
|
Logger.error("Failed to find LAS file in #{extract_dir}: #{reason}")
|
|
:ets.insert(:tile_processing, {tile_name, {:error, "No LAS file found"}, %{}})
|
|
|
|
# Clean up
|
|
File.rm_rf!(extract_dir)
|
|
File.rm(zip_path)
|
|
|
|
state = process_next_in_queue(state)
|
|
{:noreply, state}
|
|
end
|
|
|
|
{:error, reason} ->
|
|
Logger.error("Failed to extract #{zip_path}: #{inspect(reason)}")
|
|
|
|
:ets.insert(
|
|
:tile_processing,
|
|
{tile_name, {:error, "Extraction failed: #{inspect(reason)}"}, %{}}
|
|
)
|
|
|
|
# Clean up
|
|
File.rm_rf!(extract_dir)
|
|
File.rm(zip_path)
|
|
|
|
state = process_next_in_queue(state)
|
|
{:noreply, state}
|
|
end
|
|
end
|
|
|
|
@impl true
|
|
def handle_info({:convert_tile, tile_name, tile_info, las_file, temp_dir}, state) do
|
|
Logger.info("Converting tile #{tile_name} to mound format")
|
|
:ets.insert(:tile_processing, {tile_name, :converting, %{tile_info: tile_info}})
|
|
|
|
output_file = Path.join(state.tile_output_dir, "#{tile_name}.mound")
|
|
File.mkdir_p!(state.tile_output_dir)
|
|
|
|
case run_las2mound(state.las2mound_script, las_file, output_file) do
|
|
:ok ->
|
|
Logger.info("Successfully converted #{tile_name}")
|
|
|
|
# Parse header to extract bounds
|
|
case MoundHunters.MoundParser.parse_header(output_file) do
|
|
{:ok, header} ->
|
|
# Convert Web Mercator coordinates to lat/lng
|
|
bounds =
|
|
MoundHunters.MoundParser.bounds_web_mercator_to_latlon(%{
|
|
min_x: header.min_x,
|
|
max_x: header.max_x,
|
|
min_y: header.min_y,
|
|
max_y: header.max_y
|
|
})
|
|
|
|
Logger.debug(
|
|
"Tile #{tile_name} bounds: lat [#{bounds.min_lat}, #{bounds.max_lat}], " <>
|
|
"lng [#{bounds.min_lng}, #{bounds.max_lng}]"
|
|
)
|
|
|
|
:ets.insert(
|
|
:tile_processing,
|
|
{tile_name, :done, %{tile_info: tile_info, header: header}}
|
|
)
|
|
|
|
# Update Mnesia with extracted bounds
|
|
MoundHunters.Repo.upsert_tile(%{
|
|
id: tile_name,
|
|
status: :ready,
|
|
min_lat: bounds.min_lat,
|
|
max_lat: bounds.max_lat,
|
|
min_lng: bounds.min_lng,
|
|
max_lng: bounds.max_lng
|
|
})
|
|
|
|
# Clean up temp files
|
|
File.rm_rf!(temp_dir)
|
|
File.rm(Path.join(state.tile_temp_dir, "#{tile_name}.zip"))
|
|
|
|
state = process_next_in_queue(state)
|
|
{:noreply, state}
|
|
|
|
{:error, reason} ->
|
|
Logger.error("Failed to parse header for #{tile_name}: #{inspect(reason)}")
|
|
|
|
:ets.insert(
|
|
:tile_processing,
|
|
{tile_name, {:error, "Header parsing failed: #{inspect(reason)}"}, %{}}
|
|
)
|
|
|
|
# Keep the .mound file for debugging, but clean up temp files
|
|
File.rm_rf!(temp_dir)
|
|
File.rm(Path.join(state.tile_temp_dir, "#{tile_name}.zip"))
|
|
|
|
state = process_next_in_queue(state)
|
|
{:noreply, state}
|
|
end
|
|
|
|
{:error, reason} ->
|
|
Logger.error("Failed to convert #{tile_name}: #{inspect(reason)}")
|
|
|
|
:ets.insert(
|
|
:tile_processing,
|
|
{tile_name, {:error, "Conversion failed: #{inspect(reason)}"}, %{}}
|
|
)
|
|
|
|
# Clean up
|
|
File.rm_rf!(temp_dir)
|
|
File.rm(Path.join(state.tile_temp_dir, "#{tile_name}.zip"))
|
|
|
|
state = process_next_in_queue(state)
|
|
{:noreply, state}
|
|
end
|
|
end
|
|
|
|
# Helper functions
|
|
|
|
defp query_arcgis_and_process(lookup_id, lat, lng, state) do
|
|
case OhioLidar.query_tile_info(lng, lat) do
|
|
{:ok, tile_info} ->
|
|
tile_name = tile_info.tile_name
|
|
Logger.info("Found tile: #{tile_name} in #{tile_info.county} county")
|
|
|
|
# Update lookup table with success
|
|
:ets.insert(:tile_lookups, {lookup_id, {:ok, tile_name}})
|
|
|
|
# Check if tile already exists
|
|
output_file = Path.join(state.tile_output_dir, "#{tile_name}.mound")
|
|
|
|
if File.exists?(output_file) do
|
|
Logger.info("Tile #{tile_name} already processed, marking as done")
|
|
:ets.insert(:tile_processing, {tile_name, :done, %{tile_info: tile_info}})
|
|
{:noreply, state}
|
|
else
|
|
# Queue for processing
|
|
state = queue_tile_for_processing(state, tile_name, tile_info)
|
|
{:noreply, state}
|
|
end
|
|
|
|
{:error, :no_tile_found} ->
|
|
Logger.warning("No tile found for coordinates (#{lat}, #{lng})")
|
|
:ets.insert(:tile_lookups, {lookup_id, {:error, "No tile found at coordinates"}})
|
|
{:noreply, state}
|
|
|
|
{:error, reason} ->
|
|
Logger.error("Failed to lookup tile: #{inspect(reason)}")
|
|
:ets.insert(:tile_lookups, {lookup_id, {:error, "Lookup failed: #{inspect(reason)}"}})
|
|
{:noreply, state}
|
|
end
|
|
end
|
|
|
|
defp queue_tile_for_processing(state, tile_name, tile_info) do
|
|
# Check if already in queue or processing
|
|
case :ets.lookup(:tile_processing, tile_name) do
|
|
[] ->
|
|
# Add to queue
|
|
:ets.insert(:tile_processing, {tile_name, :queued, %{tile_info: tile_info}})
|
|
new_queue = :queue.in({tile_name, tile_info}, state.processing_queue)
|
|
state = %{state | processing_queue: new_queue}
|
|
|
|
# If nothing currently processing, start now
|
|
if state.current_job == nil do
|
|
process_next_in_queue(state)
|
|
else
|
|
state
|
|
end
|
|
|
|
[{^tile_name, _status, _metadata}] ->
|
|
Logger.debug("Tile #{tile_name} already in processing queue")
|
|
state
|
|
end
|
|
end
|
|
|
|
defp process_next_in_queue(state) do
|
|
case :queue.out(state.processing_queue) do
|
|
{{:value, {tile_name, tile_info}}, new_queue} ->
|
|
Logger.info("Processing next tile in queue: #{tile_name}")
|
|
send(self(), {:process_tile, tile_name, tile_info})
|
|
%{state | processing_queue: new_queue, current_job: tile_name}
|
|
|
|
{:empty, _} ->
|
|
Logger.debug("Processing queue empty")
|
|
%{state | current_job: nil}
|
|
end
|
|
end
|
|
|
|
defp unzip_file(zip_path, extract_dir) do
|
|
case :zip.unzip(String.to_charlist(zip_path), cwd: String.to_charlist(extract_dir)) do
|
|
{:ok, _files} ->
|
|
:ok
|
|
|
|
{:error, reason} ->
|
|
{:error, reason}
|
|
end
|
|
end
|
|
|
|
defp find_las_file(dir) do
|
|
case File.ls(dir) do
|
|
{:ok, files} ->
|
|
las_files = Enum.filter(files, &String.ends_with?(&1, ".las"))
|
|
|
|
case las_files do
|
|
[las_file | _] -> {:ok, Path.join(dir, las_file)}
|
|
[] -> {:error, :no_las_file}
|
|
end
|
|
|
|
{:error, reason} ->
|
|
{:error, reason}
|
|
end
|
|
end
|
|
|
|
defp run_las2mound(script_path, input_las, output_mound) do
|
|
Logger.info("Running: #{script_path} #{input_las} #{output_mound}")
|
|
|
|
case System.cmd("python3", [script_path, input_las, output_mound], stderr_to_stdout: true) do
|
|
{output, 0} ->
|
|
Logger.debug("las2mound output: #{output}")
|
|
:ok
|
|
|
|
{output, exit_code} ->
|
|
Logger.error("las2mound failed with exit code #{exit_code}: #{output}")
|
|
{:error, "Exit code #{exit_code}"}
|
|
end
|
|
end
|
|
end
|