I have code snippet of Kafka Consumer which i have developed with the help of this and here it is:
static void Main(string[] args) { string bootstrapServers = "localhost:9092"; string schemaRegistryUrl = "Production163:8081"; string topicName = "player"; string groupName = "avro-generic-example-group"; //CancellationTokenSource cts = new CancellationTokenSource(); //var consumeTask = Task.Run(() => //{ using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { SchemaRegistryUrl = schemaRegistryUrl })) using (var consumer = new ConsumerBuilder<string, GenericRecord>(new ConsumerConfig { BootstrapServers = bootstrapServers, GroupId = groupName }) .SetKeyDeserializer(new AsyncAvroDeserializer<string>(schemaRegistry).AsSyncOverAsync()) .SetValueDeserializer(new AsyncAvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync()) .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}")) .Build()) { consumer.Subscribe(topicName); try { while (true) { try { var consumeResult = consumer.Consume();//This line getting hange in unity Avro.Field f; //consumeResult.Value consumeResult.Value.Schema.TryGetField("favorite_number", out f); Console.WriteLine(consumeResult.Value["favorite_number"]); Console.WriteLine(consumeResult.Value["name"]); } catch (ConsumeException e) { Console.WriteLine($"Consume error: {e.Error.Reason}"); } } } catch (OperationCanceledException) { // commit final offsets and leave the group. consumer.Close(); } } //}); } This is example is running fine in my console application. Now i want to move this example into Unity environment, for this reason i have make these changes. 1. Wrap the code inside coroutine, so it simultanously run with main unity thread. 2. Comment out catch block as i can't use them yeild return
void Start() { StartCoroutine(Main()); } IEnumerator Main() { string bootstrapServers = "localhost:9092"; string schemaRegistryUrl = "Production163:8081"; string topicName = "player"; string groupName = "avro-generic-example-group"; //CancellationTokenSource cts = new CancellationTokenSource(); //var consumeTask = Task.Run(() => //{ using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { SchemaRegistryUrl = schemaRegistryUrl })) using ( var consumer = new ConsumerBuilder<string, GenericRecord>(new ConsumerConfig { BootstrapServers = bootstrapServers, GroupId = groupName }) .SetKeyDeserializer(new AsyncAvroDeserializer<string>(schemaRegistry).AsSyncOverAsync()) .SetValueDeserializer(new AsyncAvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync()) .SetErrorHandler((_, e) => Debug.Log($"Error: {e.Reason}")) .Build()) { Debug.Log("subscribe"); consumer.Subscribe(topicName); //try //{ while (true) { //try //{ //CancellationTokenSource cts = new CancellationTokenSource(); //cts.Token if (consumer == null) { Debug.Log("Consumer is null"); } var consumeResult = consumer.Consume();//TimeSpan.FromMilliseconds(50000) yield return consumeResult; Debug.Log($"Key: {consumeResult.Message.Key}\nValue: {consumeResult.Value}"); Debug.Log(consumeResult.Value.Schema); Debug.Log(consumeResult.Value.Schema["favorite_number"].GetProperty("favorite_number")); Debug.Log(consumeResult.Value.Schema["favorite_number"]); Avro.Field f; //consumeResult.Value consumeResult.Value.Schema.TryGetField("favorite_number", out f); Debug.Log(consumeResult.Value["favorite_number"]); Debug.Log(consumeResult.Value["name"]); //consumeResult.Message.Value.TryGetValue("favorite_number"); //Debug.Log(f.); yield return new WaitForSeconds(1); //} //catch (ConsumeException e) //{ // Debug.Log($"Consume error: {e.Error.Reason}"); //} } //} //catch (OperationCanceledException) //{ // // commit final offsets and leave the group. // consumer.Close(); //} } //}); Debug.Log("Main end game."); } Now with above code is my unity player get hanged. I try to debug and found that the problem is, in this line
var consumeResult = consumer.Consume(); The documentation suggest that there are two variants. One with TimeSpan and other with CancellationToken
Poll for new messages / events. Blocks until a consume result is available or the operation has been cancelled.