Skip to content

Commit

Permalink
Merge pull request #16 from reugn/develop
Browse files Browse the repository at this point in the history
v0.3.6
  • Loading branch information
reugn authored Sep 25, 2021
2 parents 59033d5 + 14e4106 commit 578585b
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 11 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
[![Go Report Card](https://goreportcard.com/badge/github.com/reugn/go-quartz)](https://goreportcard.com/report/github.com/reugn/go-quartz)
[![codecov](https://codecov.io/gh/reugn/go-quartz/branch/master/graph/badge.svg)](https://codecov.io/gh/reugn/go-quartz)

Simple, zero-dependency scheduling library for Go.
A minimalistic and zero-dependency scheduling library for Go.

## About
Inspired by the [Quartz](https://github.com/quartz-scheduler/quartz) Java scheduler.
Expand All @@ -27,6 +27,8 @@ Scheduler interface
type Scheduler interface {
// start the scheduler
Start()
// whether the scheduler has been started
IsStarted() bool
// schedule the job with the specified trigger
ScheduleJob(job Job, trigger Trigger) error
// get all scheduled jobs keys
Expand Down
2 changes: 1 addition & 1 deletion quartz/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ func (parser *CronExpressionParser) nextYear(prev string, field *CronField) stri
}

next, halt := parser.findNextValue(prev, field.values)
if halt != false {
if halt {
panic("Out of expression range error")
}

Expand Down
51 changes: 42 additions & 9 deletions quartz/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type ScheduledJob struct {
type Scheduler interface {
// start the scheduler
Start()
// whether the scheduler has been started
IsStarted() bool
// schedule the job with the specified trigger
ScheduleJob(job Job, trigger Trigger) error
// get keys of all of the scheduled jobs
Expand All @@ -38,16 +40,17 @@ type Scheduler interface {
type StdScheduler struct {
sync.Mutex
Queue *PriorityQueue
interrupt chan interface{}
exit chan interface{}
interrupt chan struct{}
exit chan struct{}
feeder chan *Item
started bool
}

// NewStdScheduler returns a new StdScheduler.
func NewStdScheduler() *StdScheduler {
return &StdScheduler{
Queue: &PriorityQueue{},
interrupt: make(chan interface{}),
interrupt: make(chan struct{}, 1),
exit: nil,
feeder: make(chan *Item)}
}
Expand All @@ -70,12 +73,26 @@ func (sched *StdScheduler) ScheduleJob(job Job, trigger Trigger) error {

// Start starts the StdScheduler execution loop.
func (sched *StdScheduler) Start() {
sched.Lock()
defer sched.Unlock()

if sched.started {
return
}

// reset the exit channel
sched.exit = make(chan interface{})
sched.exit = make(chan struct{})
// start the feed reader
go sched.startFeedReader()
// start scheduler execution loop
go sched.startExecutionLoop()

sched.started = true
}

// IsStarted states whether the scheduler has been started.
func (sched *StdScheduler) IsStarted() bool {
return sched.started
}

// GetJobKeys returns the keys of all of the scheduled jobs.
Expand Down Expand Up @@ -135,8 +152,17 @@ func (sched *StdScheduler) Clear() {

// Stop exits the StdScheduler execution loop.
func (sched *StdScheduler) Stop() {
sched.Lock()
defer sched.Unlock()

if !sched.started {
return
}

log.Printf("Closing the StdScheduler.")
close(sched.exit)

sched.started = false
}

func (sched *StdScheduler) startExecutionLoop() {
Expand All @@ -145,6 +171,7 @@ func (sched *StdScheduler) startExecutionLoop() {
select {
case <-sched.interrupt:
case <-sched.exit:
log.Printf("Exit the empty execution loop.")
return
}
} else {
Expand All @@ -171,10 +198,13 @@ func (sched *StdScheduler) queueLen() int {

func (sched *StdScheduler) calculateNextTick() <-chan time.Time {
sched.Lock()
ts := sched.Queue.Head().priority
var interval int64
if sched.Queue.Len() > 0 {
interval = parkTime(sched.Queue.Head().priority)
}
sched.Unlock()

return time.After(time.Duration(parkTime(ts)))
return time.After(time.Duration(interval))
}

func (sched *StdScheduler) executeAndReschedule() {
Expand All @@ -195,10 +225,13 @@ func (sched *StdScheduler) executeAndReschedule() {

// reschedule the Job
nextRunTime, err := item.Trigger.NextFireTime(item.priority)
if err == nil {
item.priority = nextRunTime
sched.feeder <- item
if err != nil {
log.Printf("The Job '%s' got out the execution loop.", item.Job.Description())
return
}

item.priority = nextRunTime
sched.feeder <- item
}

func (sched *StdScheduler) startFeedReader() {
Expand Down

0 comments on commit 578585b

Please sign in to comment.