Implementation Log: US-07 Background Job Processing
Feature: US-07 Async Worker Pipeline — Single-Attempt Background Processing
Summary
US-07 delivers the background job consumer that takes an asset from PROCESSING to either UPLOADED (success) or FAILED (terminal failure). The implementation adds a real Redis queue consumer, a worker container, and all the application/domain/infrastructure pieces needed for reliable single-attempt processing. Retries and dead-letter handling remain deferred to US-08.
Architecture Decisions
- Queue contract: Non-destructive dequeue via an atomic Lua
RPOP+LPUSHinto a:reservedlist. Jobs are only removed from the reserved list after an explicitacknowledge()(HANDLED or DISCARD) or returned to the main queue viarelease()(RETRY or infrastructure failure). A visibility-timeout recovery pass runs before eachreserveNext()call to handle crashed workers. - Delivery semantics in Infrastructure: The application service (
HandleAssetProcessingJobService) returns domain vocabulary only (AssetProcessingJobOutcome). The translation to queue delivery actions (HANDLED/DISCARD/RETRY) lives exclusively inAssetProcessingJobHandlingResultinsideInfrastructure\Processing\. - Worker loop resilience:
AssetProcessingWorkerLoopapplies exponential backoff (250 ms base, 5 s ceiling) and hard fail-fast after 5 consecutive infrastructure failures (logscriticalthen re-throws). The failure counter resets on any successful poll or processed job. - Terminal status cache: Written best-effort after MySQL persistence succeeds. Uses
setExwith TTL = 300 + jitter (0–30 s) per project thundering-herd convention. - Sanitized logging: Rejected payloads (malformed or invalid asset ID) log only
payloadLengthandpayloadSha256— never the raw payload.
Files Added
src/Application/Asset/Command/HandleAssetProcessingJobCommand.php— command DTO carrying the parsedAssetIdsrc/Application/Asset/HandleAssetProcessingJobService.php— application use case: validate state, process, persist, cachesrc/Application/Asset/AssetProcessorInterface.php— application port for the actual processing operationsrc/Application/Asset/AssetTerminalStatusCacheInterface.php— application port for terminal status cachingsrc/Application/Asset/Result/AssetProcessingJobOutcome.php— domain-vocabulary outcome enumsrc/Application/Asset/Result/HandleAssetProcessingJobResult.php— application result DTO (no delivery semantics)src/Application/Asset/Result/TerminalStatusCacheStoreResult.php— cache-write result DTOsrc/Application/Asset/Exception/RetryableAssetProcessingException.php— retryable processor failuresrc/Application/Asset/Exception/TerminalAssetProcessingException.php— terminal processor failuresrc/Infrastructure/Processing/AssetProcessingJobHandlerInterface.php— infrastructure handler contractsrc/Infrastructure/Processing/AssetProcessingJobConsumerInterface.php— infrastructure consumer contractsrc/Infrastructure/Processing/AssetProcessingJobHandlingOutcome.php— infrastructure-level outcome enum (mirrors application enum with RETRYABLE_PROCESSING_FAILURE added)src/Infrastructure/Processing/AssetProcessingJobHandlingResult.php— infrastructure result DTO includingAssetProcessingJobDeliverysrc/Infrastructure/Processing/AssetProcessingJobDelivery.php— delivery enum (HANDLED/DISCARD/RETRY)src/Infrastructure/Processing/ReservedAssetProcessingJob.php— reserved job VO withacknowledge()/discard()/release()closuressrc/Infrastructure/Processing/AssetProcessingJobWorker.php— thin adapter: parse payload → call service → translate result; sanitized loggingsrc/Infrastructure/Processing/AssetProcessingWorkerLoop.php— polling loop with backoff/fail-fast and ack/release routingsrc/Infrastructure/Processing/RedisJobQueueConsumer.php— reservation-based Redis consumer using atomic Lua scriptssrc/Infrastructure/Processing/RedisAssetTerminalStatusCache.php— Redis terminal status cache with TTL jittersrc/Infrastructure/Processing/PassThroughAssetProcessor.php— minimal concrete processor (validates state and proof; no file transforms)src/Infrastructure/Processing/Exception/RedisJobQueueConsumerException.php— typed consumer failuresrc/Infrastructure/Processing/Exception/RedisAssetTerminalStatusCacheException.php— typed cache failurebin/asset-processing-worker.php— CLI entrypoint: bootstraps services, starts loop
Files Modified
src/Application/Asset/CompleteUploadService.php— dispatch-failure compensation: if job dispatch fails after savingPROCESSING, asset is restored toPENDINGand re-savedsrc/Domain/Asset/Asset.php— addedrestorePending()(for dispatch compensation);markFailed()clears completion proofsrc/Infrastructure/Persistence/MySQLAssetRepository.php— extended allowed status transitions to includePROCESSING → PENDING|UPLOADED|FAILEDsrc/Infrastructure/Processing/RedisJobQueuePublisher.php— lazy Redis connection via closure; replacesMockAssetProcessingJobDispatcherin the HTTP pathpublic/index.php— wiresRedisJobQueuePublisherinstead of the mock dispatcherdocker-compose.yaml— addsworkerservice (bin/asset-processing-worker.php) depending ondbandredis
Tests Added/Modified
tests/Unit/Application/Asset/HandleAssetProcessingJobServiceTest.php— malformed payload, invalid ID, unknown asset, non-PROCESSING skip, success, terminal failure, stale-write recovery, cache failure pathstests/Unit/Infrastructure/Processing/AssetProcessingJobWorkerTest.php— payload parsing, logging levels, delivery values for all outcomestests/Unit/Infrastructure/Processing/AssetProcessingWorkerLoopTest.php— ack on HANDLED, discard on DISCARD, release on RETRY, release on handler exception, backoff, fail-fasttests/Unit/Infrastructure/Processing/RedisAssetTerminalStatusCacheTest.php— store success, store failuretests/Unit/Infrastructure/Processing/RedisJobQueueConsumerTest.php— reserve, acknowledge, release, expired-job recoverytests/Unit/Infrastructure/Processing/PassThroughAssetProcessorTest.php— validates state guard and proof guardtests/Unit/Application/Asset/CompleteUploadServiceTest.php— dispatch-failure compensation sequencetests/Unit/Domain/Asset/AssetTest.php—PROCESSING → UPLOADED,PROCESSING → PENDING(restore),markFailedclears prooftests/Integration/Infrastructure/Persistence/MySQLAssetRepositoryTest.php—PROCESSING → UPLOADED,PROCESSING → PENDING,PROCESSING → FAILED