hmac_signed_serializer.py 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738
  1. import hmac
  2. import hashlib
  3. import pickle
  4. import os
  5. import socket
  6. from kombu.serialization import register
  7. _local_secret_key = os.environ.get('MAXKB_HMAC_SIGNED_SERIALIZER_SECRET_KEY', 'default_hmac_signed_serializer_secret_key:' + os.getenv('MAXKB_VERSION', socket.gethostname()))
  8. try:
  9. from xpack import get_md5
  10. _local_secret_key = get_md5()
  11. except ImportError:
  12. pass
  13. def secure_dumps(obj):
  14. data = pickle.dumps(obj)
  15. signature = hmac.new(_local_secret_key.encode(), data, hashlib.sha256).digest()
  16. return signature + data
  17. def secure_loads(signed_data):
  18. if len(signed_data) < 32:
  19. raise ValueError("Invalid signed data packet")
  20. signature = signed_data[:32]
  21. payload = signed_data[32:]
  22. expected_signature = hmac.new(_local_secret_key.encode(), payload, hashlib.sha256).digest()
  23. if hmac.compare_digest(signature, expected_signature):
  24. return pickle.loads(payload)
  25. else:
  26. raise ValueError("Security Alert: Task signature mismatch! Potential tampering detected.")
  27. def register_hmac_signed_serializer():
  28. register(
  29. 'hmac_signed_serializer',
  30. secure_dumps,
  31. secure_loads,
  32. content_type='application/x-python-hmac-signed-serialize',
  33. content_encoding='binary'
  34. )