Nothing Special   »   [go: up one dir, main page]

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MapAsync reprocesses messages on processor restart #368

Closed
2 tasks
sm003ash opened this issue Sep 18, 2024 · 10 comments · Fixed by #380
Closed
2 tasks

MapAsync reprocesses messages on processor restart #368

sm003ash opened this issue Sep 18, 2024 · 10 comments · Fixed by #380
Milestone

Comments

@sm003ash
Copy link
sm003ash commented Sep 18, 2024

Description

Confluent.SchemaRegistry.Serdes.Avro nuget version: 2.2.0
Streamiz.Kafka.Net nuget version: 1.5.1
Streamiz.Kafka.Net.SchemaRegistry.SerDes.Avro nuget version: 1.5.1

MapAsyncBug.zip

I have attached a visual studio solution file. In this solution, a producer writes events to an input topic. A consumer uses Streamiz library to implement a simple topology: Input topic -> MapAsync() -> Output topic. This is a simplification of our topology that's in use in production. The MapAsync() produces two internal topics - one for requests and one for responses.

In production, we have seen this issue: when the consumer is restarted as part of upgrade, on startup, the response topic seems to resubmit the writes to the output topic. Here are the logs:

Sep 11, 2024 17:35:23.105
"external-task[1] Record persisted: (timestamp 1726076122716) topic=[SessionManager-ProcessIoTOperationalEvents-response] partition=[[5]] offset=[2393]"

Sep 11, 2024 17:35:23.211

"stream-task[2|5]|processor[KSTREAM-SINK-0000000007]- Process<String,Value> message with key ddf1a263-653a-4f4f-9bf8-c1128225fa08 and Kafka.DeviceSessionEvents.Value with record metadata [topic:SessionManager-ProcessIoTOperationalEvents-response|partition:5|offset:2393]"

"stream-task[2|5]|processor[KSTREAM-FILTER-0000000006]- Process<String,Value> message with key ddf1a263-653a-4f4f-9bf8-c1128225fa08 and Kafka.DeviceSessionEvents.Value with record metadata [topic:SessionManager-ProcessIoTOperationalEvents-response|partition:5|offset:2393]"

"stream-task[2|5] - recordQueue [record-queue-SessionManager-ProcessIoTOperationalEvents-response-2-5] Record polled. (Record info [Topic:SessionManager-ProcessIoTOperationalEvents-response|Partition:[5]|Offset:2393])"

"stream-task[2|5] Start processing one record [Topic:SessionManager-ProcessIoTOperationalEvents-response|Partition:5|Offset:2393|Timestamp:1726076122716]"

Sep 11, 2024 17:35:23.212
"stream-task[2|5] Completed processing one record [Topic:SessionManager-ProcessIoTOperationalEvents-response|Partition:5|Offset:2393|Timestamp:1726076122716]"

Sep 11, 2024 17:35:23.262
"stream-task[2|5] Record persisted: (timestamp 1726076122716) topic=[IoTDeviceSessionEvents] partition=[[5]] offset=[2393]"

POST RESTART - notice the same offset 2393

Sep 11, 2024 17:41:45.507
"stream-task[2|5] - recordQueue [record-queue-SessionManager-ProcessIoTOperationalEvents-response-2-5] Record polled. (Record info [Topic:SessionManager-ProcessIoTOperationalEvents-response|Partition:[5]|Offset:2393])"

Another set of logs for offset 2395:

Sep 11, 2024 17:35:23.225
"external-task[1] Record persisted: (timestamp 1726076122716) topic=[SessionManager-ProcessIoTOperationalEvents-response] partition=[[5]] offset=[2395]"

Sep 11, 2024 17:35:23.381
"stream-task[2|5] - recordQueue [record-queue-SessionManager-ProcessIoTOperationalEvents-response-2-5] Record polled. (Record info [Topic:SessionManager-ProcessIoTOperationalEvents-response|Partition:[5]|Offset:2395])"

"stream-task[2|5] Start processing one record [Topic:SessionManager-ProcessIoTOperationalEvents-response|Partition:5|Offset:2395|Timestamp:1726076122716]"

"stream-task[2|5]|processor[KSTREAM-FILTER-0000000006]- Process<String,Value> message with key ddf1a263-653a-4f4f-9bf8-c1128225fa08 and Kafka.DeviceSessionEvents.Value with record metadata [topic:SessionManager-ProcessIoTOperationalEvents-response|partition:5|offset:2395]"

"stream-task[2|5]|processor[KSTREAM-SINK-0000000007]- Process<String,Value> message with key ddf1a263-653a-4f4f-9bf8-c1128225fa08 and Kafka.DeviceSessionEvents.Value with record metadata [topic:SessionManager-ProcessIoTOperationalEvents-response|partition:5|offset:2395]"

"stream-task[2|5] Record persisted: (timestamp 1726076122716) topic=[IoTDeviceSessionEvents] partition=[[5]] offset=[2395]"

"stream-task[2|5] Completed processing one record [Topic:SessionManager-ProcessIoTOperationalEvents-response|Partition:5|Offset:2395|Timestamp:1726076122716]"

POST RESTART
Sep 11, 2024 17:41:45.512
"stream-task[2|5] Completed processing one record [Topic:SessionManager-ProcessIoTOperationalEvents-response|Partition:5|Offset:2395|Timestamp:1726076122716]"

For partition 5, you can see that 9 events were duplicated, and the end offsets differ for the MapAsync's response topic and the output topic:

SessionManager-ProcessIoTOperationalEvents-response
Partition Log-end-offset
0 1017
1 10356
2 1835
3 6833
4 5650
5 11089

IoTDeviceSessionEvents
0 1017
1 10356
2 1835
3 6833
4 5650
5 11098

We have a kafka sink connector that syncs data from the output topic to a postgres db table. The guids that are generated when creating output events are unique, and the corresponding db column has UNIQUE constraint. We noticed that this constraint was violated, and that's how we found this issue.

How to reproduce

This is hard to reproduce. The attached logs might be your best bet in analyzing the issue. But you can try running the docker compose cluster, and periodically restarting the consumer app.

Checklist

Please provide the following information:

  • [y ] A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • [ y] A code snippet with your topology builder (ex: builder.Stream<string, string>("topic").to("an-another-topic");)
  • [1.5.1 ] Streamiz.Kafka.Net nuget version.
  • Apache Kafka version.
  • Client configuration.
  • [ Docker linux containers] Operating system.
  • [ y] Provide logs (with in debug mode (log4net and StreamConfig.Debug) as necessary in configuration).
  • [ y] Critical issue.
@sm003ash
Copy link
Author

In another instance, after restart, we saw a huge spike in number of messages processed by the internal response topic.
image

@sm003ash sm003ash changed the title Critical Bug: MapAsync reprocesses messages on processor restart MapAsync reprocesses messages on processor restart Sep 18, 2024
@sm003ash
Copy link
Author

Attached are the shutdown related logs.
exportedLogRecords.CSV

@LGouellec
Copy link
Owner

Hey @sm003ash ,

Thanks for your issue. I will have a look and get back to you as soon as possible.

Kr,

@sm003ash
Copy link
Author
sm003ash commented Sep 19, 2024

Thanks. Note that we are not setting EnableIdempotence to true; and using the default at least once semantics.

@LGouellec
Copy link
Owner

Hi @sm003ash ,

Indeed, hardly to reproduce, but I think the issue is due that the StreamThread and the ExternalStreamThread stop asynchronously and in some edge cases the StreamThread shutdown before the ExternalStreamThread.

I will try to apply a fix. Would be able to test from a dedicated branch ?

@sm003ash
Copy link
Author

yes I can test it from a dedicated branch. Thanks!

@LGouellec LGouellec added this to the 1.7.0 milestone Oct 1, 2024
@LGouellec LGouellec linked a pull request Oct 21, 2024 that will close this issue
@LGouellec
Copy link
Owner

Hey @sm003ash,

Can you run this branch 368-mapasync-reprocesses-messages-on-processor-restart ? Let me know if it's fix your issue.

@sm003ash
Copy link
Author

Sure thing. I will test it for a few days and get back to you.

@sm003ash
Copy link
Author

Hi @LGouellec the fix seems to be working. I haven't been able to reproduce the issue.

@LGouellec
Copy link
Owner

All good ! I plan to include this fix in the 1.7.0 coming soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants