encrypt.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. import os
  4. from typing import Any
  5. from cryptography.hazmat.backends.openssl import backend
  6. from cryptography.hazmat.primitives import padding
  7. from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
  8. from itsdangerous import URLSafeSerializer
  9. from common.log import log
  10. class AESCipher:
  11. def __init__(self, key: bytes | str):
  12. """
  13. :param key: 密钥,16/24/32 bytes 或 16 进制字符串
  14. """
  15. self.key = key if isinstance(key, bytes) else bytes.fromhex(key)
  16. def encrypt(self, plaintext: bytes | str) -> bytes:
  17. """
  18. AES 加密
  19. :param plaintext: 加密前的明文
  20. :return:
  21. """
  22. if not isinstance(plaintext, bytes):
  23. plaintext = str(plaintext).encode('utf-8')
  24. iv = os.urandom(16)
  25. cipher = Cipher(algorithms.AES(self.key), modes.CBC(iv), backend=backend)
  26. encryptor = cipher.encryptor()
  27. padder = padding.PKCS7(cipher.algorithm.block_size).padder() # type: ignore
  28. padded_plaintext = padder.update(plaintext) + padder.finalize()
  29. ciphertext = encryptor.update(padded_plaintext) + encryptor.finalize()
  30. return iv + ciphertext
  31. def decrypt(self, ciphertext: bytes | str) -> str:
  32. """
  33. AES 解密
  34. :param ciphertext: 解密前的密文, bytes 或 16 进制字符串
  35. :return:
  36. """
  37. ciphertext = ciphertext if isinstance(ciphertext, bytes) else bytes.fromhex(ciphertext)
  38. iv = ciphertext[:16]
  39. ciphertext = ciphertext[16:]
  40. cipher = Cipher(algorithms.AES(self.key), modes.CBC(iv), backend=backend)
  41. decryptor = cipher.decryptor()
  42. unpadder = padding.PKCS7(cipher.algorithm.block_size).unpadder() # type: ignore
  43. padded_plaintext = decryptor.update(ciphertext) + decryptor.finalize()
  44. plaintext = unpadder.update(padded_plaintext) + unpadder.finalize()
  45. return plaintext.decode('utf-8')
  46. class Md5Cipher:
  47. @staticmethod
  48. def encrypt(plaintext: bytes | str) -> str:
  49. """
  50. MD5 加密
  51. :param plaintext: 加密前的明文
  52. :return:
  53. """
  54. import hashlib
  55. md5 = hashlib.md5()
  56. if not isinstance(plaintext, bytes):
  57. plaintext = str(plaintext).encode('utf-8')
  58. md5.update(plaintext)
  59. return md5.hexdigest()
  60. class ItsDCipher:
  61. def __init__(self, key: bytes | str):
  62. """
  63. :param key: 密钥,16/24/32 bytes 或 16 进制字符串
  64. """
  65. self.key = key if isinstance(key, bytes) else bytes.fromhex(key)
  66. def encrypt(self, plaintext: Any) -> str:
  67. """
  68. ItsDangerous 加密 (可能失败,如果 plaintext 无法序列化,则会加密为 MD5)
  69. :param plaintext: 加密前的明文
  70. :return:
  71. """
  72. serializer = URLSafeSerializer(self.key)
  73. try:
  74. ciphertext = serializer.dumps(plaintext)
  75. except Exception as e:
  76. log.error(f'ItsDangerous encrypt failed: {e}')
  77. ciphertext = Md5Cipher.encrypt(plaintext)
  78. return ciphertext
  79. def decrypt(self, ciphertext: str) -> Any:
  80. """
  81. ItsDangerous 解密 (可能失败,如果 ciphertext 无法反序列化,则解密失败, 返回原始密文)
  82. :param ciphertext: 解密前的密文
  83. :return:
  84. """
  85. serializer = URLSafeSerializer(self.key)
  86. try:
  87. plaintext = serializer.loads(ciphertext)
  88. except Exception as e:
  89. log.error(f'ItsDangerous decrypt failed: {e}')
  90. plaintext = ciphertext
  91. return plaintext