Skip to main content
Version: MarketPulse

Data Scraping Procedure


1. Concepts & Terminology

TermMeaning
Scraping Batch (scrapingbatch)The top-level orchestration record. One batch = one scrape job + multiple refine or enrichment job
Scraping Job (scrapingjob)Phase 1 job. Fetches a raw list of vehicle listings from a marketplace (Craigslist or Facebook Marketplace). One per batch.
Raw Listing (rawvehiclelisting)Minimal data captured in Phase 1: title, price, URL, thumbnail, location, mileage, year, make, model. Globally deduplicated by (sourceplatform, postid).
Refine Job (refinejob)Phase 2 job. Deep-scrapes a single listing URL and enriches it with AI extraction. One refine job is created per unique URL that needs enrichment after Phase 1.
Refined Listing (refinedvehiclelisting)Full structured vehicle record produced after Phase 2. Linked 1-to-1 with a raw listing.
Dealer Data Mapping (dealerdatamapping)Links a dealer to a refined listing. Drives billing and export.
CrawlerReceives scraping requests via RabbitMQ and performs the actual browser-based scraping using Crawl4AI + Playwright.
ManagementService.NET ASP.NET Core service. Owns the orchestration logic, DB writes, and queue management.

2. Status Enums

ScrapingBatchStatus

ValueMeaning
QueuedCreated, not yet dispatched to crawler.
FetchingPhase 1 in progress — raw scraping job is running.
EnrichingPhase 1 complete — refine jobs are being processed.
CompletedAll phases done, no failures.
PartiallyCompletedSome jobs succeeded, some failed.
FailedAll jobs failed.
CancelledCancelled by user or admin before reaching a terminal state.

ScrapingJobStatus (Phase 1)

ValueMeaning
QueuedCreated, not yet dispatched to crawler.
InProgressMessage published to scraping-job queue; crawler is working.
CompletedRaw listings received and processed.
FailedCrawler returned failure or retry budget exhausted.
CancelledBatch was cancelled before or during this job.

RefineJobStatus (Phase 2)

ValueMeaning
QueuedCreated and published to the batch queue; awaiting crawler.
InProgressCrawler has acknowledged it and is working.
CompletedEnrichment succeeded.
PartialCompletedLegacy status, treated as done during finalization.
FailedEnrichment failed after all retries.
CancelledBatch was cancelled; refine job will not run.

RefineResult (per listing outcome)

ValueMeaning
InsertedNew refined listing was created.
UpdatedExisting refined listing was updated (data changed).
SkippedRaw data unchanged; no new refined listing needed (dealer mapping only).

3. Database Tables

scrapingbatch
├─ scrapingbatchid (PK)
├─ dealerid (FK → dealermaster)
├─ sourceid, dealerscrapingsourcesid
├─ status, scrapemode (Manual / Scheduled)
├─ iscancelled, cancelledat, cancelledby
├─ requestedat, rawcompletedat, refinecompletedat
└─ scrapecode (unique display code)

scrapingjob
├─ scrapingjobid (PK)
├─ scrapingbatchid (FK → scrapingbatch)
├─ status, retrycount
├─ isrefineneeded, totalrawdata
├─ newrawdatacount, updatedrawdatacount, skippedrawdatacount
├─ totalrefinerequested, completedrefinecount, failedrefinecount
└─ requestedat

refinejob
├─ refinejobid (PK)
├─ scrapingjobid (FK → scrapingjob)
├─ scrapingbatchid (FK → scrapingbatch)
├─ rawlistingid (FK → rawvehiclelisting)
├─ url
├─ status, retrycount
└─ requestedat

rawvehiclelisting
├─ rawlistingid (PK)
├─ (sourceplatform, postid) — UNIQUE dedup key
├─ title, price, location, mileage, year, make, model, thumbnail, url
├─ firstseenjobid, lastseenjobid
└─ scrapedat

refinedvehiclelisting
├─ refinedlistingid (PK)
├─ rawlistingid (FK → rawvehiclelisting, UNIQUE)
├─ refinejobid (FK → refinejob)
├─ attributes (JSONB — all AI-extracted vehicle fields)
├─ sellertype (private / dealer)
├─ refineresult (Inserted / Updated / Skipped)
└─ createdat, updatedat

