Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to specify group ID #35

Closed
7 of 14 tasks
slyons opened this issue Jan 25, 2019 · 20 comments
Closed
7 of 14 tasks

Unable to specify group ID #35

slyons opened this issue Jan 25, 2019 · 20 comments

Comments

@slyons
Copy link

slyons commented Jan 25, 2019

Description

Using "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.0" the library prevents you from specifying the group ID.

Exception in thread "main" java.lang.IllegalArgumentException: Kafka option 'group.id' is not supported as user-specified consumer groups are not used to track offsets. at org.apache.spark.sql.kafka010.KafkaSourceProvider.validateGeneralOptions(KafkaSourceProvider.scala:361) at org.apache.spark.sql.kafka010.KafkaSourceProvider.validateStreamOptions(KafkaSourceProvider.scala:416) at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:66) at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:209)

How to reproduce

Use the spark consuming sample locally.

Has it worked previously?

No

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

  • Verified that port 9093 is not blocked by firewall
  • Verified the namespace is either Standard or Dedicated tier (i.e. it is not Basic tier, which isn't supported)
  • Sample you're having trouble with: Spark consumer
  • Apache Kafka version: ?
  • Kafka client configuration: Sample default
  • Namespace and EventHub/topic name
  • Consumer or producer failure Exception in thread "main" java.lang.IllegalArgumentException: Kafka option 'group.id' is not supported as user-specified consumer groups are not used to track offsets.
  • If consumer, partition ID, group ID $Default
  • Timestamp in UTC <REPLACE wtih e.g., Nov 7 2018 - 17:15:01 UTC>
  • Client ID <REPLACE with e.g., clientID=kafka-client>
  • Provide all logs (with debugging enabled if possible) or exception call stack
  • Standalone repro <REPLACE with e.g., Willing/able to send scenario to repro issue>
  • Operating system: <REPLACE with e.g., Ubuntu 16.04.5 (x64) LTS>
  • Critical issue
@basilhariri
Copy link
Contributor

Hmm, I've run into that in the past, but thought it was resolved. At some point in the past, Spark jobs used their own unique consumer groups to consume messages, but I haven't been able to find much documentation on whether that's still the case or not.

Could you try running without specifying the consumer group at all and let us know what happens?

@slyons
Copy link
Author

slyons commented Jan 29, 2019

It results in an error:

19/01/29 10:50:29 WARN KafkaOffsetReader: Error in attempt 2 getting Kafka offsets: 

org.apache.kafka.common.errors.InvalidGroupIdException: The configured groupId is invalid

What's the right version of the maven coordinate to use? I can confirm that it works on Databricks but not locally.

@nstudenski
Copy link

I'm encountering the same error.

@basilhariri
Copy link
Contributor

basilhariri commented Feb 7, 2019

The Spark Kafka connector was overhauled in Spark v2.4.0, so any version including and after that should (theoretically) work. If it's working on Databricks, it's worth checking the Databricks Runtime you're using, and trying with the version of Spark that it includes.

I also found this configuration detail under kafka.group.id that I hadn't seen before. Maybe that'll help

@basilhariri
Copy link
Contributor

Closing for inactivity.

@magebeans
Copy link

It looks like specifying the group id is intentionally disabled; the idea is for each query to specify its own group id so that it doesn't interfere with other queries. The Spark official docs make this explicit, saying that the group.id cannot be set and that this is intentional.

Running without a set group.id leads to the consumer creating its own group id for a job, which looks something like "spark-kafka-relation-25b94d8e-8ac8-4a1f-98cd-6356fa733983-driver-0" and is promptly rejected by the bootstrap servers as being an invalid groupId. This seems like reasonable behavior, since Event Hubs return an IllegalEntityException if you try to read from a random, non-existent consumer group.

However, this means that the kafka consumer shipped with Spark (even 2.4+) pretty much doesn't work with Event Hubs for Kafka, since you can't start the consumer with the group.id or without the group.id.

@basilhariri
Copy link
Contributor

This seems like reasonable behavior, since Event Hubs returns an IllegalEntityException if you try to read from a random, non-existent consumer group.

This is not the case for Event Hubs for Kafka @ManasGeorge - it handles random/non-existent consumer groups in the same way vanilla Kafka does (by automatically creating the consumer group).

Since the databricks documentation I posted earlier notes that group.id can't be set before Spark v2.2, is it possible the structured streaming documentation you shared was not updated? I'm not exactly sure what's going on, but I can confirm that Spark v2.4+ absolutely does work with Event Hubs for Kafka.

@magebeans
Copy link

Ah, it's good to know Event Hubs for Kafka handles this gracefully.
The link I posted points to the latest documentation, ostensibly for Spark 2.4. It makes sense that the documentation could be out of date, though; I looked through the latest code for the connector and found no sign of the 'group.id' not supported error message. I'll try cleaning up my maven packages to make sure it isn't an old package interfering.

@slyons
Copy link
Author

slyons commented Mar 29, 2019

I'm going to have to disagree with this, because this problem is still happening.

Using this build configuration:

  "org.apache.spark" %% "spark-sql" % "2.4.0",
  "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.4.0",
  "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.0",

Specifying the group id gets me this error:

[error] (run-main-0) java.lang.IllegalArgumentException: Kafka option 'group.id' is not supported as user-specified consumer groups are not used to track offsets.
[error] java.lang.IllegalArgumentException: Kafka option 'group.id' is not supported as user-specified consumer groups are not used to track offsets.
[error] 	at org.apache.spark.sql.kafka010.KafkaSourceProvider.validateGeneralOptions(KafkaSourceProvider.scala:361)
[error] 	at org.apache.spark.sql.kafka010.KafkaSourceProvider.validateStreamOptions(KafkaSourceProvider.scala:416)
[error] 	at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:66)

