Repayment Pipeline
The transaction repayment pipeline processes payment sources (webhook APIs + NACH/CSV files) and posts repayments to Fineract. It runs automatically every 30 minutes via Prefect.
How It Works
Steps:
- dbt run - refreshes all BASE models (webhook + CSV) and rebuilds
CORE_TRANSACTION_DATA - Anti-join - queries
CORE_TRANSACTION_DATALEFT JOINCORE_TRANSACTION_DATA_PROCESSEDto find rows not yet posted (source_keynot in PROCESSED withstatus = 'success') - POST to Fineract - for each unprocessed row, calls
/api/v1/loans/external-id/{id}/transactions?command=repayment - Record result - saves success/failure to
CORE_TRANSACTION_DATA_PROCESSED
Two Processing Paths
| Path | Lenders | Triggered By | Filter |
|---|---|---|---|
| Automated (30-min job) | Arthmate, DMI, IKF, Shivalik | Prefect scheduled deployment | recon_source IS NULL |
| Manual (Recon UI) | GB, UGRO, Vivriti | Operator clicks "Reconcile" | recon_source IS NOT NULL |
The 30-minute job explicitly excludes recon-enabled lenders (GB, UGRO, Vivriti) using AND recon_source IS NULL. Those lenders are only posted through the Reconciliation UI.
Running the Pipeline
bash
export PREFECT_API_URL=http://127.0.0.1:4201/api
# Standard run (includes dbt refresh)
./venv/bin/python tasks/flows/webhook_repayment_pipeline.py
# Skip dbt (you just ran it manually)
./venv/bin/python tasks/flows/webhook_repayment_pipeline.py --skip-dbt
# Dry run - inspect the queue without posting
./venv/bin/python tasks/flows/webhook_repayment_pipeline.py --dry-runDeduplication
Every payment gets a deterministic source_key = md5(lender + loan_id + date + amount).
When a repayment is posted to Fineract, the result is stored in CORE_TRANSACTION_DATA_PROCESSED:
| Status | Meaning |
|---|---|
success | Posted to Fineract - excluded from future runs |
failed | Failed - automatically retried on next run |
The anti-join ensures the same payment is never posted twice.
Monitoring
Prefect Dashboard
Monitor pipeline runs at turnoprefect.incentius.net:
- Flow run history and status
- Task-level success/failure
- Execution timing
Ops UI
- Payments page - view all posted transactions with status
- Processing Log - file processing status per lender
- Reconciliation - recon records and their posting status
Database
sql
-- Unprocessed transactions (what the next run will pick up)
SELECT source_lender, COUNT(*)
FROM "CORE"."CORE_TRANSACTION_DATA" t
LEFT JOIN "CORE"."CORE_TRANSACTION_DATA_PROCESSED" p
ON t.source_key = p.source_key AND p.status = 'success'
WHERE t.is_success = true
AND t.loan_external_id IS NOT NULL
AND p.source_key IS NULL
AND t.recon_source IS NULL
GROUP BY source_lender;
-- Recently posted (success + failed)
SELECT source_lender, status, COUNT(*)
FROM "CORE"."CORE_TRANSACTION_DATA_PROCESSED"
GROUP BY source_lender, status
ORDER BY 1, 2;Payment Sources
The pipeline unifies payments from all sources:
| Source Type | Tables | Lenders |
|---|---|---|
| Razorpay webhooks | RAW_*_RAZORPAY_API | UGRO, Vivriti |
| Cashfree webhooks | RAW_*_CASHFREE_API | UGRO, GB |
| NACH CSV files | RAW_*_NACH_* | Arthmate, DMI, GB, IKF, Shivalik, UGRO |
| Payment/Settlement CSV | RAW_*_PAYMENT_* | DMI, Shivalik, UGRO Razorpay, Vivriti |