Skip to content

Commit 1d4e6a7

Browse files
author
maxime.c
committed
enable more logging
1 parent f0443aa commit 1d4e6a7

File tree

4 files changed

+39
-32
lines changed

4 files changed

+39
-32
lines changed

airbyte_cdk/sources/concurrent_source/concurrent_source.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import concurrent
66
import logging
7-
from queue import Queue
7+
from queue import Queue, Empty
88
from typing import Iterable, Iterator, List, Optional
99

1010
from airbyte_cdk.models import AirbyteMessage
@@ -143,17 +143,23 @@ def _consume_from_queue(
143143
queue: Queue[QueueItem],
144144
concurrent_stream_processor: ConcurrentReadProcessor,
145145
) -> Iterable[AirbyteMessage]:
146-
while airbyte_message_or_record_or_exception := queue.get():
147-
yield from self._handle_item(
148-
airbyte_message_or_record_or_exception,
149-
concurrent_stream_processor,
150-
)
151-
# In the event that a partition raises an exception, anything remaining in
152-
# the queue will be missed because is_done() can raise an exception and exit
153-
# out of this loop before remaining items are consumed
154-
if queue.empty() and concurrent_stream_processor.is_done():
155-
# all partitions were generated and processed. we're done here
156-
break
146+
done = False
147+
while not done:
148+
try:
149+
while airbyte_message_or_record_or_exception := queue.get(block=True, timeout=60.0 * 5):
150+
yield from self._handle_item(
151+
airbyte_message_or_record_or_exception,
152+
concurrent_stream_processor,
153+
)
154+
# In the event that a partition raises an exception, anything remaining in
155+
# the queue will be missed because is_done() can raise an exception and exit
156+
# out of this loop before remaining items are consumed
157+
if queue.empty() and concurrent_stream_processor.is_done():
158+
# all partitions were generated and processed. we're done here
159+
done = True
160+
break
161+
except Empty:
162+
self._logger.info("No result from the queue for the past 5 minutes. Will try again...")
157163

158164
def _handle_item(
159165
self,

airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
4+
import logging
45
import time
56
from queue import Queue
67

@@ -12,6 +13,8 @@
1213
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
1314
from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem
1415

16+
LOGGER = logging.getLogger(f"airbyte.PartitionEnqueuer")
17+
1518

1619
class PartitionEnqueuer:
1720
"""
@@ -42,7 +45,9 @@ def generate_partitions(self, stream: AbstractStream) -> None:
4245
4346
This method is meant to be called in a separate thread.
4447
"""
48+
4549
try:
50+
LOGGER.info(f"Starting partition generation for stream {stream.name}")
4651
for partition in stream.generate_partitions():
4752
# Adding partitions to the queue generates futures. To avoid having too many futures, we throttle here. We understand that
4853
# we might add more futures than the limit by throttling in the threads while it is the main thread that actual adds the
@@ -58,7 +63,10 @@ def generate_partitions(self, stream: AbstractStream) -> None:
5863
while self._thread_pool_manager.prune_to_validate_has_reached_futures_limit():
5964
time.sleep(self._sleep_time_in_seconds)
6065
self._queue.put(partition)
66+
67+
LOGGER.info(f"Partition generation complete for stream {stream.name}")
6168
self._queue.put(PartitionGenerationCompletedSentinel(stream))
6269
except Exception as e:
70+
LOGGER.info(f"Error during partition generation for stream {stream.name}")
6371
self._queue.put(StreamThreadException(e, stream.name))
6472
self._queue.put(PartitionGenerationCompletedSentinel(stream))

airbyte_cdk/sources/streams/concurrent/partition_reader.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
)
1515
from airbyte_cdk.sources.utils.slice_logger import SliceLogger
1616

17+
LOGGER = logging.getLogger(f"airbyte.PartitionReader")
18+
1719

1820
# Since moving all the connector builder workflow to the concurrent CDK which required correct ordering
1921
# of grouping log messages onto the main write thread using the ConcurrentMessageRepository, this
@@ -73,14 +75,18 @@ def process_partition(self, partition: Partition, cursor: Cursor) -> None:
7375
:return: None
7476
"""
7577
try:
78+
LOGGER.info(f"Starting to read from stream {partition.stream_name()} and partition {partition.to_slice()}")
7679
if self._partition_logger:
7780
self._partition_logger.log(partition)
7881

7982
for record in partition.read():
8083
self._queue.put(record)
8184
cursor.observe(record)
8285
cursor.close_partition(partition)
86+
87+
LOGGER.info(f"Reading complete for stream {partition.stream_name()} and partition {partition.to_slice()}")
8388
self._queue.put(PartitionCompleteSentinel(partition, self._IS_SUCCESSFUL))
8489
except Exception as e:
90+
LOGGER.info(f"Error while reading from {partition.stream_name()} and partition {partition.to_slice()}")
8591
self._queue.put(StreamThreadException(e, partition.stream_name()))
8692
self._queue.put(PartitionCompleteSentinel(partition, not self._IS_SUCCESSFUL))

airbyte_cdk/sources/streams/http/http_client.py

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import logging
66
import os
77
import urllib
8+
import uuid
89
from pathlib import Path
910
from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Union
1011

@@ -329,9 +330,9 @@ def _send(
329330
if hasattr(self._session, "auth") and isinstance(self._session.auth, AuthBase):
330331
self._session.auth(request)
331332

332-
self._logger.debug(
333-
"Making outbound API request",
334-
extra={"headers": request.headers, "url": request.url, "request_body": request.body},
333+
request_id = str(uuid.uuid4())
334+
self._logger.info(
335+
f"[{request_id}] Making outbound API request to {request.url}",
335336
)
336337

337338
response: Optional[requests.Response] = None
@@ -346,23 +347,9 @@ def _send(
346347
response if response is not None else exc
347348
)
348349

349-
# Evaluation of response.text can be heavy, for example, if streaming a large response
350-
# Do it only in debug mode
351-
if self._logger.isEnabledFor(logging.DEBUG) and response is not None:
352-
if request_kwargs.get("stream"):
353-
self._logger.debug(
354-
"Receiving response, but not logging it as the response is streamed",
355-
extra={"headers": response.headers, "status": response.status_code},
356-
)
357-
else:
358-
self._logger.debug(
359-
"Receiving response",
360-
extra={
361-
"headers": response.headers,
362-
"status": response.status_code,
363-
"body": response.text,
364-
},
365-
)
350+
self._logger.info(
351+
f"[{request_id}] Receiving response from {request.url}" + f" with exception {type(exc)}" if exc else ""
352+
)
366353

367354
# Request/response logging for declarative cdk
368355
if (

0 commit comments

Comments
 (0)