dealerdatamapping
├─ dealerid + refinedlistingid — composite PK
├─ scrapingjobid
└─ isexported

4. Queue Architecture

Current Architecture (production)

RabbitMQ Queues / Exchanges

├── batch-scraping-topic [Topic Exchange]
│ └── {batchId} [Durable queue, one per active batch]
│ Routing key: batch.{batchId}
│ Contains: CrawlFilterRequested (Phase 1) + CrawlRequested (Phase 2)

├── job-registry [Durable queue — direct]
│ Contains: BatchQueueRegistered notifications (batchId, queueName)
│ Publisher: ManagementService (on batch creation)
│ Subscriber: Not yet consumed (planned for future dynamic subscription)

├── crawler-crawl-requests [Fanout, FastStream subscriber]
│ Contains: CrawlRequested (Phase 2 enrichment)

├── crawler-crawl-filter-requests [Fanout, FastStream subscriber]
│ Contains: CrawlFilterRequested (Phase 1 raw scraping)

└── crawl_status_data [Fanout, MassTransit subscriber]
Contains: CrawlStatusData responses from crawler → ManagementService

Note: In the current implementation, Phase 1 and Phase 2 messages are both routed through the per-batch {batchId} on the .NET side, but the crawler subscribes to the generic fanout queues (crawler-crawl-filter-requests, crawler-crawl-requests). The batch queue exists to isolate and drop messages on cancellation. The job-registry queue is published to but not yet consumed by the crawler.

Planned Architecture (in design)

scraping-job  [Durable queue — shared, all Phase 1 jobs]
job-registry [Durable queue — only batchIds with active refine work]
{batchId} queue [Durable queue — per-batch, Phase 2 only; created lazily after Phase 1 success]

See the architecture redesign plan for full details.


5. Message Contracts

All contracts live in ContractLibrary/Messages/.

CrawlFilterRequested — Phase 1 request

Published by: ScrapingOrchestrationService.ExecuteScrapingPipeline
Consumed by: Python crawler handle_crawl_filter_requested

{
"provider": "cgl",
"location_id": null,
"radius": 50,
"zip_code": 90210,
"min_price": 5000,
"max_price": 30000,
"min_year": 2015,
"max_year": 2024,
"min_miles": null,
"max_miles": 80000,
"makes": ["Toyota", "Honda"]
}

The AMQP MessageId property = scrapingJobId (used as dedup ID and correlation ID).

CrawlRequested — Phase 2 request

Published by: ScrapingOrchestrationService.ProcessRawBatch
Consumed by: Python crawler handle_crawl_requested

{
"id": "<refineJobId>",
"url": "https://craigslist.org/cto/d/listing/123",
"urls": null,
"priority": 0,
"requestedAt": "2026-06-03T12:00:00Z"
}

The AMQP MessageId = refineJobId.

CrawlStatusData — crawler response

Published by: Python crawler publish_crawl_status
Consumed by: .NET CrawlStatusConsumer

{
"id": "<scrapingJobId or refineJobId>",
"url": "https://...",
"result": "<JSON string — raw listings array or enriched vehicle object>",
"error": null,
"status": "SUCCESS",
"completedAt": "2026-06-03T12:01:00Z"
}

status values: STARTED | IN_PROGRESS | SUCCESS | FAILURE

MassTransit Envelope Format

All messages on the batch queue and the fanout exchanges use this envelope, matching the Python mt_decoder:

{
"messageId": "<uuid>",
"messageType": ["urn:message:MarketPulse.Contracts.Messages:CrawlFilterRequested"],
"message": { /* payload */ },
"headers": {},
"sentTime": "2026-06-03T12:00:00.000Z"
}

BatchQueueRegistered — registry notification

Published by: RabbitMqBatchQueueService.CreateBatchQueueAsync
Queue: job-registry

{
"batchId": "<uuid>"
}

6. Phase 1 — Raw Scraping

