diff --git a/src/main/scala/com/boundary/ordasity/Cluster.scala b/src/main/scala/com/boundary/ordasity/Cluster.scala index 6654005..3e2b71d 100644 --- a/src/main/scala/com/boundary/ordasity/Cluster.scala +++ b/src/main/scala/com/boundary/ordasity/Cluster.scala @@ -18,6 +18,7 @@ package com.boundary.ordasity import com.codahale.jerkson.Json._ import com.codahale.logula.Logging +import com.yammer.metrics.core.Gauge import com.yammer.metrics.scala.{Meter, Instrumented} import java.lang.management.ManagementFactory import javax.management.ObjectName @@ -33,7 +34,7 @@ import com.twitter.common.quantity.{Time, Amount} import com.twitter.common.zookeeper.{ZooKeeperMap => ZKMap, ZooKeeperClient} import listeners._ -import balancing.{CountBalancingPolicy, MeteredBalancingPolicy} +import balancing.{CountBalancingPolicy, GaugedBalancingPolicy, MeteredBalancingPolicy} import org.apache.zookeeper.{WatchedEvent, Watcher} import org.apache.zookeeper.Watcher.Event.KeeperState import java.util.concurrent.{TimeoutException, TimeUnit, ScheduledFuture, ScheduledThreadPoolExecutor} @@ -66,11 +67,12 @@ class Cluster(val name: String, val listener: Listener, config: ClusterConfig) var loadMap : Map[String, Double] = null val workUnitsPeggedToMe = new NonBlockingHashSet[String] - var balancingPolicy = { - if (config.useSmartBalancing) - new MeteredBalancingPolicy(this, config).init() - else - new CountBalancingPolicy(this, config).init() + var balancingPolicy = (config.useSmartBalancing, config.useSmartGaugedBalancing) match { + case (false, false) => new CountBalancingPolicy(this, config).init() + case (true, false) => new MeteredBalancingPolicy(this, config).init() + case (false, true) => new GaugedBalancingPolicy(this, config).init() + case (true, true) => throw new IllegalArgumentException( + "Can't use both smart (meter) balancing and smart gauge balancing. Please pick one.") } // Scheduled executions @@ -336,7 +338,7 @@ class Cluster(val name: String, val listener: Listener, config: ClusterConfig) } // If smart balancing is enabled, watch for changes to the cluster's workload. - if (config.useSmartBalancing) + if (config.useSmartBalancing || config.useSmartGaugedBalancing) loadMap = ZKMap.create[Double](zk, "/%s/meta/workload".format(name), new DoubleDeserializer) } @@ -393,21 +395,31 @@ class Cluster(val name: String, val listener: Listener, config: ClusterConfig) * Starts up a work unit that this node has claimed. * If "smart rebalancing" is enabled, hand the listener a meter to mark load. * Otherwise, just call "startWork" on the listener and let the client have at it. - * TODO: Refactor to remove check and cast. + * TODO: Refactor to remove type matching. */ def startWork(workUnit: String, meter: Option[Meter] = None) { log.info("Successfully claimed %s: %s. Starting...", config.workUnitName, workUnit) val added = myWorkUnits.add(workUnit) if (added) { - if (balancingPolicy.isInstanceOf[MeteredBalancingPolicy]) { - val mbp = balancingPolicy.asInstanceOf[MeteredBalancingPolicy] - val meter = mbp.persistentMeterCache.getOrElseUpdate( - workUnit, metrics.meter(workUnit, "processing")) - mbp.meters.put(workUnit, meter) - listener.asInstanceOf[SmartListener].startWork(workUnit, meter) - } else { - listener.asInstanceOf[ClusterListener].startWork(workUnit) + (balancingPolicy, listener) match { + case (balancingPolicy: GaugedBalancingPolicy, listener: SmartGaugedListener) => + listener.startWork(workUnit) + balancingPolicy.gauges.put(workUnit, metrics.gauge(workUnit) { + listener.workload(workUnit) + }) + case (balancingPolicy: MeteredBalancingPolicy, listener: SmartListener) => + val meter = balancingPolicy.persistentMeterCache.getOrElseUpdate( + workUnit, metrics.meter(workUnit, "processing")) + balancingPolicy.gauges.put(workUnit, new Gauge[Double] { + def value = meter.oneMinuteRate + }) + listener.startWork(workUnit, meter) + case (balancingPolicy: CountBalancingPolicy, listener: ClusterListener) => + listener.startWork(workUnit) + case _ => + throw new IllegalStateException("Illegal combination: balancingPolicy = %s, listener = %s".format( + balancingPolicy, listener)) } } else { log.warn("Detected that %s is already a member of my work units; not starting twice!", workUnit) @@ -420,6 +432,13 @@ class Cluster(val name: String, val listener: Listener, config: ClusterConfig) */ def shutdownWork(workUnit: String, doLog: Boolean = true, deleteZNode: Boolean = true) { if (doLog) log.info("Shutting down %s: %s...", config.workUnitName, workUnit) + balancingPolicy match { + case _: MeteredBalancingPolicy => metrics.metricsRegistry.removeMetric(getClass, workUnit, "processing") + case _: GaugedBalancingPolicy => metrics.metricsRegistry.removeMetric(getClass, workUnit) + case _: CountBalancingPolicy => // No metric to unregister + case _ => + log.error("Unknown balancingPolicy type %s, possible metrics leak", balancingPolicy.getClass) + } myWorkUnits.remove(workUnit) val path = "/%s/claimed-%s/%s".format(name, config.workUnitShortName, workUnit) if (deleteZNode) ZKUtils.delete(zk, path) diff --git a/src/main/scala/com/boundary/ordasity/ClusterConfig.scala b/src/main/scala/com/boundary/ordasity/ClusterConfig.scala index 5da3ffe..c5655dd 100644 --- a/src/main/scala/com/boundary/ordasity/ClusterConfig.scala +++ b/src/main/scala/com/boundary/ordasity/ClusterConfig.scala @@ -27,6 +27,7 @@ class ClusterConfig { @JsonProperty var autoRebalanceInterval = 60 @JsonProperty var drainTime = 60 @JsonProperty var useSmartBalancing = false + @JsonProperty var useSmartGaugedBalancing = false @JsonProperty var zkTimeout = 3000 @JsonProperty var workUnitName = "work-units" @JsonProperty var workUnitShortName = "work" @@ -59,6 +60,11 @@ class ClusterConfig { this } + def useSmartGaugedBalancing(to: Boolean) : ClusterConfig = { + useSmartGaugedBalancing = to + this + } + def setDrainTime(to: Int) : ClusterConfig = { drainTime = to this diff --git a/src/main/scala/com/boundary/ordasity/ClusterListener.scala b/src/main/scala/com/boundary/ordasity/ClusterListener.scala index 0ba32cb..bc35189 100644 --- a/src/main/scala/com/boundary/ordasity/ClusterListener.scala +++ b/src/main/scala/com/boundary/ordasity/ClusterListener.scala @@ -16,6 +16,7 @@ package com.boundary.ordasity +import com.yammer.metrics.core.Gauge import com.yammer.metrics.scala.Meter import com.twitter.common.zookeeper.ZooKeeperClient @@ -25,6 +26,11 @@ abstract class Listener { def shutdownWork(workUnit: String) } +abstract class SmartGaugedListener extends Listener { + def startWork(workUnit: String) + def workload(workUnit: String): Double +} + abstract class SmartListener extends Listener { def startWork(workUnit: String, meter: Meter) } diff --git a/src/main/scala/com/boundary/ordasity/balancing/GaugedBalancingPolicy.scala b/src/main/scala/com/boundary/ordasity/balancing/GaugedBalancingPolicy.scala new file mode 100644 index 0000000..a212d37 --- /dev/null +++ b/src/main/scala/com/boundary/ordasity/balancing/GaugedBalancingPolicy.scala @@ -0,0 +1,198 @@ +// +// Copyright 2011-2012, Boundary +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package com.boundary.ordasity.balancing + +import collection.JavaConversions._ +import overlock.atomicmap.AtomicMap +import com.boundary.ordasity._ +import com.codahale.jerkson.Json +import java.util.concurrent.{TimeUnit, ScheduledFuture} +import com.yammer.metrics.core.Gauge +import com.yammer.metrics.scala.Instrumented +import java.util.{TimerTask, LinkedList} +import org.apache.zookeeper.CreateMode + +/** + * Ordasity's gauge-based load balancing policy is [TODO ...]. It must be + * initialized with a SmartGaugedListener. + */ +class GaugedBalancingPolicy(cluster: Cluster, config: ClusterConfig) + extends BalancingPolicy(cluster, config) { + + val gauges = AtomicMap.atomicNBHM[String, Gauge[Double]] + val loadGauge = metrics.gauge[Double]("my_load") { myLoad() } + var loadFuture : Option[ScheduledFuture[_]] = None + + override def init() : BalancingPolicy = { + if (!cluster.listener.isInstanceOf[SmartGaugedListener]) { + throw new RuntimeException("Ordasity's gauged balancing policy must be initialized with " + + "a SmartGaugeListener, but you provided something else. Please fix that so we can measure " + + "the gauge as your application performs work!") + } + + this + } + + /** + * Begins by claimng all work units that are pegged to this node. + * Then, continues to claim work from the available pool until we've claimed + * equal to or slightly more than the total desired load. + */ + def claimWork() { + cluster.allWorkUnits.synchronized { + for (workUnit <- getUnclaimed()) + if (isPeggedToMe(workUnit)) + claimWorkPeggedToMe(workUnit) + + val unclaimed = new LinkedList[String](getUnclaimed()) + while (myLoad() <= evenDistribution && !unclaimed.isEmpty) { + val workUnit = unclaimed.poll() + + if (config.useSoftHandoff && cluster.handoffRequests.contains(workUnit) + && isFairGame(workUnit) && attemptToClaim(workUnit, claimForHandoff = true)) + log.info("Accepted handoff for %s.", workUnit) + + else if (isFairGame(workUnit)) + attemptToClaim(workUnit) + } + } + } + + /** + * Performs a "smart rebalance." The target load is set to (cluster load / node count), + * where "load" is determined by the sum of all work unit gauges in the cluster. + */ + def rebalance() { + val target = evenDistribution() + if (myLoad() > target) { + log.info("Smart Rebalance triggered. Load: %s. Target: %s", myLoad(), target) + drainToLoad(target.longValue) + } + } + + + /** + * When smart balancing is enabled, calculates the even distribution of load about + * the cluster. This is determined by the total load divided by the number of alive nodes. + */ + def evenDistribution() : Double = { + cluster.loadMap.values.sum / activeNodeSize().doubleValue() + } + + + /** + * Determines the current load on this instance when smart rebalancing is enabled. + * This load is determined by the sum of all of this node's gauges' values. + */ + def myLoad() : Double = { + var load = 0d + log.debug(cluster.loadMap.toString) + log.debug(cluster.myWorkUnits.toString) + cluster.myWorkUnits.foreach(u => load += cluster.getOrElse(cluster.loadMap, u, 0)) + load + } + + /** + * Once a minute, pass off information about the amount of load generated per + * work unit off to Zookeeper for use in the claiming and rebalancing process. + */ + private def scheduleLoadTicks() { + val sendLoadToZookeeper = new Runnable { + def run() { + try { + gauges.foreach { case(workUnit, gauge) => + val loadPath = "/%s/meta/workload/%s".format(cluster.name, workUnit) + ZKUtils.setOrCreate(cluster.zk, loadPath, gauge.value.toString, CreateMode.PERSISTENT) + } + + val myInfo = new NodeInfo(cluster.getState.toString, cluster.zk.get().getSessionId) + val nodeLoadPath = "/%s/nodes/%s".format(cluster.name, cluster.myNodeID) + ZKUtils.setOrCreate(cluster.zk, nodeLoadPath, Json.generate(myInfo), CreateMode.EPHEMERAL) + + log.info("My load: %s", myLoad()) + } catch { + case e: Exception => log.error(e, "Error reporting load info to ZooKeeper.") + } + } + } + + loadFuture = Some(cluster.pool.get.scheduleAtFixedRate( + sendLoadToZookeeper, 0, 1, TimeUnit.MINUTES)) + } + + + /** + * Drains excess load on this node down to a fraction distributed across the cluster. + * The target load is set to (clusterLoad / # nodes). + */ + def drainToLoad(targetLoad: Long, time: Int = config.drainTime, + useHandoff: Boolean = config.useSoftHandoff) { + val startingLoad = myLoad() + var currentLoad = myLoad() + val drainList = new LinkedList[String] + val eligibleToDrop = new LinkedList[String](cluster.myWorkUnits -- cluster.workUnitsPeggedToMe) + + while (currentLoad > targetLoad && !eligibleToDrop.isEmpty) { + val workUnit = eligibleToDrop.poll() + var workUnitLoad : Double = cluster.getOrElse(cluster.loadMap, workUnit, 0) + + if (workUnitLoad > 0 && (currentLoad - workUnitLoad) > targetLoad) { + drainList.add(workUnit) + currentLoad -= workUnitLoad + } + } + + val drainInterval = ((config.drainTime.toDouble / drainList.size) * 1000).intValue() + val drainTask = buildDrainTask(drainList, drainInterval, useHandoff, currentLoad) + + if (!drainList.isEmpty) { + log.info("Releasing work units over %s seconds. Current load: %s. Target: %s. " + + "Releasing: %s", time, startingLoad, targetLoad, drainList.mkString(", ")) + cluster.pool.get.schedule(drainTask, 0, TimeUnit.SECONDS) + } + } + + def buildDrainTask(drainList: LinkedList[String], drainInterval: Int, useHandoff: Boolean, + currentLoad: Double) : TimerTask = { + new TimerTask { + def run() { + if (drainList.isEmpty || myLoad <= evenDistribution) { + log.info("Finished the drain list, or my load is now less than an even distribution. " + + "Stopping rebalance. Remaining work units: %s", drainList.mkString(", ")) + return + } + else if (useHandoff) + cluster.requestHandoff(drainList.poll) + else + cluster.shutdownWork(drainList.poll) + + cluster.pool.get.schedule(this, drainInterval, TimeUnit.MILLISECONDS) + } + } + } + + override def onConnect() = scheduleLoadTicks() + + override def shutdown() { + if (loadFuture.isDefined) + loadFuture.get.cancel(true) + } + + override def onShutdownWork(workUnit: String) = + gauges.remove(workUnit) + +} diff --git a/src/main/scala/com/boundary/ordasity/balancing/MeteredBalancingPolicy.scala b/src/main/scala/com/boundary/ordasity/balancing/MeteredBalancingPolicy.scala index 0d5a539..825afee 100644 --- a/src/main/scala/com/boundary/ordasity/balancing/MeteredBalancingPolicy.scala +++ b/src/main/scala/com/boundary/ordasity/balancing/MeteredBalancingPolicy.scala @@ -16,184 +16,28 @@ package com.boundary.ordasity.balancing -import collection.JavaConversions._ import overlock.atomicmap.AtomicMap import com.boundary.ordasity._ -import com.codahale.jerkson.Json -import java.util.concurrent.{TimeUnit, ScheduledFuture} -import com.yammer.metrics.scala.{Meter, Instrumented} -import java.util.{TimerTask, LinkedList} -import org.apache.zookeeper.CreateMode +import com.yammer.metrics.scala.Meter /** - * Ordasity's count-based load balancing policy is simple. A node in the cluster - * will attempt to claim ( work work units / nodes + 1) work units. It may - * be initialized with either a simple ClusterListener or a metered SmartListener. + * A metered balancing policy is just a gauged balancing policy where the gauge + * value is calculated from the meter's one-minute rate. It must be initialized + * with a metered SmartListener. */ class MeteredBalancingPolicy(cluster: Cluster, config: ClusterConfig) - extends BalancingPolicy(cluster, config) { + extends GaugedBalancingPolicy(cluster, config) { - val meters = AtomicMap.atomicNBHM[String, Meter] val persistentMeterCache = AtomicMap.atomicNBHM[String, Meter] - val loadGauge = metrics.gauge[Double]("my_load") { myLoad() } - var loadFuture : Option[ScheduledFuture[_]] = None override def init() : BalancingPolicy = { if (!cluster.listener.isInstanceOf[SmartListener]) { throw new RuntimeException("Ordasity's metered balancing policy must be initialized with " + - "a SmartListener, but you provided a simple listener. Please flip that so we can tick " + + "a SmartListener, but you provided something else. Please fix that so we can tick " + "the meter as your application performs work!") } this } - /** - * Begins by claimng all work units that are pegged to this node. - * Then, continues to claim work from the available pool until we've claimed - * equal to or slightly more than the total desired load. - */ - def claimWork() { - cluster.allWorkUnits.synchronized { - for (workUnit <- getUnclaimed()) - if (isPeggedToMe(workUnit)) - claimWorkPeggedToMe(workUnit) - - val unclaimed = new LinkedList[String](getUnclaimed()) - while (myLoad() <= evenDistribution && !unclaimed.isEmpty) { - val workUnit = unclaimed.poll() - - if (config.useSoftHandoff && cluster.handoffRequests.contains(workUnit) - && isFairGame(workUnit) && attemptToClaim(workUnit, claimForHandoff = true)) - log.info("Accepted handoff for %s.", workUnit) - - else if (isFairGame(workUnit)) - attemptToClaim(workUnit) - } - } - } - - /** - * Performs a "smart rebalance." The target load is set to (cluster load / node count), - * where "load" is determined by the sum of all work unit meters in the cluster. - */ - def rebalance() { - val target = evenDistribution() - if (myLoad() > target) { - log.info("Smart Rebalance triggered. Load: %s. Target: %s", myLoad(), target) - drainToLoad(target.longValue) - } - } - - - /** - * When smart balancing is enabled, calculates the even distribution of load about - * the cluster. This is determined by the total load divided by the number of alive nodes. - */ - def evenDistribution() : Double = { - cluster.loadMap.values.sum / activeNodeSize().doubleValue() - } - - - /** - * Determines the current load on this instance when smart rebalancing is enabled. - * This load is determined by the sum of all of this node's meters' one minute rate. - */ - def myLoad() : Double = { - var load = 0d - log.debug(cluster.loadMap.toString) - log.debug(cluster.myWorkUnits.toString) - cluster.myWorkUnits.foreach(u => load += cluster.getOrElse(cluster.loadMap, u, 0)) - load - } - - /** - * Once a minute, pass off information about the amount of load generated per - * work unit off to Zookeeper for use in the claiming and rebalancing process. - */ - private def scheduleLoadTicks() { - val sendLoadToZookeeper = new Runnable { - def run() { - try { - meters.foreach { case(workUnit, meter) => - val loadPath = "/%s/meta/workload/%s".format(cluster.name, workUnit) - ZKUtils.setOrCreate(cluster.zk, loadPath, meter.oneMinuteRate.toString, CreateMode.PERSISTENT) - } - - val myInfo = new NodeInfo(cluster.getState.toString, cluster.zk.get().getSessionId) - val nodeLoadPath = "/%s/nodes/%s".format(cluster.name, cluster.myNodeID) - ZKUtils.setOrCreate(cluster.zk, nodeLoadPath, Json.generate(myInfo), CreateMode.EPHEMERAL) - - log.info("My load: %s", myLoad()) - } catch { - case e: Exception => log.error(e, "Error reporting load info to ZooKeeper.") - } - } - } - - loadFuture = Some(cluster.pool.get.scheduleAtFixedRate( - sendLoadToZookeeper, 0, 1, TimeUnit.MINUTES)) - } - - - /** - * Drains excess load on this node down to a fraction distributed across the cluster. - * The target load is set to (clusterLoad / # nodes). - */ - def drainToLoad(targetLoad: Long, time: Int = config.drainTime, - useHandoff: Boolean = config.useSoftHandoff) { - val startingLoad = myLoad() - var currentLoad = myLoad() - val drainList = new LinkedList[String] - val eligibleToDrop = new LinkedList[String](cluster.myWorkUnits -- cluster.workUnitsPeggedToMe) - - while (currentLoad > targetLoad && !eligibleToDrop.isEmpty) { - val workUnit = eligibleToDrop.poll() - var workUnitLoad : Double = cluster.getOrElse(cluster.loadMap, workUnit, 0) - - if (workUnitLoad > 0 && (currentLoad - workUnitLoad) > targetLoad) { - drainList.add(workUnit) - currentLoad -= workUnitLoad - } - } - - val drainInterval = ((config.drainTime.toDouble / drainList.size) * 1000).intValue() - val drainTask = buildDrainTask(drainList, drainInterval, useHandoff, currentLoad) - - if (!drainList.isEmpty) { - log.info("Releasing work units over %s seconds. Current load: %s. Target: %s. " + - "Releasing: %s", time, startingLoad, targetLoad, drainList.mkString(", ")) - cluster.pool.get.schedule(drainTask, 0, TimeUnit.SECONDS) - } - } - - def buildDrainTask(drainList: LinkedList[String], drainInterval: Int, useHandoff: Boolean, - currentLoad: Double) : TimerTask = { - new TimerTask { - def run() { - if (drainList.isEmpty || myLoad <= evenDistribution) { - log.info("Finished the drain list, or my load is now less than an even distribution. " + - "Stopping rebalance. Remaining work units: %s", drainList.mkString(", ")) - return - } - else if (useHandoff) - cluster.requestHandoff(drainList.poll) - else - cluster.shutdownWork(drainList.poll) - - cluster.pool.get.schedule(this, drainInterval, TimeUnit.MILLISECONDS) - } - } - } - - override def onConnect() = scheduleLoadTicks() - - override def shutdown() { - if (loadFuture.isDefined) - loadFuture.get.cancel(true) - } - - override def onShutdownWork(workUnit: String) = - meters.remove(workUnit) - } diff --git a/src/test/scala/com/boundary/ordasity/balancing/MeteredBalancingPolicySpec.scala b/src/test/scala/com/boundary/ordasity/balancing/MeteredBalancingPolicySpec.scala index 0a60928..3ce597a 100644 --- a/src/test/scala/com/boundary/ordasity/balancing/MeteredBalancingPolicySpec.scala +++ b/src/test/scala/com/boundary/ordasity/balancing/MeteredBalancingPolicySpec.scala @@ -24,6 +24,7 @@ import collection.JavaConversions._ import org.apache.zookeeper.ZooKeeper import com.twitter.common.zookeeper.ZooKeeperClient import java.util.concurrent.ScheduledFuture +import com.yammer.metrics.core.Gauge import com.yammer.metrics.scala.Meter import java.util.{LinkedList, HashMap, UUID} @@ -107,9 +108,9 @@ class MeteredBalancingPolicySpec extends Spec with Logging { val balancer = new MeteredBalancingPolicy(cluster, config) cluster.balancingPolicy = balancer - balancer.meters.put("foo", mock[Meter]) + balancer.gauges.put("foo", mock[Gauge[Double]]) balancer.onShutdownWork("foo") - balancer.meters.contains("foo").must(be(false)) + balancer.gauges.contains("foo").must(be(false)) } @Test def `claim work` {