defmodule MoundHunters.TileProcessor do @moduledoc """ GenServer that processes tile requests: lookup tile IDs and download/convert tiles. Processing pipeline: 1. :looking_up - Query ArcGIS for tile metadata 2. :downloading - Download ZIP from OGRIP 3. :extracting - Unzip the archive 4. :converting - Run las2mound.py to create .mound file 5. :done - Files ready in priv/tiles/ """ use GenServer require Logger alias MoundHunters.OhioLidar # Processing statuses @type lookup_status :: :pending | {:ok, String.t()} | {:error, String.t()} @type processing_status :: :queued | :downloading | :extracting | :converting | :done | {:error, String.t()} # Client API def start_link(opts) do GenServer.start_link(__MODULE__, opts, name: __MODULE__) end @doc """ Request processing for a lat/lng coordinate pair. Returns the lookup_id that can be used to poll progress. """ def request_tile(lat, lng) do lookup_id = MoundHunters.Boundary.format_lookup_id(lat, lng) GenServer.cast(__MODULE__, {:request_tile, lookup_id, lat, lng}) {:ok, lookup_id} end @doc """ Get the current lookup status for a coordinate pair. """ def get_lookup_status(lookup_id) do case :ets.lookup(:tile_lookups, lookup_id) do [{^lookup_id, status}] -> {:ok, status} [] -> {:error, :not_found} end end @doc """ Get the current processing status for a tile. """ def get_processing_status(tile_id) do case :ets.lookup(:tile_processing, tile_id) do [{^tile_id, status, _metadata}] -> {:ok, status} [] -> {:error, :not_found} end end @doc """ Get full metadata for a processing tile. """ def get_processing_metadata(tile_id) do case :ets.lookup(:tile_processing, tile_id) do [{^tile_id, status, metadata}] -> {:ok, status, metadata} [] -> {:error, :not_found} end end # Server callbacks @impl true def init(_opts) do # Create ETS tables :ets.new(:tile_lookups, [:set, :public, :named_table]) :ets.new(:tile_processing, [:set, :public, :named_table]) # Ensure temp directory exists temp_dir = Application.get_env(:mound_hunters, :tile_temp_dir) File.mkdir_p!(temp_dir) state = %{ las2mound_script: Application.get_env(:mound_hunters, :las2mound_script_path), tile_output_dir: Application.get_env(:mound_hunters, :tile_output_dir), tile_temp_dir: temp_dir, processing_queue: :queue.new(), current_job: nil } Logger.info("TileProcessor started") {:ok, state} end @impl true def handle_cast({:request_tile, lookup_id, lat, lng}, state) do # Check if we already have this lookup in progress or completed case :ets.lookup(:tile_lookups, lookup_id) do [] -> # New request - insert as pending and start lookup :ets.insert(:tile_lookups, {lookup_id, :pending}) Logger.info("New tile request for #{lookup_id} (#{lat}, #{lng})") # Start lookup asynchronously send(self(), {:lookup_tile, lookup_id, lat, lng}) [{^lookup_id, _status}] -> # Already in progress or completed, ignore duplicate request Logger.debug("Duplicate tile request for #{lookup_id}, ignoring") end {:noreply, state} end @impl true def handle_info({:lookup_tile, lookup_id, lat, lng}, state) do Logger.info("Looking up tile for coordinates (#{lat}, #{lng})") case OhioLidar.query_tile_info(lng, lat) do {:ok, tile_info} -> tile_name = tile_info.tile_name Logger.info("Found tile: #{tile_name} in #{tile_info.county} county") # Update lookup table with success :ets.insert(:tile_lookups, {lookup_id, {:ok, tile_name}}) # Check if tile already exists output_file = Path.join(state.tile_output_dir, "#{tile_name}.mound") if File.exists?(output_file) do Logger.info("Tile #{tile_name} already processed, marking as done") :ets.insert(:tile_processing, {tile_name, :done, %{tile_info: tile_info}}) else # Queue for processing state = queue_tile_for_processing(state, tile_name, tile_info) {:noreply, state} end {:error, :no_tile_found} -> Logger.warning("No tile found for coordinates (#{lat}, #{lng})") :ets.insert(:tile_lookups, {lookup_id, {:error, "No tile found at coordinates"}}) {:noreply, state} {:error, reason} -> Logger.error("Failed to lookup tile: #{inspect(reason)}") :ets.insert(:tile_lookups, {lookup_id, {:error, "Lookup failed: #{inspect(reason)}"}}) {:noreply, state} end end @impl true def handle_info({:process_tile, tile_name, tile_info}, state) do Logger.info("Starting processing for tile #{tile_name}") # Update status to downloading :ets.insert(:tile_processing, {tile_name, :downloading, %{tile_info: tile_info}}) # Download ZIP temp_zip = Path.join(state.tile_temp_dir, "#{tile_name}.zip") case OhioLidar.download_tile(tile_name, tile_info.county, temp_zip) do {:ok, _bytes} -> send(self(), {:extract_tile, tile_name, tile_info, temp_zip}) {:noreply, state} {:error, reason} -> Logger.error("Failed to download tile #{tile_name}: #{inspect(reason)}") :ets.insert(:tile_processing, {tile_name, {:error, "Download failed: #{inspect(reason)}"}, %{}}) # Clean up File.rm(temp_zip) # Process next in queue state = process_next_in_queue(state) {:noreply, state} end end @impl true def handle_info({:extract_tile, tile_name, tile_info, zip_path}, state) do Logger.info("Extracting tile #{tile_name}") :ets.insert(:tile_processing, {tile_name, :extracting, %{tile_info: tile_info}}) extract_dir = Path.join(state.tile_temp_dir, tile_name) File.mkdir_p!(extract_dir) case unzip_file(zip_path, extract_dir) do :ok -> # Find the .las file case find_las_file(extract_dir) do {:ok, las_file} -> send(self(), {:convert_tile, tile_name, tile_info, las_file, extract_dir}) {:noreply, state} {:error, reason} -> Logger.error("Failed to find LAS file in #{extract_dir}: #{reason}") :ets.insert(:tile_processing, {tile_name, {:error, "No LAS file found"}, %{}}) # Clean up File.rm_rf!(extract_dir) File.rm(zip_path) state = process_next_in_queue(state) {:noreply, state} end {:error, reason} -> Logger.error("Failed to extract #{zip_path}: #{inspect(reason)}") :ets.insert(:tile_processing, {tile_name, {:error, "Extraction failed: #{inspect(reason)}"}, %{}}) # Clean up File.rm_rf!(extract_dir) File.rm(zip_path) state = process_next_in_queue(state) {:noreply, state} end end @impl true def handle_info({:convert_tile, tile_name, tile_info, las_file, temp_dir}, state) do Logger.info("Converting tile #{tile_name} to mound format") :ets.insert(:tile_processing, {tile_name, :converting, %{tile_info: tile_info}}) output_file = Path.join(state.tile_output_dir, "#{tile_name}.mound") File.mkdir_p!(state.tile_output_dir) case run_las2mound(state.las2mound_script, las_file, output_file) do :ok -> Logger.info("Successfully converted #{tile_name}") :ets.insert(:tile_processing, {tile_name, :done, %{tile_info: tile_info}}) # Update Mnesia MoundHunters.Repo.upsert_tile(%{ id: tile_name, status: :ready, min_lat: nil, # TODO: Extract from tile_info or LAS bounds max_lat: nil, min_lng: nil, max_lng: nil }) # Clean up temp files File.rm_rf!(temp_dir) File.rm(Path.join(state.tile_temp_dir, "#{tile_name}.zip")) state = process_next_in_queue(state) {:noreply, state} {:error, reason} -> Logger.error("Failed to convert #{tile_name}: #{inspect(reason)}") :ets.insert(:tile_processing, {tile_name, {:error, "Conversion failed: #{inspect(reason)}"}, %{}}) # Clean up File.rm_rf!(temp_dir) File.rm(Path.join(state.tile_temp_dir, "#{tile_name}.zip")) state = process_next_in_queue(state) {:noreply, state} end end # Helper functions defp queue_tile_for_processing(state, tile_name, tile_info) do # Check if already in queue or processing case :ets.lookup(:tile_processing, tile_name) do [] -> # Add to queue :ets.insert(:tile_processing, {tile_name, :queued, %{tile_info: tile_info}}) new_queue = :queue.in({tile_name, tile_info}, state.processing_queue) state = %{state | processing_queue: new_queue} # If nothing currently processing, start now if state.current_job == nil do process_next_in_queue(state) else state end [{^tile_name, _status, _metadata}] -> Logger.debug("Tile #{tile_name} already in processing queue") state end end defp process_next_in_queue(state) do case :queue.out(state.processing_queue) do {{:value, {tile_name, tile_info}}, new_queue} -> Logger.info("Processing next tile in queue: #{tile_name}") send(self(), {:process_tile, tile_name, tile_info}) %{state | processing_queue: new_queue, current_job: tile_name} {:empty, _} -> Logger.debug("Processing queue empty") %{state | current_job: nil} end end defp unzip_file(zip_path, extract_dir) do case :zip.unzip(String.to_charlist(zip_path), cwd: String.to_charlist(extract_dir)) do {:ok, _files} -> :ok {:error, reason} -> {:error, reason} end end defp find_las_file(dir) do case File.ls(dir) do {:ok, files} -> las_files = Enum.filter(files, &String.ends_with?(&1, ".las")) case las_files do [las_file | _] -> {:ok, Path.join(dir, las_file)} [] -> {:error, :no_las_file} end {:error, reason} -> {:error, reason} end end defp run_las2mound(script_path, input_las, output_mound) do Logger.info("Running: #{script_path} #{input_las} #{output_mound}") case System.cmd("python3", [script_path, input_las, output_mound], stderr_to_stdout: true) do {output, 0} -> Logger.debug("las2mound output: #{output}") :ok {output, exit_code} -> Logger.error("las2mound failed with exit code #{exit_code}: #{output}") {:error, "Exit code #{exit_code}"} end end end