Entry Points

  • Manual trigger: POST /api/v1/scraping/triggerScrapingControllerExecuteScrapingPipeline
  • Scheduled trigger: Hangfire recurring job → SchedulingServiceExecuteScrapingPipeline

Step-by-Step

ExecuteScrapingPipeline(dealerId, dealerScrapingSourcesId, scrapeMode)

├─ 1. Load scraping source config (provider, location IDs, source name)
├─ 2. Load dealer details (geography: radius, zip; vehicle filters: price, year, miles, makes)
├─ 3. Resolve provider (Craigslist → "cgl" | FacebookMarketplace → "fbm")

├─ 4. Create ScrapingBatch in DB
│ status = Fetching
│ scrapecode = unique human-readable code

├─ 5a. CreateBatchQueueAsync(batchId)
│ - Declare topic exchange "batch-scraping-topic" (idempotent)
│ - Declare durable queue "{batchId}" queue
│ - Bind queue to exchange with routing key "batch.{batchId}"
│ - Publish BatchQueueRegistered to "job-registry"

├─ 5b. Create ScrapingJob in DB
│ status = Queued
│ jobcode = unique human-readable code

├─ 5c. Publish CrawlFilterRequested to "{batchId}" queue
│ messageId = scrapingJobId

└─ 5d. Update ScrapingJob status = InProgress
Return (OK, scrapingBatchId) to caller

Crawler Phase 1 Execution

handle_crawl_filter_requested(CrawlFilterRequested)

├─ Publish CrawlStatusData(status=STARTED, id=scrapingJobId)
├─ Build search URL (ZipBasedUrlResolver resolves canonical regional URL for provider + zip)
├─ Select task queue:
│ FacebookMarketplace → secure_crawl_task_queue (GoLogin remote browser)
│ Craigslist → crawl_task_queue (local Playwright or Camoufox)

└─ start_filter_crawl(scrapingJobId, crawlData)
- Crawl4AI fetches listing results page
- Extracts raw listing objects (postId, title, price, url, thumbnail, location, year, make, model, mileage)
- Publishes CrawlStatusData(status=SUCCESS, result=JSON_array_of_raw_listings)

ManagementService Phase 1 Result Handling

CrawlStatusConsumer.HandleSuccess receives the CrawlStatusData message:

  1. Looks up scrapingJobId in DB — confirms it is a scraping job (not a refine job) in InProgress status
  2. Deserializes result as List<RawScrapedListing>
  3. Calls ProcessRawBatch(scrapingJobId, rawListings)

ProcessRawBatch Logic

ProcessRawBatch(scrapingJobId, rawListings)

├─ 1. Load scraping job and parent batch from DB

├─ 2. Pre-filter: skip listings with null/empty postId or sourcePlatform

├─ 3. Batch-fetch existing raw listings by (platform, postId)

├─ 4. Classify each listing:
│ NEW → toInsert list
│ CHANGED → toUpdate list (raw fields differ from existing)
│ UNCHANGED → skipped list

├─ 5. Bulk INSERT new listings + Bulk UPDATE changed listings in DB

├─ 5.5 Cross-dealer mapping for unchanged raws:
│ For each unchanged raw that already has a refined listing:
│ - If THIS dealer is not yet mapped to it → upsert DealerDataMapping + log transaction
│ For each unchanged raw with NO refined listing yet:
│ - Add its URL to extraItemsToEnrich (won't be permanently skipped)

├─ 6. Build enrich list = new + changed + skipped-with-no-refined-entry (deduped by URL)
│ isRefineNeeded = enrich list is not empty

├─ 7. Update ScrapingJob counters (totalRaw, new, updated, skipped, isRefineNeeded)

├─ 8. If isRefineNeeded:
│ a. Create RefineJobEntity per URL (status=Queued)
│ b. BulkInsert all RefineJobs in DB
│ c. Update ScrapingJob.totalRefineRequested
│ d. Publish CrawlRequested per RefineJob to "{batchId}"

└─ 9. MarkScrapingJobsAsCompletedAndBatchAsEnriching(batchId)
- Check if all scraping jobs in batch are done
- If yes: mark batch rawCompletedAt, transition status to Enriching
- Call CheckAndFinalizeBatch (will return early if refine jobs still pending)

7. Phase 2 — Enrichment (Refine)

Trigger

Each CrawlRequested message published in step 8d above triggers one Phase 2 cycle. With N URLs needing enrichment, N messages are published and N concurrent crawl tasks are spawned in the crawler.

Crawler Phase 2 Execution

handle_crawl_requested(CrawlRequested)

├─ Publish CrawlStatusData(status=STARTED, id=refineJobId, url=url)
├─ Select task queue:
│ Facebook URL → secure_crawl_task_queue (GoLogin remote browser)
│ Craigslist URL → crawl_task_queue (local Playwright or Camoufox)

└─ start_crawl(refineJobId, url)
- Crawl4AI fetches the individual listing detail page
- AI extraction produces an EnrichedVehicleListing (make, model, year, mileage,
price, condition, vin, description, seller info, images, attributes JSONB)
- Publishes CrawlStatusData(status=SUCCESS, result=JSON_enriched_object, url=url)

ManagementService Phase 2 Result Handling

CrawlStatusConsumer.HandleSuccess:

  1. Tries to find a scraping job for jobId — none found (or found but not InProgress)
  2. Looks up refine job by refineJobId
  3. Validates refineJob.Url == message.Url — mismatch triggers failure
  4. Deserializes result as EnrichedVehicleListing
  5. Calls ProcessSingleRefineResult(refineJobId, enrichedItem)

ProcessSingleRefineResult Logic

ProcessSingleRefineResult(refineJobId, enriched)

├─ 1. Load refine job from DB (idempotency guard: skip if already Completed/Failed)
├─ 2. Load parent scraping job + batch

├─ 3. Lookup existing refined listing for rawListingId

├─ Case A — No existing refined listing (NEW):
│ INSERT RefinedVehicleListing (refineResult=Inserted)
│ If sellerType != "dealer": upsert DealerDataMapping + log transaction

├─ Case B — Existing refined listing + data changed (UPDATE):
│ UPDATE RefinedVehicleListing (refineResult=Updated)
│ INSERT RefinedDataChangelog entries for each changed field
│ If sellerType != "dealer": upsert DealerDataMapping + log transaction

├─ Case C — Existing refined listing + data unchanged (NO-OP):
│ Only upsert DealerDataMapping if this dealer is not already mapped

├─ Case D — enriched == null (FAILURE):
│ enrichmentFailed = true

├─ 4. Mark refine job: Completed or Failed

├─ 5. IncrementRefineResponse on parent ScrapingJob (thread-safe atomic counter)

└─ 6. If responseCount >= totalRefineRequested:
→ CheckAndFinalizeBatch(batchId)

8. Batch Finalization

CheckAndFinalizeBatch(batchId) is called:

  • After every Phase 1 raw job completes (even with zero listings)
  • After every Phase 2 refine job completes or fails

Gates (early exits)

  1. If any scraping jobs are still pending → return (batch not done with Phase 1 yet)
  2. If any refine jobs are still pending (Queued / InProgress) → return (Phase 2 still running)

Finalization Formula

rawCompletedJobs = totalRawJobs - rawFailedJobs
needRefine = rawCompletedJobs - noRefineNeeded
doneRefine = completedRefine + partialCompletedRefine + failedRefine

if needRefine == 0:
→ Completed (if any raw jobs succeeded)
→ Failed (if all raw jobs failed)

elif all refine paths failed (completedRefine == 0 && partialCompleted == 0):
→ Failed

elif rawFailedJobs == 0 && partialCompletedRefine == 0 && failedRefine == 0:
→ Completed (clean success)

else:
→ PartiallyCompleted (mixed outcomes)

After Finalization

  • scrapingbatch.refinecompletedat is set
  • scrapingbatch.status is updated to the final status
  • DeleteBatchQueueAsync(batchId) is called — drops the per-batch RabbitMQ queue and all remaining messages

9. Failure Handling & Retry

Inline Retry (immediate, within the message handler)

Triggered by HandleFailure or HandleSingleRefineFailure when the first failure comes in:

Job TypeRetry BudgetAction on first failureAction on max retries
Scraping job (Phase 1)Scraping:MaxRetryCount (default 1)Re-publish CrawlFilterRequested to batch queue, increment retryCountMark job Failed → finalize batch
Refine job (Phase 2)Scraping:MaxRetryCount (default 1)Re-publish CrawlRequested via IPublishEndpoint.Publish, reset status to QueuedMark job Failed → finalize batch

Background Retry Service (ScrapingRetryService)

Runs every Scraping:SchedulerIntervalMinutes minutes (default 5 min) via Hangfire.

Detects batches in Fetching, Enriching, or Queued status and processes four tracks per batch:

TrackTargetConditionAction
AScraping jobsstatus=Failed, retrycount=0Re-publish CrawlFilterRequested, increment retryCount
BRefine jobsstatus=Failed, retrycount=0Re-publish CrawlRequested, increment retryCount
CScraping jobsStuck in Queued/InProgress > Scraping:ForceFailHoursThreshold hours (default 24h)Force-fail job + all its refine jobs
DRefine jobsStuck in Queued/InProgress > thresholdForce-fail refine job

After processing all tracks: calls CheckAndFinalizeBatch.

CrawlStatus Signal Flow

Crawler publishes CrawlStatusData

├─ STARTED → ManagementService confirms job is running (log only for Phase 1; Queued→InProgress for Phase 2)
├─ IN_PROGRESS → Same as STARTED (handles late delivery)
├─ SUCCESS → ProcessRawBatch (Phase 1) OR ProcessSingleRefineResult (Phase 2)
└─ FAILURE → HandleFailure (Phase 1) OR HandleSingleRefineFailure (Phase 2)

10. Cancellation

Trigger

POST /api/v1/scraping/batches/{batchId}/cancelScrapingControllerCancelScrapingBatch

Validation

  • Batch must exist
  • Batch must not already be cancelled (IsCancelled == false)
  • Batch must not be in a terminal state (Completed / PartiallyCompleted / Failed)

Atomic Transaction

TransactionScope {
1. MarkScrapingJobsCancelledByBatchId(batchId)
→ Sets all ScrapingJob.IsCancelled = true for this batch

2. MarkAllRefineJobsCancelledForBatch(batchId)
→ Sets all RefineJob.IsCancelled = true for this batch

3. UpdateBatchCancellationInfo(batchId, userId)
→ Sets ScrapingBatch.IsCancelled = true, CancelledAt, CancelledBy

transaction.Complete()
}

// Outside transaction (non-blocking):
4. DeleteBatchQueueAsync(batchId)
→ Deletes "{batchId}" queue including all pending messages
→ ifUnused=false, ifEmpty=false (force delete)
→ 404 is swallowed (queue may already be gone — safe to ignore)

What Happens to In-Flight Work

ScenarioBehaviour
Phase 1 scraping job is queued but not yet picked upCrawler picks it up and runs. ManagementService receives CrawlStatusData(SUCCESS) but checks IsCancelled before calling ProcessRawBatch → result discarded, no refine jobs created
Phase 1 is already running in crawlerCrawler completes and publishes result. ManagementService discards it (same check). No batch queue is created. No job-registry entry.
Phase 2 refine job is in batch queueDeleteBatchQueueAsync drops the queue + all messages. Crawler never receives them.
Phase 2 refine job is already in-flight (fetched by crawler but not yet ACKed)Crawler finishes and publishes result. ManagementService receives it, but idempotency guard in ProcessSingleRefineResult skips jobs already in Cancelled status.

Known trade-off: Cancelling a batch after Phase 1 has been dispatched but before the result comes back means the crawler will perform the scraping work unnecessarily. An explicit cancellation message to the crawler is planned for a future iteration.


11. Full Flow Diagram (Updated Architecture)

Happy Path

┌──────────────────────────────────────────────────────────────────┐
│ Trigger (Manual API or Scheduled Job) │
└────────────────┬─────────────────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────────┐
│ ExecuteScrapingPipeline │
│ 1. Create ScrapingBatch (Fetching) │
│ 2. Create ScrapingJob (InProgress) │
│ 3. Publish CrawlFilterRequested → [scraping-job] queue │
│ (NO per-batch queue created yet) │
└────────────────┬─────────────────────────────────────────────────┘

▼ RabbitMQ
┌──────────────────────────────────────────────────────────────────┐
│ Crawler: handle_crawl_filter_requested │
│ • Extract raw listings │
│ • Publish CrawlStatusData(SUCCESS, rawListings) → ManagementService│
└────────────────┬─────────────────────────────────────────────────┘

▼ RabbitMQ (CrawlStatusData)
┌──────────────────────────────────────────────────────────────────┐
│ CrawlStatusConsumer → ProcessRawBatch │
│ • Dedup by (platform, postId) │
│ • Classify: NEW / CHANGED / UNCHANGED │
│ • Bulk INSERT/UPDATE │
│ • If refine items exist: │
│ └─ Create RefineJobEntity per URL │
│ └─ CreateBatchQueueAsync(batchId) ← LAZY, only if needed │
│ └─ Publish CrawlRequested × N → [{batchId}] │
│ └─ Publish batchId → [job-registry] │
│ • Set batch status = Enriching │
└────────────────┬─────────────────────────────────────────────────┘

┌─────────┴─────────┐
▼ ▼ (concurrent, N refine jobs)
URL 1 URL 2 ... URL N
│ │ │
▼ ▼ ▼
[Crawler processes each URL from {batchId} queue]
│ │ │
└────────┬───────────┴──────────┬──────────┘
│ │
▼ RabbitMQ (per-URL CrawlStatusData)
┌──────────────────────────────────────────────────────────────────┐
│ CrawlStatusConsumer → ProcessSingleRefineResult × N │
│ • INSERT / UPDATE / MAP refined listings │
│ • Increment responseCount │
│ • If responseCount == totalRequested: │
│ └─ CheckAndFinalizeBatch(batchId) │
│ └─ Evaluate final status │
│ └─ DeleteBatchQueueAsync(batchId) │
└──────────────────────────────────────────────────────────────────┘

RESULT: Batch → Completed / PartiallyCompleted / Failed

Cancellation Flow

User → POST /cancel/{batchId}


CancelScrapingBatch (atomic transaction)
├─ Mark all ScrapingJobs.IsCancelled = true
├─ Mark all RefineJobs.IsCancelled = true
├─ Mark ScrapingBatch.IsCancelled = true
└─ commit

▼ (outside transaction, non-blocking)
DeleteBatchQueueAsync(batchId)
├─ Drop {batchId} queue + all pending Phase 2 messages
└─ Queue deletion is the enforcement mechanism

BEHAVIOR:
┌─ If Phase 1 result arrives after cancel:
│ └─ CrawlStatusConsumer checks IsCancelled → silently discard (known trade-off)

└─ If Phase 2 result arrives after cancel:
├─ ProcessSingleRefineResult loads refineJob
├─ Idempotency guard: job.Status == Cancelled → skip processing
└─ No data written

Job-Registry & Dynamic Subscription (Python side)

[job-registry] queue (RabbitMQ)


Crawler: handle_batch_queue_registered (FastStream, manual ACK)
├─ Extract batchId from message
├─ Spawn asyncio.create_task(drain_batch_queue(batchId, delivery_tag))
└─ Do NOT ACK yet

drain_batch_queue(batchId, delivery_tag) — runs concurrently
├─ Get raw aio-pika channel from FastStream broker
├─ Attempt passive declare [{batchId} queue]
├─ If 404 (queue not found):
│ └─ Batch already done or cancelled
│ └─ ACK job-registry message immediately → entry removed
│ └─ Task exits

└─ If queue exists:
├─ Consume messages from [{batchId} queue]
├─ Route each CrawlRequested to crawl_task_queue
├─ When queue is deleted by .NET (during finalization/cancellation):
│ └─ Consumer receives queue-deleted event
│ └─ ACK job-registry message → entry removed
│ └─ Task exits

└─ Multiple batches run as independent concurrent tasks (no blocking)