I'm using confluent cp-all-in-one project configuration from here: https://github.com/confluentinc/cp-docker-images/blob/5.2.2-post/examples/cp-all-in-one/docker-compose.yml
I'm POST-ing a message to http://localhost:8082/topics/zuum-positions with the following AVRO body:
{ "key_schema": "{\"type\":\"string\"}", "value_schema":"{ \"type\":\"record\",\"name\":\"Position\",\"fields\":[ { \"name\":\"loadId\",\"type\":\"double\"},{\"name\":\"lat\",\"type\":\"double\"},{ \"name\":\"lon\",\"type\":\"double\"}]}", "records":[ { "key":"22", "value":{ "lat":43.33, "lon":43.33, "loadId":22 } } ] } I have correctly added the following headers to the above POST request : Content-Type: application/vnd.kafka.avro.v2+json Accept: application/vnd.kafka.v2+json
When doing this request I see in docker logs the following exception:
Error encountered in task zuum-sink-positions-0. Executing stage 'VALUE_CONVERTER' with class 'io.confluent.connect.avro.AvroConverter', where consumed record is {topic='zuum-positions', partition=0, offset=25, timestamp=1563480487456, timestampType=CreateTime}. org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic zuum-positions to Avro: connect | at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107) connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487) connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487) connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464) connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320) connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) connect | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) connect | at java.util.concurrent.FutureTask.run(FutureTask.java:266) connect | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) connect | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) connect | at java.lang.Thread.run(Thread.java:748) connect | Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 61 connect | Caused by: java.net.ConnectException: Connection refused (Connection refused) connect | at java.net.PlainSocketImpl.socketConnect(Native Method) connect | at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) connect | at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) connect | at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) connect | at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) connect | at java.net.Socket.connect(Socket.java:589) connect | at sun.net.NetworkClient.doConnect(NetworkClient.java:175) connect | at sun.net.www.http.HttpClient.openServer(HttpClient.java:463) connect | at sun.net.www.http.HttpClient.openServer(HttpClient.java:558) connect | at sun.net.www.http.HttpClient.<init>(HttpClient.java:242) connect | at sun.net.www.http.HttpClient.New(HttpClient.java:339) connect | at sun.net.www.http.HttpClient.New(HttpClient.java:357) connect | at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220) connect | at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1156) connect | at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050) connect | at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984) connect | at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1564) connect | at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1492) connect | at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480) connect | at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208) connect | at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252) connect | at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482) connect | at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475) connect | at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:153) connect | at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:232) connect | at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:211) connect | at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116) connect | at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:215) connect | at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:145) connect | at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:90) connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487) connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487) connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464) connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320) connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) connect | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) connect | at java.util.concurrent.FutureTask.run(FutureTask.java:266) connect | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) connect | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) connect | at java.lang.Thread.run(Thread.java:748) I've spent hours on this and cannot find the reason. Usually, this error occurs when connect cannot connect to to the schema registry but I've kept their configuration from here: https://github.com/confluentinc/cp-docker-images/blob/5.2.2-post/examples/cp-all-in-one/docker-compose.yml#L77
Can you please help?