本文共 15025 字,大约阅读时间需要 50 分钟。
去年项目需要看了hazelcast源码,当时记录的笔记。
Node是节点的抽象,里面包含节点引擎、客户端引擎、分区服务、集群服务、组播服务、连接管理、命令管理、组播属性、节点配置、本地成员、tcp地址、组播地址、连接者、节点初始化器、管理中心、安全上下文、
Config类,包含GroupConfig、NetworkConfig、MapConfig、TopicConfig、QueueConfig、MultiMapConfig、ListConfig、SetConfig、ExecutorConfig、SemaphoreConfig、WanReplicationConfig、ServicesConfig、SecurityConfig、ListenerConfig、PartitionGroupConfig、ManagementCenterConfig、SerializationConfig。 GroupConfig集群用户名及密码DEFAULT_GROUP_PASSWORD = "dev-pass";DEFAULT_GROUP_NAME = "dev"; NetworkConfig网络相关配置,包括InterfacesConfig、JoinConfig、SSLConfig等,其中JoinConfig包括MulticastConfig、TcpIpConfig、AwsConfig。 SecurityConfig客户端登陆相关配置。 WanReplicationConfig集群复制配置,包括WanTargetClusterConfig配置,所有目标节点相关配置。 ClusterServiceImpl用于维护集群各个成员节点,实例化时把本地节点加入成员Map中,MulticastService Node实例化时根据MulticastConfig使用组播加入一个组,MulticastService提供节点基于组播传递的,使用监听模式每当接收到其他节点传播的消息调用监听器的onMessage,传递的参数为JoinMessage,它由序列化模块提供转化,默认NodeMulticastListener监听器,会对集群的节点校验,是否是同个集群用户名密码。 MulticastJoiner用于加入集群节点,创建JoinRequest对象用MulticastService发送给其他成员,不断发送加入集群请求直到node里面的masteraddress,如果配置里面指定了targetAddress就不用使用这种不断发送的方式选举主节点。MulticastService负责发送申请加入的组播消息和组播消息接收及处理的工作。 找主节点方式:循环发送JoinRequest消息向组内发送,如果已加入状态且是master的节点接收到后会向外组播JoinMessage,告诉其他节点我的组成员信息,还没加入集群的成员则将自己节点的master地址设置为master发出的JoinMessage中的地址。所以JoinMessage只有master才会发出。其他已加入组的成员节点接收到JoinMessage类型消息则直接忽略, JoinMessage包含包版本、地址等信息。 SerializationService序列化转化模块,SerializationServiceBuilder生产者,默认字节存放顺序序列为ByteOrder.BIG_ENDIAN,即使用ByteArrayInputOutputFactory, Packet为数据包结构,第一字节为版本号,第二字节表示为长度,第三字节表示类型。 SocketConnector会执行连接操作,使用的是阻塞读写。 TcpIpConnectionManager集成ConnectionManager,用于管理TCPIP连接 JoinMessage{ protected byte packetVersion; protected int buildNumber; protected Address address; protected String uuid; protected ConfigCheck configCheck; protected int memberCount; } JoinRequest继承JoinMessage,并添加credentials、tryCount属性。 默认5701作为每个节点对外暴露的端口。 ReadHandler的handle方法会初始化socketReader,集群成员维护的话使用SocketPacketReader读取报文, OperationServiceImpl类内部类RemoteOperationProcessor用于ResponseOperation对象 response.beforeRun(); response.run(); response.afterRun(); 即会调用JoinRequestOperation的run方法,调用ClusterServiceImpl里面的handleJoinRequest方法,完成成员更新工作,接收成功会向源节点返回SetMasterOperation、MemberInfoUpdateOperation等等更新源节点成员及节点joined状态。 里面一切需要处理的都用run方法,会统一处理。 Node.start--TcpIpConnectionManager.start--InSelectorImpl.run(输入即读取选择器,run不断选择可读的对象并获取attachment,调用它的handle方法,这里的attachment是在TcpIpConnection实例化的时候创建readHandle对象并调用start方法把readHandler当做attachment注册到socketChannel里面的,readHandler调用handle方法,如果是集群协议CLUSTER则实例化SocketPacketReader,SocketPacketReader包含一个PacketReader,默认是DefaultPacketReader,用于将套接字读出来的字节转化成Packet包并调用handlePacket方法处理消息包,如果是集群成员消息包则调用handleMemberPacket方法处理,间接调用NodeEngineImpl的handlePacket方法,有三种不同头部类型:Packet.HEADER_OP、Packet.HEADER_EVENT、Packet.HEADER_WAN_REPLICATION,HEADER_OP表示操作类型,会调用operationService的handleOperation方法处理包消息,OperationThread会一直跑调用OperationRunner处理需要执行的Operation,OperationRunner线程数由hazelcast.operation.thread.count设置,如果-1则为机器cpu个数,接着会执行operation的beforeRun、run、operation如果设置了响应的话还要执行产生响应内容在返回给调用者、afterRun方法。)--OutSelectorImpl.run(输出即写入选择器)--SocketAcceptor.run--TcpIpConnection.start(TcpIpConnection包括WriteHandler和ReadHandler)-- private void process(Object task) { processedCount++; if (task instanceof Operation) { processOperation((Operation) task); return; } if (task instanceof Packet) { processPacket((Packet) task); return; } if (task instanceof PartitionSpecificRunnable) { processPartitionSpecificRunnable((PartitionSpecificRunnable) task); return; } throw new IllegalStateException("Unhandled task type for task:" + task); } 设计得不错的是它直接支持各种Operation的传递,并执行,本质也是序列化反序列化,然后调用Operation的beforerun、run、afterrun方法,后面还会自动执行handleResponse方法,此方法用于向其他备份节点同步数据,这部分操作是在operation的afterrun之前完成备份。备份工作由OperationBackupHandler完成,backup方法,备份又分为同步备份和异步备份,相加等于总的需要备份数。map操作默认备份一份数据且是同步的,异步的默认为0.异步备份不会阻塞操作。备份的operation是Backup,它的run会执行PutOperation的run方法,即把数据放到缓存中并修改版本,这里的run不会再执行复制操作。 <hazelcast> <map name="default"> <backup-count>1</backup-count> <async-backup-count>1</async-backup-count> </map> </hazelcast> operation的run同样会在备份节点上执行,putoperation其实就是在本地缓存更新值。备份的过程没有一个ack机制,信息传输的可靠性如何保证? 一个节点调用map的put操作时,会在本节点上缓存这个结果,再把operation传输到对应partition的第一个备份节点(这个节点可能就是自己本地节点)上,第一个节点接收到后备份到第二个节点上,所以默认就只有两个备份数据。所以nearcache缓存是可能存在每个节点上的。 PutOperation的afterRun方法主要是触发一些拦截器,触发各个节点的事件监听器什么的、更新各个备份节点缓存等等。 IOBalancer平衡io读取写入。 packet报文结构,1byte版本+2byte头部+4byte分区+4byte长度+nbyte消息。 主节点一个一个发给其他成员节点关于成员的消息,其他节点进行更新。 ServiceManagerImpl用于管理所有的远程,启动时会注册常用的一些服务,包括如下的服务,什么map、queue什么的 private void registerCoreServices() { Node node = nodeEngine.getNode(); registerService(ClusterServiceImpl.SERVICE_NAME, node.getClusterService()); registerService(InternalPartitionService.SERVICE_NAME, node.getPartitionService()); registerService(ProxyServiceImpl.SERVICE_NAME, nodeEngine.getProxyService()); registerService(TransactionManagerServiceImpl.SERVICE_NAME, nodeEngine.getTransactionManagerService()); registerService(ClientEngineImpl.SERVICE_NAME, node.clientEngine); registerService(QuorumServiceImpl.SERVICE_NAME, nodeEngine.getQuorumService()); } private void registerDefaultServices(ServicesConfig servicesConfig) { registerService(MapService.SERVICE_NAME, createService(MapService.class)); registerService(LockService.SERVICE_NAME, new LockServiceImpl(nodeEngine)); registerService(QueueService.SERVICE_NAME, new QueueService(nodeEngine)); registerService(TopicService.SERVICE_NAME, new TopicService()); registerService(ReliableTopicService.SERVICE_NAME, new ReliableTopicService(nodeEngine)); registerService(MultiMapService.SERVICE_NAME, new MultiMapService(nodeEngine)); registerService(ListService.SERVICE_NAME, new ListService(nodeEngine)); registerService(SetService.SERVICE_NAME, new SetService(nodeEngine)); registerService(DistributedExecutorService.SERVICE_NAME, new DistributedExecutorService()); registerService(AtomicLongService.SERVICE_NAME, new AtomicLongService()); registerService(AtomicReferenceService.SERVICE_NAME, new AtomicReferenceService()); registerService(CountDownLatchService.SERVICE_NAME, new CountDownLatchService()); registerService(SemaphoreService.SERVICE_NAME, new SemaphoreService(nodeEngine)); registerService(IdGeneratorService.SERVICE_NAME, new IdGeneratorService(nodeEngine)); registerService(MapReduceService.SERVICE_NAME, new MapReduceService(nodeEngine)); registerService(ReplicatedMapService.SERVICE_NAME, new ReplicatedMapService(nodeEngine)); registerService(RingbufferService.SERVICE_NAME, new RingbufferService(nodeEngine)); registerService(XAService.SERVICE_NAME, new XAService(nodeEngine)); registerCacheServiceIfAvailable(); } Map通过AbstractMapServiceFactory创建,使用MapRemoteService处理远程操作,RemoteService服务有两个方法createDistributedObject和destroyDistributedObject方法,最终是通过MapProxyImpl。Map的partition策略在MapContainer里面。 PutOperation用于执行集群节点的put操作。一致性哈希根据key使用MurmurHash哈希计算出结果,再根据分区数(默认271)取余。每个节点都是使用new ConcurrentHashMap<Data, Record>(1000, 0.75f, 1)存放记录。 成员组MemberGroup一共有若干个组,多少个成员就多少个组,最大的复制节点数为7,如果成员组小于7则使用成员组数量。 address[271][4] Address[271][7] 1、addr0 addr9 addr3 addr4 null null null 2、. . . . . 271、addr1 addr2 addr3 addr4 null null null 对分配的结果尝试重新分配,把过载的组分配一些成员给不足的组,并检测每个成员组内的节点数相差不会超过系数1.1,否则重新分组,尽可能达到均匀。 最终形成一张表,271个分区每个分区都对应着若干个复制的成员地址。 int avgPartitionPerGroup = partitionCount / groupSize; 就是说最多4个组,假如5个结点,分组情况为2,1,1,1,则每个组分到的partition个数为271/4,可能有些组多1个partition。 一共有271条线程处理operation,OperationThread,里面有队列ScheduleQueue,线程会不断处理,ScheduleQueue用于operation执行缓冲队列,里面的有两种队列,normalQueue和priorityQueue,一种用于正常的排队,一种用于设置优先级的队列,take方法会优先从priorityQueue中获取需要优先处理的operation。 有个同步复制、异步复制。 OperationServiceImpl.createInvocationBuilder(此方法有两个,一个用于partition、一个用于指定的address),-> 实例化一个InvocationBuilderImpl对象,调用invoke方法会创建Invocation,Invocation包含PartitionInvocation(针对分区)和TargetInvocation(针对指定节点)两种。 在集群中传输一切以Data形式传输。 Map的服务名为hz:impl:mapService。 假如HazelcastInstance执行instance.getMap("customers")则,通过HazelcastInstanceProxy的getMap方法,代理是调HazelcastInstanceImpl的getMap方法,调用getDistributedObject方法,它会通过ProxyService(它代理了所有服务)代理找到MapService,调用mapservice的createDistributedObject方法创建DistributedObject,间接调用MapRemoteService的createDistributedObject方法创建MapProxyImpl,调用MapProxyImpl的put方法把数据放到集群,主要如下操作 final Data key = toData(k, partitionStrategy); final Data value = toData(v); final Data result = putInternal(key, value, ttl, timeunit); return (V) toObject(result); 分别将key和value转化为Data,其实是序列化,方便网络传输。putInternal方法使用PutOperation,并且要根据key计算出partitionId,接着再完成operation的调用。 SerializationServiceImpl提供各种类型的序列化支持,toData提供由object到Data的转化,toObject提供由Data到object的转化,Data默认是使用DefaultData,DefaultData里面其实就是包含了一个字节数组还有不同的偏移量,例如从几位到几位表示类型,序列化工作其实也跟这种类似,把某一对象的相关信息转化为字节数组,传递到目的地后再根据约定反向组装成指定对象。 OperationThread专门用于执行接收到的请求,process方法,可以有三种请求,包括Operation、Packet、PartitionSpecificRunnable,分别不同的处理逻辑,Operation则直接反序列化后调用beforeRun、run、afterRun等方法。 HealthMonitor是独立一条线程用于监控健康,包括 private class HealthMetrics { private final long memoryFree; private final long memoryTotal; private final long memoryUsed; private final long memoryMax; private final double memoryUsedOfTotalPercentage; private final double memoryUsedOfMaxPercentage; //following three load variables are always between 0 and 100. private final double processCpuLoad; private final double systemLoadAverage; private final double systemCpuLoad; private final int threadCount; private final int peakThreadCount; private final long clusterTimeDiff; private final int asyncExecutorQueueSize; private final int clientExecutorQueueSize; private final int queryExecutorQueueSize; private final int scheduledExecutorQueueSize; private final int systemExecutorQueueSize; private final int eventQueueSize; private final int pendingInvocationsCount; private final double pendingInvocationsPercentage; private final int operationServiceOperationExecutorQueueSize; private final int operationServiceOperationPriorityExecutorQueueSize; private final int operationServiceOperationResponseQueueSize; private final int runningOperationsCount; private final int remoteOperationsCount; private final int proxyCount; private final int clientEndpointCount; private final int activeConnectionCount; private final int currentClientConnectionCount; private final int connectionCount; private final int ioExecutorQueueSize; } PerformanceMonitor表示性能监控,监控的参数包括inSelector的已读取数量,outSelect的已写入事件数量,OperationService相关的性能参数,例如挂起的调用比例、总体调用比例、最大调用数量、271个分区线程已执行数量、271个分区operation线程正在挂起线程(即任务排队队列中的数量),常规operation线程的排队队列数量、常规operation线程已执行数量、响应线程已执行数量、响应线程排队队列数量。 hazelcast核心——Node,包含各种各样重要的基础服务,日志、节点关闭钩子、序列化服务、节点引擎、客户端引擎、分区服务、集群服务、广播服务、连接管理服务、命令服务、配置文件服务、群组属性服务、本节点地址、本地集群成员对象、主节点地址、hazelcast实例引用、日志服务、集群节点加入服务、节点扩展服务、管理中心服务、安全上下文、创建信息服务、版本校对服务、hazelcast线程组。 public class Node { private final ILogger logger; private final NodeShutdownHookThread shutdownHookThread = new NodeShutdownHookThread("hz.ShutdownThread"); private final SerializationService serializationService; public final NodeEngineImpl nodeEngine; public final ClientEngineImpl clientEngine; public final InternalPartitionService partitionService; public final ClusterServiceImpl clusterService; public final MulticastService multicastService; public final ConnectionManager connectionManager; public final TextCommandServiceImpl textCommandService; public final Config config; public final GroupProperties groupProperties; public final Address address; public final MemberImpl localMember; private volatile Address masterAddress = null; public final HazelcastInstanceImpl hazelcastInstance; public final LoggingServiceImpl loggingService; private final Joiner joiner; private final NodeExtension nodeExtension; private ManagementCenterService managementCenterService; public final SecurityContext securityContext; private final ClassLoader configClassLoader; private final BuildInfo buildInfo; private final VersionCheck versionCheck = new VersionCheck(); private final HazelcastThreadGroup hazelcastThreadGroup; } NodeEngineImpl作为节点引擎包含了许多服务,重要的例如,事件服务、operation服务、执行服务、等待通知服务、service(内置许多service,例如Map、Queue,用户自定义的服务可配置到hazelcast.xml,启动时会加载进来)管理服务、事务管理服务、代理服务、wan复制服务、包传输服务、证明人服务。 public class NodeEngineImpl implements NodeEngine { private final Node node; private final ILogger logger; private final EventServiceImpl eventService; private final OperationServiceImpl operationService; private final ExecutionServiceImpl executionService; private final WaitNotifyServiceImpl waitNotifyService; private final ServiceManagerImpl serviceManager; private final TransactionManagerServiceImpl transactionManagerService; private final ProxyServiceImpl proxyService; private final WanReplicationService wanReplicationService; private final PacketTransceiver packetTransceiver; private final QuorumServiceImpl quorumService; } ServiceManagerImpl用于管理所有服务,启动时默认会实例化核心的service、默认的service,如果用户通过配置文件配置了自定义service则也会实例化。启动时注册即将service实例put到ConcurrentMap中,核心service包括ClusterServiceImpl、InternalPartitionService、ProxyServiceImpl、TransactionManagerServiceImpl、ClientEngineImpl、QuorumServiceImpl。默认service包括MapService、LockService、QueueService、TopicService、ReliableTopicService、MultiMapService、ListService、SetService、DistributedExecutorService、AtomicLongService、AtomicReferenceService、CountDownLatchService、SemaphoreService、IdGeneratorService、MapReduceService、ReplicatedMapService、RingbufferService、XAService。如果允许还将把缓存服务CacheService添加进来。 public final class ServiceManagerImpl implements ServiceManager { private final ConcurrentMap<String, ServiceInfo> services = new ConcurrentHashMap<String, ServiceInfo>(20, .75f, 1); } 节点加入,Joiner负责加入工作,例如广播则使用MulticastJoiner、单播则使用TcpIpJoiner、AWS则使用TcpIpJoinerOverAWS。Node启动时会根据情况启动个线程, multicast只是做节点发现工作,真正的节点加入工作是交由tcpip做,向主节点发送加入请求,主节点把请求节点添加到成员列表中,然后返回请求节点让它把主节点地址设置为本人。 DefaultSerializers包括DateSerializer、ObjectSerializer、ClassSerializer等等序列化器,实现StreamSerializer的read和write方法完成序列化和反序列化处理。 Date.class, new DateSerializer()); BigInteger.class, new BigIntegerSerializer()); BigDecimal.class, new BigDecimalSerializer()); Externalizable.class, new Externalizer(enableCompression)); Serializable.class, new ObjectSerializer(enableSharedObject, enableCompression)); Class.class, new ClassSerializer()); Enum.class, new EnumSerializer()); DataSerializable.class, dataSerializerAdapter); Portable.class, portableSerializerAdapter); Byte.class, new ByteSerializer()); Boolean.class, new BooleanSerializer()); Character.class, new CharSerializer()); Short.class, new ShortSerializer()); Integer.class, new IntegerSerializer()); Long.class, new LongSerializer()); Float.class, new FloatSerializer()); Double.class, new DoubleSerializer()); byte[].class, new TheByteArraySerializer()); char[].class, new CharArraySerializer()); short[].class, new ShortArraySerializer()); int[].class, new IntegerArraySerializer()); long[].class, new LongArraySerializer()); float[].class, new FloatArraySerializer()); double[].class, new DoubleArraySerializer()); String.class, new StringSerializer());每个service都有自己的context,例如mapservice的MapServiceContext,它里面是保留了一份partition映射表的副本,在底层完成迁移之前并不会更新,当然底层的map数据也不会一边迁移一边删除,而是复制一份进行删除
==========广告时间==========
鄙人的新书《Tomcat内核设计剖析》已经在京东预售了,有需要的朋友可以到 https://item.jd.com/12185360.html 进行预定。感谢各位朋友。
=========================
转载地址:http://askmx.baihongyu.com/