Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 30 additions & 17 deletions src/main/scala/com/boundary/ordasity/Cluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

package com.boundary.ordasity

import com.boundary.ordasity.ExceptionUtils.logExceptions
import com.codahale.jerkson.Json._
import com.codahale.logula.Logging
import com.yammer.metrics.core.Gauge
import com.yammer.metrics.scala.{Meter, Instrumented}
import java.lang.management.ManagementFactory
import javax.management.ObjectName
Expand All @@ -33,7 +35,7 @@ import com.twitter.common.quantity.{Time, Amount}
import com.twitter.common.zookeeper.{ZooKeeperMap => ZKMap, ZooKeeperClient}

import listeners._
import balancing.{CountBalancingPolicy, MeteredBalancingPolicy}
import balancing.{CountBalancingPolicy, GaugedBalancingPolicy, MeteredBalancingPolicy}
import org.apache.zookeeper.{WatchedEvent, Watcher}
import org.apache.zookeeper.Watcher.Event.KeeperState
import java.util.concurrent.{TimeoutException, TimeUnit, ScheduledFuture, ScheduledThreadPoolExecutor}
Expand Down Expand Up @@ -66,11 +68,12 @@ class Cluster(val name: String, val listener: Listener, config: ClusterConfig)
var loadMap : Map[String, Double] = null
val workUnitsPeggedToMe = new NonBlockingHashSet[String]

var balancingPolicy = {
if (config.useSmartBalancing)
new MeteredBalancingPolicy(this, config).init()
else
new CountBalancingPolicy(this, config).init()
var balancingPolicy = (config.useSmartBalancing, config.useSmartGaugedBalancing) match {
case (false, false) => new CountBalancingPolicy(this, config).init()
case (true, false) => new MeteredBalancingPolicy(this, config).init()
case (false, true) => new GaugedBalancingPolicy(this, config).init()
case (true, true) => throw new IllegalArgumentException(
"Can't use both smart (meter) balancing and smart gauge balancing. Please pick one.")
}

// Scheduled executions
Expand Down Expand Up @@ -268,7 +271,7 @@ class Cluster(val name: String, val listener: Listener, config: ClusterConfig)
def scheduleRebalancing() {
val interval = config.autoRebalanceInterval
val runRebalance = new Runnable {
def run() {
def run() = logExceptions(log) {
try {
rebalance()
} catch {
Expand Down Expand Up @@ -336,7 +339,7 @@ class Cluster(val name: String, val listener: Listener, config: ClusterConfig)
}

// If smart balancing is enabled, watch for changes to the cluster's workload.
if (config.useSmartBalancing)
if (config.useSmartBalancing || config.useSmartGaugedBalancing)
loadMap = ZKMap.create[Double](zk, "/%s/meta/workload".format(name), new DoubleDeserializer)
}

Expand Down Expand Up @@ -393,21 +396,31 @@ class Cluster(val name: String, val listener: Listener, config: ClusterConfig)
* Starts up a work unit that this node has claimed.
* If "smart rebalancing" is enabled, hand the listener a meter to mark load.
* Otherwise, just call "startWork" on the listener and let the client have at it.
* TODO: Refactor to remove check and cast.
* TODO: Refactor to remove type matching.
*/
def startWork(workUnit: String, meter: Option[Meter] = None) {
log.info("Successfully claimed %s: %s. Starting...", config.workUnitName, workUnit)
val added = myWorkUnits.add(workUnit)

if (added) {
if (balancingPolicy.isInstanceOf[MeteredBalancingPolicy]) {
val mbp = balancingPolicy.asInstanceOf[MeteredBalancingPolicy]
val meter = mbp.persistentMeterCache.getOrElseUpdate(
workUnit, metrics.meter(workUnit, "processing"))
mbp.meters.put(workUnit, meter)
listener.asInstanceOf[SmartListener].startWork(workUnit, meter)
} else {
listener.asInstanceOf[ClusterListener].startWork(workUnit)
(balancingPolicy, listener) match {
case (balancingPolicy: GaugedBalancingPolicy, listener: SmartGaugedListener) =>
listener.startWork(workUnit)
balancingPolicy.gauges.put(workUnit, metrics.gauge(workUnit) {
listener.workload(workUnit)
})
case (balancingPolicy: MeteredBalancingPolicy, listener: SmartListener) =>
val meter = balancingPolicy.persistentMeterCache.getOrElseUpdate(
workUnit, metrics.meter(workUnit, "processing"))
balancingPolicy.gauges.put(workUnit, new Gauge[Double] {
def value = meter.oneMinuteRate
})
listener.startWork(workUnit, meter)
case (balancingPolicy: CountBalancingPolicy, listener: ClusterListener) =>
listener.startWork(workUnit)
case _ =>
throw new IllegalStateException("Illegal combination: balancingPolicy = %s, listener = %s".format(
balancingPolicy, listener))
}
} else {
log.warn("Detected that %s is already a member of my work units; not starting twice!", workUnit)
Expand Down
6 changes: 6 additions & 0 deletions src/main/scala/com/boundary/ordasity/ClusterConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class ClusterConfig {
@JsonProperty var autoRebalanceInterval = 60
@JsonProperty var drainTime = 60
@JsonProperty var useSmartBalancing = false
@JsonProperty var useSmartGaugedBalancing = false
@JsonProperty var zkTimeout = 3000
@JsonProperty var workUnitName = "work-units"
@JsonProperty var workUnitShortName = "work"
Expand Down Expand Up @@ -59,6 +60,11 @@ class ClusterConfig {
this
}

def useSmartGaugedBalancing(to: Boolean) : ClusterConfig = {
useSmartGaugedBalancing = to
this
}

def setDrainTime(to: Int) : ClusterConfig = {
drainTime = to
this
Expand Down
6 changes: 6 additions & 0 deletions src/main/scala/com/boundary/ordasity/ClusterListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.boundary.ordasity

import com.yammer.metrics.core.Gauge
import com.yammer.metrics.scala.Meter
import com.twitter.common.zookeeper.ZooKeeperClient

Expand All @@ -25,6 +26,11 @@ abstract class Listener {
def shutdownWork(workUnit: String)
}

abstract class SmartGaugedListener extends Listener {
def startWork(workUnit: String)
def workload(workUnit: String): Double
}

abstract class SmartListener extends Listener {
def startWork(workUnit: String, meter: Meter)
}
Expand Down
30 changes: 30 additions & 0 deletions src/main/scala/com/boundary/ordasity/ExceptionUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//
// Copyright 2011-2012, Boundary
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package com.boundary.ordasity

import com.codahale.logula.Log
import com.codahale.logula.Logging

object ExceptionUtils extends Logging {

def logExceptions(log: Log)(x: => Any) {
try x catch { case e: Exception =>
log.error(e, "Unexpected exception")
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import collection.JavaConversions._
import com.codahale.logula.Logging
import com.codahale.jerkson.Json
import com.boundary.ordasity.{ZKUtils, NodeState, ClusterConfig, Cluster}
import com.boundary.ordasity.ExceptionUtils.logExceptions
import com.yammer.metrics.scala.Instrumented
import java.util.{TimerTask, LinkedList}
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -183,7 +184,7 @@ abstract class BalancingPolicy(cluster: Cluster, config: ClusterConfig)
val drainInterval = ((config.drainTime.toDouble / toHandOff.size) * 1000).intValue()

val handoffTask = new TimerTask {
def run() {
def run(): Unit = logExceptions(log) {
if (toHandOff.isEmpty) {
if (targetCount == 0 && doShutdown) cluster.completeShutdown()
return
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
//
// Copyright 2011-2012, Boundary
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package com.boundary.ordasity.balancing

import collection.JavaConversions._
import overlock.atomicmap.AtomicMap
import com.boundary.ordasity._
import com.boundary.ordasity.ExceptionUtils.logExceptions
import com.codahale.jerkson.Json
import java.util.concurrent.{TimeUnit, ScheduledFuture}
import com.yammer.metrics.core.Gauge
import com.yammer.metrics.scala.Instrumented
import java.util.{TimerTask, LinkedList}
import org.apache.zookeeper.CreateMode

/**
* Ordasity's gauge-based load balancing policy is [TODO ...]. It must be
* initialized with a SmartGaugedListener.
*/
class GaugedBalancingPolicy(cluster: Cluster, config: ClusterConfig)
extends BalancingPolicy(cluster, config) {

val gauges = AtomicMap.atomicNBHM[String, Gauge[Double]]
val loadGauge = metrics.gauge[Double]("my_load") { myLoad() }
var loadFuture : Option[ScheduledFuture[_]] = None

override def init() : BalancingPolicy = {
if (!cluster.listener.isInstanceOf[SmartGaugedListener]) {
throw new RuntimeException("Ordasity's gauged balancing policy must be initialized with " +
"a SmartGaugeListener, but you provided something else. Please fix that so we can measure " +
"the gauge as your application performs work!")
}

this
}

/**
* Begins by claimng all work units that are pegged to this node.
* Then, continues to claim work from the available pool until we've claimed
* equal to or slightly more than the total desired load.
*/
def claimWork() {
cluster.allWorkUnits.synchronized {
for (workUnit <- getUnclaimed())
if (isPeggedToMe(workUnit))
claimWorkPeggedToMe(workUnit)

val unclaimed = new LinkedList[String](getUnclaimed())
while (myLoad() <= evenDistribution && !unclaimed.isEmpty) {
val workUnit = unclaimed.poll()

if (config.useSoftHandoff && cluster.handoffRequests.contains(workUnit)
&& isFairGame(workUnit) && attemptToClaim(workUnit, claimForHandoff = true))
log.info("Accepted handoff for %s.", workUnit)

else if (isFairGame(workUnit))
attemptToClaim(workUnit)
}
}
}

/**
* Performs a "smart rebalance." The target load is set to (cluster load / node count),
* where "load" is determined by the sum of all work unit gauges in the cluster.
*/
def rebalance() {
val target = evenDistribution()
if (myLoad() > target) {
log.info("Smart Rebalance triggered. Load: %s. Target: %s", myLoad(), target)
drainToLoad(target.longValue)
}
}


/**
* When smart balancing is enabled, calculates the even distribution of load about
* the cluster. This is determined by the total load divided by the number of alive nodes.
*/
def evenDistribution() : Double = {
cluster.loadMap.values.sum / activeNodeSize().doubleValue()
}


/**
* Determines the current load on this instance when smart rebalancing is enabled.
* This load is determined by the sum of all of this node's gauges' values.
*/
def myLoad() : Double = {
var load = 0d
log.debug(cluster.loadMap.toString)
log.debug(cluster.myWorkUnits.toString)
cluster.myWorkUnits.foreach(u => load += cluster.getOrElse(cluster.loadMap, u, 0))
load
}

/**
* Once a minute, pass off information about the amount of load generated per
* work unit off to Zookeeper for use in the claiming and rebalancing process.
*/
private def scheduleLoadTicks() {
val sendLoadToZookeeper = new Runnable {
def run() = logExceptions(log) {
try {
gauges.foreach { case(workUnit, gauge) =>
val loadPath = "/%s/meta/workload/%s".format(cluster.name, workUnit)
ZKUtils.setOrCreate(cluster.zk, loadPath, gauge.value.toString, CreateMode.PERSISTENT)
}

val myInfo = new NodeInfo(cluster.getState.toString, cluster.zk.get().getSessionId)
val nodeLoadPath = "/%s/nodes/%s".format(cluster.name, cluster.myNodeID)
ZKUtils.setOrCreate(cluster.zk, nodeLoadPath, Json.generate(myInfo), CreateMode.EPHEMERAL)

log.info("My load: %s", myLoad())
} catch {
case e: Exception => log.error(e, "Error reporting load info to ZooKeeper.")
}
}
}

loadFuture = Some(cluster.pool.get.scheduleAtFixedRate(
sendLoadToZookeeper, 0, 1, TimeUnit.MINUTES))
}


/**
* Drains excess load on this node down to a fraction distributed across the cluster.
* The target load is set to (clusterLoad / # nodes).
*/
def drainToLoad(targetLoad: Long, time: Int = config.drainTime,
useHandoff: Boolean = config.useSoftHandoff) {
val startingLoad = myLoad()
var currentLoad = myLoad()
val drainList = new LinkedList[String]
val eligibleToDrop = new LinkedList[String](cluster.myWorkUnits -- cluster.workUnitsPeggedToMe)

while (currentLoad > targetLoad && !eligibleToDrop.isEmpty) {
val workUnit = eligibleToDrop.poll()
var workUnitLoad : Double = cluster.getOrElse(cluster.loadMap, workUnit, 0)

if (workUnitLoad > 0 && (currentLoad - workUnitLoad) > targetLoad) {
drainList.add(workUnit)
currentLoad -= workUnitLoad
}
}

val drainInterval = ((config.drainTime.toDouble / drainList.size) * 1000).intValue()
val drainTask = buildDrainTask(drainList, drainInterval, useHandoff, currentLoad)

if (!drainList.isEmpty) {
log.info("Releasing work units over %s seconds. Current load: %s. Target: %s. " +
"Releasing: %s", time, startingLoad, targetLoad, drainList.mkString(", "))
cluster.pool.get.schedule(drainTask, 0, TimeUnit.SECONDS)
}
}

def buildDrainTask(drainList: LinkedList[String], drainInterval: Int, useHandoff: Boolean,
currentLoad: Double) : TimerTask = {
new TimerTask {
def run(): Unit = logExceptions(log) {
if (drainList.isEmpty || myLoad <= evenDistribution) {
log.info("Finished the drain list, or my load is now less than an even distribution. " +
"Stopping rebalance. Remaining work units: %s", drainList.mkString(", "))
return
}
else if (useHandoff)
cluster.requestHandoff(drainList.poll)
else
cluster.shutdownWork(drainList.poll)

cluster.pool.get.schedule(this, drainInterval, TimeUnit.MILLISECONDS)
}
}
}

override def onConnect() = scheduleLoadTicks()

override def shutdown() {
if (loadFuture.isDefined)
loadFuture.get.cancel(true)
}

override def onShutdownWork(workUnit: String) =
gauges.remove(workUnit)

}
Loading