Nothing Special   »   [go: up one dir, main page]

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: BrokenPipe Error #7333

Open
dimoschi opened this issue Feb 17, 2023 · 2 comments
Open

bug: BrokenPipe Error #7333

dimoschi opened this issue Feb 17, 2023 · 2 comments

Comments

@dimoschi
Copy link

Meltano Version

2.15.3

Python Version

3.9

Bug scope

API

Operating System

Linux - Ubuntu 20.04

Description

Meltano unexpectedly raises a BrokenPipe error. Meltano runs as a Prefect 1.4.1 task executed inside a k8s pod that uses a custom image built following the documentation.

Code

[2023-02-17 10:22:45+0000] INFO - prefect.CloudTaskRunner | Task 'ShellTask': Starting task run...
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | 2023-02-17T10:52:40.324374Z [error    ]
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | ╭───────────────────── Traceback (most recent call last) ──────────────────────╮
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |/usr/local/lib/python3.9/site-packages/meltano/core/logging/output_logger.py │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │ :203 in redirect_logging                                                     │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │                                                                              │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |200 │   │   │   *ignore_errors,                                            │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |201 │   │   )                                                              │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |202 │   │   try:                                                           │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │ ❱ 203 │   │   │   yield                                                      │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |204 │   │   except ignored_errors:  # noqa: WPS329                         │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |205 │   │   │   raise                                                      │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |206 │   │   except RunnerError as err:                                     │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │                                                                              │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |/usr/local/lib/python3.9/site-packages/meltano/core/block/extract_load.py:46 │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |1 in run                                                                     │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │                                                                              │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |458 │   │   │   # TODO: legacy `meltano elt` style logging should be depre │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |459 │   │   │   legacy_log_handler = self.output_logger.out("meltano", log │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |460 │   │   │   with legacy_log_handler.redirect_logging():                │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │ ❱ 461 │   │   │   │   await self.run_with_job()                              │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |462 │   │   │   │   return                                                 │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |463 │   │   else:                                                          │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |464 │   │   │   logger.warning(                                            │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │                                                                              │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |/usr/local/lib/python3.9/site-packages/meltano/core/block/extract_load.py:48 │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |7 in run_with_job                                                            │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │                                                                              │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |484 │   │                                                                  │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |485 │   │   with closing(self.context.session) as session:                 │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |486 │   │   │   async with job.run(session):                               │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │ ❱ 487 │   │   │   │   await self.execute()                                   │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |488 │                                                                      │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |489async def terminate(self, graceful: bool = False) -> None:         │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |490 │   │   """Terminate an in flight ExtractLoad execution, potentially d │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │                                                                              │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │ /usr/local/lib/python3.9/site-packages/meltano/core/block/extract_load.py:45 │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │ 3 in execute                                                                 │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │                                                                              │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │   450 │   │   async with self._start_blocks():                               │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │   451 │   │   │   await self._link_io()                                      │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │   452 │   │   │   manager = ELBExecutionManager(self)                        │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │ ❱ 453 │   │   │   await manager.run()                                        │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │   454 │                                                                      │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │   455 │   async def run(self) -> None:                                       │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │   456 │   │   """Run the ELT task."""                                        │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │                                                                              │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │ /usr/local/lib/python3.9/site-packages/meltano/core/block/extract_load.py:65 │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │ /usr/local/lib/python3.9/site-packages/meltano/core/block/extract_load.py:65 │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │ 0 in run                                                                     │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │                                                                              │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │   647 │   │   stopping blocks or waiting for IO to complete as appropriate.  │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │   648 │   │   the blocks exit with a non 0 exit code.                        │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │   649 │   │   """                                                            │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │ ❱ 650 │   │   await self._wait_for_process_completion(self.elb.head)         │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |651 │   │   _check_exit_codes(                                             │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |652 │   │   │   self._producer_code, self._consumer_code, self._intermedia │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |653 │   │   )                                                              │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │                                                                              │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |/usr/local/lib/python3.9/site-packages/meltano/core/block/extract_load.py:71 │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |5 in _wait_for_process_completion                                            │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │                                                                              │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |712 │   │   │   │   │   line_length_limit=self.line_length_limit,          │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |713 │   │   │   │   │   stream_buffer_size=self.stream_buffer_size,        │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |714 │   │   │   │   )                                                      │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │ ❱ 715 │   │   │   raise output_futures_failed.exception()                    │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |716 │   │   else:                                                          │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |717 │   │   │   # If all the output handlers completed without raising an  │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |718 │   │   │   # we still need to wait for all the underlying block proce │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │                                                                              │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |/usr/local/lib/python3.9/site-packages/meltano/core/logging/utils.py:241 in  │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |capture_subprocess_output                                                    │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │                                                                              │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |238 │   │   │   continue                                                   │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |239 │   │                                                                  │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |240 │   │   for writer in line_writers:                                    │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │ ❱ 241 │   │   │   if not await _write_line_writer(writer, line):             │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |242 │   │   │   │   # If the destination stream is closed, we can stop cap │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |243 │   │   │   │   return                                                 │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |244                                                                        │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │                                                                              │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |/usr/local/lib/python3.9/site-packages/meltano/core/logging/utils.py:210 in  │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |_write_line_writer                                                           │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │                                                                              │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |207 │   │   │   await writer.drain()                                       │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |208 │   │   except (BrokenPipeError, ConnectionResetError):                │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |209 │   │   │   with suppress(AttributeError):  # `wait_closed` is Python  │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │ ❱ 210 │   │   │   │   await writer.wait_closed()                             │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |211 │   │   │                                                              │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |212 │   │   │   return False                                               │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |213else:                                                              │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │                                                                              │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |/usr/local/lib/python3.9/asyncio/streams.py:359 in wait_closed               │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │                                                                              │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |356 │   │   return self._transport.is_closing()                            │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |357 │                                                                      │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |358async def wait_closed(self):                                       │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │ ❱ 359 │   │   await self._protocol._get_close_waiter(self)                   │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |360 │                                                                      │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |361def get_extra_info(self, name, default=None):                      │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |362 │   │   return self._transport.get_extra_info(name, default)           │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │                                                                              │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |/usr/local/lib/python3.9/site-packages/meltano/core/logging/utils.py:207 in  │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |_write_line_writer                                                           │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │                                                                              │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |204if isinstance(writer, asyncio.StreamWriter):                       │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |205 │   │   try:  # noqa: WPS229                                           │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |206 │   │   │   writer.write(line)                                         │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │ ❱ 207 │   │   │   await writer.drain()                                       │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |208 │   │   except (BrokenPipeError, ConnectionResetError):                │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |209 │   │   │   with suppress(AttributeError):  # `wait_closed` is Python  │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |210 │   │   │   │   await writer.wait_closed()                             │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │                                                                              │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |/usr/local/lib/python3.9/asyncio/streams.py:387 in drain                     │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │                                                                              │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |384 │   │   │   # in a loop would never call connection_lost(), so it      │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |385 │   │   │   # would not see an error when the socket is closed.        │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |386 │   │   │   await sleep(0)                                             │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │ ❱ 387 │   │   await self._protocol._drain_helper()                           │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |388                                                                        │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |389                                                                        │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |390 class StreamReader:                                                    │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │                                                                              │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |/usr/local/lib/python3.9/asyncio/streams.py:197 in _drain_helper             │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │                                                                              │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |194 │   │   assert waiter is None or waiter.cancelled()                    │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |195 │   │   waiter = self._loop.create_future()                            │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |196 │   │   self._drain_waiter = waiter                                    │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | │ ❱ 197 │   │   await waiter                                                   │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |198 │                                                                      │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |199def _get_close_waiter(self, stream):                               │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask |200 │   │   raise NotImplementedError                                      │
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | ╰──────────────────────────────────────────────────────────────────────────────╯
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | BrokenPipeError
[2023-02-17 10:52:40+0000] INFO - prefect.ShellTask | 
[2023-02-17 10:52:42+0000] INFO - prefect.ShellTask | Need help fixing this problem? Visit http://melta.no/ for troubleshooting steps, or to
[2023-02-17 10:52:42+0000] INFO - prefect.ShellTask | join our friendly Slack community.
[2023-02-17 10:52:42+0000] INFO - prefect.ShellTask | 
[2023-02-17 10:52:42+0000] INFO - prefect.ShellTask | 
[2023-02-17 10:52:43+0000] ERROR - prefect.ShellTask | Command failed with exit code 1
@dimoschi dimoschi added kind/Bug Something isn't working valuestream/Meltano labels Feb 17, 2023
@aaronsteers
Copy link
Contributor
aaronsteers commented Feb 17, 2023

@dimoschi - Thanks very much for raising this. Most often this is caused by a tap failing, and then stopping sending data to the target. Can you check to see if your log contains any other messages about a tap or target failing? Also, can you confirm what the CLI command that is failing here?

We have an open issue related to this specific error message not being helpful - with plans to improve and clean up our error messaging here:

cc @tayloramurphy

@lukas-gust
Copy link

I ran into a similar issue running Meltano on k8s pod. It turns out we had an OOM error which was not apparent in the pod logs, but was caught at the container level instead. Just adding info for anyone else to find.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants