"""Utils."""
import logging
import os
import os.path
import random
import re
import shutil
import sys
import tarfile
import tempfile
import urllib.request
from . import __version__ as version
from .errors import BadStatusLine
BASE_DIR = getattr(sys, "_MEIPASS", os.path.dirname(os.path.abspath(__file__)))
DATA_DIR = os.path.join(BASE_DIR, "data")
log = logging.getLogger(__package__)
IPPattern = re.compile(
r"(?:(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(?:25[0-5]|2[0-4]\d|[01]?\d\d?)"
)
IPv6Pattern = re.compile(
r"\s*((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?\s*"
)
IPPortPatternLine = re.compile(
r"^.*?(?P<ip>(?:(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(?:25[0-5]|2[0-4]\d|[01]?\d\d?)).*?(?P<port>\d{2,5}).*$", # noqa
flags=re.MULTILINE,
)
IPPortPatternGlobal = re.compile(
r"(?P<ip>(?:(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(?:25[0-5]|2[0-4]\d|[01]?\d\d?))" # noqa
r"(?=.*?(?:(?:(?:(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(?:25[0-5]|2[0-4]\d|[01]?\d\d?))|(?P<port>\d{2,5})))", # noqa
flags=re.DOTALL,
)
[docs]
def get_all_ip(page):
# NOTE: IPv6 addresses support need to be tested
return set(IPPattern.findall(page) + IPv6Pattern.findall(page))
[docs]
def get_status_code(resp, start=9, stop=12):
try:
if not isinstance(resp, (bytes, str)):
raise TypeError(f"{type(resp).__name__} is not supported")
code = int(resp[start:stop])
except ValueError:
return 400 # Bad Request
else:
return code
[docs]
def parse_status_line(line):
_headers = {}
is_response = line.startswith("HTTP/")
try:
if is_response: # HTTP/1.1 200 OK
version, status, *reason = line.split()
else: # GET / HTTP/1.1
method, path, version = line.split()
except ValueError as e:
raise BadStatusLine(line) from e
_headers["Version"] = version.upper()
if is_response:
_headers["Status"] = int(status)
reason = " ".join(reason)
reason = reason.upper() if reason.lower() == "ok" else reason.title()
_headers["Reason"] = reason
else:
_headers["Method"] = method.upper()
_headers["Path"] = path
if _headers["Method"] == "CONNECT":
host, port = path.split(":")
_headers["Host"], _headers["Port"] = host, int(port)
return _headers
[docs]
def update_geoip_db():
print("The update in progress, please waite for a while...")
filename = "GeoLite2-City.tar.gz"
local_file = os.path.join(DATA_DIR, filename)
city_db = os.path.join(DATA_DIR, "GeoLite2-City.mmdb")
url = "http://geolite.maxmind.com/download/geoip/database/%s" % filename
urllib.request.urlretrieve(url, local_file)
tmp_dir = tempfile.gettempdir()
with tarfile.open(name=local_file, mode="r:gz") as tf:
for tar_info in tf.getmembers():
if tar_info.name.endswith(".mmdb"):
tf.extract(tar_info, tmp_dir)
tmp_path = os.path.join(tmp_dir, tar_info.name)
shutil.move(tmp_path, city_db)
os.remove(local_file)
if os.path.exists(city_db):
print(
"The GeoLite2-City DB successfully downloaded and now you "
"have access to detailed geolocation information of the proxy."
)
else:
print("Something went wrong, please try again later.")