import hmac
import hashlib
import json
def generate_sha256_signature(payload: dict, secret: str) -> str:
"""
Generate signature SHA256 using HMAC hex digest.
Parameters:
payload (dict): The payload of the webhook request.
secret (str): The shared secret key.
Returns:
str: The generated HMAC-SHA256 signature, prefixed with "sha256=".
"""
# Ensure the payload is dumped as a compact JSON string, matching how Ripio generates it.
# Ripio's example uses json.dumps(payload) which might include spaces.
# For consistency, ensure your server and Ripio use the exact same serialization.
# If Ripio sends compact JSON (no spaces after separators), use:
# payload_bytes = json.dumps(payload, separators=(',', ':')).encode("utf-8")
# Based on the PDF's example, it seems to be standard json.dumps():
payload_bytes = json.dumps(payload).encode("utf-8")
hash_object = hmac.new(
secret.encode("utf-8"),
msg=payload_bytes,
digestmod=hashlib.sha256
)
signature = "sha256=" + hash_object.hexdigest()
return signature
def verify_signature(payload: dict, secret: str, signature_header: str) -> bool:
"""
Verify that the payload was sent from a trusted source by validating SHA256 signature.
Parameters:
payload (dict): The payload of the webhook request.
secret (str): The shared secret key.
signature_header (str): The signature received in the `Http-X-Wh-Signature-256` header.
Returns:
bool: True if the signature is valid, False otherwise.
"""
expected_signature = generate_sha256_signature(payload, secret)
return hmac.compare_digest(expected_signature, signature_header)
# Example Usage (within a web framework like Flask)
# from flask import request, abort
#
# shared_secret = "your_shared_secret" # Replace with your actual shared secret
#
# @app.route('/webhook-receiver', methods=['POST'])
# def webhook_receiver():
# received_signature = request.headers.get('Http-X-Wh-Signature-256')
# payload = request.json # Assumes the payload is JSON
#
# if not received_signature or not payload:
# abort(400, 'Missing signature or payload')
#
# if verify_signature(payload, shared_secret, received_signature):
# # Process the valid webhook payload
# print("Signature is valid. Processing payload:", payload)
# # Add your business logic here
# return "Webhook received and validated.", 200
# else:
# # Invalid signature
# print("Invalid signature.")
# abort(403, 'Invalid signature. Reject the request.')