loadTemplateMeta($import); // Load entity configurations and handlers $this->loadEntityConfigurations(); // Only CSV/TSV supported for now if (! in_array($import->source_type, ['csv', 'txt'])) { ImportEvent::create([ 'import_id' => $import->id, 'user_id' => $user?->getAuthIdentifier(), 'event' => 'processing_skipped', 'level' => 'warning', 'message' => 'Only CSV/TXT supported in v2 processor.', ]); $import->update(['status' => 'completed', 'finished_at' => now()]); return compact('total', 'imported', 'skipped', 'invalid'); } $import->update(['status' => 'processing', 'started_at' => $started]); $filePath = $import->path; if (! Storage::disk($import->disk ?? 'local')->exists($filePath)) { throw new \RuntimeException("File not found: {$filePath}"); } // Check if this is a retry (import_rows already exist) $isRetry = ImportRow::where('import_id', $import->id)->exists(); $fullPath = Storage::disk($import->disk ?? 'local')->path($filePath); $fh = fopen($fullPath, 'r'); if (! $fh) { throw new \RuntimeException("Could not open file: {$filePath}"); } $meta = $import->meta ?? []; $hasHeader = (bool) ($meta['has_header'] ?? true); $delimiter = $meta['forced_delimiter'] ?? $meta['detected_delimiter'] ?? ','; $mappings = $this->loadMappings($import); $header = null; $rowNum = 0; // Read header if present if ($hasHeader) { $header = fgetcsv($fh, 0, $delimiter); $rowNum++; } // If retry mode, only process failed/invalid rows if ($isRetry) { $failedRows = ImportRow::where('import_id', $import->id) ->whereIn('status', ['invalid', 'failed']) ->orderBy('row_number') ->get(); foreach ($failedRows as $importRow) { $total++; try { $rawAssoc = $importRow->raw_data; $mapped = $importRow->mapped_data; // Process entities in priority order within a transaction $context = ['import' => $import, 'user' => $user, 'import_row' => $importRow]; DB::beginTransaction(); try { $results = $this->processRow($import, $mapped, $rawAssoc, $context); // If processing succeeded, commit the transaction if ($results['status'] === 'imported' || $results['status'] === 'skipped') { DB::commit(); } else { DB::rollBack(); } } catch (\Throwable $e) { DB::rollBack(); throw $e; } // Collect entity details from results $entityData = $this->collectEntityDetails($results); $entityDetails = $entityData['details']; $hasErrors = $entityData['hasErrors']; $hasWarnings = $entityData['hasWarnings']; // Handle different result statuses if ($results['status'] === 'imported') { $imported++; $importRow->update([ 'status' => 'imported', 'entity_type' => $results['entity_type'] ?? null, 'entity_id' => $results['entity_id'] ?? null, ]); $this->createRowProcessedEvent($import, $user, $importRow->row_number, $entityDetails, $hasWarnings, $rawAssoc); } elseif ($results['status'] === 'skipped') { $skipped++; $importRow->update(['status' => 'skipped']); $this->createRowSkippedEvent($import, $user, $importRow->row_number, $entityDetails, $rawAssoc); } else { $invalid++; $importRow->update([ 'status' => 'invalid', 'errors' => $results['errors'] ?? ['Processing failed'], ]); $this->createRowFailedEvent( $import, $user, $importRow->row_number, $results['errors'] ?? ['Processing failed'], $entityDetails, $rawAssoc ); } } catch (\Throwable $e) { $invalid++; $this->handleRowException($import, $user, $importRow->row_number, $e); } } fclose($fh); } else { // Normal mode: process all rows from CSV while (($row = fgetcsv($fh, 0, $delimiter)) !== false) { $rowNum++; $total++; try { $rawAssoc = $this->buildRowAssoc($row, $header); // Skip empty rows if ($this->rowIsEffectivelyEmpty($rawAssoc)) { $skipped++; continue; } $mapped = $this->applyMappings($rawAssoc, $mappings); $rawSha1 = sha1(json_encode($rawAssoc)); $importRow = ImportRow::create([ 'import_id' => $import->id, 'row_number' => $rowNum, 'record_type' => $this->determineRecordType($mapped), 'raw_data' => $rawAssoc, 'mapped_data' => $mapped, 'status' => 'valid', 'raw_sha1' => $rawSha1, ]); // Process entities in priority order within a transaction $context = ['import' => $import, 'user' => $user, 'import_row' => $importRow]; DB::beginTransaction(); try { $results = $this->processRow($import, $mapped, $rawAssoc, $context); // If processing succeeded, commit the transaction if ($results['status'] === 'imported' || $results['status'] === 'skipped') { DB::commit(); } else { DB::rollBack(); } } catch (\Throwable $e) { DB::rollBack(); throw $e; } // Collect entity details from results $entityData = $this->collectEntityDetails($results); $entityDetails = $entityData['details']; $hasErrors = $entityData['hasErrors']; $hasWarnings = $entityData['hasWarnings']; // Handle different result statuses if ($results['status'] === 'imported') { $imported++; $importRow->update([ 'status' => 'imported', 'entity_type' => $results['entity_type'] ?? null, 'entity_id' => $results['entity_id'] ?? null, ]); $this->createRowProcessedEvent($import, $user, $rowNum, $entityDetails, $hasWarnings, $rawAssoc); } elseif ($results['status'] === 'skipped') { $skipped++; $importRow->update(['status' => 'skipped']); $this->createRowSkippedEvent($import, $user, $rowNum, $entityDetails, $rawAssoc); } else { $invalid++; $importRow->update([ 'status' => 'invalid', 'errors' => $results['errors'] ?? ['Processing failed'], ]); $this->createRowFailedEvent( $import, $user, $rowNum, $results['errors'] ?? ['Processing failed'], $entityDetails, $rawAssoc ); } } catch (\Throwable $e) { $invalid++; $this->handleRowException($import, $user, $rowNum, $e); } } fclose($fh); } $this->finalizeImport($import, $user, $total, $imported, $skipped, $invalid); } catch (\Throwable $e) { $this->handleFatalException($import, $user, $e); throw $e; } return compact('total', 'imported', 'skipped', 'invalid'); } /** * Load entity configurations from database. */ protected function loadEntityConfigurations(): void { $entities = ImportEntity::where('is_active', true) ->orderBy('priority', 'desc') ->get(); foreach ($entities as $entity) { $this->entityConfigs[$entity->canonical_root] = $entity; // Instantiate handler if specified if ($entity->handler_class && class_exists($entity->handler_class)) { $this->handlers[$entity->canonical_root] = new $entity->handler_class($entity); } } } /** * Load mappings for import. */ protected function loadMappings(Import $import) { return DB::table('import_mappings') ->where('import_id', $import->id) ->orderBy('position') ->get(); } /** * Build associative array from row. */ protected function buildRowAssoc(array $row, ?array $header): array { if ($header) { $result = []; foreach ($header as $idx => $col) { $result[$col] = $row[$idx] ?? null; } return $result; } return array_combine(range(0, count($row) - 1), $row); } /** * Check if row is effectively empty. */ protected function rowIsEffectivelyEmpty(array $raw): bool { foreach ($raw as $val) { if (! is_null($val) && trim((string) $val) !== '') { return false; } } return true; } /** * Apply mappings to raw data. */ protected function applyMappings(array $raw, $mappings): array { $mapped = []; // Group mappings by target field to handle concatenation $groupedMappings = []; foreach ($mappings as $mapping) { $targetField = $mapping->target_field; if (!isset($groupedMappings[$targetField])) { $groupedMappings[$targetField] = []; } $groupedMappings[$targetField][] = $mapping; } foreach ($groupedMappings as $targetField => $fieldMappings) { // Special handling for meta fields: contracts.meta or other_entity.meta if (str_ends_with($targetField, '.meta')) { $this->applyMetaMappings($mapped, $targetField, $fieldMappings, $raw); continue; } // Group by group number from options $valuesByGroup = []; foreach ($fieldMappings as $mapping) { $sourceCol = $mapping->source_column; if (!isset($raw[$sourceCol])) { continue; } $value = $raw[$sourceCol]; // Apply transform if ($mapping->transform) { $value = $this->applyTransform($value, $mapping->transform); } // Get group from options $options = $mapping->options ? json_decode($mapping->options, true) : []; $group = $options['group'] ?? null; // Group values by their group number if ($group !== null) { // Same group = concatenate if (!isset($valuesByGroup[$group])) { $valuesByGroup[$group] = []; } $valuesByGroup[$group][] = $value; } else { // No group = each gets its own group $valuesByGroup[] = [$value]; } } // Now set the values foreach ($valuesByGroup as $values) { if (count($values) === 1) { // Single value - set directly $this->setNestedValue($mapped, $targetField, $values[0]); } else { // Multiple values in same group - concatenate with newline $concatenated = implode("\n", array_filter($values, fn($v) => !empty($v) && trim((string)$v) !== '')); if (!empty($concatenated)) { $this->setNestedValue($mapped, $targetField, $concatenated); } } } } return $mapped; } /** * Apply meta mappings with special structure: entity.meta[group][key] = {title, value, type} */ protected function applyMetaMappings(array &$mapped, string $targetField, array $fieldMappings, array $raw): void { // Extract entity from target field: contracts.meta -> contracts $entity = str_replace('.meta', '', $targetField); foreach ($fieldMappings as $mapping) { $sourceCol = $mapping->source_column; if (!isset($raw[$sourceCol])) { continue; } $value = $raw[$sourceCol]; // Apply transform if ($mapping->transform) { $value = $this->applyTransform($value, $mapping->transform); } // Get options $options = $mapping->options ? json_decode($mapping->options, true) : []; $metaKey = $options['key'] ?? null; $metaType = $options['type'] ?? 'string'; $group = $options['group'] ?? '1'; if (!$metaKey) { continue; } // Coerce value based on type $coerced = $value; if ($metaType === 'number') { if (is_string($coerced)) { $norm = DecimalNormalizer::normalize($coerced); $coerced = is_numeric($norm) ? (float) $norm : $coerced; } } elseif ($metaType === 'boolean') { if (is_string($coerced)) { $lc = strtolower(trim($coerced)); if (in_array($lc, ['1', 'true', 'yes', 'y'], true)) { $coerced = true; } elseif (in_array($lc, ['0', 'false', 'no', 'n'], true)) { $coerced = false; } } else { $coerced = (bool) $coerced; } } elseif ($metaType === 'date') { $coerced = is_scalar($coerced) ? $this->normalizeDate((string) $coerced) : null; } else { // string or unspecified: cast scalars to string if (is_scalar($coerced)) { $coerced = (string) $coerced; } } // Initialize structure if needed if (!isset($mapped[$entity])) { $mapped[$entity] = []; } if (!isset($mapped[$entity]['meta']) || !is_array($mapped[$entity]['meta'])) { $mapped[$entity]['meta'] = []; } if (!isset($mapped[$entity]['meta'][$group])) { $mapped[$entity]['meta'][$group] = []; } // Store as structure with title, value and type $entry = [ 'title' => $sourceCol, 'value' => $coerced, ]; if ($metaType) { $entry['type'] = $metaType; } $mapped[$entity]['meta'][$group][$metaKey] = $entry; } } /** * Apply transform to value. */ protected function applyTransform(mixed $value, string $transform): mixed { return match (strtolower($transform)) { 'trim' => is_string($value) ? trim($value) : $value, 'upper' => is_string($value) ? strtoupper($value) : $value, 'lower' => is_string($value) ? strtolower($value) : $value, 'date' => $this->normalizeDate($value), default => $value, }; } /** * Normalize date value. */ protected function normalizeDate(mixed $value): ?string { if (empty($value)) { return null; } try { return DateNormalizer::toDate((string) $value); } catch (\Throwable $e) { return null; } } /** * Set nested value in array using dot notation. * If the key already exists, convert to array and append the new value. */ protected function setNestedValue(array &$array, string $key, mixed $value): void { $keys = explode('.', $key); $current = &$array; foreach ($keys as $i => $k) { if ($i === count($keys) - 1) { // If key already exists, convert to array and append if (isset($current[$k])) { // Convert existing single value to array if needed if (!is_array($current[$k])) { $current[$k] = [$current[$k]]; } // Append new value $current[$k][] = $value; } else { // Set as single value $current[$k] = $value; } } else { if (! isset($current[$k]) || ! is_array($current[$k])) { $current[$k] = []; } $current = &$current[$k]; } } } /** * Determine record type from mapped data. */ protected function determineRecordType(array $mapped): string { if (isset($mapped['payment'])) { return 'payment'; } if (isset($mapped['activity'])) { return 'activity'; } if (isset($mapped['contract'])) { return 'contract'; } if (isset($mapped['account'])) { return 'account'; } return 'contact'; } /** * Process a single row through all entity handlers. */ protected function processRow(Import $import, array $mapped, array $raw, array $context): array { $entityResults = []; $lastEntityType = null; $lastEntityId = null; $hasErrors = false; // Process entities in configured priority order foreach ($this->entityConfigs as $root => $config) { // Check if this entity exists in mapped data (support aliases) $mappedKey = $this->findMappedKey($mapped, $root, $config); if (!$mappedKey || !isset($mapped[$mappedKey])) { continue; } $handler = $this->handlers[$root] ?? null; if (! $handler) { continue; } try { // Validate before processing $validation = $handler->validate($mapped[$mappedKey]); if (! $validation['valid']) { $entityResults[$root] = [ 'action' => 'invalid', 'errors' => $validation['errors'], 'level' => 'error', ]; $hasErrors = true; // Don't stop processing, continue to other entities to collect all errors continue; } // Pass previous results as context $result = $handler->process($import, $mapped[$mappedKey], $raw, array_merge($context, $entityResults)); $entityResults[$root] = $result; // Track last successful entity for row status if (in_array($result['action'] ?? null, ['inserted', 'updated'])) { $lastEntityType = $handler->getEntityClass(); $lastEntityId = $result['entity']?->id ?? null; } // Post-contract actions (segment attachment, activity creation) if ($root === 'contract' && in_array($result['action'] ?? null, ['inserted', 'updated', 'reactivated'])) { try { $this->postContractActions($import, $result['entity'], $context); } catch (\Throwable $e) { Log::warning('Post-contract action failed', [ 'import_id' => $import->id, 'contract_id' => $result['entity']->id ?? null, 'error' => $e->getMessage(), ]); } } } catch (\Throwable $e) { $hasErrors = true; Log::error("Handler failed for entity {$root}", [ 'import_id' => $import->id, 'error' => $e->getMessage(), 'trace' => $e->getTraceAsString(), ]); $entityResults[$root] = [ 'action' => 'failed', 'level' => 'error', 'errors' => [$e->getMessage()], 'exception' => [ 'message' => $e->getMessage(), 'file' => basename($e->getFile()), 'line' => $e->getLine(), 'class' => get_class($e), ], ]; // Continue to process other entities to collect all errors continue; } } // If we had errors, return invalid status if ($hasErrors) { $allErrors = []; foreach ($entityResults as $root => $result) { if (isset($result['errors'])) { $allErrors[] = "{$root}: " . implode(', ', $result['errors']); } } return [ 'status' => 'invalid', 'errors' => $allErrors, 'results' => $entityResults, ]; } return [ 'status' => $lastEntityId ? 'imported' : 'skipped', 'entity_type' => $lastEntityType, 'entity_id' => $lastEntityId, 'results' => $entityResults, ]; } /** * Find the key in mapped data that corresponds to this canonical root. */ protected function findMappedKey(array $mapped, string $canonicalRoot, $config): ?string { // First check canonical_root itself if (isset($mapped[$canonicalRoot])) { return $canonicalRoot; } // Then check key (e.g., 'contracts', 'person_addresses') if (isset($mapped[$config->key])) { return $config->key; } // Then check aliases $aliases = $config->aliases ?? []; foreach ($aliases as $alias) { if (isset($mapped[$alias])) { return $alias; } } return null; } /** * Load template meta flags for special processing modes. */ protected function loadTemplateMeta(Import $import): void { $this->templateMeta = optional($import->template)->meta ?? []; $this->paymentsImport = (bool) ($this->templateMeta['payments_import'] ?? false); $this->historyImport = (bool) ($this->templateMeta['history_import'] ?? false); $this->contractKeyMode = $this->templateMeta['contract_key_mode'] ?? null; } /** * Collect entity details from processing results. */ protected function collectEntityDetails(array $results): array { $entityDetails = []; $hasErrors = false; $hasWarnings = false; if (isset($results['results']) && is_array($results['results'])) { foreach ($results['results'] as $entityKey => $result) { $action = $result['action'] ?? 'unknown'; $message = $result['message'] ?? null; $count = $result['count'] ?? 1; $detail = [ 'entity' => $entityKey, 'action' => $action, 'count' => $count, ]; if ($message) { $detail['message'] = $message; } if ($action === 'invalid' || isset($result['errors'])) { $detail['level'] = 'error'; $detail['errors'] = $result['errors'] ?? []; $hasErrors = true; } elseif ($action === 'skipped') { $detail['level'] = 'warning'; $hasWarnings = true; } else { $detail['level'] = 'info'; } if (isset($result['exception'])) { $detail['exception'] = $result['exception']; $hasErrors = true; } $entityDetails[] = $detail; } } return [ 'details' => $entityDetails, 'hasErrors' => $hasErrors, 'hasWarnings' => $hasWarnings, ]; } /** * Create a success event for a processed row. */ protected function createRowProcessedEvent( Import $import, ?Authenticatable $user, int $rowNum, array $entityDetails, bool $hasWarnings, array $rawData = [] ): void { ImportEvent::create([ 'import_id' => $import->id, 'user_id' => $user?->getAuthIdentifier(), 'event' => 'row_processed', 'level' => $hasWarnings ? 'warning' : 'info', 'message' => "Row {$rowNum} processed successfully", 'context' => [ 'row' => $rowNum, 'entity_details' => $entityDetails, 'raw_data' => $rawData, ], ]); } /** * Create a skip event for a skipped row. */ protected function createRowSkippedEvent( Import $import, ?Authenticatable $user, int $rowNum, array $entityDetails, array $rawData = [] ): void { ImportEvent::create([ 'import_id' => $import->id, 'user_id' => $user?->getAuthIdentifier(), 'event' => 'row_skipped', 'level' => 'warning', 'message' => "Row {$rowNum} skipped", 'context' => [ 'row' => $rowNum, 'entity_details' => $entityDetails, 'raw_data' => $rawData, ], ]); } /** * Create a failure event for a failed row. */ protected function createRowFailedEvent( Import $import, ?Authenticatable $user, int $rowNum, array $errors, array $entityDetails, array $rawData = [] ): void { ImportEvent::create([ 'import_id' => $import->id, 'user_id' => $user?->getAuthIdentifier(), 'event' => 'row_failed', 'level' => 'error', 'message' => "Row {$rowNum} failed: " . implode(', ', $errors), 'context' => [ 'row' => $rowNum, 'errors' => $errors, 'entity_details' => $entityDetails, 'raw_data' => $rawData, ], ]); } /** * Handle row processing exception. */ protected function handleRowException( Import $import, ?Authenticatable $user, int $rowNum, \Throwable $e ): void { Log::error('ImportServiceV2 row processing failed', [ 'import_id' => $import->id, 'row' => $rowNum, 'error' => $e->getMessage(), 'trace' => $e->getTraceAsString(), ]); ImportEvent::create([ 'import_id' => $import->id, 'user_id' => $user?->getAuthIdentifier(), 'event' => 'row_failed', 'level' => 'error', 'message' => "Row {$rowNum} exception: {$e->getMessage()}", 'context' => [ 'row' => $rowNum, 'exception' => [ 'message' => $e->getMessage(), 'file' => $e->getFile(), 'line' => $e->getLine(), ], ], ]); } /** * Finalize import with completion event. */ protected function finalizeImport( Import $import, ?Authenticatable $user, int $total, int $imported, int $skipped, int $invalid ): void { // If there are any invalid rows, mark import as failed, not completed $status = $invalid > 0 ? 'failed' : 'completed'; $eventLevel = $invalid > 0 ? 'error' : 'info'; $eventName = $invalid > 0 ? 'processing_failed' : 'processing_completed'; $import->update([ 'status' => $status, 'finished_at' => now(), 'total_rows' => $total, 'imported_rows' => $imported, 'valid_rows' => $imported, 'invalid_rows' => $invalid, ]); ImportEvent::create([ 'import_id' => $import->id, 'user_id' => $user?->getAuthIdentifier(), 'event' => $eventName, 'level' => $eventLevel, 'message' => "Processed {$total} rows: {$imported} imported, {$skipped} skipped, {$invalid} invalid", ]); } /** * Handle fatal processing exception. */ protected function handleFatalException( Import $import, ?Authenticatable $user, \Throwable $e ): void { Log::error('ImportServiceV2 processing failed', [ 'import_id' => $import->id, 'error' => $e->getMessage(), 'trace' => $e->getTraceAsString(), ]); $import->update(['status' => 'failed', 'finished_at' => now()]); ImportEvent::create([ 'import_id' => $import->id, 'user_id' => $user?->getAuthIdentifier(), 'event' => 'processing_failed', 'level' => 'error', 'message' => $e->getMessage(), ]); } /** * Post-contract actions: attach segment, create activity with decision. * Matches ImportProcessor::postContractActions() behavior. */ protected function postContractActions(Import $import, $contract, array $context = []): void { $meta = $import->meta ?? []; $segmentId = (int) ($meta['segment_id'] ?? 0); $decisionId = (int) ($meta['decision_id'] ?? 0); $templateName = (string) ($meta['template_name'] ?? optional($import->template)->name ?? ''); $actionId = (int) ($meta['action_id'] ?? 0); // Attach segment to contract as the main (active) segment if provided if ($segmentId > 0) { // Ensure the segment exists on the client case and is active $ccSeg = \DB::table('client_case_segment') ->where('client_case_id', $contract->client_case_id) ->where('segment_id', $segmentId) ->first(); if (! $ccSeg) { \DB::table('client_case_segment')->insert([ 'client_case_id' => $contract->client_case_id, 'segment_id' => $segmentId, 'active' => true, 'created_at' => now(), 'updated_at' => now(), ]); } elseif (! $ccSeg->active) { \DB::table('client_case_segment') ->where('id', $ccSeg->id) ->update(['active' => true, 'updated_at' => now()]); } // Deactivate all other segments for this contract to make this the main one \DB::table('contract_segment') ->where('contract_id', $contract->id) ->where('segment_id', '!=', $segmentId) ->update(['active' => false, 'updated_at' => now()]); // Upsert the selected segment as active for this contract $pivot = \DB::table('contract_segment') ->where('contract_id', $contract->id) ->where('segment_id', $segmentId) ->first(); if ($pivot) { if (! $pivot->active) { \DB::table('contract_segment') ->where('id', $pivot->id) ->update(['active' => true, 'updated_at' => now()]); } } else { \DB::table('contract_segment')->insert([ 'contract_id' => $contract->id, 'segment_id' => $segmentId, 'active' => true, 'created_at' => now(), 'updated_at' => now(), ]); } } // Create activity if decision provided if ($decisionId > 0) { \App\Models\Activity::create([ 'decision_id' => $decisionId, 'action_id' => $actionId > 0 ? $actionId : null, 'contract_id' => $contract->id, 'client_case_id' => $contract->client_case_id, 'note' => trim('Imported via template'.($templateName ? ': '.$templateName : '')), ]); } } }