First draft taking code from Walter Huf and adding pieces of the JIRACookieAuth (early drafts, needs revision) to handle relogins. Original repo was for reference to Walter's code.master
#!/usr/bin/env python3 | |||||
# -*- coding: utf-8 -*- | |||||
# Standard library imports | |||||
import argparse | |||||
import errno | |||||
import logging | |||||
import os | |||||
import re | |||||
import threading | |||||
# Related third party imports (If you used pip/apt/yum to install) | |||||
import arrow | |||||
from bs4 import BeautifulSoup as bs | |||||
from expand_path import expand_path | |||||
import requests | |||||
from requests.auth import AuthBase | |||||
from shm_dict import SHMDict | |||||
__author__ = "Nate Bohman" | |||||
__license__ = "GPL-3" | |||||
__version__ = "1.1.0" | |||||
__maintainer__ = "Nate Bohman" | |||||
__email__ = "" | |||||
logger = logging.getLogger(__name__) | |||||
try: | |||||
unichr | |||||
except NameError: | |||||
unichr = chr | |||||
try: | |||||
unicode | |||||
except NameError: | |||||
unicode = str | |||||
DEFAULT_CFGDIR_BASE = expand_path("~/.config/dudesnude") | |||||
LOGIN_URL = {"default": "", | |||||
"guest": ""} | |||||
LOGIN_INPUT_RGX = re.compile(r'<input.*name="(?P<name>[^"]+)".*value="(?P<value>[^"]+)"', re.I) | |||||
class DudesNudeCookieAuth(AuthBase): | |||||
def __init__(self, email, password="", cookie_dir=DEFAULT_CFGDIR_BASE): | |||||
self.cookie_dict = None | |||||
self._cookie_dir = "" | |||||
self.cookie_dir = cookie_dir | |||||
self._email = "" | ||||| = email | |||||
self.password = password | |||||
self.session = requests.Session() | |||||
@property | |||||
def email(self): | |||||
return self._email | |||||
@email.setter | |||||
def email(self, value): | |||||
if self._email != value: | |||||
self._email = value | |||||
self.cookie_dict = SHMDict(name=self.cookie_file, | |||||
size=8192, is_file=True) | |||||
@property | |||||
def cookie_dir(self): | |||||
return self._cookie_dir | |||||
@cookie_dir.setter | |||||
def cookie_dir(self, value): | |||||
value = expand_path(value) | |||||
self._cookie_dir = value | |||||
try: | |||||
os.makedirs(value, mode=0o750) | |||||
except OSError as e: | |||||
if e.errno != errno.EEXIST: | |||||
raise | |||||
@property | |||||
def cookie_file(self): | |||||
return "{}/{}_cookies".format(self.cookie_dir, | ||||| | |||||
@property | |||||
def login_form_fields(self): | |||||
r = self.session.get(LOGIN_URL.get("default")) | |||||
self.cookie_dict['cookies'] = self.session.cookies | |||||
login_form_fields = {} | |||||
if | |||||
for login_input in LOGIN_INPUT_RGX.finditer(r.text): | |||||
login_input = login_input.groupdict() | |||||
login_form_fields[login_input.get("name")] = login_input.get("value") | |||||
return login_form_fields | |||||
@property | |||||
def login_data(self): | |||||
if == "guest": | |||||
return {'go': '1', 'iagree': '1', 'token': '', 'win_x': '1080', 'win_y': '720'} | |||||
else: | |||||
login_form_fields = self.login_form_fields | |||||
assert login_form_fields.get('s_token') is not None | |||||
assert self.cookie_dict['cookies'].get('PHPSESSID') is not None | |||||
login_data = { | |||||
'PHPSESSID': self.cookie_dict['cookies'].get('PHPSESSID'), | |||||
's_token': login_form_fields.get('s_token'), | |||||
'email':, | |||||
'pass': self.password, | |||||
'logon': '1', | |||||
'seeking': '0', # 0 -, 1 chat only, 2 hookups | |||||
'tz': 0, # Force GMT | |||||
'win_x': '1080', # Force desktop | |||||
'win_y': '720', # Force desktop | |||||
} | |||||
return login_data | |||||
def login(self): | |||||
with self.cookie_dict.exclusive_lock(): | |||||
if self.cookie_dict.get('last_refresh') is None: | |||||
log_message = "Creating cookie {}".format(self.cookie_file) | |||||
else: | |||||
log_message = "Updating cookie that was last refreshed {}".format( | |||||
self.cookie_dict.get('last_refresh').humanize()) | ||||| | |||||
r =, LOGIN_URL.get("default")), | |||||
data=self.login_data) | |||||
# Check for HTTP error codes | |||||
r.raise_for_status() | |||||
# Check for bad URL | |||||
if == "guest" and r.url != '': | |||||
raise Exception('Failure to log in') | |||||
if != "guest" and r.url == '': | |||||
raise Exception('Failure to log in') | |||||
self.cookie_dict['cookies'] = self.session.cookies | |||||
self.cookie_dict['last_refresh'] = arrow.utcnow() | |||||
def handle_relogin(self, r, **kwargs): | |||||
"""Handle logging back in if response URL indicates not logged in.""" | |||||
if not r.url == '': | |||||
return r | |||||
self.login() | |||||
def __call__(self, req): | |||||
"""Handle every call through requests to webpage.""" | |||||
# Login if cookies dict doesn't exist | |||||
if self.cookie_dict.get('last_refresh') is None: | |||||
self.login() | |||||
# Clean up, since we can't generate new cookies otherwise | |||||
if 'Cookie' in req.headers: | |||||
del req.headers['Cookie'] | |||||
# Add saved cookies to request | |||||
req.prepare_cookies(self.cookie_dict.get('cookies')) | |||||
# Register a method that is called after every request | |||||
# to ensure that we don't have to login. | |||||
req.register_hook('response', self.handle_relogin) | |||||
return req | |||||
class DudesNudeClient(object): | |||||
def __init__(self, email, password, config_dir=DEFAULT_CFGDIR_BASE): | |||||
self._config_dir = "" | |||||
self.config_dir = config_dir | |||||
#self.guest_session = requests.Session() | |||||
#self.guest_session.auth = DudesNudeCookieAuth(email="guest") | |||||
self.session = requests.Session() | |||||
self.session.auth = DudesNudeCookieAuth(email=email, password=password) |