Skip to content
This repository was archived by the owner on Sep 17, 2025. It is now read-only.

Commit 3e41879

Browse files
Olivier Cervelloliyanhui1228
authored andcommitted
[Fixes #270] Make BackgroundThread more resilient to exceptions (#297)
1 parent 4d82d62 commit 3e41879

4 files changed

Lines changed: 97 additions & 2 deletions

File tree

opencensus/common/transports/async.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
import atexit
16+
import logging
1617
import threading
1718
import time
1819

@@ -100,7 +101,15 @@ def _thread_main(self):
100101
data.extend(item)
101102

102103
if data:
103-
self.exporter.emit(data)
104+
try:
105+
self.exporter.emit(data)
106+
except Exception:
107+
logging.exception(
108+
'%s failed to emit data.'
109+
'Dropping %s objects from queue.',
110+
self.exporter.__class__.__name__,
111+
len(data))
112+
pass
104113

105114
for _ in range(len(items)):
106115
self._queue.task_done()

opencensus/trace/exporters/transports/background_thread.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
import atexit
16+
import logging
1617
import threading
1718
import time
1819

@@ -101,7 +102,15 @@ def _thread_main(self):
101102
span_datas.extend(item)
102103

103104
if span_datas:
104-
self.exporter.emit(span_datas)
105+
try:
106+
self.exporter.emit(span_datas)
107+
except Exception:
108+
logging.exception(
109+
'%s failed to emit spans.'
110+
'Dropping %s spans from queue.',
111+
self.exporter.__class__.__name__,
112+
len(span_datas))
113+
pass
105114

106115
for _ in range(len(items)):
107116
self._queue.task_done()

tests/unit/common/transports/test_async.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,45 @@ def emit(self, span):
200200
# trace2 should be left in the queue because worker is terminated.
201201
self.assertEqual(worker._queue.qsize(), 1)
202202

203+
@mock.patch('logging.exception')
204+
def test__thread_main_alive_on_emit_failed(self, mock):
205+
206+
class Exporter(object):
207+
def __init__(self):
208+
self.exported = []
209+
210+
def emit(self, span):
211+
if len(self.exported) < 2:
212+
self.exported.extend(span)
213+
else:
214+
raise Exception("This exporter is broken !")
215+
216+
exporter = Exporter()
217+
worker = async._Worker(exporter, max_batch_size=2)
218+
219+
span_data0 = [mock.Mock()]
220+
span_data1 = [mock.Mock()]
221+
span_data2 = [mock.Mock()]
222+
223+
worker.enqueue(span_data0)
224+
worker.enqueue(span_data1)
225+
worker.enqueue(span_data2)
226+
worker.enqueue(async._WORKER_TERMINATOR)
227+
228+
worker._thread_main()
229+
230+
# Span 2 should throw an exception, only span 0 and 1 are left
231+
self.assertEqual(exporter.exported, span_data0 + span_data1)
232+
233+
# Logging exception should have been called on the exporter exception
234+
expected = '%s failed to emit data.Dropping %s objects from queue.'
235+
mock.assert_called_with(expected, 'Exporter', 1)
236+
237+
# Nothing should be left in the queue because worker is terminated
238+
# and the data was dropped.
239+
self.assertEqual(worker._queue.qsize(), 0)
240+
241+
203242
def test_flush(self):
204243
from six.moves import queue
205244

tests/unit/trace/exporters/transports/test_background_thread.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,44 @@ def emit(self, span):
200200
# trace2 should be left in the queue because worker is terminated.
201201
self.assertEqual(worker._queue.qsize(), 1)
202202

203+
@mock.patch('logging.exception')
204+
def test__thread_main_alive_on_emit_failed(self, mock):
205+
206+
class Exporter(object):
207+
def __init__(self):
208+
self.exported = []
209+
210+
def emit(self, span):
211+
if len(self.exported) < 2:
212+
self.exported.extend(span)
213+
else:
214+
raise Exception("This exporter is broken !")
215+
216+
exporter = Exporter()
217+
worker = background_thread._Worker(exporter, max_batch_size=2)
218+
219+
span_data0 = [mock.Mock()]
220+
span_data1 = [mock.Mock()]
221+
span_data2 = [mock.Mock()]
222+
223+
worker.enqueue(span_data0)
224+
worker.enqueue(span_data1)
225+
worker.enqueue(span_data2)
226+
worker.enqueue(background_thread._WORKER_TERMINATOR)
227+
228+
worker._thread_main()
229+
230+
# Span 2 should throw an exception, only span 0 and 1 are left
231+
self.assertEqual(exporter.exported, span_data0 + span_data1)
232+
233+
# Logging exception should have been called on the exporter exception
234+
expected = '%s failed to emit spans.Dropping %s spans from queue.'
235+
mock.assert_called_with(expected, 'Exporter', 1)
236+
237+
# Nothing should be left in the queue because worker is terminated
238+
# and the data was dropped.
239+
self.assertEqual(worker._queue.qsize(), 0)
240+
203241
def test_flush(self):
204242
from six.moves import queue
205243

0 commit comments

Comments
 (0)