Running without gets me this error:

19/03/28 13:21:18 ERROR MicroBatchExecution: Query [id = 649bf31c-1c2f-4044-816e-21d3b5ab3b7c, runId = 9c06449b-668e-4571-bcc0-be4afd5dabe3] terminated with error
org.apache.kafka.common.errors.InvalidGroupIdException: The configured groupId is invalid
[error] (run-main-0) org.apache.spark.sql.streaming.StreamingQueryException: The configured groupId is invalid

Is there a different Kafka client version I have to pin? This is just running locally on my machine.

@sjkwak
Copy link
Member

sjkwak commented Mar 30, 2019

Hi @slyons, the issue is because there is a default length limit (64 bytes) set for the group.id in the Event Hubs for Kafka. Can you provide Event Hubs namespace name?

@slyons
Copy link
Author

slyons commented Apr 1, 2019

@sjkwak How can there be a length limit when we can't set the group ID? That's something that Spark generates in 2.4.

Here's more of the output that includes the rejection from the server.

19/03/31 18:23:59 INFO AbstractCoordinator: [Consumer clientId=consumer-3, groupId=spark-kafka-source-df9d29ac-45ac-4709-81c6-8a4d320e401a--1523439773-driver-2] Discovered group coordinator sclyondelta.servicebus.windows.net:9093 (id: 2147483647 rack: null)
19/03/31 18:23:59 INFO ConsumerCoordinator: [Consumer clientId=consumer-3, groupId=spark-kafka-source-df9d29ac-45ac-4709-81c6-8a4d320e401a--1523439773-driver-2] Revoking previously assigned partitions []
19/03/31 18:23:59 INFO AbstractCoordinator: [Consumer clientId=consumer-3, groupId=spark-kafka-source-df9d29ac-45ac-4709-81c6-8a4d320e401a--1523439773-driver-2] (Re-)joining group
19/03/31 18:23:59 ERROR AbstractCoordinator: [Consumer clientId=consumer-3, groupId=spark-kafka-source-df9d29ac-45ac-4709-81c6-8a4d320e401a--1523439773-driver-2] Attempt to join group failed due to fatal error: The configured groupId is invalid
19/03/31 18:23:59 WARN KafkaOffsetReader: Error in attempt 3 getting Kafka offsets:
org.apache.kafka.common.errors.InvalidGroupIdException: The configured groupId is invalid

@sjkwak
Copy link
Member

sjkwak commented Apr 1, 2019

@slyons The limit exists in the service side. We've updated the limit for sclyondelta namespace. Can you try again and let us know if you still see the failure?

@slyons
Copy link
Author

slyons commented Apr 1, 2019

Looks like it's still happening:

19/04/01 13:45:32 INFO AbstractCoordinator: [Consumer clientId=consumer-3, groupId=spark-kafka-source-91ee1b45-ca1c-4eec-adb3-e3fc2496caf7-2004743647-driver-2] Discovered group coordinator sclyondelta.servicebus.windows.net:9093 (id: 2147483647 rack: null)
19/04/01 13:45:32 INFO ConsumerCoordinator: [Consumer clientId=consumer-3, groupId=spark-kafka-source-91ee1b45-ca1c-4eec-adb3-e3fc2496caf7-2004743647-driver-2] Revoking previously assigned partitions []
19/04/01 13:45:32 INFO AbstractCoordinator: [Consumer clientId=consumer-3, groupId=spark-kafka-source-91ee1b45-ca1c-4eec-adb3-e3fc2496caf7-2004743647-driver-2] (Re-)joining group
19/04/01 13:45:32 ERROR AbstractCoordinator: [Consumer clientId=consumer-3, groupId=spark-kafka-source-91ee1b45-ca1c-4eec-adb3-e3fc2496caf7-2004743647-driver-2] Attempt to join group failed due to fatal error: The configured groupId is invalid
19/04/01 13:45:32 WARN KafkaOffsetReader: Error in attempt 3 getting Kafka offsets:
org.apache.kafka.common.errors.InvalidGroupIdException: The configured groupId is invalid

@slyons
Copy link
Author

slyons commented Apr 1, 2019

Looks like the ID it's trying to use is something along the lines of:

spark-kafka-source-45665dd7-756a-4580-a6bd-dfc11ed6fc69-1580028041-driver-0

@sjkwak
Copy link
Member

sjkwak commented Apr 1, 2019

@slyons can you try it again?

@slyons
Copy link
Author

slyons commented Apr 2, 2019

@sjkwak Looks like that did the trick! Thanks for working it through.

For other namespaces/projects can we issue a support ticket request to increase the group id size or will this be fixed on the platform eventually?

@sjkwak
Copy link
Member

sjkwak commented Apr 2, 2019

We're going to update production clusters with the change. In the meanwhile, if you run into the same issue in other namespace, yes, please open a support ticket.

@slyons
Copy link
Author

slyons commented Apr 3, 2019

Thanks, @sjkwak !

@arerlend
Copy link
Contributor

arerlend commented Jul 9, 2019

Update for anyone reviewing this thread - the group id name length limit has been increased to 256 chars. If you see Kafka option 'group.id' is not supported as user-specified consumer groups are not used to track offsets, it is in fact because Spark 2.4 does not want the user to specify the group id manually.

See the validateGeneralOptions sections here - https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-KafkaSourceProvider.html

@marshadkhan
Copy link

I am still LOOKING to find the solution of this Group.Id issue.It seems i have to switch back to Normal Streaming instead of SS.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants