Skip to content

Repayment Pipeline

The transaction repayment pipeline processes payment sources (webhook APIs + NACH/CSV files) and posts repayments to Fineract. It runs automatically every 30 minutes via Prefect.

How It Works

Steps:

  1. dbt run - refreshes all BASE models (webhook + CSV) and rebuilds CORE_TRANSACTION_DATA
  2. Anti-join - queries CORE_TRANSACTION_DATA LEFT JOIN CORE_TRANSACTION_DATA_PROCESSED to find rows not yet posted (source_key not in PROCESSED with status = 'success')
  3. POST to Fineract - for each unprocessed row, calls /api/v1/loans/external-id/{id}/transactions?command=repayment
  4. Record result - saves success/failure to CORE_TRANSACTION_DATA_PROCESSED

Two Processing Paths

PathLendersTriggered ByFilter
Automated (30-min job)Arthmate, DMI, IKF, ShivalikPrefect scheduled deploymentrecon_source IS NULL
Manual (Recon UI)GB, UGRO, VivritiOperator clicks "Reconcile"recon_source IS NOT NULL

The 30-minute job explicitly excludes recon-enabled lenders (GB, UGRO, Vivriti) using AND recon_source IS NULL. Those lenders are only posted through the Reconciliation UI.

Running the Pipeline

bash
export PREFECT_API_URL=http://127.0.0.1:4201/api

# Standard run (includes dbt refresh)
./venv/bin/python tasks/flows/webhook_repayment_pipeline.py

# Skip dbt (you just ran it manually)
./venv/bin/python tasks/flows/webhook_repayment_pipeline.py --skip-dbt

# Dry run - inspect the queue without posting
./venv/bin/python tasks/flows/webhook_repayment_pipeline.py --dry-run

Deduplication

Every payment gets a deterministic source_key = md5(lender + loan_id + date + amount).

When a repayment is posted to Fineract, the result is stored in CORE_TRANSACTION_DATA_PROCESSED:

StatusMeaning
successPosted to Fineract - excluded from future runs
failedFailed - automatically retried on next run

The anti-join ensures the same payment is never posted twice.

Monitoring

Prefect Dashboard

Monitor pipeline runs at turnoprefect.incentius.net:

  • Flow run history and status
  • Task-level success/failure
  • Execution timing

Ops UI

  • Payments page - view all posted transactions with status
  • Processing Log - file processing status per lender
  • Reconciliation - recon records and their posting status

Database

sql
-- Unprocessed transactions (what the next run will pick up)
SELECT source_lender, COUNT(*)
FROM "CORE"."CORE_TRANSACTION_DATA" t
LEFT JOIN "CORE"."CORE_TRANSACTION_DATA_PROCESSED" p
  ON t.source_key = p.source_key AND p.status = 'success'
WHERE t.is_success = true
  AND t.loan_external_id IS NOT NULL
  AND p.source_key IS NULL
  AND t.recon_source IS NULL
GROUP BY source_lender;

-- Recently posted (success + failed)
SELECT source_lender, status, COUNT(*)
FROM "CORE"."CORE_TRANSACTION_DATA_PROCESSED"
GROUP BY source_lender, status
ORDER BY 1, 2;

Payment Sources

The pipeline unifies payments from all sources:

Source TypeTablesLenders
Razorpay webhooksRAW_*_RAZORPAY_APIUGRO, Vivriti
Cashfree webhooksRAW_*_CASHFREE_APIUGRO, GB
NACH CSV filesRAW_*_NACH_*Arthmate, DMI, GB, IKF, Shivalik, UGRO
Payment/Settlement CSVRAW_*_PAYMENT_*DMI, Shivalik, UGRO Razorpay, Vivriti

Turno Fineract LMS Documentation