Coverage for telegram_webapp_auth/auth.py: 87%
166 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-02 20:25 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-02 20:25 +0000
1"""Telegram Web App Authenticator utilities."""
3import base64
4import binascii
5import dataclasses
6import hashlib
7import hmac
8import json
9import re
10import typing
11from datetime import datetime
12from datetime import timedelta
13from datetime import timezone
14from json import JSONDecodeError
15from urllib.parse import parse_qsl
17from cryptography.exceptions import InvalidSignature
18from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
20from telegram_webapp_auth.data import PROD_PUBLIC_KEY
21from telegram_webapp_auth.data import TEST_PUBLIC_KEY
22from telegram_webapp_auth.data import ChatType
23from telegram_webapp_auth.data import WebAppChat
24from telegram_webapp_auth.data import WebAppInitData
25from telegram_webapp_auth.data import WebAppUser
26from telegram_webapp_auth.errors import ExpiredInitDataError
27from telegram_webapp_auth.errors import InvalidInitDataError
29_INTEGER_RE = re.compile(r"^\d+$")
30_INIT_DATA_FIELDS = {field.name for field in dataclasses.fields(WebAppInitData)}
31_KNOWN_INIT_DATA_FIELDS = _INIT_DATA_FIELDS - {"extra"}
34def generate_secret_key(token: str) -> bytes:
35 """Generates a secret key from a Telegram token.
37 Links:
38 https://core.telegram.org/bots/webapps#validating-data-received-via-the-mini-app
40 Args:
41 token: Telegram Bot Token
43 Returns:
44 bytes: secret key
45 """
46 base = "WebAppData".encode("utf-8")
47 token_enc = token.encode("utf-8")
48 return hmac.digest(base, token_enc, hashlib.sha256)
51class TelegramAuthenticator:
52 """Telegram Web App Authenticator."""
54 def __init__(self, secret: typing.Optional[bytes] = None) -> None:
55 """Initialize the authenticator with a secret key.
57 Args:
58 secret: secret key generated from the Telegram Bot Token
59 """
60 self._secret = secret
62 @staticmethod
63 def __parse_init_data(data: str) -> dict[str, str]:
64 """Convert init_data string into dictionary.
66 Args:
67 data: the query string passed by the webapp
68 """
69 if not data: 69 ↛ 70line 69 didn't jump to line 70 because the condition on line 69 was never true
70 raise InvalidInitDataError("Init Data cannot be empty")
72 try:
73 parsed_items = parse_qsl(data, keep_blank_values=True, strict_parsing=True)
74 except ValueError as err:
75 raise InvalidInitDataError("Cannot parse init data") from err
77 parsed_data: dict[str, str] = {}
78 for key, value in parsed_items:
79 if not key: 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true
80 raise InvalidInitDataError("Init data contains an empty key")
81 if key in parsed_data:
82 raise InvalidInitDataError(f"Init data contains duplicate key: {key}")
83 parsed_data[key] = value
85 if not parsed_data: 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true
86 raise InvalidInitDataError("Init Data cannot be empty")
88 return parsed_data
90 @staticmethod
91 def __parse_json(data: str, field_name: str) -> dict[str, typing.Any]:
92 """Convert JSON string value from WebAppInitData to Python dictionary.
94 Links:
95 https://core.telegram.org/bots/webapps#webappinitdata
97 Raises:
98 InvalidInitDataError: if the JSON string cannot be decoded.
99 """
100 try:
101 parsed_json = json.loads(data)
102 except JSONDecodeError as err:
103 raise InvalidInitDataError(f"Cannot decode {field_name} data") from err
105 if not isinstance(parsed_json, dict):
106 raise InvalidInitDataError(f"{field_name} data must be a JSON object")
108 return parsed_json
110 @staticmethod
111 def __parse_int(value: typing.Optional[str], field_name: str, required: bool = True) -> typing.Optional[int]:
112 """Parse an integer field from init data."""
113 if value is None or value == "":
114 if required:
115 raise InvalidInitDataError(f"Init data does not contain {field_name}")
116 return None
118 if not _INTEGER_RE.match(value):
119 raise InvalidInitDataError(f"Invalid {field_name}")
121 return int(value)
123 @staticmethod
124 def __build_data_check_string(init_data: dict[str, str], excluded_keys: set[str]) -> str:
125 """Build a Telegram data-check-string from parsed init data."""
126 return "\n".join(
127 f"{key}={value}"
128 for key, value in sorted(init_data.items(), key=lambda item: item[0])
129 if key not in excluded_keys
130 )
132 def _validate(self, hash_: str, init_data: str) -> bool:
133 """Validates the data received from the Telegram web app, using the method from Telegram documentation.
135 Links:
136 https://core.telegram.org/bots/webapps#validating-data-received-via-the-mini-app
138 Args:
139 hash_: hash from init data
140 init_data: init data from webapp
142 Returns:
143 bool: Validation result
144 """
145 if self._secret is None:
146 raise ValueError("Telegram bot secret is required for validate()")
148 init_data_bytes = init_data.encode("utf-8")
149 client_hash = hmac.new(self._secret, init_data_bytes, hashlib.sha256).hexdigest()
150 return hmac.compare_digest(client_hash, hash_)
152 @staticmethod
153 def __ed25519_verify(
154 public_key: Ed25519PublicKey,
155 signature: bytes,
156 message: bytes,
157 ) -> bool:
158 """Verify the Ed25519 signature of the message using the public key.
160 Args:
161 public_key: public key
162 signature: signature
163 message: original message in bytes format
165 Returns:
166 bool: True if the signature is valid, False otherwise
167 """
168 try:
169 public_key.verify(signature, message)
170 return True
171 except InvalidSignature:
172 return False
174 @staticmethod
175 def _check_expiry(auth_date: int, expr_in: typing.Optional[timedelta]) -> None:
176 """Check if the auth_date is present and not expired."""
177 try:
178 auth_dt = datetime.fromtimestamp(auth_date, tz=timezone.utc)
179 except (OSError, OverflowError, ValueError) as err:
180 raise InvalidInitDataError("Invalid auth_date") from err
182 if expr_in is not None:
183 now = datetime.now(tz=timezone.utc)
184 if now - auth_dt > expr_in:
185 raise ExpiredInitDataError
187 @staticmethod
188 def __decode_signature(val: str) -> bytes:
189 """Decode a base64-encoded signature, appending padding if necessary.
191 :param val: A base64-encoded string.
192 :return: Decoded signature as bytes.
193 """
194 padded_v = val + "=" * ((4 - len(val) % 4) % 4)
196 try:
197 return base64.b64decode(padded_v.encode("ascii"), altchars=b"-_", validate=True)
198 except (binascii.Error, UnicodeEncodeError) as err:
199 raise InvalidInitDataError("Invalid signature encoding") from err
201 @staticmethod
202 def __make_user(data: typing.Optional[str], field_name: str) -> typing.Optional[WebAppUser]:
203 """Parse a WebAppUser field from init data."""
204 if not data:
205 return None
207 user_data = TelegramAuthenticator.__parse_json(data, field_name)
208 try:
209 return WebAppUser(**user_data)
210 except TypeError as err:
211 raise InvalidInitDataError(f"Invalid {field_name} data") from err
213 @staticmethod
214 def __make_chat(data: typing.Optional[str]) -> typing.Optional[WebAppChat]:
215 """Parse a WebAppChat field from init data."""
216 if not data: 216 ↛ 219line 216 didn't jump to line 219 because the condition on line 216 was always true
217 return None
219 chat_data = TelegramAuthenticator.__parse_json(data, "chat")
220 chat_type = chat_data.get("type")
221 if chat_type is not None:
222 try:
223 chat_data["type"] = ChatType(chat_type)
224 except ValueError as err:
225 raise InvalidInitDataError("Invalid chat type") from err
227 try:
228 return WebAppChat(**chat_data)
229 except TypeError as err:
230 raise InvalidInitDataError("Invalid chat data") from err
232 def __serialize_init_data(self, init_data_dict: typing.Dict[str, typing.Any]) -> WebAppInitData:
233 """Serialize the init data dictionary into WebAppInitData object.
235 Args:
236 init_data_dict: the init data dictionary
238 Returns:
239 WebAppInitData: the serialized WebAppInitData object
240 """
241 auth_date = self.__parse_int(init_data_dict.get("auth_date"), "auth_date")
242 can_send_after = self.__parse_int(init_data_dict.get("can_send_after"), "can_send_after", required=False)
244 chat_type_raw = init_data_dict.get("chat_type")
245 chat_type = None
246 if chat_type_raw:
247 try:
248 chat_type = ChatType(chat_type_raw)
249 except ValueError as err:
250 raise InvalidInitDataError("Invalid chat_type") from err
252 extra = {key: value for key, value in init_data_dict.items() if key not in _KNOWN_INIT_DATA_FIELDS}
254 return WebAppInitData(
255 auth_date=typing.cast(int, auth_date),
256 hash=init_data_dict.get("hash") or None,
257 signature=init_data_dict.get("signature") or None,
258 query_id=init_data_dict.get("query_id") or None,
259 user=self.__make_user(init_data_dict.get("user"), "user"),
260 receiver=self.__make_user(init_data_dict.get("receiver"), "receiver"),
261 chat=self.__make_chat(init_data_dict.get("chat")),
262 chat_type=chat_type,
263 chat_instance=init_data_dict.get("chat_instance") or None,
264 start_param=init_data_dict.get("start_param") or None,
265 can_send_after=can_send_after,
266 extra=extra,
267 )
269 def validate_third_party(
270 self,
271 init_data: str,
272 bot_id: int,
273 expr_in: typing.Optional[timedelta] = None,
274 is_test: bool = False,
275 ) -> WebAppInitData:
276 """Validates the data for Third-Party Use, using the method from Telegram documentation.
278 Links:
279 https://core.telegram.org/bots/webapps#validating-data-for-third-party-use
281 Args:
282 init_data: init data from mini app
283 bot_id: Telegram Bot ID
284 expr_in: time delta to check if the token is expired
285 is_test: true if the init data was issued in Telegram test environment
287 Returns:
288 WebAppInitData: parsed init a data object
290 Raises:
291 InvalidInitDataError: if the init data is invalid
292 ExpiredInitDataError: if the init data is expired
293 """
294 init_data_dict = self.__parse_init_data(init_data)
295 data_check_string = self.__build_data_check_string(init_data_dict, excluded_keys={"hash", "signature"})
296 data_check_string = f"{bot_id}:WebAppData\n{data_check_string}"
298 auth_date = self.__parse_int(init_data_dict.get("auth_date"), "auth_date")
299 self._check_expiry(typing.cast(int, auth_date), expr_in)
301 signature = init_data_dict.get("signature") or ""
302 if not signature:
303 raise InvalidInitDataError("Init data does not contain signature")
305 if is_test: 305 ↛ 306line 305 didn't jump to line 306 because the condition on line 305 was never true
306 public_key = TEST_PUBLIC_KEY
307 else:
308 public_key = PROD_PUBLIC_KEY
310 signature_bytes = self.__decode_signature(signature)
311 data_check_string_bytes = data_check_string.encode("utf-8")
312 if not self.__ed25519_verify(public_key, signature_bytes, data_check_string_bytes):
313 raise InvalidInitDataError("Invalid data")
315 return self.__serialize_init_data(init_data_dict)
317 def validate(
318 self,
319 init_data: str,
320 expr_in: typing.Optional[timedelta] = None,
321 ) -> WebAppInitData:
322 """Validates the data received via the Mini App. Returns a parsed init data object if is valid.
324 Links:
325 https://core.telegram.org/bots/webapps#validating-data-received-via-the-mini-app
327 Args:
328 init_data: init data from mini app
329 expr_in: time delta to check if the token is expired
331 Returns:
332 WebAppInitData: parsed init a data object
334 Raises:
335 InvalidInitDataError: if the init data is invalid
336 ExpiredInitDataError: if the init data is expired
337 """
338 init_data_dict = self.__parse_init_data(init_data)
339 data_check_string = self.__build_data_check_string(init_data_dict, excluded_keys={"hash"})
340 hash_ = init_data_dict.get("hash") or ""
341 if not hash_:
342 raise InvalidInitDataError("Init data does not contain hash")
344 auth_date = self.__parse_int(init_data_dict.get("auth_date"), "auth_date")
345 self._check_expiry(typing.cast(int, auth_date), expr_in)
347 if not self._validate(hash_, data_check_string):
348 raise InvalidInitDataError("Invalid data")
350 return self.__serialize_init_data(init_data_dict)