|
- """
- Direct Messages Archiver
- Usage:
- >>> from dmarchiver.core import Crawler
- >>> crawler = Crawler()
- >>> crawler.authenticate('username', 'password')
- >>> crawler.crawl('conversation_id')
- """
- import collections
- import datetime
- from enum import Enum
- import os
- import pickle
- import re
- import shutil
- from sys import platform
- import time
- import lxml.html
- import requests
- import traceback
- from ratelimit import limits
- import random
- from json import dump as json_dump
- API_LIMIT = 900
- API_RESET = 900
- DEFAULT_BEARER_TOKEN = 'AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA'
- __all__ = ['Crawler']
- def expand_url(url):
- """Return the expanded URL behind a short link"""
- response = requests.get(url, allow_redirects=False)
- return response.headers['location']
- class Conversation(object):
- """This class is a representation of a complete conversation"""
- conversation_id = None
- tweets = collections.OrderedDict()
- def __init__(self, conversation_id):
- self.tweets = collections.OrderedDict()
- self.conversation_id = conversation_id
- def print_conversation(self):
- """Print the conversation in the console"""
- items = list(self.tweets.items())
- items.reverse()
- for tweet in items:
- if type(tweet[1]).__name__ == 'DirectMessage':
- irc_formatted_date = datetime.datetime.fromtimestamp(
- int(tweet[1].time_stamp)).strftime('%Y-%m-%d %H:%M:%S')
- print(
- '[{0}] <{1}> '.format(
- irc_formatted_date,
- tweet[1].author),
- end='')
- for element in tweet[1].elements:
- print('{0} '.format(element), end='')
- print('\r')
- elif type(tweet[1]).__name__ == 'DMConversationEntry':
- print('[DMConversationEntry] {0}\r'.format(tweet[1]))
- def write_conversation(self, filename, max_id):
- """Write the content of the conversation to a file"""
- file_buffer = ''
- items = list(self.tweets.items())
- items.reverse()
- for tweet in items:
- if type(tweet[1]).__name__ == 'DirectMessage':
- irc_formatted_date = datetime.datetime.fromtimestamp(
- int(tweet[1].time_stamp)).strftime('%Y-%m-%d %H:%M:%S')
- file_buffer += '[{0}] <{1}> '.format(
- irc_formatted_date, tweet[1].author)
- for element in tweet[1].elements:
-
-
- file_buffer += '{0} '.format(
- element).replace('\n', os.linesep)
-
- file_buffer = file_buffer[:-1]
-
- file_buffer += '{0}'.format(os.linesep)
- elif type(tweet[1]).__name__ == 'DMConversationEntry':
- file_buffer += '[DMConversationEntry] {0}{1}'.format(
- tweet[1], os.linesep)
-
- if len(items) > 0:
- file_buffer += '[LatestTweetID] {0}{1}'.format(
- tweet[1].tweet_id, os.linesep)
- if max_id != '0':
- with open(filename, 'rb+') as file:
- lines = file.readlines()
-
-
- lines = lines[:-1]
- file.seek(0)
- file.write(b''.join(lines))
- file.truncate()
- file_mode = "ab"
- if max_id == '0':
- file_mode = "wb"
- with open(filename, file_mode) as file:
- file.write(file_buffer.encode('UTF-8'))
- class DMConversationEntry(object):
- """This class is a representation of a DMConversationEntry.
- It could be a when a new user join the group, when
- the group is renamed or the picture updated.
- """
- tweet_id = ''
- _text = ''
- def __init__(self, tweet_id, text):
- self.tweet_id = tweet_id
- self._text = text.strip()
- def __str__(self):
- return self._text
- class DirectMessage(object):
- """This class is a representation of a Direct Message (a tweet)"""
- tweet_id = ''
- time_stamp = ''
- author = ''
- elements = []
- def __init__(self, tweet_id, time_stamp, author):
- self.tweet_id = tweet_id
- self.time_stamp = time_stamp
- self.author = author
- class DirectMessageText(object):
- """ This class is a representation of simple text message.
- This is an "element" of the Direct Message.
- """
- _text = ''
- def __init__(self, text):
- self._text = text
- def __str__(self):
- return self._text
- class DirectMessageTweet(object):
- """ This class is a representation of a quoted tweet.
- This is an "element" of the Direct Message.
- """
- _tweet_url = ''
- def __init__(self, tweet_url):
- self._tweet_url = tweet_url
- def __str__(self):
- return '[Tweet] {0}'.format(self._tweet_url)
- class MediaType(Enum):
- """ This class is a representation of the possible media types."""
- image = 1
- gif = 2
- video = 3
- sticker = 4
- unknown = 5
- class DirectMessageMedia(object):
- """ This class is a representation of a embedded media.
- This is an "element" of the Direct Message.
- """
- _media_preview_url = ''
- _media_url = ''
- _media_alt = ''
- _media_type = ''
- _media_replace_url = ''
- def __init__(self, media_url, media_preview_url, media_type, media_replace_url):
- self._media_url = media_url
- self._media_preview_url = media_preview_url
- self._media_type = media_type
- self._media_replace_url = media_replace_url
- def __repr__(self):
-
- return "{0}('{1}','{2}','{3}')".format(
- self.__class__.__name__,
- self._media_url,
- self._media_preview_url,
- self._media_replace_url)
- def __str__(self):
- if self._media_preview_url != '':
- return '[Media-{0}] {1} [Media-preview] {2}'.format(
- self._media_type.name, self._media_url, self._media_preview_url)
- else:
- return '[Media-{0}] {1}'.format(
- self._media_type.name, self._media_url)
- class Crawler(object):
- """ This class is a main component of the tool.
- It allows to create an authentication session,
- retrieve the conversation list and loop to gather all the tweets.
- """
- _twitter_base_url = 'https://twitter.com'
- _referer_url = 'https://twitter.com/messages/{}'
- _bearer_token_url = 'https://abs.twimg.com/responsive-web/client-web/main.05e1f885.js'
- _api_url = 'https://api.twitter.com'
- _user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.89 Safari/537.36'
- if platform == 'darwin':
- _user_agent = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13) AppleWebKit/603.1.13 (KHTML, like Gecko) Version/10.1 Safari/603.1.13'
- elif platform == 'linux' or platform == 'linux2':
- _user_agent = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36'
- _http_headers = {
- 'User-Agent': _user_agent}
- _login_headers = {
- 'User-Agent': _user_agent,
- 'Referer': 'https://mobile.twitter.com/login'}
- _ajax_headers = {
- 'user-agent': _user_agent,
- 'accept': '*/*',
- 'accept-encoding': 'gzip, deflate, br',
- 'referer': 'https://mobile.twitter.com',
- 'x-twitter-active-user': 'yes',
- 'origin': 'https://twitter.com',
- 'accept-language': 'en-US,en-GB;q=0.9,en;q=0.8'}
- _api_headers = {
- 'User-Agent': _user_agent,
- 'Accept': '*/*',
- 'Accept-Encoding': 'gzip, deflate, br',
- 'Accept-Language': 'en-US,en-GB;q=0.9,en;q=0.8',
- 'Origin': 'https://twitter.com',
- 'Sec-Fetch-Dest': 'empty',
- 'Sec-Fetch-Site': 'same-site',
- 'X-Twitter-Active-User': 'yes',
- 'X-Twitter-Auth-Type': 'OAuth2Session',
- 'X-Twitter-Client-Language': 'en'}
- _max_id_found = False
- _session = None
- def authenticate(self, username, password, save_session, raw_output, mfa_token=None):
- force_nojs = 'https://mobile.twitter.com/i/nojs_router?path=%2Flogin'
- login_url = 'https://mobile.twitter.com/login'
- mfa_url = 'https://mobile.twitter.com/account/login_verification'
- sessions_url = 'https://mobile.twitter.com/sessions'
- messages_url = self._twitter_base_url + '/messages'
- if save_session:
- try:
- with open('dmarchiver_session.dat', 'rb') as file:
- self._session = pickle.load(file)
- print('dmarchiver_session.dat found. Reusing a previous session, ignoring the provided credentials.')
-
- response = self._session.get(messages_url, headers=self._http_headers, allow_redirects=False)
- if response.status_code == 200:
- return
- else:
- self._session = None
- print('Previous session is invalid. Creating a new session with provided credentials.')
- except FileNotFoundError:
- print('dmarchiver_session.dat not found. Creating a new session with provided credentials.')
- if save_session is False or self._session is None:
- self._session = requests.Session()
- if raw_output:
- raw_output_file = open(
- 'authentication-{0}.txt'.format(username), 'wb')
- response = self._session.post(
- force_nojs,
- headers=self._login_headers)
- if raw_output:
- raw_output_file.write(response.content)
- raw_output_file.close()
- document = lxml.html.document_fromstring(response.content)
- authenticity_token = document.xpath(
- '//input[@name="authenticity_token"]/@value')[0]
- payload = {'session[username_or_email]': str(username),
- 'session[password]': password,
- 'authenticity_token': authenticity_token}
- response = self._session.post(
- sessions_url,
- headers=self._ajax_headers,
- params=payload)
- if mfa_token is not None and 'auth_token' not in dict(self._session.cookies):
- document = lxml.html.document_fromstring(response.content)
- challenge_id = document.xpath('//input[@name="challenge_id"]/@value')[0]
- user_id = document.xpath('//input[@name="user_id"]/@value')[0]
- payload = {
- 'challenge_type': 'Totp',
- 'user_id': user_id,
- 'platform': 'web',
- 'challenge_response': str(mfa_token),
- 'challenge_id': challenge_id,
- 'authenticity_token': authenticity_token}
- response = self._session.post(mfa_url, headers=self._ajax_headers, params=payload)
- cookies = requests.utils.dict_from_cookiejar(self._session.cookies)
- if 'auth_token' in cookies:
- print('Authentication succeedeed.{0}'.format(os.linesep))
- if save_session:
-
- with open('dmarchiver_session.dat', "wb") as file:
- pickle.dump(self._session, file)
- else:
- raise PermissionError(
- 'Your username or password was invalid. Note: DMArchiver supports multi-factor authentication (provided at command-line), but not application passwords.')
- def _get_bearer_token(self):
- try:
- response = self._session.get(self._bearer_token_url)
- return 'Bearer {}'.format(re.findall('(AAAAAA.*?)\"',str(response.content))[0])
- except:
- return 'Bearer {}'.format(DEFAULT_BEARER_TOKEN)
- def _cookie_string(self):
- cookies = dict(self._session.cookies)
- csrf_token = ''.join(random.choice('1234567890abcdef') for i in range(32))
- cookies['ct0'] = csrf_token
- self._api_headers['x-csrf-token'] = csrf_token
- self._api_headers['Authorization'] = self._get_bearer_token()
- return "; ".join([str(x)+"="+str(y) for x,y in cookies.items()])
- def get_threads(self, delay, raw_output):
- threads = []
- messages_url = self._twitter_base_url + '/messages'
- payload = {}
- first_request = False
- if raw_output:
- raw_output_file = open(
- 'conversation-list.txt', 'wb')
- while True:
- response = self._session.get(
- messages_url,
- headers=self._ajax_headers,
- params=payload)
- if raw_output:
- raw_output_file.write(response.content)
- json = response.json()
- if 'errors' in json:
- print('An error occured during the parsing of the conversions.\n')
- if json['errors'][0]['code'] == 326:
- print('''DMArchiver was identified as suspicious and your account as been temporarily locked by Twitter.
- Don\'t worry, you can unlock your account by following the intructions on the Twitter website.
- Maybe it\'s the first time you use it or maybe you have a lot of messages.
- You can unlock your account and try again, and possibly use the -d option to slow down the tool.\n''')
- print('''Twitter error details below:
- Code {0}: {1}\n'''.format(json['errors'][0]['code'], json['errors'][0]['message']))
- raise Exception('Stopping execution due to parsing error while retrieving the conversations')
- try:
- if first_request is False:
- first_request = True
- threads += json['inner']['trusted']['threads']
- if json['inner']['trusted']['has_more'] is False:
- break
- payload = {'is_trusted': 'true', 'max_entry_id': json[
- 'inner']['trusted']['min_entry_id']}
- messages_url = self._twitter_base_url + '/inbox/paginate?is_trusted=true&max_entry_id=' + \
- json['inner']['trusted']['min_entry_id']
- else:
- if json['trusted']['is_empty'] is True:
- break
- threads += json['trusted']['threads']
- if json['trusted']['has_more'] is False:
- break
- payload = {'is_trusted': 'true',
- 'max_entry_id': json['trusted']['min_entry_id']}
- messages_url = self._twitter_base_url + '/inbox/paginate?is_trusted=true&max_entry_id=' + \
- json['trusted']['min_entry_id']
- except KeyError as ex:
- print(
- 'Unable to fully parse the list of the conversations.\n \
- Maybe your account is locked or Twitter has updated the HTML code.\n \
- Use -r to get the raw output and post an issue on GitHub.\n \
- Exception: {0}'.format(str(ex)))
- break
- time.sleep(delay)
- if raw_output:
- raw_output_file.close()
- return threads
- def _get_latest_tweet_id(self, thread_id):
- filename = '{0}.txt'.format(thread_id)
- try:
- with open(filename, 'rb+') as file:
- lines = file.readlines()
- regex = r"^\[LatestTweetID\] ([0-9]+)"
- result = re.match(regex, lines[-1].decode('utf-8'))
- if result:
- print('Latest tweet ID found in previous dump. Incremental update.')
- return result.group(1)
- else:
- print(
- 'Latest tweet ID not found in previous dump. Creating a new one with incremental support.')
- except IOError:
- print(
- "Previous conversation not found. Creating a new one with incremental support.")
- return '0'
- def _get_media_url(self, variants):
- return sorted(variants, key = lambda i: i['bitrate'] if 'bitrate' in i else -1, reverse=True)[0]['url']
- def _parse_dm_media(self, type, media, tweet_id, time_stamp, download):
- media_url = ''
- media_preview_url = ''
- media_alt = ''
- media_replace_url = ''
- media_type = MediaType.unknown
- formatted_timestamp = datetime.datetime.fromtimestamp(
- int(time_stamp)).strftime('%Y%m%d-%H%M%S')
- self._session.headers.update({'Referer': 'https://twitter.com/?lang=en'})
- media_replace_url = media['expanded_url']
- if type == 'photo':
- media_url = media['media_url_https']
- media_filename_re = re.findall(r'/\d+/(.+)/(.+)$', media_url)
- media_sticker_filename_re = re.findall(
- '/stickers/stickers/(.+)$', media_url)
- if len(media_filename_re) > 0:
- media_type = MediaType.image
- media_filename = '{0}-{1}-{2}-{3}'.format(
- formatted_timestamp, tweet_id, media_filename_re[0][0], media_filename_re[0][1])
- elif len(media_sticker_filename_re) > 0:
-
- media_type = MediaType.sticker
- media_filename = 'sticker-' + media_sticker_filename_re[0]
- else:
-
- print("Unknown media type")
- if media_filename is not None and download:
- response = self._session.get(media_url, headers=self._api_headers, stream=True)
- while response.status_code == 429:
- time.sleep(60)
- response = self._session.get(media_url, headers=self._api_headers, stream=True)
- if response.status_code == 200:
- os.makedirs(
- '{0}/images'.format(self._conversation_id), exist_ok=True)
- with open('{0}/images/{1}'.format(self._conversation_id, media_filename), 'wb') as file:
- file.write(response.content)
- elif type == 'animated_gif':
- media_type = MediaType.gif
- media_preview_url = media['media_url_https']
- media_url = self._get_media_url(media['video_info']['variants'])
- media_filename_re = re.findall(r'dm_gif/(.+)/(.+)$', media_url)
- media_filename = '{0}-{1}-{2}'.format(formatted_timestamp, media_filename_re[0][
- 0], media_filename_re[0][1])
- if download:
- response = self._session.get(media_url, stream=True)
- if response.status_code == 200:
- os.makedirs(
- '{0}/mp4-gifs'.format(self._conversation_id), exist_ok=True)
- with open('{0}/mp4-gifs/{1}'.format(self._conversation_id, media_filename), 'wb') as file:
- file.write(response.content)
- elif type == 'video':
- media_type = MediaType.video
- media_preview_url = media['media_url_https']
- media_url = self._get_media_url(media['video_info']['variants'])
- media_filename = '{0}-{1}.mp4'.format(
- formatted_timestamp, tweet_id)
- if download:
- response = self._session.get(media_url, stream=True)
- if response.status_code == 200:
- os.makedirs(
- '{0}/mp4-videos'.format(self._conversation_id), exist_ok=True)
- with open('{0}/mp4-videos/{1}'.format(self._conversation_id, media_filename), 'wb') as file:
- file.write(response.content)
- else:
- print('Unknown media')
- return DirectMessageMedia(media_url, media_preview_url, media_type, media_replace_url)
- def _process_tweets(self, tweets, users, download, max_id):
- conversation_set = {}
- for tweet_container in tweets:
- try:
- for type, t in tweet_container.items():
- tweet_type = type
- tweet_id = t['id']
- tweet = t
- if tweet_id == max_id:
- self._max_id_found = True
- print('Previous tweet limit found.')
- break
- time_stamp = tweet['time'][:10]
- if tweet_type == 'conversation_name_update':
- dm_author = tweet['by_user_id']
- dm_author_name = users[dm_author]['screen_name']
- text = '{} changed the group name to {}'.format(
- dm_author_name,
- tweet['conversation_name'])
- dm_author_name = 'DMConversationEntry'
- elif tweet_type == 'join_conversation' or tweet_type == 'participants_join':
- dm_author = tweet['sender_id']
- dm_author_name = users[dm_author]['screen_name']
- joiners = [users[user['user_id']]['screen_name'] for user in tweet['participants']]
- text = '{} added {}.'.format(dm_author_name, ', '.join(joiners))
- dm_author_name = 'DMConversationEntry'
- elif tweet_type == 'leave_conversation' or tweet_type == 'participants_leave':
- leavers = [users[user['user_id']]['screen_name'] for user in tweet['participants']]
- text = '{} left.'.format(', '.join(leavers))
- dm_author_name = 'DMConversationEntry'
- elif tweet_type == 'message':
- dm_author = tweet['message_data']['sender_id']
- dm_author_name = users[dm_author]['screen_name']
- msg = tweet['message_data']
- text = msg['text']
- if 'entities' in msg and 'urls' in msg['entities']:
- for url in msg['entities']['urls']:
- text = text.replace(url['url'], url['expanded_url'])
- if 'attachment' in msg:
- for k, v in msg['attachment'].items():
- if k == 'tweet':
- element = DirectMessageTweet(v['expanded_url'])
- text = text.replace(element._tweet_url, str(element))
- else:
- element = self._parse_dm_media(k, v, tweet_id, time_stamp, download[k])
- text = text.replace(element._media_replace_url, str(element))
- else:
- raise Exception
- message = DirectMessage(tweet_id, time_stamp, dm_author_name)
- message.elements = [DirectMessageText(text)]
- except KeyboardInterrupt:
- print(
- 'Script execution interruption requested. Writing the conversation.')
- self._max_id_found = True
- break
- except Exception as ex:
- print(
- 'Unexpected error \'{0}\' for tweet \'{1}\', raw JSON will be used for the tweet.'.format(ex, tweet_id))
- traceback.print_exc()
- message = DMConversationEntry(
- tweet_id, '[ParseError] Parsing of tweet \'{0}\' failed. Raw JSON: {1}'.format(
- tweet_id, tweet))
- if message is not None:
- conversation_set[tweet_id] = message
- return conversation_set
- @limits(calls=API_LIMIT, period=API_RESET)
- def _api_call(self, url, headers, payload):
- return self._session.get(url, headers=headers, params=payload)
- def crawl(
- self,
- conversation_id,
- delay=0,
- download_images=False,
- download_gifs=False,
- download_videos=False,
- raw_output=False):
- raw_output_file = None
- if raw_output:
- raw_output_file = open(
- '{0}-raw.txt'.format(conversation_id), 'wb')
- print('{0}Starting crawl of \'{1}\''.format(
- os.linesep, conversation_id))
-
- max_id = self._get_latest_tweet_id(conversation_id)
- payload = {}
- self._conversation_id = conversation_id
- conversation = Conversation(conversation_id)
- conversation_url = '{}/1.1/dm/conversation/{}.json'.format(self._api_url, conversation_id)
- self._api_headers['referer'] = self._referer_url.format(conversation_id)
- self._api_headers['cookie'] = self._cookie_string()
- processed_tweet_counter = 0
- try:
- while True and self._max_id_found is False:
- response = self._api_call(conversation_url, self._api_headers, payload)
- json = response.json()
- if 'conversation_timeline' not in json:
- print('An error occured during the parsing of the tweets.\n')
- if json['errors'][0]['code'] == 326:
- print('''DMArchiver was identified as suspicious and your account as been temporarily locked by Twitter.
- Don\'t worry, you can unlock your account by following the intructions on the Twitter website.
- Maybe it\'s the first time you use it or maybe you have a lot of messages.
- You can unlock your account and try again, and possibly use the -d option to slow down the tool.\n''')
- print('''Twitter error details below:
- Code {0}: {1}\n'''.format(json['errors'][0]['code'], json['errors'][0]['message']))
- raise Exception('Stopping execution due to parsing error while retrieving the tweets.')
- json = json['conversation_timeline']
- payload = {'max_id': json['min_entry_id']}
- tweets = json['entries']
- users = json['users']
- if raw_output:
- json_dump(json, raw_output_file)
-
- conversation_set = self._process_tweets(
- tweets, users,
- {'photo': download_images, 'animated_gif': download_gifs, 'video': download_videos},
- max_id)
-
- for tweet_id in conversation_set:
- processed_tweet_counter += 1
- conversation.tweets[tweet_id] = conversation_set[tweet_id]
- print('Processed tweets: {0}\r'.format(
- processed_tweet_counter), end='')
- if json['status'] == 'AT_END':
- print('Begin of thread reached')
- break
- time.sleep(delay)
- except KeyboardInterrupt:
- print(
- 'Script execution interruption requested. Writing this conversation.')
- if raw_output:
- raw_output_file.close()
- print('Total processed tweets: {0}'.format(processed_tweet_counter))
-
-
- print('Writing conversation to {0}.txt'.format(
- os.path.join(os.getcwd(), conversation_id)))
- conversation.write_conversation(
- '{0}.txt'.format(conversation_id), max_id)
- self._max_id_found = False
|