From f89924992729e201f3b6495f4410cfaf015e4c18 Mon Sep 17 00:00:00 2001 From: Ross Smith II Date: Mon, 18 Apr 2022 18:38:34 -0700 Subject: [PATCH] Retrieve messages using threads API --- simplegmail/gmail.py | 155 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 147 insertions(+), 8 deletions(-) diff --git a/simplegmail/gmail.py b/simplegmail/gmail.py index 2c9aba7..e850cd5 100644 --- a/simplegmail/gmail.py +++ b/simplegmail/gmail.py @@ -58,6 +58,9 @@ class Gmail(object): # https://developers.google.com/gmail/api/quickstart/python # Make sure the client secret file is in the root directory of your app. + MESSAGE_MODE = 'messages' + THREAD_MODE = 'threads' + def __init__( self, client_secret_file: str = 'client_secret.json', @@ -66,6 +69,7 @@ def __init__( ) -> None: self.client_secret_file = client_secret_file self.creds_file = creds_file + self._mode = self.MESSAGE_MODE try: # The file gmail_token.json stores the user's access and refresh @@ -156,8 +160,7 @@ def send_message( ) try: - req = self.service.users().messages().send(userId='me', body=msg) - res = req.execute() + res = self.service.users().messages().send(userId='me', body=msg).execute() return self._build_message_from_ref(user_id, res, 'reference') except HttpError as error: @@ -509,20 +512,29 @@ def get_messages( ] try: - response = self.service.users().messages().list( + if self._mode == self.MESSAGE_MODE: + api = self.service.users().messages() + else: + api = self.service.users().threads() + response = api.list( userId=user_id, q=query, labelIds=labels_ids, includeSpamTrash=include_spam_trash ).execute() - message_refs = [] - if 'messages' in response: # ensure request was successful - message_refs.extend(response['messages']) + if self._mode == self.MESSAGE_MODE: + message_refs = [] + if 'messages' in response: # ensure request was successful + message_refs.extend(response['messages']) + else: + thread_refs = [] + if 'threads' in response: # ensure request was successful + thread_refs.extend(response['threads']) while 'nextPageToken' in response: page_token = response['nextPageToken'] - response = self.service.users().messages().list( + response = api.list( userId=user_id, q=query, labelIds=labels_ids, @@ -530,7 +542,15 @@ def get_messages( pageToken=page_token ).execute() - message_refs.extend(response['messages']) + if self._mode == self.MESSAGE_MODE: + if 'messages' in response: # ensure request was successful + message_refs.extend(response['messages']) + else: + if 'threads' in response: # ensure request was successful + thread_refs.extend(response['threads']) + + if self._mode == self.THREAD_MODE: + message_refs = self._get_message_refs_from_thread_refs(user_id, thread_refs) return self._get_messages_from_refs(user_id, message_refs, attachments) @@ -572,6 +592,7 @@ def list_labels(self, user_id: str = 'me') -> List[Label]: labels = [Label(name=x['name'], id=x['id']) for x in res['labels']] return labels + def _get_messages_from_refs( self, user_id: str, @@ -828,6 +849,115 @@ def _evaluate_message_payload( return [] + def _get_message_refs_from_thread_refs( + self, + user_id: str, + thread_refs: List[dict], + parallel: bool = True + ) -> List[Message]: + """ + Retrieves a list of message references from a list of thread references. + + Args: + user_id: The account the messages belong to. + thread_refs: A list of thread references. + parallel: Whether to retrieve messages in parallel. Default true. + Currently parallelization is always on, since there is no + reason to do otherwise. + + + Returns: + A list of Message objects. + + Raises: + googleapiclient.errors.HttpError: There was an error executing the + HTTP request. + + """ + + if not thread_refs: + return [] + + if not parallel: + message_refs = [] + for ref in thread_refs: + message_refs.extend(self._build_message_refs_from_thread_ref(user_id, ref)) + return message_refs + + max_num_threads = 12 # empirically chosen, prevents throttling + target_msgs_per_thread = 10 # empirically chosen + num_threads = min( + math.ceil(len(thread_refs) / target_msgs_per_thread), + max_num_threads + ) + batch_size = math.ceil(len(thread_refs) / num_threads) + message_lists = [None] * num_threads + + def thread_download_batch(thread_num): + gmail = Gmail(_creds=self.creds) + + start = thread_num * batch_size + end = min(len(thread_refs), (thread_num + 1) * batch_size) + message_lists[thread_num] = [] + for i in range(start, end): + message_lists[thread_num].extend(gmail._build_message_refs_from_thread_ref( + user_id, thread_refs[i] + )) + threads = [ + threading.Thread(target=thread_download_batch, args=(i,)) + for i in range(num_threads) + ] + + for t in threads: + t.start() + + for t in threads: + t.join() + + return sum(message_lists, []) + + def _build_message_refs_from_thread_ref( + self, + user_id: str, + thread_ref: dict, + ) -> Message: + """ + Creates a list of messages from a thread reference. + + Args: + user_id: The username of the account the message belongs to. + thread_ref: A thread references returned from the Gmail + API. + + Returns: + A list of dicts containing message IDs and thread IDs. + + Raises: + googleapiclient.errors.HttpError: There was an error executing the + HTTP request. + + """ + + try: + # Get thread JSON + thread = self.service.users().threads().get( + userId=user_id, id=thread_ref['id'] + ).execute() + + except HttpError as error: + # Pass along the error + raise error + + else: + messages = [] + for message in thread['messages']: + h = { + "id": message['id'], + "threadId": thread_ref['id'] + } + messages.append(h) + return messages + def _create_message( self, sender: str, @@ -988,3 +1118,12 @@ def _get_alias_info( res = req.execute() return res + + @property + def mode(self): + return self._mode + + @mode.setter + def mode(self, value: str): + if value.lower() in [self.MESSAGE_MODE, self.THREAD_MODE]: + self._mode = value.lower()