隨筆 - 100  文章 - 50  trackbacks - 0
          <2018年12月>
          2526272829301
          2345678
          9101112131415
          16171819202122
          23242526272829
          303112345

          常用鏈接

          留言簿(3)

          隨筆分類

          隨筆檔案

          文章分類

          文章檔案

          收藏夾

          我收藏的一些文章!

          搜索

          •  

          最新評論

          閱讀排行榜

          評論排行榜

           使用kafka 2.1.0 ,然后用最新的kafka-manager 1.3.3.18來管理kafka, 然后寫了一個生產者和消費者程序,程序運行后,死活顯示不出來
          程序運行后,消費者的group死活顯示不出來。
          生產者代碼如下:
          package com.kafka.producer;
          import org.apache.commons.lang3.exception.ExceptionUtils;
          import org.apache.kafka.clients.consumer.KafkaConsumer;
          import org.apache.kafka.clients.producer.Callback;
          import org.apache.kafka.clients.producer.KafkaProducer;
          import org.apache.kafka.clients.producer.Producer;
          import org.apache.kafka.clients.producer.ProducerRecord;
          import org.apache.kafka.clients.producer.RecordMetadata;
          import java.util.Properties;
          import java.util.concurrent.ExecutionException;
          public class ProducerDemo {
              public static void main(String[] args) {
                  int i = 0;
                  while (true) {
                      i++;
                      try {
                          send("test", String.format("test_%d", i), "123");
                          Thread.sleep(2000);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      } catch (ExecutionException e) {
                          e.printStackTrace();
                      }
                      System.out.println(String.format("Kafka寫入:%d", i));
                  }
              }
          private static Producer<String, Object> producer;
          private static KafkaConsumer<String, Object> consumer;
          private static final String server = "127.0.0.1:9092";
          static {
          Properties props = buildProducerConfig();
          producer = new KafkaProducer<>(props);
          }
          private static Properties buildProducerConfig() {
          Properties props = new Properties();
          // bootstrap.servers是Kafka集群的IP地址,也就是Broker地址
          props.put("bootstrap.servers", server);
          props.put("acks", "all");
          props.put("retries", 0);
          props.put("batch.size", 16384);
          props.put("linger.ms", 1);
          props.put("buffer.memory", 33554432);
          props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
          props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
          return props;
          }
          public static RecordMetadata send(String topic, String key, Object Obj) throws InterruptedException, ExecutionException {
          return producer.send(new ProducerRecord<String, Object>(topic, key, Obj)).get();
          }
          public static void sendAsync(String topic,String key,Object obj) {
          producer.send(new ProducerRecord<String, Object>(topic, key, obj), new Callback() {
          @Override
          public void onCompletion(RecordMetadata metadata, Exception e) {
          if(e !=null) {
          System.out.println(ExceptionUtils.getStackTrace(e));
          }
          }
          } );
          }
          }
          消費者程序如下:
          package com.kafka.consumer;
          import org.apache.kafka.clients.consumer.ConsumerRecord;
          import org.apache.kafka.clients.consumer.ConsumerRecords;
          import org.apache.kafka.clients.consumer.KafkaConsumer;
          import java.util.Arrays;
          import java.util.Properties;
          public class ConsumerDemo {
              public static void main(String[] args) {
                  KafkaUtils.consume();
              }
              private static KafkaConsumer<String, Object> consumer;
              private static final String server = "127.0.0.1:9092";
              static {
                  Properties props = buildConsumerConfig();
                  consumer = new KafkaConsumer<>(props);
              }
              private static Properties buildConsumerConfig() {
                  Properties props;
                  props = new Properties();
                  props.put("bootstrap.servers", server);
                  // 消費組
                  props.put("group.id", "testGroup");
                  props.put("enable.auto.commit", "true");
                  // 設置多久一次更新被消費消息的偏移量
                  props.put("auto.commit.interval.ms", "1000");
                  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                  return props;
              }
              public static void consume() {
                  consumer.subscribe(Arrays.asList("test"));
                  while (true) {
                      // 每隔100ms拉取一次數據
                      ConsumerRecords<String, Object> records = consumer.poll(100);
                      for (ConsumerRecord<String, Object> record : records) {
                          System.out.printf("partition=%d,offset = %d, key = %s, value = %s\n", record.partition(),
                                  record.offset(), record.key(), record.value());
                      }
                  }
              }
          }
          然后在kafka manager的消費者組顯示不出來,為了查找原因,去看kafka manager日志。發現日志報錯如下:
          [warn] k.m.a.c.KafkaManagedOffsetCache - Failed to process a message from offset topic on cluster test-Kafka!
          kafka.common.KafkaException: Unknown offset schema version 3
                  at kafka.manager.utils.one10.GroupMetadataManager$.schemaForOffset(GroupMetadataManager.scala:428) ~[kafka-manager.kafka-manager-1.3.3.18-sans-externalized.jar:na]
                  at kafka.manager.utils.one10.GroupMetadataManager$.readOffsetMessageValue(GroupMetadataManager.scala:532) ~[kafka-manager.kafka-manager-1.3.3.18-sans-externalized.jar:na]
                  at kafka.manager.actor.cluster.KafkaManagedOffsetCache$$anonfun$run$4.apply(KafkaStateActor.scala:332) [kafka-manager.kafka-manager-1.3.3.18-sans-externalized.jar:na]
                  at kafka.manager.actor.cluster.KafkaManagedOffsetCache$$anonfun$run$4.apply(KafkaStateActor.scala:308) [kafka-manager.kafka-manager-1.3.3.18-sans-externalized.jar:na]
                  at scala.util.Success.foreach(Try.scala:236) [org.scala-lang.scala-library-2.11.12.jar:na]
                  at kafka.manager.actor.cluster.KafkaManagedOffsetCache.run(KafkaStateActor.scala:308) [kafka-manager.kafka-manager-1.3.3.18-sans-externalized.jar:na]
                  at java.lang.Thread.run(Thread.java:745) [na:1.8.0_74]
          初步診斷是kafka manager的問題,覺得具體深入分析下,發現kafka manager是用scala寫的, 自己有不了解scala,頓時感覺無從下手,
          但是想想,程序應該都差不多,就去分析分析原因吧,發現錯誤日志在GroupMetadataManager.scala:428,這行,那應該錯誤也在這邊,
          然后在google找了找,也沒有很好的解決方式,只能在github的kafka manager提了個Issue,發現有人修改過源代碼后成功顯示了,安裝這位老兄的提示
          修改scala源代碼,然后重新編譯打包,問題終于得到了解決。
          修改的scala源代碼如下:
          git diff origin/master
          diff --git a/app/kafka/manager/utils/one10/GroupMetadataManager.scala b/app/kafka/manager/utils/one10/GroupMetadataManager.scala
          index 85771cd..f16b1a3 100644
          --- a/app/kafka/manager/utils/one10/GroupMetadataManager.scala
          +++ b/app/kafka/manager/utils/one10/GroupMetadataManager.scala
          @@ -368,6 +368,25 @@ object GroupMetadataManager {
               new Field(SUBSCRIPTION_KEY, BYTES),
               new Field(ASSIGNMENT_KEY, BYTES))
           
          +  private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1
          +
          +  private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", INT64),
          +    new Field("metadata", STRING, "Associated metadata.", ""),
          +    new Field("commit_timestamp", INT64))
          +  private val OFFSET_VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
          +  private val OFFSET_VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
          +  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
          +
          +  private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(
          +    new Field("offset", INT64),
          +    new Field("leader_epoch", INT32),
          +    new Field("metadata", STRING, "Associated metadata.", ""),
          +    new Field("commit_timestamp", INT64))
          +  private val OFFSET_VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
          +  private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
          +  private val OFFSET_VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
          +  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")
          +
             private val PROTOCOL_TYPE_KEY = "protocol_type"
             private val GENERATION_KEY = "generation"
             private val PROTOCOL_KEY = "protocol"
          @@ -388,6 +407,12 @@ object GroupMetadataManager {
               new Field(LEADER_KEY, NULLABLE_STRING),
               new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V1)))
           
          +  private val GROUP_METADATA_VALUE_SCHEMA_V2 = new Schema(
          +    new Field(PROTOCOL_TYPE_KEY, STRING),
          +    new Field(GENERATION_KEY, INT32),
          +    new Field(PROTOCOL_KEY, NULLABLE_STRING),
          +    new Field(LEADER_KEY, NULLABLE_STRING),
          +    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2)))
           
             // map of versions to key schemas as data types
             private val MESSAGE_TYPE_SCHEMAS = Map(
          @@ -398,13 +423,18 @@ object GroupMetadataManager {
             // map of version of offset value schemas
             private val OFFSET_VALUE_SCHEMAS = Map(
               0 -> OFFSET_COMMIT_VALUE_SCHEMA_V0,
          -    1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1)
          +    1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1,
          +    2 -> OFFSET_COMMIT_VALUE_SCHEMA_V2,
          +    3 -> OFFSET_COMMIT_VALUE_SCHEMA_V3)
          +
             private val CURRENT_OFFSET_VALUE_SCHEMA_VERSION = 1.toShort
           
             // map of version of group metadata value schemas
             private val GROUP_VALUE_SCHEMAS = Map(
               0 -> GROUP_METADATA_VALUE_SCHEMA_V0,
          -    1 -> GROUP_METADATA_VALUE_SCHEMA_V1)
          +    1 -> GROUP_METADATA_VALUE_SCHEMA_V1,
          +    2 -> GROUP_METADATA_VALUE_SCHEMA_V2)
          +
             private val CURRENT_GROUP_VALUE_SCHEMA_VERSION = 1.toShort
           
             private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
          @@ -545,6 +575,20 @@ object GroupMetadataManager {
                   val expireTimestamp = value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
           
                   OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp)
          +      } else if (version == 2) {
          +        val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V2).asInstanceOf[Long]
          +        val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V2).asInstanceOf[String]
          +        val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2).asInstanceOf[Long]
          +
          +        OffsetAndMetadata(offset, metadata, commitTimestamp)
          +      } else if (version == 3) {
          +        val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V3).asInstanceOf[Long]
          +        val leaderEpoch = value.get(OFFSET_VALUE_LEADER_EPOCH_FIELD_V3).asInstanceOf[Int]
          +        val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V3).asInstanceOf[String]
          +        val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3).asInstanceOf[Long]
          +
          +        // val leaderEpochOpt: Optional[Integer] = if (leaderEpoch < 0) Optional.empty() else Optional.of(leaderEpoch)
          +        OffsetAndMetadata(offset, metadata, commitTimestamp)
                 } else {
                   throw new IllegalStateException("Unknown offset message version")
                 }
          完整的app/kafka/manager/utils/one10/GroupMetadataManager.scala b/app/kafka/manager/utils/one10/GroupMetadataManager.scala代碼如下:
          /*
           * Licensed to the Apache Software Foundation (ASF) under one or more
           * contributor license agreements. See the NOTICE file distributed with
           * this work for additional information regarding copyright ownership.
           * The ASF licenses this file to You under the Apache License, Version 2.0
           * (the "License"); you may not use this file except in compliance with
           * the License. You may obtain a copy of the License at
           *
           *    
          http://www.apache.org/licenses/LICENSE-2.0
           *
           * Unless required by applicable law or agreed to in writing, software
           * distributed under the License is distributed on an "AS IS" BASIS,
           * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
           * See the License for the specific language governing permissions and
           * limitations under the License.
           
          */

          package kafka.manager.utils.one10

          import java.io.PrintStream
          import java.nio.ByteBuffer
          import java.nio.charset.StandardCharsets
          import java.util.UUID

          import kafka.common.{KafkaException, MessageFormatter, OffsetAndMetadata}
          import kafka.utils.{Logging, nonthreadsafe}
          import org.apache.kafka.clients.consumer.ConsumerRecord
          import org.apache.kafka.clients.consumer.internals.{ConsumerProtocol, PartitionAssignor}
          import org.apache.kafka.common.TopicPartition
          import org.apache.kafka.common.protocol.types.Type._
          import org.apache.kafka.common.protocol.types._
          import org.apache.kafka.common.utils.Utils

          import scala.collection.JavaConverters._
          import scala.collection.{Seq, immutable, mutable, _}


          /**
            * Case class used to represent group metadata for the ListGroups API
            
          */
          case class GroupOverview(groupId: String,
                                   protocolType: String)

          /**
            * We cache offset commits along with their commit record offset. This enables us to ensure that the latest offset
            * commit is always materialized when we have a mix of transactional and regular offset commits. Without preserving
            * information of the commit record offset, compaction of the offsets topic it self may result in the wrong offset commit
            * being materialized.
            
          */
          case class CommitRecordMetadataAndOffset(appendedBatchOffset: Option[Long], offsetAndMetadata: OffsetAndMetadata) {
            def olderThan(that: CommitRecordMetadataAndOffset) : Boolean = appendedBatchOffset.get < that.appendedBatchOffset.get
          }

          /**
            * Group contains the following metadata:
            *
            *  Membership metadata:
            *  1. Members registered in this group
            *  2. Current protocol assigned to the group (e.g. partition assignment strategy for consumers)
            *  3. Protocol metadata associated with group members
            *
            *  State metadata:
            *  1. group state
            *  2. generation id
            *  3. leader id
            
          */
          @nonthreadsafe
          class GroupMetadata(val groupId: String
                                             , var protocolType: Option[String]
                                             , var generationId: Int
                                             , var protocol: Option[String]
                                             , var leaderId: Option[String]
                                            ) extends Logging {

            private val members = new mutable.HashMap[String, MemberMetadata]
            private val offsets = new mutable.HashMap[TopicPartition, CommitRecordMetadataAndOffset]
            private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata]
            private val pendingTransactionalOffsetCommits = new mutable.HashMap[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]()
            private var receivedTransactionalOffsetCommits = false
            private var receivedConsumerOffsetCommits = false

            var newMemberAdded: Boolean = false

            def has(memberId: String) = members.contains(memberId)
            def get(memberId: String) = members(memberId)

            def isLeader(memberId: String): Boolean = leaderId.contains(memberId)
            def leaderOrNull: String = leaderId.orNull
            def protocolOrNull: String = protocol.orNull

            def add(member: MemberMetadata) {
              if (members.isEmpty)
                this.protocolType = Some(member.protocolType)

              assert(groupId == member.groupId)
              assert(this.protocolType.orNull == member.protocolType)
              assert(supportsProtocols(member.protocols))

              if (leaderId.isEmpty)
                leaderId = Some(member.memberId)
              members.put(member.memberId, member)
            }

            def remove(memberId: String) {
              members.remove(memberId)
              if (isLeader(memberId)) {
                leaderId = if (members.isEmpty) {
                  None
                } else {
                  Some(members.keys.head)
                }
              }
            }

            def allMembers = members.keySet

            def allMemberMetadata = members.values.toList

            // TODO: decide if ids should be predictable or random
            def generateMemberIdSuffix = UUID.randomUUID().toString

            private def candidateProtocols = {
              // get the set of protocols that are commonly supported by all members
              allMemberMetadata
                .map(_.protocols)
                .reduceLeft((commonProtocols, protocols) => commonProtocols & protocols)
            }

            def supportsProtocols(memberProtocols: Set[String]) = {
              members.isEmpty || (memberProtocols & candidateProtocols).nonEmpty
            }

            def overview: GroupOverview = {
              GroupOverview(groupId, protocolType.getOrElse(""))
            }

            def initializeOffsets(offsets: collection.Map[TopicPartition, CommitRecordMetadataAndOffset],
                                  pendingTxnOffsets: Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]) {
              this.offsets ++= offsets
              this.pendingTransactionalOffsetCommits ++= pendingTxnOffsets
            }

            def onOffsetCommitAppend(topicPartition: TopicPartition, offsetWithCommitRecordMetadata: CommitRecordMetadataAndOffset) {
              if (pendingOffsetCommits.contains(topicPartition)) {
                if (offsetWithCommitRecordMetadata.appendedBatchOffset.isEmpty)
                  throw new IllegalStateException("Cannot complete offset commit write without providing the metadata of the record " +
                    "in the log.")
                if (!offsets.contains(topicPartition) || offsets(topicPartition).olderThan(offsetWithCommitRecordMetadata))
                  offsets.put(topicPartition, offsetWithCommitRecordMetadata)
              }

              pendingOffsetCommits.get(topicPartition) match {
                case Some(stagedOffset) if offsetWithCommitRecordMetadata.offsetAndMetadata == stagedOffset =>
                  pendingOffsetCommits.remove(topicPartition)
                case _ =>
                // The pendingOffsetCommits for this partition could be empty if the topic was deleted, in which case
                
          // its entries would be removed from the cache by the `removeOffsets` method.
              }
            }

            def failPendingOffsetWrite(topicPartition: TopicPartition, offset: OffsetAndMetadata): Unit = {
              pendingOffsetCommits.get(topicPartition) match {
                case Some(pendingOffset) if offset == pendingOffset => pendingOffsetCommits.remove(topicPartition)
                case _ =>
              }
            }

            def prepareOffsetCommit(offsets: Map[TopicPartition, OffsetAndMetadata]) {
              receivedConsumerOffsetCommits = true
              pendingOffsetCommits ++= offsets
            }

            def prepareTxnOffsetCommit(producerId: Long, offsets: Map[TopicPartition, OffsetAndMetadata]) {
              trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offsets $offsets is pending")
              receivedTransactionalOffsetCommits = true
              val producerOffsets = pendingTransactionalOffsetCommits.getOrElseUpdate(producerId,
                mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset])

              offsets.foreach { case (topicPartition, offsetAndMetadata) =>
                producerOffsets.put(topicPartition, CommitRecordMetadataAndOffset(None, offsetAndMetadata))
              }
            }

            def hasReceivedConsistentOffsetCommits : Boolean = {
              !receivedConsumerOffsetCommits || !receivedTransactionalOffsetCommits
            }

            /* Remove a pending transactional offset commit if the actual offset commit record was not written to the log.
             * We will return an error and the client will retry the request, potentially to a different coordinator.
             
          */
            def failPendingTxnOffsetCommit(producerId: Long, topicPartition: TopicPartition): Unit = {
              pendingTransactionalOffsetCommits.get(producerId) match {
                case Some(pendingOffsets) =>
                  val pendingOffsetCommit = pendingOffsets.remove(topicPartition)
                  trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offsets $pendingOffsetCommit failed " +
                    s"to be appended to the log")
                  if (pendingOffsets.isEmpty)
                    pendingTransactionalOffsetCommits.remove(producerId)
                case _ =>
                // We may hit this case if the partition in question has emigrated already.
              }
            }

            def onTxnOffsetCommitAppend(producerId: Long, topicPartition: TopicPartition,
                                        commitRecordMetadataAndOffset: CommitRecordMetadataAndOffset) {
              pendingTransactionalOffsetCommits.get(producerId) match {
                case Some(pendingOffset) =>
                  if (pendingOffset.contains(topicPartition)
                    && pendingOffset(topicPartition).offsetAndMetadata == commitRecordMetadataAndOffset.offsetAndMetadata)
                    pendingOffset.update(topicPartition, commitRecordMetadataAndOffset)
                case _ =>
                // We may hit this case if the partition in question has emigrated.
              }
            }

            /* Complete a pending transactional offset commit. This is called after a commit or abort marker is fully written
             * to the log.
             
          */
            def completePendingTxnOffsetCommit(producerId: Long, isCommit: Boolean): Unit = {
              val pendingOffsetsOpt = pendingTransactionalOffsetCommits.remove(producerId)
              if (isCommit) {
                pendingOffsetsOpt.foreach { pendingOffsets =>
                  pendingOffsets.foreach { case (topicPartition, commitRecordMetadataAndOffset) =>
                    if (commitRecordMetadataAndOffset.appendedBatchOffset.isEmpty)
                      throw new IllegalStateException(s"Trying to complete a transactional offset commit for producerId $producerId " +
                        s"and groupId $groupId even though the offset commit record itself hasn't been appended to the log.")

                    val currentOffsetOpt = offsets.get(topicPartition)
                    if (currentOffsetOpt.forall(_.olderThan(commitRecordMetadataAndOffset))) {
                      trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offset $commitRecordMetadataAndOffset " +
                        "committed and loaded into the cache.")
                      offsets.put(topicPartition, commitRecordMetadataAndOffset)
                    } else {
                      trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offset $commitRecordMetadataAndOffset " +
                        s"committed, but not loaded since its offset is older than current offset $currentOffsetOpt.")
                    }
                  }
                }
              } else {
                trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offsets $pendingOffsetsOpt aborted")
              }
            }

            def activeProducers = pendingTransactionalOffsetCommits.keySet

            def hasPendingOffsetCommitsFromProducer(producerId: Long) =
              pendingTransactionalOffsetCommits.contains(producerId)

            def removeOffsets(topicPartitions: Seq[TopicPartition]): immutable.Map[TopicPartition, OffsetAndMetadata] = {
              topicPartitions.flatMap { topicPartition =>

                pendingOffsetCommits.remove(topicPartition)
                pendingTransactionalOffsetCommits.foreach { case (_, pendingOffsets) =>
                  pendingOffsets.remove(topicPartition)
                }
                val removedOffset = offsets.remove(topicPartition)
                removedOffset.map(topicPartition -> _.offsetAndMetadata)
              }.toMap
            }

            def removeExpiredOffsets(startMs: Long) : Map[TopicPartition, OffsetAndMetadata] = {
              val expiredOffsets = offsets
                .filter {
                  case (topicPartition, commitRecordMetadataAndOffset) =>
                    commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp < startMs && !pendingOffsetCommits.contains(topicPartition)
                }
                .map {
                  case (topicPartition, commitRecordOffsetAndMetadata) =>
                    (topicPartition, commitRecordOffsetAndMetadata.offsetAndMetadata)
                }
              offsets --= expiredOffsets.keySet
              expiredOffsets.toMap
            }

            def allOffsets = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) =>
              (topicPartition, commitRecordMetadataAndOffset.offsetAndMetadata)
            }.toMap

            def offset(topicPartition: TopicPartition): Option[OffsetAndMetadata] = offsets.get(topicPartition).map(_.offsetAndMetadata)

            // visible for testing
            def offsetWithRecordMetadata(topicPartition: TopicPartition): Option[CommitRecordMetadataAndOffset] = offsets.get(topicPartition)

            def numOffsets = offsets.size

            def hasOffsets = offsets.nonEmpty || pendingOffsetCommits.nonEmpty || pendingTransactionalOffsetCommits.nonEmpty


            override def toString: String = {
              "GroupMetadata(" +
                s"groupId=$groupId, " +
                s"generation=$generationId, " +
                s"protocolType=$protocolType, " +
                s"members=$members)"
            }

          }


          /**
            * Messages stored for the group topic has versions for both the key and value fields. Key
            * version is used to indicate the type of the message (also to differentiate different types
            * of messages from being compacted together if they have the same field values); and value
            * version is used to evolve the messages within their data types:
            *
            * key version 0:       group consumption offset
            *    -> value version 0:       [offset, metadata, timestamp]
            *
            * key version 1:       group consumption offset
            *    -> value version 1:       [offset, metadata, commit_timestamp, expire_timestamp]
            *
            * key version 2:       group metadata
            *     -> value version 0:       [protocol_type, generation, protocol, leader, members]
            
          */
          object GroupMetadataManager {

            private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort
            private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort
            private val CURRENT_GROUP_KEY_SCHEMA_VERSION2 = 3.toShort

            private val OFFSET_COMMIT_KEY_SCHEMA = new Schema(new Field("group", STRING),
              new Field("topic", STRING),
              new Field("partition", INT32))
            private val OFFSET_KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("group")
            private val OFFSET_KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("topic")
            private val OFFSET_KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("partition")

            private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", INT64),
              new Field("metadata", STRING, "Associated metadata.", ""),
              new Field("timestamp", INT64))
            private val OFFSET_VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
            private val OFFSET_VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
            private val OFFSET_VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")

            private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", INT64),
              new Field("metadata", STRING, "Associated metadata.", ""),
              new Field("commit_timestamp", INT64),
              new Field("expire_timestamp", INT64))

            private val OFFSET_VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
            private val OFFSET_VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
            private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
            private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")

           //new add for version
            private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", INT64),
                 new Field("metadata", STRING, "Associated metadata.", ""),
                 new Field("commit_timestamp", INT64))
            private val OFFSET_VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
            private val OFFSET_VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
            private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")

           private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(
             new Field("offset", INT64),
             new Field("leader_epoch", INT32),
             new Field("metadata", STRING, "Associated metadata.", ""),
             new Field("commit_timestamp", INT64))
            private val OFFSET_VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
            private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
            private val OFFSET_VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
            private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")

            //new add for version 3-end

            private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", STRING))
            private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")

            private val MEMBER_ID_KEY = "member_id"
            private val CLIENT_ID_KEY = "client_id"
            private val CLIENT_HOST_KEY = "client_host"
            private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout"
            private val SESSION_TIMEOUT_KEY = "session_timeout"
            private val SUBSCRIPTION_KEY = "subscription"
            private val ASSIGNMENT_KEY = "assignment"

            private val MEMBER_METADATA_V0 = new Schema(
              new Field(MEMBER_ID_KEY, STRING),
              new Field(CLIENT_ID_KEY, STRING),
              new Field(CLIENT_HOST_KEY, STRING),
              new Field(SESSION_TIMEOUT_KEY, INT32),
              new Field(SUBSCRIPTION_KEY, BYTES),
              new Field(ASSIGNMENT_KEY, BYTES))

            private val MEMBER_METADATA_V1 = new Schema(
              new Field(MEMBER_ID_KEY, STRING),
              new Field(CLIENT_ID_KEY, STRING),
              new Field(CLIENT_HOST_KEY, STRING),
              new Field(REBALANCE_TIMEOUT_KEY, INT32),
              new Field(SESSION_TIMEOUT_KEY, INT32),
              new Field(SUBSCRIPTION_KEY, BYTES),
              new Field(ASSIGNMENT_KEY, BYTES))

            //new add for version
            private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1

            private val PROTOCOL_TYPE_KEY = "protocol_type"
            private val GENERATION_KEY = "generation"
            private val PROTOCOL_KEY = "protocol"
            private val LEADER_KEY = "leader"
            private val MEMBERS_KEY = "members"

            private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(
              new Field(PROTOCOL_TYPE_KEY, STRING),
              new Field(GENERATION_KEY, INT32),
              new Field(PROTOCOL_KEY, NULLABLE_STRING),
              new Field(LEADER_KEY, NULLABLE_STRING),
              new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V0)))

            private val GROUP_METADATA_VALUE_SCHEMA_V1 = new Schema(
              new Field(PROTOCOL_TYPE_KEY, STRING),
              new Field(GENERATION_KEY, INT32),
              new Field(PROTOCOL_KEY, NULLABLE_STRING),
              new Field(LEADER_KEY, NULLABLE_STRING),
              new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V1)))

            private val GROUP_METADATA_VALUE_SCHEMA_V2 = new Schema(
              new Field(PROTOCOL_TYPE_KEY, STRING),
              new Field(GENERATION_KEY, INT32),
              new Field(PROTOCOL_KEY, NULLABLE_STRING),
              new Field(LEADER_KEY, NULLABLE_STRING),
              new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2)))

            // map of versions to key schemas as data types
            private val MESSAGE_TYPE_SCHEMAS = Map(
              0 -> OFFSET_COMMIT_KEY_SCHEMA,
              1 -> OFFSET_COMMIT_KEY_SCHEMA,
              2 -> GROUP_METADATA_KEY_SCHEMA
            )

            // map of version of offset value schemas
            private val OFFSET_VALUE_SCHEMAS = Map(
               1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1,
               2 -> OFFSET_COMMIT_VALUE_SCHEMA_V2,
               3 -> OFFSET_COMMIT_VALUE_SCHEMA_V3
              )
            private val CURRENT_OFFSET_VALUE_SCHEMA_VERSION = 1.toShort

            // map of version of group metadata value schemas
            private val GROUP_VALUE_SCHEMAS = Map(
              1 -> GROUP_METADATA_VALUE_SCHEMA_V1,
              2 -> GROUP_METADATA_VALUE_SCHEMA_V2)
            private val CURRENT_GROUP_VALUE_SCHEMA_VERSION = 1.toShort

            private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
            private val CURRENT_GROUP_KEY_SCHEMA = schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)

            private val CURRENT_OFFSET_VALUE_SCHEMA = schemaForOffset(CURRENT_OFFSET_VALUE_SCHEMA_VERSION)
            private val CURRENT_GROUP_VALUE_SCHEMA = schemaForGroup(CURRENT_GROUP_VALUE_SCHEMA_VERSION)

            private def schemaForKey(version: Int) = {
              val schemaOpt = MESSAGE_TYPE_SCHEMAS.get(version)
              schemaOpt match {
                case Some(schema) => schema
                case _ => throw new KafkaException("Unknown offset schema version " + version)
              }
            }

            private def schemaForOffset(version: Int) = {
              val schemaOpt = OFFSET_VALUE_SCHEMAS.get(version)
              println("version is:"+version+", schemaOpt is: "+schemaOpt)
              schemaOpt match {
                case Some(schema) => schema
                case _ => throw new KafkaException("Unknown offset schema version " + version)
              }
            }

            private def schemaForGroup(version: Int) = {
              val schemaOpt = GROUP_VALUE_SCHEMAS.get(version)
              schemaOpt match {
                case Some(schema) => schema
                case _ => throw new KafkaException("Unknown group metadata version " + version)
              }
            }

            /**
              * Generates the key for offset commit message for given (group, topic, partition)
              *
              * 
          @return key for offset commit message
              
          */
            def offsetCommitKey(group: String, topicPartition: TopicPartition,
                                               versionId: Short = 0): Array[Byte] = {
              val key = new Struct(CURRENT_OFFSET_KEY_SCHEMA)
              key.set(OFFSET_KEY_GROUP_FIELD, group)
              key.set(OFFSET_KEY_TOPIC_FIELD, topicPartition.topic)
              key.set(OFFSET_KEY_PARTITION_FIELD, topicPartition.partition)

              val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf)
              byteBuffer.putShort(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
              key.writeTo(byteBuffer)
              byteBuffer.array()
            }

            /**
              * Generates the key for group metadata message for given group
              *
              * 
          @return key bytes for group metadata message
              
          */
            def groupMetadataKey(group: String): Array[Byte] = {
              val key = new Struct(CURRENT_GROUP_KEY_SCHEMA)
              key.set(GROUP_KEY_GROUP_FIELD, group)

              val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf)
              byteBuffer.putShort(CURRENT_GROUP_KEY_SCHEMA_VERSION)
              key.writeTo(byteBuffer)
              byteBuffer.array()
            }

            /**
              * Generates the payload for offset commit message from given offset and metadata
              *
              * 
          @param offsetAndMetadata consumer's current offset and metadata
              * 
          @return payload for offset commit message
              
          */
            def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata): Array[Byte] = {
              // generate commit value with schema version 1
              val value = new Struct(CURRENT_OFFSET_VALUE_SCHEMA)
              value.set(OFFSET_VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset)
              value.set(OFFSET_VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata)
              value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp)
              value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp)
              val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
              byteBuffer.putShort(CURRENT_OFFSET_VALUE_SCHEMA_VERSION)
              value.writeTo(byteBuffer)
              byteBuffer.array()
            }

            /**
              * Decodes the offset messages' key
              *
              * 
          @param buffer input byte-buffer
              * 
          @return an GroupTopicPartition object
              
          */
            def readMessageKey(buffer: ByteBuffer): BaseKey = {
              val version = buffer.getShort
              val keySchema = schemaForKey(version)
              val key = keySchema.read(buffer)

              if (version <= CURRENT_OFFSET_KEY_SCHEMA_VERSION) {
                // version 0 and 1 refer to offset
                val group = key.get(OFFSET_KEY_GROUP_FIELD).asInstanceOf[String]
                val topic = key.get(OFFSET_KEY_TOPIC_FIELD).asInstanceOf[String]
                val partition = key.get(OFFSET_KEY_PARTITION_FIELD).asInstanceOf[Int]

                OffsetKey(version, GroupTopicPartition(group, new TopicPartition(topic, partition)))

              } else if (version == CURRENT_GROUP_KEY_SCHEMA_VERSION) {
                // version 2 refers to offset
                val group = key.get(GROUP_KEY_GROUP_FIELD).asInstanceOf[String]

                GroupMetadataKey(version, group)
              } else if(version == CURRENT_GROUP_KEY_SCHEMA_VERSION2) {//new add
               
          // version 3 refers to offset
                val group = key.get(GROUP_KEY_GROUP_FIELD).asInstanceOf[String]

                GroupMetadataKey(version, group)
              } else {
                throw new IllegalStateException("Unknown version " + version + " for group metadata message")
              }
            }

            /**
              * Decodes the offset messages' payload and retrieves offset and metadata from it
              *
              * 
          @param buffer input byte-buffer
              * 
          @return an offset-metadata object from the message
              
          */
            def readOffsetMessageValue(buffer: ByteBuffer): OffsetAndMetadata = {
              if (buffer == null) { // tombstone
                null
              } else {
                val version = buffer.getShort
                val valueSchema = schemaForOffset(version)
                val value = valueSchema.read(buffer)

                if (version == 0) {
                  val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V0).asInstanceOf[Long]
                  val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V0).asInstanceOf[String]
                  val timestamp = value.get(OFFSET_VALUE_TIMESTAMP_FIELD_V0).asInstanceOf[Long]

                  OffsetAndMetadata(offset, metadata, timestamp)
                } else if (version == 1) {
                  val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V1).asInstanceOf[Long]
                  val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V1).asInstanceOf[String]
                  val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
                  val expireTimestamp = value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]

                  OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp)
                } else if (version == 2) {
                  val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V2).asInstanceOf[Long]
                  val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V2).asInstanceOf[String]
                  val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2).asInstanceOf[Long]

                  OffsetAndMetadata(offset, metadata, commitTimestamp)
                } else if (version == 3) {
                  val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V3).asInstanceOf[Long]
                  val leaderEpoch = value.get(OFFSET_VALUE_LEADER_EPOCH_FIELD_V3).asInstanceOf[Int]
                  val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V3).asInstanceOf[String]
                  val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3).asInstanceOf[Long]
                  OffsetAndMetadata(offset, metadata, commitTimestamp)
                } else {
                  throw new IllegalStateException("Unknown offset message version")
                }
              }
            }

            /**
              * Decodes the group metadata messages' payload and retrieves its member metadatafrom it
              *
              * 
          @param buffer input byte-buffer
              * 
          @return a group metadata object from the message
              
          */
            def readGroupMessageValue(groupId: String, buffer: ByteBuffer): GroupMetadata = {
              if (buffer == null) { // tombstone
                null
              } else {
                val version = buffer.getShort
                val valueSchema = schemaForGroup(version)
                val value = valueSchema.read(buffer)

                if (version == 0 || version == 1) {
                  val generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
                  val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
                  val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
                  val leaderId = value.get(LEADER_KEY).asInstanceOf[String]
                  val memberMetadataArray = value.getArray(MEMBERS_KEY)

                  val members = memberMetadataArray.map { memberMetadataObj =>
                    val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
                    val memberId = memberMetadata.get(MEMBER_ID_KEY).asInstanceOf[String]
                    val clientId = memberMetadata.get(CLIENT_ID_KEY).asInstanceOf[String]
                    val clientHost = memberMetadata.get(CLIENT_HOST_KEY).asInstanceOf[String]
                    val subscription: PartitionAssignor.Subscription = ConsumerProtocol.deserializeSubscription(memberMetadata.get(SUBSCRIPTION_KEY).asInstanceOf[ByteBuffer])
                    val assignment: PartitionAssignor.Assignment = ConsumerProtocol.deserializeAssignment(memberMetadata.get(ASSIGNMENT_KEY).asInstanceOf[ByteBuffer])
                    val member = new MemberMetadata(memberId
                      , groupId
                      , clientId
                      , clientHost
                      , protocolType
                      , List((protocol, subscription.topics().asScala.toSet))
                      , assignment.partitions().asScala.map(tp => (tp.topic(), tp.partition())).toSet)
                    member
                  }
                  val finalProtocolType = if (protocolType == null || protocolType.isEmpty) None else Some(protocolType)
                  val group = new GroupMetadata(groupId = groupId
                    , generationId = generationId
                    , protocolType = finalProtocolType
                    , protocol = Option(protocol)
                    , leaderId = Option(leaderId)
                  )
                  members.foreach(group.add)
                  group
                } else {
                  throw new IllegalStateException("Unknown group metadata message version")
                }
              }
            }

            // Formatter for use with tools such as console consumer: Consumer should also set exclude.internal.topics to false.
            
          // (specify --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" when consuming __consumer_offsets)
            class OffsetsMessageFormatter extends MessageFormatter {
              def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
                Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach {
                  // Only print if the message is an offset record.
                  
          // We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
                  case offsetKey: OffsetKey =>
                    val groupTopicPartition = offsetKey.key
                    val value = consumerRecord.value
                    val formattedValue =
                      if (value == null) "NULL"
                      else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString
                    output.write(groupTopicPartition.toString.getBytes(StandardCharsets.UTF_8))
                    output.write("::".getBytes(StandardCharsets.UTF_8))
                    output.write(formattedValue.getBytes(StandardCharsets.UTF_8))
                    output.write("\n".getBytes(StandardCharsets.UTF_8))
                  case _ => // no-op
                }
              }
            }

            // Formatter for use with tools to read group metadata history
            class GroupMetadataMessageFormatter extends MessageFormatter {
              def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
                Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach {
                  // Only print if the message is a group metadata record.
                  
          // We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
                  case groupMetadataKey: GroupMetadataKey =>
                    val groupId = groupMetadataKey.key
                    val value = consumerRecord.value
                    val formattedValue =
                      if (value == null) "NULL"
                      else GroupMetadataManager.readGroupMessageValue(groupId, ByteBuffer.wrap(value)).toString
                    output.write(groupId.getBytes(StandardCharsets.UTF_8))
                    output.write("::".getBytes(StandardCharsets.UTF_8))
                    output.write(formattedValue.getBytes(StandardCharsets.UTF_8))
                    output.write("\n".getBytes(StandardCharsets.UTF_8))
                  case _ => // no-op
                }
              }
            }

          }

          case class GroupTopicPartition(group: String, topicPartition: TopicPartition) {

            def this(group: String, topic: String, partition: Int) =
              this(group, new TopicPartition(topic, partition))

            override def toString: String =
              "[%s,%s,%d]".format(group, topicPartition.topic, topicPartition.partition)
          }

          trait BaseKey{
            def version: Short
            def key: Any
          }

          case class OffsetKey(version: Short, key: GroupTopicPartition) extends BaseKey {

            override def toString: String = key.toString
          }

          case class GroupMetadataKey(version: Short, key: String) extends BaseKey {

            override def toString: String = key
          }

          posted on 2018-12-13 15:36 fly 閱讀(1516) 評論(0)  編輯  收藏 所屬分類: J2EE
          主站蜘蛛池模板: 武宁县| 隆回县| 昌平区| 峨眉山市| 富阳市| 汝南县| 太湖县| 苍南县| 榆林市| 富阳市| 什邡市| 柏乡县| 随州市| 达孜县| 商洛市| 文山县| 彭阳县| 景东| 英超| 舒兰市| 浑源县| 阳高县| 宁蒗| 长武县| 嵩明县| 大竹县| 灵武市| 武胜县| 罗山县| 读书| 紫金县| 扎兰屯市| 阿坝| 红安县| 彰武县| 万宁市| 沧州市| 尤溪县| 勐海县| 汾阳市| 虹口区|