Coverage for telegram_webapp_auth/auth.py: 87%

166 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-02 20:25 +0000

1"""Telegram Web App Authenticator utilities.""" 

2 

3import base64 

4import binascii 

5import dataclasses 

6import hashlib 

7import hmac 

8import json 

9import re 

10import typing 

11from datetime import datetime 

12from datetime import timedelta 

13from datetime import timezone 

14from json import JSONDecodeError 

15from urllib.parse import parse_qsl 

16 

17from cryptography.exceptions import InvalidSignature 

18from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey 

19 

20from telegram_webapp_auth.data import PROD_PUBLIC_KEY 

21from telegram_webapp_auth.data import TEST_PUBLIC_KEY 

22from telegram_webapp_auth.data import ChatType 

23from telegram_webapp_auth.data import WebAppChat 

24from telegram_webapp_auth.data import WebAppInitData 

25from telegram_webapp_auth.data import WebAppUser 

26from telegram_webapp_auth.errors import ExpiredInitDataError 

27from telegram_webapp_auth.errors import InvalidInitDataError 

28 

29_INTEGER_RE = re.compile(r"^\d+$") 

30_INIT_DATA_FIELDS = {field.name for field in dataclasses.fields(WebAppInitData)} 

31_KNOWN_INIT_DATA_FIELDS = _INIT_DATA_FIELDS - {"extra"} 

32 

33 

34def generate_secret_key(token: str) -> bytes: 

35 """Generates a secret key from a Telegram token. 

36 

37 Links: 

38 https://core.telegram.org/bots/webapps#validating-data-received-via-the-mini-app 

39 

40 Args: 

41 token: Telegram Bot Token 

42 

43 Returns: 

44 bytes: secret key 

45 """ 

46 base = "WebAppData".encode("utf-8") 

47 token_enc = token.encode("utf-8") 

48 return hmac.digest(base, token_enc, hashlib.sha256) 

49 

50 

51class TelegramAuthenticator: 

52 """Telegram Web App Authenticator.""" 

53 

54 def __init__(self, secret: typing.Optional[bytes] = None) -> None: 

55 """Initialize the authenticator with a secret key. 

56 

57 Args: 

58 secret: secret key generated from the Telegram Bot Token 

59 """ 

60 self._secret = secret 

61 

62 @staticmethod 

63 def __parse_init_data(data: str) -> dict[str, str]: 

64 """Convert init_data string into dictionary. 

65 

66 Args: 

67 data: the query string passed by the webapp 

68 """ 

69 if not data: 69 ↛ 70line 69 didn't jump to line 70 because the condition on line 69 was never true

70 raise InvalidInitDataError("Init Data cannot be empty") 

71 

72 try: 

73 parsed_items = parse_qsl(data, keep_blank_values=True, strict_parsing=True) 

74 except ValueError as err: 

75 raise InvalidInitDataError("Cannot parse init data") from err 

76 

77 parsed_data: dict[str, str] = {} 

78 for key, value in parsed_items: 

79 if not key: 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true

80 raise InvalidInitDataError("Init data contains an empty key") 

81 if key in parsed_data: 

82 raise InvalidInitDataError(f"Init data contains duplicate key: {key}") 

83 parsed_data[key] = value 

84 

85 if not parsed_data: 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true

86 raise InvalidInitDataError("Init Data cannot be empty") 

87 

88 return parsed_data 

89 

90 @staticmethod 

91 def __parse_json(data: str, field_name: str) -> dict[str, typing.Any]: 

92 """Convert JSON string value from WebAppInitData to Python dictionary. 

93 

94 Links: 

95 https://core.telegram.org/bots/webapps#webappinitdata 

96 

97 Raises: 

98 InvalidInitDataError: if the JSON string cannot be decoded. 

99 """ 

100 try: 

101 parsed_json = json.loads(data) 

102 except JSONDecodeError as err: 

103 raise InvalidInitDataError(f"Cannot decode {field_name} data") from err 

104 

105 if not isinstance(parsed_json, dict): 

106 raise InvalidInitDataError(f"{field_name} data must be a JSON object") 

107 

108 return parsed_json 

109 

110 @staticmethod 

111 def __parse_int(value: typing.Optional[str], field_name: str, required: bool = True) -> typing.Optional[int]: 

112 """Parse an integer field from init data.""" 

113 if value is None or value == "": 

114 if required: 

115 raise InvalidInitDataError(f"Init data does not contain {field_name}") 

116 return None 

117 

118 if not _INTEGER_RE.match(value): 

119 raise InvalidInitDataError(f"Invalid {field_name}") 

120 

121 return int(value) 

122 

123 @staticmethod 

124 def __build_data_check_string(init_data: dict[str, str], excluded_keys: set[str]) -> str: 

125 """Build a Telegram data-check-string from parsed init data.""" 

126 return "\n".join( 

127 f"{key}={value}" 

128 for key, value in sorted(init_data.items(), key=lambda item: item[0]) 

129 if key not in excluded_keys 

130 ) 

131 

132 def _validate(self, hash_: str, init_data: str) -> bool: 

133 """Validates the data received from the Telegram web app, using the method from Telegram documentation. 

134 

135 Links: 

136 https://core.telegram.org/bots/webapps#validating-data-received-via-the-mini-app 

137 

138 Args: 

139 hash_: hash from init data 

140 init_data: init data from webapp 

141 

142 Returns: 

143 bool: Validation result 

144 """ 

145 if self._secret is None: 

146 raise ValueError("Telegram bot secret is required for validate()") 

147 

148 init_data_bytes = init_data.encode("utf-8") 

149 client_hash = hmac.new(self._secret, init_data_bytes, hashlib.sha256).hexdigest() 

150 return hmac.compare_digest(client_hash, hash_) 

151 

152 @staticmethod 

153 def __ed25519_verify( 

154 public_key: Ed25519PublicKey, 

155 signature: bytes, 

156 message: bytes, 

157 ) -> bool: 

158 """Verify the Ed25519 signature of the message using the public key. 

159 

160 Args: 

161 public_key: public key 

162 signature: signature 

163 message: original message in bytes format 

164 

165 Returns: 

166 bool: True if the signature is valid, False otherwise 

167 """ 

168 try: 

169 public_key.verify(signature, message) 

170 return True 

171 except InvalidSignature: 

172 return False 

173 

174 @staticmethod 

175 def _check_expiry(auth_date: int, expr_in: typing.Optional[timedelta]) -> None: 

176 """Check if the auth_date is present and not expired.""" 

177 try: 

178 auth_dt = datetime.fromtimestamp(auth_date, tz=timezone.utc) 

179 except (OSError, OverflowError, ValueError) as err: 

180 raise InvalidInitDataError("Invalid auth_date") from err 

181 

182 if expr_in is not None: 

183 now = datetime.now(tz=timezone.utc) 

184 if now - auth_dt > expr_in: 

185 raise ExpiredInitDataError 

186 

187 @staticmethod 

188 def __decode_signature(val: str) -> bytes: 

189 """Decode a base64-encoded signature, appending padding if necessary. 

190 

191 :param val: A base64-encoded string. 

192 :return: Decoded signature as bytes. 

193 """ 

194 padded_v = val + "=" * ((4 - len(val) % 4) % 4) 

195 

196 try: 

197 return base64.b64decode(padded_v.encode("ascii"), altchars=b"-_", validate=True) 

198 except (binascii.Error, UnicodeEncodeError) as err: 

199 raise InvalidInitDataError("Invalid signature encoding") from err 

200 

201 @staticmethod 

202 def __make_user(data: typing.Optional[str], field_name: str) -> typing.Optional[WebAppUser]: 

203 """Parse a WebAppUser field from init data.""" 

204 if not data: 

205 return None 

206 

207 user_data = TelegramAuthenticator.__parse_json(data, field_name) 

208 try: 

209 return WebAppUser(**user_data) 

210 except TypeError as err: 

211 raise InvalidInitDataError(f"Invalid {field_name} data") from err 

212 

213 @staticmethod 

214 def __make_chat(data: typing.Optional[str]) -> typing.Optional[WebAppChat]: 

215 """Parse a WebAppChat field from init data.""" 

216 if not data: 216 ↛ 219line 216 didn't jump to line 219 because the condition on line 216 was always true

217 return None 

218 

219 chat_data = TelegramAuthenticator.__parse_json(data, "chat") 

220 chat_type = chat_data.get("type") 

221 if chat_type is not None: 

222 try: 

223 chat_data["type"] = ChatType(chat_type) 

224 except ValueError as err: 

225 raise InvalidInitDataError("Invalid chat type") from err 

226 

227 try: 

228 return WebAppChat(**chat_data) 

229 except TypeError as err: 

230 raise InvalidInitDataError("Invalid chat data") from err 

231 

232 def __serialize_init_data(self, init_data_dict: typing.Dict[str, typing.Any]) -> WebAppInitData: 

233 """Serialize the init data dictionary into WebAppInitData object. 

234 

235 Args: 

236 init_data_dict: the init data dictionary 

237 

238 Returns: 

239 WebAppInitData: the serialized WebAppInitData object 

240 """ 

241 auth_date = self.__parse_int(init_data_dict.get("auth_date"), "auth_date") 

242 can_send_after = self.__parse_int(init_data_dict.get("can_send_after"), "can_send_after", required=False) 

243 

244 chat_type_raw = init_data_dict.get("chat_type") 

245 chat_type = None 

246 if chat_type_raw: 

247 try: 

248 chat_type = ChatType(chat_type_raw) 

249 except ValueError as err: 

250 raise InvalidInitDataError("Invalid chat_type") from err 

251 

252 extra = {key: value for key, value in init_data_dict.items() if key not in _KNOWN_INIT_DATA_FIELDS} 

253 

254 return WebAppInitData( 

255 auth_date=typing.cast(int, auth_date), 

256 hash=init_data_dict.get("hash") or None, 

257 signature=init_data_dict.get("signature") or None, 

258 query_id=init_data_dict.get("query_id") or None, 

259 user=self.__make_user(init_data_dict.get("user"), "user"), 

260 receiver=self.__make_user(init_data_dict.get("receiver"), "receiver"), 

261 chat=self.__make_chat(init_data_dict.get("chat")), 

262 chat_type=chat_type, 

263 chat_instance=init_data_dict.get("chat_instance") or None, 

264 start_param=init_data_dict.get("start_param") or None, 

265 can_send_after=can_send_after, 

266 extra=extra, 

267 ) 

268 

269 def validate_third_party( 

270 self, 

271 init_data: str, 

272 bot_id: int, 

273 expr_in: typing.Optional[timedelta] = None, 

274 is_test: bool = False, 

275 ) -> WebAppInitData: 

276 """Validates the data for Third-Party Use, using the method from Telegram documentation. 

277 

278 Links: 

279 https://core.telegram.org/bots/webapps#validating-data-for-third-party-use 

280 

281 Args: 

282 init_data: init data from mini app 

283 bot_id: Telegram Bot ID 

284 expr_in: time delta to check if the token is expired 

285 is_test: true if the init data was issued in Telegram test environment 

286 

287 Returns: 

288 WebAppInitData: parsed init a data object 

289 

290 Raises: 

291 InvalidInitDataError: if the init data is invalid 

292 ExpiredInitDataError: if the init data is expired 

293 """ 

294 init_data_dict = self.__parse_init_data(init_data) 

295 data_check_string = self.__build_data_check_string(init_data_dict, excluded_keys={"hash", "signature"}) 

296 data_check_string = f"{bot_id}:WebAppData\n{data_check_string}" 

297 

298 auth_date = self.__parse_int(init_data_dict.get("auth_date"), "auth_date") 

299 self._check_expiry(typing.cast(int, auth_date), expr_in) 

300 

301 signature = init_data_dict.get("signature") or "" 

302 if not signature: 

303 raise InvalidInitDataError("Init data does not contain signature") 

304 

305 if is_test: 305 ↛ 306line 305 didn't jump to line 306 because the condition on line 305 was never true

306 public_key = TEST_PUBLIC_KEY 

307 else: 

308 public_key = PROD_PUBLIC_KEY 

309 

310 signature_bytes = self.__decode_signature(signature) 

311 data_check_string_bytes = data_check_string.encode("utf-8") 

312 if not self.__ed25519_verify(public_key, signature_bytes, data_check_string_bytes): 

313 raise InvalidInitDataError("Invalid data") 

314 

315 return self.__serialize_init_data(init_data_dict) 

316 

317 def validate( 

318 self, 

319 init_data: str, 

320 expr_in: typing.Optional[timedelta] = None, 

321 ) -> WebAppInitData: 

322 """Validates the data received via the Mini App. Returns a parsed init data object if is valid. 

323 

324 Links: 

325 https://core.telegram.org/bots/webapps#validating-data-received-via-the-mini-app 

326 

327 Args: 

328 init_data: init data from mini app 

329 expr_in: time delta to check if the token is expired 

330 

331 Returns: 

332 WebAppInitData: parsed init a data object 

333 

334 Raises: 

335 InvalidInitDataError: if the init data is invalid 

336 ExpiredInitDataError: if the init data is expired 

337 """ 

338 init_data_dict = self.__parse_init_data(init_data) 

339 data_check_string = self.__build_data_check_string(init_data_dict, excluded_keys={"hash"}) 

340 hash_ = init_data_dict.get("hash") or "" 

341 if not hash_: 

342 raise InvalidInitDataError("Init data does not contain hash") 

343 

344 auth_date = self.__parse_int(init_data_dict.get("auth_date"), "auth_date") 

345 self._check_expiry(typing.cast(int, auth_date), expr_in) 

346 

347 if not self._validate(hash_, data_check_string): 

348 raise InvalidInitDataError("Invalid data") 

349 

350 return self.__serialize_init_data(init_data_dict)