[HN Gopher] OpenTelemetry at Scale: Using Kafka to handle bursty...
___________________________________________________________________
 
OpenTelemetry at Scale: Using Kafka to handle bursty traffic
 
Author : pranay01
Score  : 138 points
Date   : 2023-10-22 18:58 UTC (14 hours ago)
 
web link (signoz.io)
w3m dump (signoz.io)
 
| francoismassot wrote:
| I heard several times that Kafka was put in front of
| elasticsearch clusters for handling traffic burst. You can also
| use Redpanda, Pulsar, NATS and other distributed queues.
| 
| One thing that is also very interesting with Kafka is that you
| can achieve exactly-once semantic without too much efforts: by
| keeping track of the positions of partitions in your own database
| and carefully acknowledging them when you are sure data is safely
| stored in your db. That's what we did with our engine Quickwit,
| so far it's the most efficient way to index data in it.
| 
| One obvious drawback with Kafka is that it's one more piece to
| maintain... and it's not a small one.
 
  | pranay01 wrote:
  | Have you done/seen any benchmarks between Redpanda/NATS and
  | Kafka for this use case?
  | 
  | Some folks in SigNoz community have also suggested NATS for
  | this, but I have not deep dived into benchmarks/features yet
 
    | francoismassot wrote:
    | Unfortunately no :/
 
  | radicality wrote:
  | Exactly-once semantics of what specifically? Or do you mean at-
  | least-once ?
 
    | francoismassot wrote:
    | Exactly-once semantic between Kafka and the observability
    | engine.
 
  | richieartoul wrote:
  | You have to do a bit more than that if you want exactly once
  | end-to-end (I.E if Kafka itself can contain duplicates). One of
  | my former colleagues did a good write up on how Husky does it:
  | https://www.datadoghq.com/blog/engineering/husky-deep-dive/
 
    | francoismassot wrote:
    | Yeah, I was only talking about exactly once semantic between
    | Kafka and Quickwit.
 
  | viraptor wrote:
  | > exactly-once semantic without too much efforts: by keeping
  | track of the positions of partitions in your own database and
  | carefully acknowledging them when you are sure data is safely
  | stored in your db
  | 
  | That's not really "exactly once". What happens when your system
  | dies after it made sure the data is safely stored in the db and
  | before ack-ing?
 
    | Svenskunganka wrote:
    | Depending on how you use the database it is. If you write the
    | data as well as the offset to the DB in the same transaction,
    | you can then seek to the offset stored in the DB after
    | application restart and continue from there.
 
      | viraptor wrote:
      | > after application restart and continue from there.
      | 
      | What if the application doesn't restart before the queue
      | decides the message was lost and resends?
 
        | hashhar wrote:
        | In Kafka the "queue" is dumb, it doesn't lose messages
        | (it's an append only durable log) nor does it resend
        | anything unless the consumer requests it.
 
      | mirekrusin wrote:
      | You should drop "(...) and carefully acknowledging them
      | when you are sure data is safely stored in your db (...)"
      | part then, because it means it's not necessary, you don't
      | rely on it.
      | 
      | One-or-more semantics + local deduplication gives one-and-
      | only semantics.
      | 
      | In this case you're optimising local deduplication with
      | strictly monotonic index.
      | 
      | One downside is that you leak internals of other system
      | (partitions).
      | 
      | The other is that it implies serialised processing - you
      | can't process anything in parallel as you have single index
      | threshold that defines what has been and what has yet not
      | been processed.
 
        | Svenskunganka wrote:
        | I'm not the one who wrote the original comment, so I
        | can't modify it. But one should still commit offsets
        | because it is the happy-path; DB transaction successful?
        | Commit offset. If the latter fails due to e.g application
        | crash and you seek at startup to the partition offset
        | stored in the DB + 1, you get exactly-once semantics.
        | There's some more details, e.g you'd have to do the same
        | during consumer group rebalance, and topic configuration
        | also plays a role, for example if the topic is a
        | compacted topic or not, and if you write tombstones, what
        | its retention policy is.
        | 
        | edit: You added some more to your comment after I posted
        | this one, so I'll try to cover them as well:
        | 
        | > One downside is that you leak internals of other system
        | (partitions).
        | 
        | Yeah, sure.
        | 
        | > The other is that it implies serialised processing -
        | you can't process anything in parallel as you have single
        | index threshold that defines what has been and what has
        | yet not been processed.
        | 
        | It doesn't imply serialised processing. It depends on the
        | use-case, if each record in a topic has to be processed
        | serially, you can't parallelize full-stop; number of
        | partitions equals 1. But if each record can be
        | individually processed you get parallelism equal to the
        | number of partitions the topic has configured. You also
        | achieve parallelism in the same way if only some records
        | in a topic needs to be processed serially, at which point
        | you can use the same key for the records needing to be
        | serially processed and they will end up in the same
        | partition, for example recording the coordinates of a
        | plane - each plane can be processed in parallel, but an
        | individual plane's coordinates need to be processed
        | serially - just use the planes unique identifier as key
        | and the coordinates for the same plane will be appended
        | to the log of the same partition.
 
        | mirekrusin wrote:
        | Yes, it's good option but it requires serialised
        | processing in partition scope, which may or may not be
        | desirable.
        | 
        | If one-and-only-one semantics are needed and processing
        | should be parallel, other methods have to be used.
 
        | francoismassot wrote:
        | Good point: first you're right, we do the ack on Kafka
        | but it's not necessary. Second, this is not what I wanted
        | to stress... and I should have not used the verb
        | "acknowledge". What we do is upload the data on S3, then
        | we commit partitions + positions in what we call the
        | metastore. I can't edit my comment unfortunately.
        | 
        | > One downside is that you leak internals of other system
        | (partitions).
        | 
        | True, but we generalized the concept of partitions for
        | other datasources, pretty convenient to use it for
        | distributing indexing tasks.
 
        | fulmicoton wrote:
        | > The other is that it implies serialised processing -
        | you can't process anything > in parallel as you have
        | single index threshold that defines what has been and >
        | what has yet not been processed.
        | 
        | Fortunately Kafka is partitioned. You cannot work in
        | parallel along partitions.
        | 
        | Also, you can streamline your process. If you are running
        | your data through operation (A, B, C). (C on batch N) can
        | run at the same time as (B on batch N+1), and (A on batch
        | N+2)
        | 
        | We do both at quickwit.
 
  | foota wrote:
  | Isn't exactly once delivery the kind of problem like the CAP
  | thereom where it's not possible?
  | 
  | You can make the downstream idemptoent wrt what the queue is
  | delivering, but the queue might still redeliver things.
 
  | ankitnayan wrote:
  | https://www.confluent.io/blog/exactly-once-semantics-are-pos...
 
| bushbaba wrote:
| Seems like overkill no? Otel collectors are fairly cheap, why add
| expensive Kafka into the mix. If you need to buffer why not just
| dump to s3 or similar data store as a temporary storage array.
 
  | francoismassot wrote:
  | I really like this idea. And there is an OTEL exporter to AWS
  | S3, still in alpha but I'm gonna test it soon:
  | https://github.com/open-telemetry/opentelemetry-collector-co...
 
  | prpl wrote:
  | Why not both, dump to S3 and write pointers to kafka for
  | portable event-based ingestion (since everybody does messages a
  | bit differently)
 
    | bushbaba wrote:
    | No need as s3 objects is your dead letter queue and the
    | system should be designed anyway to coupe with multiple write
    | of same event.
    | 
    | The point is to only use s3 etc in the event of system
    | instability. Not as a primary data transfer means.
 
  | lmm wrote:
  | > If you need to buffer why not just dump to s3 or similar data
  | store as a temporary storage array.
  | 
  | At that point it's very easy to sleepwalk into implementing
  | your own database on top of s3, which is very hard to get good
  | semantics out of - e.g. it offers essentially no ordering
  | guarantees, and forget atomicity. For telemetry you might well
  | be ok with fuzzy data, but if you want exact traces every time
  | then Kafka could make sense.
 
    | dikei wrote:
    | Yeah, and to use S3 efficiently you also need to batch your
    | messages into large blobs of at least 10s of MB, which
    | further complicates the matter, especially if you don't want
    | to lose those messages buffers.
 
      | bushbaba wrote:
      | if your otel collector is being overwhelmed. In such cases
      | you have a lot of backlogged data not able to be ingested.
      | So you dead letter queue it to s3 for freeing up buffers.
      | 
      | The approach here is to only send data to s3 as a last
      | ditch resort.
 
    | ankitnayan wrote:
    | it's very hard to think s3 work as a buffer. Every datastore
    | can work for almost all storage usecases buffer/queue/db when
    | the scale is low but the latter were designed to work at
    | scale
 
  | richieartoul wrote:
  | (WarpStream founder)
  | 
  | This is more or less exactly what WarpStream is:
  | https://www.warpstream.com/blog/minimizing-s3-api-costs-with...
  | 
  | Kafka API, S3 costs and ease of use
 
| daurnimator wrote:
| I expect it would be far cheaper to scale up tempo/loki than it
| would be to even run an idle kafka cluster. This feels like
| spending thousands of dollars to save tens of dollars.
 
  | neetle wrote:
  | Tempo can still buckle under huge bursts of traffic, and you
  | don't need the retention to be in the hours
 
  | pranay01 wrote:
  | Querying in Tempo/Loki does seem to not scale particularly
  | well, and Loki has known issues with high cardinality data,
  | so...
 
  | ankitnayan wrote:
  | When handling surges of the order of 10x, it's much more
  | difficult to scale the different components of loki than to
  | write them to Kafka/Redpanda first and consume at a consistent
  | rate.
 
| blinded wrote:
| This arch is how the big players do it at scale (ie. datadog, new
| relic - the second it passes their edge it lands in a kafka
| cluster). Also otel components lack rate limiting(1) meaning its
| super easy to overload your backend storage (s3).
| 
| Grafana has some posts how they softened the s3 blow with
| memcached(2,3).
| 
| 1. https://github.com/open-telemetry/opentelemetry-collector-
| co... 2. https://grafana.com/docs/loki/latest/operations/caching/
| 3. https://grafana.com/blog/2023/08/23/how-we-scaled-grafana-
| cl...
| 
| I know the post is about telemetry data and my comments on
| grafana are logs, but the arch bits still apply.
 
  | ankitnayan wrote:
  | Caching is to improve read performance whereas Kafka is used to
  | handle ingest volume. I couldn't correlate the Grafana articles
  | shared
 
  | wardb wrote:
  | Grafana Labs employee here => On the linked articles: I'm not
  | aware of any caching being used in the writing data to S3 part
  | of the pipeline other then some time based/volume based
  | buffering at the ingester microservices before writing the
  | chunks of data to object storage.
  | 
  | The linked Loki caching docs/articles are for optimising the
  | read access patterns of S3/object storage, not for writes.
 
| chris_armstrong wrote:
| A similar idea [^1] has cropped up in the serverless
| OpenTelemetry world to collate OpenTelemetry spans in a Kinesis
| stream before forwarding them to a third-party service for
| analysis, obviating the need for a separate collector, reducing
| forwarding latency and removing the cold-start overhead of the
| AWS Distribution for OpenTelemetry Lambda Layer.
| 
| [^1] https://x.com/donkersgood/status/1662074303456636929?s=20
 
| Joel_Mckay wrote:
| If you have distributed concurrent data streams that exhibit
| coherent temporal events, than at some point you pretty much have
| to implement a queuing balancer.
| 
| One simply trades latency for capacity and eventual coherent data
| locality.
| 
| Its almost a arbitrary detail whether you use Kafka, RabbitMQ, or
| Erlang channels. If you can add smart client application-layer
| predictive load-balancing, than it is possible to cut burst
| traffic loads by a magnitude or two. Cost optimized Dynamic host
| scaling is not always a solution that solves every problem.
| 
| Good luck out there =)
 
| nicognaw wrote:
| Signoz is too good at SEO.
| 
| Early days, I looked up otel and observability stuff, and I
| always saw Signoz articles on the first screen.
 
___________________________________________________________________
(page generated 2023-10-23 09:00 UTC)