from __future__ import annotations
import asyncio
import os
from types import TracebackType
from typing import Any, Callable, Literal
from urllib.parse import urljoin
try:
import httpx # type: ignore[import-not-found]
except ImportError:
httpx = None # type: ignore[assignment]
from .exceptions import AnticaptchaException
from .tasks import BaseTask
SLEEP_EVERY_CHECK_FINISHED = 3
MAXIMUM_JOIN_TIME = 60 * 5
[docs]
class AsyncJob:
"""An async handle to a submitted captcha-solving task.
Returned by :meth:`AsyncAnticaptchaClient.create_task`. Use :meth:`join`
to wait for completion, then call one of the ``get_*`` methods to
retrieve the solution.
Example::
job = await client.create_task(task)
await job.join()
print(job.get_solution_response()) # for ReCAPTCHA / hCaptcha
"""
client: AsyncAnticaptchaClient
task_id: int
_last_result: dict[str, Any] | None = None
def __init__(self, client: AsyncAnticaptchaClient, task_id: int) -> None:
self.client = client
self.task_id = task_id
async def _update(self) -> None:
self._last_result = await self.client.getTaskResult(self.task_id)
[docs]
async def check_is_ready(self) -> bool:
"""Poll the API once and return whether the task is complete.
:returns: ``True`` if the solution is ready, ``False`` otherwise.
"""
await self._update()
assert self._last_result is not None
return self._last_result["status"] == "ready"
[docs]
def get_solution_response(self) -> str:
"""Return the ``gRecaptchaResponse`` token.
Use after solving ReCAPTCHA v2, ReCAPTCHA v3, or hCaptcha tasks.
Call this only after :meth:`join` has returned.
"""
assert self._last_result is not None
return self._last_result["solution"]["gRecaptchaResponse"]
[docs]
def get_solution(self) -> dict[str, Any]:
"""Return the full solution dictionary from the API response.
Useful for task types where the solution has multiple fields
(e.g. GeeTest returns ``challenge``, ``validate``, ``seccode``).
Call this only after :meth:`join` has returned.
"""
assert self._last_result is not None
return self._last_result["solution"]
[docs]
def get_token_response(self) -> str:
"""Return the ``token`` string from the solution.
Use after solving FunCaptcha tasks.
Call this only after :meth:`join` has returned.
"""
assert self._last_result is not None
return self._last_result["solution"]["token"]
[docs]
def get_answers(self) -> dict[str, str]:
"""Return the ``answers`` dictionary from the solution.
Use after solving AntiGate tasks.
Call this only after :meth:`join` has returned.
"""
assert self._last_result is not None
return self._last_result["solution"]["answers"]
[docs]
def get_captcha_text(self) -> str:
"""Return the recognized text from an image captcha.
Use after solving :class:`ImageToTextTask` tasks.
Call this only after :meth:`join` has returned.
"""
assert self._last_result is not None
return self._last_result["solution"]["text"]
[docs]
def get_cells_numbers(self) -> list[int]:
"""Return the list of selected cell numbers from a grid captcha.
Call this only after :meth:`join` has returned.
"""
assert self._last_result is not None
return self._last_result["solution"]["cellNumbers"]
[docs]
async def report_incorrect_image(self) -> bool:
"""Report that an image captcha was solved incorrectly.
:returns: ``True`` if the report was accepted.
"""
return await self.client.reportIncorrectImage(self.task_id)
[docs]
async def report_incorrect_recaptcha(self) -> bool:
"""Report that a ReCAPTCHA was solved incorrectly.
:returns: ``True`` if the report was accepted.
"""
return await self.client.reportIncorrectRecaptcha(self.task_id)
def __repr__(self) -> str:
status = self._last_result.get("status") if self._last_result else None
if status:
return f"<AsyncJob task_id={self.task_id} status={status!r}>"
return f"<AsyncJob task_id={self.task_id}>"
[docs]
async def join(
self,
maximum_time: int | None = None,
on_check: Callable[[int, str | None], None] | None = None,
backoff: bool = False,
) -> None:
"""Poll for task completion, sleeping asynchronously until ready or timeout.
:param maximum_time: Maximum seconds to wait (default: ``MAXIMUM_JOIN_TIME``).
:param on_check: Optional callback invoked after each poll with
``(elapsed_time, status)`` where *elapsed_time* is the total seconds
waited so far and *status* is the last task status string
(e.g. ``"processing"``).
:param backoff: When ``True``, use exponential backoff for polling
intervals starting at 1 second and doubling up to a 10-second cap.
Default ``False`` preserves the fixed 3-second interval.
:raises AnticaptchaException: If *maximum_time* is exceeded.
"""
elapsed_time = 0
maximum_time = maximum_time or MAXIMUM_JOIN_TIME
sleep_time = 1 if backoff else SLEEP_EVERY_CHECK_FINISHED
while not await self.check_is_ready():
await asyncio.sleep(sleep_time)
elapsed_time += sleep_time
if backoff:
sleep_time = min(sleep_time * 2, 10)
if on_check is not None and self._last_result is not None:
on_check(elapsed_time, self._last_result.get("status"))
if elapsed_time > maximum_time:
raise AnticaptchaException(
None,
250,
f"The execution time exceeded a maximum time of {maximum_time} seconds."
f" It takes {elapsed_time} seconds.",
)
[docs]
class AsyncAnticaptchaClient:
"""Asynchronous client for the Anticaptcha.com API.
Mirrors :class:`AnticaptchaClient` but all network methods are coroutines.
Requires the ``httpx`` package — install with
``pip install python-anticaptcha[async]``.
Can be used as an async context manager::
async with AsyncAnticaptchaClient("my-api-key") as client:
job = await client.create_task(task)
await job.join()
:param client_key: Your Anticaptcha API key. If omitted, the
``ANTICAPTCHA_API_KEY`` environment variable is used.
:param language_pool: Language pool for workers — ``"en"`` (default)
or ``"rn"`` (Russian).
:param host: API hostname (default: ``"api.anti-captcha.com"``).
:param use_ssl: Use HTTPS (default: ``True``).
:raises ImportError: If ``httpx`` is not installed.
:raises AnticaptchaException: If no API key is provided.
"""
client_key = None
CREATE_TASK_URL = "/createTask"
TASK_RESULT_URL = "/getTaskResult"
BALANCE_URL = "/getBalance"
REPORT_IMAGE_URL = "/reportIncorrectImageCaptcha"
REPORT_RECAPTCHA_URL = "/reportIncorrectRecaptcha"
APP_STAT_URL = "/getAppStats"
SOFT_ID = 847
language_pool = "en"
response_timeout = 5
def __init__(
self,
client_key: str | None = None,
language_pool: str = "en",
host: str = "api.anti-captcha.com",
use_ssl: bool = True,
) -> None:
if httpx is None:
raise ImportError(
"httpx is required for async support. Install it with: pip install python-anticaptcha[async]"
)
self.client_key = client_key or os.environ.get("ANTICAPTCHA_API_KEY")
if not self.client_key:
raise AnticaptchaException(
None,
"CONFIG_ERROR",
"API key required. Pass client_key or set ANTICAPTCHA_API_KEY env var.",
)
self.language_pool = language_pool
self.base_url = "{proto}://{host}/".format(proto="https" if use_ssl else "http", host=host)
self.session = httpx.AsyncClient()
async def __aenter__(self) -> AsyncAnticaptchaClient:
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> Literal[False]:
await self.session.aclose()
return False
[docs]
async def close(self) -> None:
"""Close the underlying HTTP session.
Called automatically when using the client as an async context manager.
"""
await self.session.aclose()
def __repr__(self) -> str:
from urllib.parse import urlparse
host = urlparse(self.base_url).hostname or self.base_url
return f"<AsyncAnticaptchaClient host={host!r}>"
async def _get_client_ip(self) -> str:
if not hasattr(self, "_client_ip"):
response = await self.session.get("https://api.myip.com", timeout=self.response_timeout)
self._client_ip = response.json()["ip"]
return self._client_ip
async def _check_response(self, response: dict[str, Any]) -> None:
if response.get("errorId", False) == 11:
ip = await self._get_client_ip()
response["errorDescription"] = "{} Your missing IP address is probably {}.".format(
response["errorDescription"], ip
)
if response.get("errorId", False):
raise AnticaptchaException(response["errorId"], response["errorCode"], response["errorDescription"])
[docs]
async def createTask(self, task: BaseTask) -> AsyncJob:
"""Submit a captcha task and return an :class:`AsyncJob` handle.
:param task: A task instance (e.g. :class:`NoCaptchaTaskProxylessTask`).
:returns: An :class:`AsyncJob` that can be polled with :meth:`AsyncJob.join`.
:raises AnticaptchaException: If the API returns an error.
"""
request = {
"clientKey": self.client_key,
"task": task.serialize(),
"softId": self.SOFT_ID,
"languagePool": self.language_pool,
}
response = (
await self.session.post(
urljoin(self.base_url, self.CREATE_TASK_URL),
json=request,
timeout=self.response_timeout,
)
).json()
await self._check_response(response)
return AsyncJob(self, response["taskId"])
[docs]
async def getTaskResult(self, task_id: int) -> dict[str, Any]:
"""Fetch the current result/status of a task.
:param task_id: The task ID returned when the task was created.
:returns: Raw API response dictionary with ``status`` and ``solution`` keys.
:raises AnticaptchaException: If the API returns an error.
"""
request = {"clientKey": self.client_key, "taskId": task_id}
response = (await self.session.post(urljoin(self.base_url, self.TASK_RESULT_URL), json=request)).json()
await self._check_response(response)
return response
[docs]
async def getBalance(self) -> float:
"""Return the current account balance in USD.
:returns: Account balance as a float (e.g. ``3.50``).
:raises AnticaptchaException: If the API returns an error.
"""
request = {
"clientKey": self.client_key,
"softId": self.SOFT_ID,
}
response = (await self.session.post(urljoin(self.base_url, self.BALANCE_URL), json=request)).json()
await self._check_response(response)
return response["balance"]
[docs]
async def getAppStats(self, soft_id: int, mode: str) -> dict[str, Any]:
"""Retrieve application statistics.
:param soft_id: Application ID.
:param mode: Statistics mode (e.g. ``"errors"``, ``"views"``, ``"downloads"``).
:returns: Raw API response dictionary with statistics data.
:raises AnticaptchaException: If the API returns an error.
"""
request = {"clientKey": self.client_key, "softId": soft_id, "mode": mode}
response = (await self.session.post(urljoin(self.base_url, self.APP_STAT_URL), json=request)).json()
await self._check_response(response)
return response
[docs]
async def reportIncorrectImage(self, task_id: int) -> bool:
"""Report that an image captcha was solved incorrectly.
Use this to get a refund and improve solver accuracy.
:param task_id: The task ID of the incorrectly solved task.
:returns: ``True`` if the report was accepted.
:raises AnticaptchaException: If the API returns an error.
"""
request = {"clientKey": self.client_key, "taskId": task_id}
response = (await self.session.post(urljoin(self.base_url, self.REPORT_IMAGE_URL), json=request)).json()
await self._check_response(response)
return bool(response.get("status", False))
[docs]
async def reportIncorrectRecaptcha(self, task_id: int) -> bool:
"""Report that a ReCAPTCHA was solved incorrectly.
Use this to get a refund and improve solver accuracy.
:param task_id: The task ID of the incorrectly solved task.
:returns: ``True`` if the report was accepted.
:raises AnticaptchaException: If the API returns an error.
"""
request = {"clientKey": self.client_key, "taskId": task_id}
response = (await self.session.post(urljoin(self.base_url, self.REPORT_RECAPTCHA_URL), json=request)).json()
await self._check_response(response)
return response["status"] == "success"
# Snake_case aliases
#: Alias for :meth:`createTask`.
create_task = createTask
#: Alias for :meth:`getTaskResult`.
get_task_result = getTaskResult
#: Alias for :meth:`getBalance`.
get_balance = getBalance
#: Alias for :meth:`getAppStats`.
get_app_stats = getAppStats
#: Alias for :meth:`reportIncorrectImage`.
report_incorrect_image = reportIncorrectImage
#: Alias for :meth:`reportIncorrectRecaptcha`.
report_incorrect_recaptcha = reportIncorrectRecaptcha